You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/03/01 12:43:35 UTC

[iotdb] branch NonAlignedTablet created (now 448ed1c)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch NonAlignedTablet
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at 448ed1c  Support NonAlignedTablet

This branch includes the following new commits:

     new 448ed1c  Support NonAlignedTablet

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 01/01: Support NonAlignedTablet

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch NonAlignedTablet
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 448ed1ceeaa9d6ef8171c488e2d793afb2f30953
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Tue Mar 1 20:42:15 2022 +0800

    Support NonAlignedTablet
---
 .../apache/iotdb/tsfile/TsFileWriteWithTablet.java |  32 ++-
 .../apache/iotdb/tsfile/write/TsFileWriter.java    |  28 +++
 .../chunk/NonAlignedChunkGroupWriterImpl.java      |  72 ++++++
 .../tsfile/write/record/NonAlignedTablet.java      | 280 +++++++++++++++++++++
 4 files changed, 411 insertions(+), 1 deletion(-)

diff --git a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileWriteWithTablet.java b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileWriteWithTablet.java
index c454e0c..2bb19d8 100644
--- a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileWriteWithTablet.java
+++ b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileWriteWithTablet.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.write.TsFileWriter;
+import org.apache.iotdb.tsfile.write.record.NonAlignedTablet;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
@@ -69,7 +70,7 @@ public class TsFileWriteWithTablet {
         writeMeasurementScheams.add(measurementSchemas.get(0));
         writeMeasurementScheams.add(measurementSchemas.get(1));
         writeMeasurementScheams.add(measurementSchemas.get(2));
-        writeWithTablet(tsFileWriter, DEVICE_1, writeMeasurementScheams, 10000, 0, 0);
+        writeWithNonAlignedTablet(tsFileWriter, DEVICE_1, writeMeasurementScheams, 10000, 0, 0);
       }
     } catch (Exception e) {
       logger.error("meet error in TsFileWrite with tablet", e);
@@ -108,4 +109,33 @@ public class TsFileWriteWithTablet {
       tablet.reset();
     }
   }
+
+  private static void writeWithNonAlignedTablet(
+      TsFileWriter tsFileWriter,
+      String deviceId,
+      List<MeasurementSchema> schemas,
+      long rowNum,
+      long startTime,
+      long startValue)
+      throws IOException, WriteProcessException {
+    NonAlignedTablet tablet = new NonAlignedTablet(deviceId, schemas);
+
+    long sensorNum = schemas.size();
+    for (long r = 0; r < rowNum; r++, startValue++) {
+      for (int i = 0; i < sensorNum; i++) {
+        tablet.addValue(
+            schemas.get(i).getMeasurementId(), startTime++, new Binary("testString........."));
+      }
+      // write
+      if (tablet.maxRowSize == tablet.getMaxRowNumber()) {
+        tsFileWriter.write(tablet);
+        tablet.reset();
+      }
+    }
+    // write
+    if (tablet.maxRowSize != 0) {
+      tsFileWriter.write(tablet);
+      tablet.reset();
+    }
+  }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
index f2ced03..5e8cdfb 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.tsfile.utils.MeasurementGroup;
 import org.apache.iotdb.tsfile.write.chunk.AlignedChunkGroupWriterImpl;
 import org.apache.iotdb.tsfile.write.chunk.IChunkGroupWriter;
 import org.apache.iotdb.tsfile.write.chunk.NonAlignedChunkGroupWriterImpl;
+import org.apache.iotdb.tsfile.write.record.NonAlignedTablet;
 import org.apache.iotdb.tsfile.write.record.TSRecord;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
@@ -380,6 +381,24 @@ public class TsFileWriter implements AutoCloseable {
     }
   }
 
