You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/05/27 01:10:02 UTC

[iotdb] branch master updated: Improve aligned write in tsfile api (#6021)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new b535b278dc Improve aligned write in tsfile api (#6021)
b535b278dc is described below

commit b535b278dc0d71bde001c23e8eefdb3f60f6881c
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Fri May 27 09:09:57 2022 +0800

    Improve aligned write in tsfile api (#6021)
---
 .../write/chunk/AlignedChunkGroupWriterImpl.java   | 85 +++++++++++++++-------
 .../iotdb/tsfile/write/chunk/ValueChunkWriter.java |  2 +-
 2 files changed, 61 insertions(+), 26 deletions(-)

diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
index 79d9ed271e..9a43b55b7b 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
@@ -38,11 +38,11 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter {
   private static final Logger LOG = LoggerFactory.getLogger(AlignedChunkGroupWriterImpl.class);
@@ -50,11 +50,9 @@ public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter {
   private final String deviceId;
 
   // measurementID -> ValueChunkWriter
-  private Map<String, ValueChunkWriter> valueChunkWriterMap = new LinkedHashMap<>();
+  private final Map<String, ValueChunkWriter> valueChunkWriterMap = new LinkedHashMap<>();
 
-  private TimeChunkWriter timeChunkWriter;
-
-  private Set<String> writenMeasurementSet = new HashSet<>();
+  private final TimeChunkWriter timeChunkWriter;
 
   private long lastTime = -1;
 
@@ -103,10 +101,16 @@ public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter {
 
   @Override
   public int write(long time, List<DataPoint> data) throws WriteProcessException, IOException {
-    checkIsHistoryData("", time);
-
+    checkIsHistoryData(time);
+    List<ValueChunkWriter> emptyValueChunkWriters = new ArrayList<>();
+    Set<String> existingMeasurements =
+        data.stream().map(DataPoint::getMeasurementId).collect(Collectors.toSet());
+    for (Map.Entry<String, ValueChunkWriter> entry : valueChunkWriterMap.entrySet()) {
+      if (!existingMeasurements.contains(entry.getKey())) {
+        emptyValueChunkWriters.add(entry.getValue());
+      }
+    }
     for (DataPoint point : data) {
-      writenMeasurementSet.add(point.getMeasurementId());
       boolean isNull = point.getValue() == null;
       ValueChunkWriter valueChunkWriter = valueChunkWriterMap.get(point.getMeasurementId());
       switch (point.getType()) {
@@ -133,7 +137,9 @@ public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter {
               String.format("Data type %s is not supported.", point.getType()));
       }
     }
-    writeEmptyDataInOneRow(time);
+    if (!emptyValueChunkWriters.isEmpty()) {
+      writeEmptyDataInOneRow(emptyValueChunkWriters);
+    }
     timeChunkWriter.write(time);
     lastTime = time;
     if (checkPageSizeAndMayOpenANewPage()) {
@@ -146,18 +152,25 @@ public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter {
   public int write(Tablet tablet) throws WriteProcessException, IOException {
     int pointCount = 0;
     List<MeasurementSchema> measurementSchemas = tablet.getSchemas();
+    List<ValueChunkWriter> emptyValueChunkWriters = new ArrayList<>();
+    Set<String> existingMeasurements =
+        measurementSchemas.stream()
+            .map(MeasurementSchema::getMeasurementId)
+            .collect(Collectors.toSet());
+    for (Map.Entry<String, ValueChunkWriter> entry : valueChunkWriterMap.entrySet()) {
+      if (!existingMeasurements.contains(entry.getKey())) {
+        emptyValueChunkWriters.add(entry.getValue());
+      }
+    }
     for (int row = 0; row < tablet.rowSize; row++) {
       long time = tablet.timestamps[row];
-      checkIsHistoryData("", time);
+      checkIsHistoryData(time);
       for (int columnIndex = 0; columnIndex < measurementSchemas.size(); columnIndex++) {
-        writenMeasurementSet.add(measurementSchemas.get(columnIndex).getMeasurementId());
-        boolean isNull = false;
+        boolean isNull =
+            tablet.bitMaps != null
+                && tablet.bitMaps[columnIndex] != null
+                && tablet.bitMaps[columnIndex].isMarked(row);
         // check isNull by bitMap in tablet
-        if (tablet.bitMaps != null
-            && tablet.bitMaps[columnIndex] != null
-            && tablet.bitMaps[columnIndex].isMarked(row)) {
-          isNull = true;
-        }
         ValueChunkWriter valueChunkWriter =
             valueChunkWriterMap.get(measurementSchemas.get(columnIndex).getMeasurementId());
         switch (measurementSchemas.get(columnIndex).getType()) {
@@ -186,7 +199,9 @@ public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter {
                     measurementSchemas.get(columnIndex).getType()));
         }
       }
-      writeEmptyDataInOneRow(time);
+      if (!emptyValueChunkWriters.isEmpty()) {
+        writeEmptyDataInOneRow(emptyValueChunkWriters);
+      }
       timeChunkWriter.write(time);
       lastTime = time;
       if (checkPageSizeAndMayOpenANewPage()) {
@@ -241,13 +256,33 @@ public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter {
     }
   }
 
-  private void writeEmptyDataInOneRow(long time) {
-    for (Map.Entry<String, ValueChunkWriter> entry : valueChunkWriterMap.entrySet()) {
-      if (!writenMeasurementSet.contains(entry.getKey())) {
-        entry.getValue().write(time, 0, true);
+  private void writeEmptyDataInOneRow(List<ValueChunkWriter> valueChunkWriterList) {
+    for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) {
+      TSDataType dataType = valueChunkWriter.getDataType();
+      switch (dataType) {
+        case BOOLEAN:
+          valueChunkWriter.write(-1, false, true);
+          break;
+        case INT32:
+          valueChunkWriter.write(-1, 0, true);
+          break;
+        case INT64:
+          valueChunkWriter.write(-1, 0L, true);
+          break;
+        case FLOAT:
+          valueChunkWriter.write(-1, 0.0f, true);
+          break;
+        case DOUBLE:
+          valueChunkWriter.write(-1, 0.0d, true);
+          break;
+        case TEXT:
+          valueChunkWriter.write(-1, null, true);
+          break;
+        default:
+          throw new UnSupportedDataTypeException(
+              String.format("Data type %s is not supported.", dataType));
       }
     }
-    writenMeasurementSet.clear();
   }
 
   /**
@@ -280,13 +315,13 @@ public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter {
     }
   }
 
-  private void checkIsHistoryData(String measurementId, long time) throws WriteProcessException {
+  private void checkIsHistoryData(long time) throws WriteProcessException {
     if (time <= lastTime) {
       throw new WriteProcessException(
           "Not allowed to write out-of-order data in timeseries "
               + deviceId
               + TsFileConstant.PATH_SEPARATOR
-              + measurementId
+              + ""
               + ", time should later than "
               + lastTime);
     }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java
index 3ece00b403..e89edc4407 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java
@@ -217,7 +217,7 @@ public class ValueChunkWriter {
     // Empty chunk, it may happen if pageBuffer stores empty bits and only chunk header will be
     // flushed.
     if (statistics.getCount() == 0) {
-      return ChunkHeader.getSerializedSize(measurementId, pageBuffer.size());
+      return ChunkHeader.getSerializedSize(measurementId, 0);
     }
 
     // return the serialized size of the chunk header + all pages