You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2022/11/18 11:32:31 UTC

[iotdb] 01/09: improve aligned tsfile write api

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch improvedAlign_for_expr
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 29325681d2b477fb6b23c5f6d26b3e2008c24177
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Thu May 26 10:04:47 2022 +0800

    improve aligned tsfile write api
---
 .../write/chunk/AlignedChunkGroupWriterImpl.java   | 220 ++++++++++++++++-----
 .../iotdb/tsfile/write/chunk/TimeChunkWriter.java  |   4 +
 .../iotdb/tsfile/write/chunk/ValueChunkWriter.java |   4 +
 3 files changed, 184 insertions(+), 44 deletions(-)

diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
index 79d9ed271e..20e0e50c53 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
@@ -43,6 +43,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter {
   private static final Logger LOG = LoggerFactory.getLogger(AlignedChunkGroupWriterImpl.class);
@@ -144,57 +145,188 @@ public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter {
 
   @Override
   public int write(Tablet tablet) throws WriteProcessException, IOException {
-    int pointCount = 0;
-    List<MeasurementSchema> measurementSchemas = tablet.getSchemas();
+    // write time
     for (int row = 0; row < tablet.rowSize; row++) {
       long time = tablet.timestamps[row];
       checkIsHistoryData("", time);
-      for (int columnIndex = 0; columnIndex < measurementSchemas.size(); columnIndex++) {
-        writenMeasurementSet.add(measurementSchemas.get(columnIndex).getMeasurementId());
-        boolean isNull = false;
-        // check isNull by bitMap in tablet
-        if (tablet.bitMaps != null
-            && tablet.bitMaps[columnIndex] != null
-            && tablet.bitMaps[columnIndex].isMarked(row)) {
-          isNull = true;
-        }
-        ValueChunkWriter valueChunkWriter =
-            valueChunkWriterMap.get(measurementSchemas.get(columnIndex).getMeasurementId());
-        switch (measurementSchemas.get(columnIndex).getType()) {
-          case BOOLEAN:
-            valueChunkWriter.write(time, ((boolean[]) tablet.values[columnIndex])[row], isNull);
-            break;
-          case INT32:
-            valueChunkWriter.write(time, ((int[]) tablet.values[columnIndex])[row], isNull);
-            break;
-          case INT64:
-            valueChunkWriter.write(time, ((long[]) tablet.values[columnIndex])[row], isNull);
-            break;
-          case FLOAT:
-            valueChunkWriter.write(time, ((float[]) tablet.values[columnIndex])[row], isNull);
-            break;
-          case DOUBLE:
-            valueChunkWriter.write(time, ((double[]) tablet.values[columnIndex])[row], isNull);
-            break;
-          case TEXT:
-            valueChunkWriter.write(time, ((Binary[]) tablet.values[columnIndex])[row], isNull);
-            break;
-          default:
-            throw new UnSupportedDataTypeException(
-                String.format(
-                    "Data type %s is not supported.",
-                    measurementSchemas.get(columnIndex).getType()));
-        }
-      }
-      writeEmptyDataInOneRow(time);
       timeChunkWriter.write(time);
       lastTime = time;
-      if (checkPageSizeAndMayOpenANewPage()) {
-        writePageToPageBuffer();
+      if (timeChunkWriter.needANewPage()) {
+        timeChunkWriter.writePageToPageBuffer();
       }
-      pointCount++;
     }
-    return pointCount;
+
+    List<MeasurementSchema> measurementSchemas = tablet.getSchemas();
+    // write existed values
+    for (int columnIndex = 0; columnIndex < measurementSchemas.size(); columnIndex++) {
+      ValueChunkWriter valueChunkWriter =
+          valueChunkWriterMap.get(measurementSchemas.get(columnIndex).getMeasurementId());
+      TSDataType dataType = measurementSchemas.get(columnIndex).getType();
+      switch (dataType) {
+        case BOOLEAN:
+          boolean[] booleanValues = (boolean[]) tablet.values[columnIndex];
+          for (int row = 0; row < tablet.rowSize; row++) {
+            long time = tablet.timestamps[row];
+            boolean isNull =
+                tablet.bitMaps != null
+                    && tablet.bitMaps[columnIndex] != null
+                    && tablet.bitMaps[columnIndex].isMarked(row);
+            // check isNull by bitMap in tablet
+            valueChunkWriter.write(time, booleanValues[row], isNull);
+            if (valueChunkWriter.needANewPage()) {
+              valueChunkWriter.writePageToPageBuffer();
+            }
+          }
+          break;
+        case INT32:
+          int[] intValues = (int[]) tablet.values[columnIndex];
+          for (int row = 0; row < tablet.rowSize; row++) {
+            long time = tablet.timestamps[row];
+            boolean isNull =
+                tablet.bitMaps != null
+                    && tablet.bitMaps[columnIndex] != null
+                    && tablet.bitMaps[columnIndex].isMarked(row);
+            // check isNull by bitMap in tablet
+            valueChunkWriter.write(time, intValues[row], isNull);
+            if (valueChunkWriter.needANewPage()) {
+              valueChunkWriter.writePageToPageBuffer();
+            }
+          }
+          break;
+        case INT64:
+          long[] longValues = (long[]) tablet.values[columnIndex];
+          for (int row = 0; row < tablet.rowSize; row++) {
+            long time = tablet.timestamps[row];
+            boolean isNull =
+                tablet.bitMaps != null
+                    && tablet.bitMaps[columnIndex] != null
+                    && tablet.bitMaps[columnIndex].isMarked(row);
+            // check isNull by bitMap in tablet
+            valueChunkWriter.write(time, longValues[row], isNull);
+            if (valueChunkWriter.needANewPage()) {
+              valueChunkWriter.writePageToPageBuffer();
+            }
+          }
+          break;
+        case FLOAT:
+          float[] floatValues = (float[]) tablet.values[columnIndex];
+          for (int row = 0; row < tablet.rowSize; row++) {
+            long time = tablet.timestamps[row];
+            boolean isNull =
+                tablet.bitMaps != null
+                    && tablet.bitMaps[columnIndex] != null
+                    && tablet.bitMaps[columnIndex].isMarked(row);
+            // check isNull by bitMap in tablet
+            valueChunkWriter.write(time, floatValues[row], isNull);
+            if (valueChunkWriter.needANewPage()) {
+              valueChunkWriter.writePageToPageBuffer();
+            }
+          }
+          break;
+        case DOUBLE:
+          double[] doubleValues = (double[]) tablet.values[columnIndex];
+          for (int row = 0; row < tablet.rowSize; row++) {
+            long time = tablet.timestamps[row];
+            boolean isNull =
+                tablet.bitMaps != null
+                    && tablet.bitMaps[columnIndex] != null
+                    && tablet.bitMaps[columnIndex].isMarked(row);
+            // check isNull by bitMap in tablet
+            valueChunkWriter.write(time, doubleValues[row], isNull);
+            if (valueChunkWriter.needANewPage()) {
+              valueChunkWriter.writePageToPageBuffer();
+            }
+          }
+          break;
+        case TEXT:
+          Binary[] binaryValues = (Binary[]) tablet.values[columnIndex];
+          for (int row = 0; row < tablet.rowSize; row++) {
+            long time = tablet.timestamps[row];
+            boolean isNull =
+                tablet.bitMaps != null
+                    && tablet.bitMaps[columnIndex] != null
+                    && tablet.bitMaps[columnIndex].isMarked(row);
+            // check isNull by bitMap in tablet
+            valueChunkWriter.write(time, binaryValues[row], isNull);
+            if (valueChunkWriter.needANewPage()) {
+              valueChunkWriter.writePageToPageBuffer();
+            }
+          }
+          break;
+        default:
+          throw new UnSupportedDataTypeException(
+              String.format("Data type %s is not supported.", dataType));
+      }
+    }
+
+    if (measurementSchemas.size() != valueChunkWriterMap.size()) {
+      Set<String> existingMeasurements =
+          measurementSchemas.stream()
+              .map(MeasurementSchema::getMeasurementId)
+              .collect(Collectors.toSet());
+      // write non-existed values
+      for (Map.Entry<String, ValueChunkWriter> entry : valueChunkWriterMap.entrySet()) {
+        if (!existingMeasurements.contains(entry.getKey())) {
+          ValueChunkWriter valueChunkWriter = entry.getValue();
+          TSDataType dataType = valueChunkWriter.getDataType();
+          switch (dataType) {
+            case BOOLEAN:
+              for (int row = 0; row < tablet.rowSize; row++) {
+                valueChunkWriter.write(-1, false, true);
+                if (valueChunkWriter.needANewPage()) {
+                  valueChunkWriter.writePageToPageBuffer();
+                }
+              }
+              break;
+            case INT32:
+              for (int row = 0; row < tablet.rowSize; row++) {
+                valueChunkWriter.write(-1, 0, true);
+                if (valueChunkWriter.needANewPage()) {
+                  valueChunkWriter.writePageToPageBuffer();
+                }
+              }
+              break;
+            case INT64:
+              for (int row = 0; row < tablet.rowSize; row++) {
+                valueChunkWriter.write(-1, 0L, true);
+                if (valueChunkWriter.needANewPage()) {
+                  valueChunkWriter.writePageToPageBuffer();
+                }
+              }
+              break;
+            case FLOAT:
+              for (int row = 0; row < tablet.rowSize; row++) {
+                valueChunkWriter.write(-1, 0.0f, true);
+                if (valueChunkWriter.needANewPage()) {
+                  valueChunkWriter.writePageToPageBuffer();
+                }
+              }
+              break;
+            case DOUBLE:
+              for (int row = 0; row < tablet.rowSize; row++) {
+                valueChunkWriter.write(-1, 0.0, true);
+                if (valueChunkWriter.needANewPage()) {
+                  valueChunkWriter.writePageToPageBuffer();
+                }
+              }
+              break;
+            case TEXT:
+              for (int row = 0; row < tablet.rowSize; row++) {
+                valueChunkWriter.write(-1, null, true);
+                if (valueChunkWriter.needANewPage()) {
+                  valueChunkWriter.writePageToPageBuffer();
+                }
+              }
+              break;
+            default:
+              throw new UnSupportedDataTypeException(
+                  String.format("Data type %s is not supported.", dataType));
+          }
+        }
+      }
+    }
+
+    return tablet.rowSize;
   }
 
   @Override
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java
index b96f1a09ec..bd37840d79 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java
@@ -136,6 +136,10 @@ public class TimeChunkWriter {
     return false;
   }
 
+  public boolean needANewPage() {
+    return pageWriter.getPointNumber() >= maxNumberOfPointsInPage;
+  }
+
   public void writePageToPageBuffer() {
     try {
       if (numOfPages == 0) { // record the firstPageStatistics
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java
index 3ece00b403..7a143c9f95 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java
@@ -252,6 +252,10 @@ public class ValueChunkWriter {
     return false;
   }
 
+  public boolean needANewPage() {
+    return pageWriter.getSize() >= maxNumberOfPointsInPage;
+  }
+
   public void sealCurrentPage() {
     // if the page contains no points, we still need to serialize it
     if (pageWriter != null && pageWriter.getSize() != 0) {