+  private void checkIsTimeseriesExist(NonAlignedTablet tablet) throws WriteProcessException {
+    IChunkGroupWriter groupWriter = tryToInitialGroupWriter(tablet.deviceId, false);
+
+    Path devicePath = new Path(tablet.deviceId);
+    List<MeasurementSchema> schemas = tablet.getSchemas();
+    if (schema.containsDevice(devicePath)) {
+      checkIsAllMeasurementsInGroup(schema.getSeriesSchema(devicePath), schemas, false);
+      groupWriter.tryToAddSeriesWriter(schemas);
+    } else if (schema.getSchemaTemplates() != null && schema.getSchemaTemplates().size() == 1) {
+      MeasurementGroup measurementGroup =
+          schema.getSchemaTemplates().entrySet().iterator().next().getValue();
+      checkIsAllMeasurementsInGroup(measurementGroup, schemas, false);
+      groupWriter.tryToAddSeriesWriter(schemas);
+    } else {
+      throw new NoMeasurementException("input devicePath is invalid: " + devicePath);
+    }
+  }
+
   /**
    * If it's aligned, then all measurementSchemas should be contained in the measurementGroup, or it
    * will throw exception. If it's nonAligned, then remove the measurementSchema that is not
@@ -513,6 +532,15 @@ public class TsFileWriter implements AutoCloseable {
     return checkMemorySizeAndMayFlushChunks();
   }
 
+  public boolean write(NonAlignedTablet tablet) throws IOException, WriteProcessException {
+    // make sure the ChunkGroupWriter for this Tablet exist
+    checkIsTimeseriesExist(tablet);
+    // get corresponding ChunkGroupWriter and write this Tablet
+    recordCount +=
+        ((NonAlignedChunkGroupWriterImpl) groupWriters.get(tablet.deviceId)).write(tablet);
+    return checkMemorySizeAndMayFlushChunks();
+  }
+
   public boolean writeAligned(Tablet tablet) throws IOException, WriteProcessException {
     // make sure the ChunkGroupWriter for this Tablet exist
     checkIsTimeseriesExist(tablet, true);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/NonAlignedChunkGroupWriterImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/NonAlignedChunkGroupWriterImpl.java
index 8b6038e..9d65318 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/NonAlignedChunkGroupWriterImpl.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/NonAlignedChunkGroupWriterImpl.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
 import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.record.NonAlignedTablet;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
@@ -129,6 +130,77 @@ public class NonAlignedChunkGroupWriterImpl implements IChunkGroupWriter {
     return pointCount;
   }
 
+  public int write(NonAlignedTablet tablet) throws WriteProcessException {
+    int pointCount = 0;
+    List<MeasurementSchema> timeseries = tablet.getSchemas();
+    for (int column = 0; column < timeseries.size(); column++) {
+      String measurementId = timeseries.get(column).getMeasurementId();
+      int rowSize = tablet.rowSize[column];
+      pointCount = Math.max(pointCount, rowSize);
+      long[] timestamps = tablet.timestamps[column];
+      switch (timeseries.get(column).getType()) {
+        case INT32:
+          int[] intValues = (int[]) tablet.values[column];
+          for (int row = 0; row < rowSize; row++) {
+            long time = timestamps[row];
+            checkIsHistoryData(measurementId, time);
+            chunkWriters.get(measurementId).write(time, intValues[row]);
+            lastTimeMap.put(measurementId, time);
+          }
+          break;
+        case INT64:
+          long[] longValues = (long[]) tablet.values[column];
+          for (int row = 0; row < rowSize; row++) {
+            long time = timestamps[row];
+            checkIsHistoryData(measurementId, time);
+            chunkWriters.get(measurementId).write(time, longValues[row]);
+            lastTimeMap.put(measurementId, time);
+          }
+          break;
+        case FLOAT:
+          float[] floatValues = (float[]) tablet.values[column];
+          for (int row = 0; row < rowSize; row++) {
+            long time = timestamps[row];
+            checkIsHistoryData(measurementId, time);
+            chunkWriters.get(measurementId).write(time, floatValues[row]);
+            lastTimeMap.put(measurementId, time);
+          }
+          break;
+        case DOUBLE:
+          double[] doubleValues = (double[]) tablet.values[column];
+          for (int row = 0; row < rowSize; row++) {
+            long time = timestamps[row];
+            checkIsHistoryData(measurementId, time);
+            chunkWriters.get(measurementId).write(time, doubleValues[row]);
+            lastTimeMap.put(measurementId, time);
+          }
+          break;
+        case BOOLEAN:
+          boolean[] booleanValues = (boolean[]) tablet.values[column];
+          for (int row = 0; row < rowSize; row++) {
+            long time = timestamps[row];
+            checkIsHistoryData(measurementId, time);
+            chunkWriters.get(measurementId).write(time, booleanValues[row]);
+            lastTimeMap.put(measurementId, time);
+          }
+          break;
+        case TEXT:
+          Binary[] binaryValues = (Binary[]) tablet.values[column];
+          for (int row = 0; row < rowSize; row++) {
+            long time = timestamps[row];
+            checkIsHistoryData(measurementId, time);
+            chunkWriters.get(measurementId).write(time, binaryValues[row]);
+            lastTimeMap.put(measurementId, time);
+          }
+          break;
+        default:
+          throw new UnSupportedDataTypeException(
+              String.format("Data type %s is not supported.", timeseries.get(column).getType()));
+      }
+    }
+    return pointCount;
+  }
+
   @Override
   public long flushToFileWriter(TsFileIOWriter fileWriter) throws IOException {
     LOG.debug("start flush device id:{}", deviceId);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/NonAlignedTablet.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/NonAlignedTablet.java
new file mode 100644
index 0000000..f0be58f
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/NonAlignedTablet.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.write.record;
+
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import java.util.*;
+
+public class NonAlignedTablet {
+
+  private static final int DEFAULT_SIZE = 1024;
+  private static final String NOT_SUPPORT_DATATYPE = "Data type %s is not supported.";
+
+  /** deviceId of this tablet */
+  public String deviceId;
+
+  /** the list of measurement schemas for creating the tablet */
+  private List<MeasurementSchema> schemas;
+
+  /** measurementId->indexOf(measurementSchema) */
+  private final Map<String, Integer> measurementIndex;
+
+  /** timestamps in this tablet */
+  public long[][] timestamps;
+  /** each object is a primitive type array, which represents values of one measurement */
+  public Object[] values;
+
+  /** the number of rows for each sensor to include in this tablet */
+  public int[] rowSize;
+  /** the max number of rows in each sensor */
+  public int maxRowSize;
+  /** the maximum number of rows for this tablet */
+  private final int maxRowNumber;
+
+  /**
+   * Return a tablet with default specified row number. This is the standard constructor (all Tablet
+   * should be the same size).
+   *
+   * @param deviceId the name of the device specified to be written in
+   * @param schemas the list of measurement schemas for creating the tablet, only measurementId and
+   *     type take effects
+   */
+  public NonAlignedTablet(String deviceId, List<MeasurementSchema> schemas) {
+    this(deviceId, schemas, DEFAULT_SIZE);
+  }
+
+  /**
+   * Return a tablet with the specified number of rows (maxBatchSize). Only call this constructor
+   * directly for testing purposes. Tablet should normally always be default size.
+   *
+   * @param deviceId the name of the device specified to be written in
+   * @param schemas the list of measurement schemas for creating the row batch, only measurementId
+   *     and type take effects
+   * @param maxRowNumber the maximum number of rows for this tablet
+   */
+  public NonAlignedTablet(String deviceId, List<MeasurementSchema> schemas, int maxRowNumber) {
+    this.deviceId = deviceId;
+    this.schemas = new ArrayList<>(schemas);
+    this.maxRowNumber = maxRowNumber;
+    measurementIndex = new HashMap<>();
+
+    int indexInSchema = 0;
+    for (MeasurementSchema schema : schemas) {
+      if (schema.getType() == TSDataType.VECTOR) {
+        for (String measurementId : schema.getSubMeasurementsList()) {
+          measurementIndex.put(measurementId, indexInSchema);
+        }
+      } else {
+        measurementIndex.put(schema.getMeasurementId(), indexInSchema);
+      }
+      indexInSchema++;
+    }
+
+    createColumns();
+
+    reset();
+  }
+
+  public void setDeviceId(String deviceId) {
+    this.deviceId = deviceId;
+  }
+
+  public void setSchemas(List<MeasurementSchema> schemas) {
+    this.schemas = schemas;
+  }
+
+  public void addValue(String measurementId, long time, Object value) {
+    if (value == null) {
+      return;
+    }
+    int indexOfSchema = measurementIndex.get(measurementId);
+    MeasurementSchema measurementSchema = schemas.get(indexOfSchema);
+    addValueOfDataType(
+        measurementSchema.getType(), rowSize[indexOfSchema]++, indexOfSchema, time, value);
+    maxRowSize = Math.max(maxRowSize, rowSize[indexOfSchema]);
+  }
+
+  private void addValueOfDataType(
+      TSDataType dataType, int rowIndex, int indexOfSchema, long time, Object value) {
+    timestamps[indexOfSchema][rowIndex] = time;
+    switch (dataType) {
+      case TEXT:
+        {
+          Binary[] sensor = (Binary[]) values[indexOfSchema];
+          sensor[rowIndex] = value != null ? (Binary) value : Binary.EMPTY_VALUE;
+          break;
+        }
+      case FLOAT:
+        {
+          float[] sensor = (float[]) values[indexOfSchema];
+          sensor[rowIndex] = value != null ? (float) value : Float.MIN_VALUE;
+          break;
+        }
+      case INT32:
+        {
+          int[] sensor = (int[]) values[indexOfSchema];
+          sensor[rowIndex] = value != null ? (int) value : Integer.MIN_VALUE;
+          break;
+        }
+      case INT64:
+        {
+          long[] sensor = (long[]) values[indexOfSchema];
+          sensor[rowIndex] = value != null ? (long) value : Long.MIN_VALUE;
+          break;
+        }
+      case DOUBLE:
+        {
+          double[] sensor = (double[]) values[indexOfSchema];
+          sensor[rowIndex] = value != null ? (double) value : Double.MIN_VALUE;
+          break;
+        }
+      case BOOLEAN:
+        {
+          boolean[] sensor = (boolean[]) values[indexOfSchema];
+          sensor[rowIndex] = value != null && (boolean) value;
+          break;
+        }
+      default:
+        throw new UnSupportedDataTypeException(String.format(NOT_SUPPORT_DATATYPE, dataType));
+    }
+  }
+
+  public List<MeasurementSchema> getSchemas() {
+    return schemas;
+  }
+
+  /** Return the maximum number of rows for this tablet */
+  public int getMaxRowNumber() {
+    return maxRowNumber;
+  }
+
+  /** Reset Tablet to the default state - set the rowSize to 0 and reset bitMaps */
+  public void reset() {
+    maxRowSize = 0;
+    if (rowSize == null) {
+      rowSize = new int[schemas.size()];
+    } else {
+      Arrays.fill(rowSize, 0);
+    }
+  }
+
+  private void createColumns() {
+    // create timestamp column
+    timestamps = new long[schemas.size()][maxRowNumber];
+
+    // calculate total value column size
+    int valueColumnsSize = schemas.size();
+
+    // value column
+    values = new Object[valueColumnsSize];
+    int columnIndex = 0;
+    for (MeasurementSchema schema : schemas) {
+      TSDataType dataType = schema.getType();
+      values[columnIndex] = createValueColumnOfDataType(dataType);
+      columnIndex++;
+    }
+  }
+
+  private Object createValueColumnOfDataType(TSDataType dataType) {
+
+    Object valueColumn;
+    switch (dataType) {
+      case INT32:
+        valueColumn = new int[maxRowNumber];
+        break;
+      case INT64:
+        valueColumn = new long[maxRowNumber];
+        break;
+      case FLOAT:
+        valueColumn = new float[maxRowNumber];
+        break;
+      case DOUBLE:
+        valueColumn = new double[maxRowNumber];
+        break;
+      case BOOLEAN:
+        valueColumn = new boolean[maxRowNumber];
+        break;
+      case TEXT:
+        valueColumn = new Binary[maxRowNumber];
+        break;
+      default:
+        throw new UnSupportedDataTypeException(String.format(NOT_SUPPORT_DATATYPE, dataType));
+    }
+    return valueColumn;
+  }
+
+  //    public int getTimeBytesSize() {
+  //        return rowSize * 8;
+  //    }
+  //
+  //    /**
+  //     * @return total bytes of values
+  //     */
+  //    public int getTotalValueOccupation() {
+  //        int valueOccupation = 0;
+  //        int columnIndex = 0;
+  //        for (MeasurementSchema schema : schemas) {
+  //            valueOccupation += calOccupationOfOneColumn(schema.getType(), columnIndex);
+  //            columnIndex++;
+  //        }
+  //        // add bitmap size if the tablet has bitMaps
+  //        if (bitMaps != null) {
+  //            for (BitMap bitMap : bitMaps) {
+  //                // marker byte
+  //                valueOccupation++;
+  //                if (bitMap != null && !bitMap.isAllUnmarked()) {
+  //                    valueOccupation += rowSize / Byte.SIZE + 1;
+  //                }
+  //            }
+  //        }
+  //        return valueOccupation;
+  //    }
+
+  //    private int calOccupationOfOneColumn(TSDataType dataType, int columnIndex) {
+  //        int valueOccupation = 0;
+  //        switch (dataType) {
+  //            case BOOLEAN:
+  //                valueOccupation += rowSize;
+  //                break;
+  //            case INT32:
+  //            case FLOAT:
+  //                valueOccupation += rowSize * 4;
+  //                break;
+  //            case INT64:
+  //            case DOUBLE:
+  //                valueOccupation += rowSize * 8;
+  //                break;
+  //            case TEXT:
+  //                valueOccupation += rowSize * 4;
+  //                Binary[] binaries = (Binary[]) values[columnIndex];
+  //                for (int rowIndex = 0; rowIndex < rowSize; rowIndex++) {
+  //                    valueOccupation += binaries[rowIndex].getLength();
+  //                }
+  //                break;
+  //            default:
+  //                throw new UnSupportedDataTypeException(String.format(NOT_SUPPORT_DATATYPE,
+  // dataType));
+  //        }
+  //        return valueOccupation;
+  //    }
+}