You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2021/11/08 08:32:44 UTC

[GitHub] [iotdb] JackieTien97 commented on a change in pull request #4331: [IOTDB-1626]TsFile API supports write and register on aligned timeseries

JackieTien97 commented on a change in pull request #4331:
URL: https://github.com/apache/iotdb/pull/4331#discussion_r744366713



##########
File path: example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
##########
@@ -85,33 +90,82 @@ public static void main(String[] args) throws IOException {
             Decoder valueDecoder =
                 Decoder.getDecoderByType(header.getEncodingType(), header.getDataType());
             int dataSize = header.getDataSize();
+            timeBatchIndex = 0;
+            if (header.getDataType() == TSDataType.VECTOR) {
+              timeBatch = new ArrayList<>();

Review comment:
       ```suggestion
                 timeBatch.clear();
   ```

##########
File path: server/src/test/java/org/apache/iotdb/db/writelog/recover/DeviceStringTest.java
##########
@@ -69,17 +69,19 @@ public void setup() throws IOException, WriteProcessException, MetadataException
 
     schema = new Schema();
     schema.registerTimeseries(
-        new Path(("root.sg.device99"), ("sensor4")),
-        new UnaryMeasurementSchema("sensor4", TSDataType.INT64, TSEncoding.PLAIN));
+        new Path("root.sg.device99"),
+        new UnaryMeasurementSchema("sensor4", TSDataType.INT64, TSEncoding.PLAIN),
+        false);

Review comment:
       It seems that there is no such three parameters `registerTimeseries` function in `Schema` class

##########
File path: tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/TimePageReader.java
##########
@@ -53,12 +54,15 @@ public TimePageReader(PageHeader pageHeader, ByteBuffer pageData, Decoder timeDe
   }
 
   public long[] nexTimeBatch() throws IOException {
-    long[] timeBatch = new long[(int) pageHeader.getStatistics().getCount()];
-    int index = 0;
+    List<Long> timeList = new ArrayList<>();
+    // long[] timeBatch = new long[(int) pageHeader.getStatistics().getCount()]; //
+    // Todo:bug.当TimePage有多页,statistics就会为null
+    // int index = 0;
     while (timeDecoder.hasNext(timeBuffer)) {
-      timeBatch[index++] = timeDecoder.readLong(timeBuffer);
+      timeList.add(timeDecoder.readLong(timeBuffer));
+      // timeBatch[index++] = timeDecoder.readLong(timeBuffer);
     }
-    return timeBatch;
+    return timeList.stream().mapToLong(t -> t.longValue()).toArray();

Review comment:
       Why you change the previous code. Is there anything wrong with the previous codes? If you want to deal with the empty page case, you can do the judgement at the beginning of the function and return an long array with 0 length. There is no need to use Long to box that.

##########
File path: tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
##########
@@ -328,7 +328,7 @@ public long estimateMaxSeriesMemSize() {
 
   @Override
   public long getSerializedChunkSize() {
-    if (pageBuffer.size() == 0) {
+    if (pageBuffer.size() == 0 || statistics.getCount() == 0) {

Review comment:
       In which case, `pageBuffer.size() != 0` and `statistics.getCount() == 0`?

##########
File path: tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
##########
@@ -153,6 +155,8 @@ public static ChunkHeader deserializeFrom(InputStream inputStream, byte chunkTyp
       throws IOException {
     // read measurementID
     String measurementID = ReadWriteIOUtils.readVarIntString(inputStream);
+    measurementID =
+        measurementID == null ? "" : measurementID; // measurementID in TimeChunk header is null

Review comment:
       `ReadWriteIOUtils.readVarIntString` will return empty string if the length is 0. There is no need to do this check.

##########
File path: example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
##########
@@ -85,33 +90,82 @@ public static void main(String[] args) throws IOException {
             Decoder valueDecoder =
                 Decoder.getDecoderByType(header.getEncodingType(), header.getDataType());
             int dataSize = header.getDataSize();
+            timeBatchIndex = 0;
+            if (header.getDataType() == TSDataType.VECTOR) {
+              timeBatch = new ArrayList<>();
+            }
             while (dataSize > 0) {
               valueDecoder.reset();
-              System.out.println("\t\t[Page]\n \t\tPage head position: " + reader.position());
+              System.out.println(
+                  "\t\t[Page"
+                      + timeBatchIndex
+                      + "]\n \t\tPage head position: "
+                      + reader.position());

Review comment:
       Common Chunk should not print these.

##########
File path: tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
##########
@@ -74,7 +74,9 @@ public ChunkHeader(
       int mask) {
     this(
         (byte)
-            ((numOfPages <= 1 ? MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER : MetaMarker.CHUNK_HEADER)
+            ((numOfPages <= 1
+                    ? MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER
+                    : MetaMarker.CHUNK_HEADER) // Todo:what if timeChunk
                 | (byte) mask),

Review comment:
       Delete the TODO, whether it is timeChunk or valueChunk is defined by the `mask`.

##########
File path: tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.write.chunk;
+
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+import org.apache.iotdb.tsfile.encoding.encoder.TSEncodingBuilder;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter {
+  private static final Logger LOG = LoggerFactory.getLogger(ChunkGroupWriterImpl.class);
+
+  private final String deviceId;
+
+  // measurementID -> ValueChunkWriter
+  private Map<String, ValueChunkWriter> valueChunkWriterMap = new LinkedHashMap<>();
+
+  private TimeChunkWriter timeChunkWriter;
+
+  private Set<String> writenMeasurementSet = new HashSet<>();
+
+  private long lastTime = -1;
+
+  public AlignedChunkGroupWriterImpl(String deviceId) {
+    this.deviceId = deviceId;
+    String timeMeasurementId = "";
+    CompressionType compressionType = TSFileDescriptor.getInstance().getConfig().getCompressor();
+    TSEncoding tsEncoding =
+        TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder());
+    TSDataType timeType = TSFileDescriptor.getInstance().getConfig().getTimeSeriesDataType();
+    Encoder encoder = TSEncodingBuilder.getEncodingBuilder(tsEncoding).getEncoder(timeType);
+    timeChunkWriter = new TimeChunkWriter(timeMeasurementId, compressionType, tsEncoding, encoder);
+  }
+
+  @Override
+  public void tryToAddSeriesWriter(IMeasurementSchema measurementSchema) {
+    if (!valueChunkWriterMap.containsKey(measurementSchema.getMeasurementId())) {
+      ValueChunkWriter valueChunkWriter =
+          new ValueChunkWriter(
+              measurementSchema.getMeasurementId(),
+              measurementSchema.getCompressor(),
+              measurementSchema.getType(),
+              measurementSchema.getEncodingType(),
+              measurementSchema.getValueEncoder());
+      valueChunkWriterMap.put(measurementSchema.getMeasurementId(), valueChunkWriter);
+      tryToAddEmptyPageAndData(valueChunkWriter);
+    }
+  }
+
+  @Override
+  public void tryToAddSeriesWriter(List<IMeasurementSchema> measurementSchemas) {
+    for (IMeasurementSchema schema : measurementSchemas) {
+      if (!valueChunkWriterMap.containsKey(schema.getMeasurementId())) {
+        ValueChunkWriter valueChunkWriter =
+            new ValueChunkWriter(
+                schema.getMeasurementId(),
+                schema.getCompressor(),
+                schema.getType(),
+                schema.getEncodingType(),
+                schema.getValueEncoder());
+        valueChunkWriterMap.put(schema.getMeasurementId(), valueChunkWriter);
+        tryToAddEmptyPageAndData(valueChunkWriter);
+      }
+    }
+  }
+
+  @Override
+  public int write(long time, List<DataPoint> data) throws WriteProcessException, IOException {
+    if (checkIsHistoryData("", time)) {
+      return 0;
+    }
+    for (DataPoint point : data) {
+      writenMeasurementSet.add(point.getMeasurementId());
+      boolean isNull = point.getValue() == null;
+      ValueChunkWriter valueChunkWriter = valueChunkWriterMap.get(point.getMeasurementId());
+      switch (point.getType()) {
+        case BOOLEAN:
+          valueChunkWriter.write(time, (boolean) point.getValue(), isNull);
+          break;
+        case INT32:
+          valueChunkWriter.write(time, (int) point.getValue(), isNull);
+          break;
+        case INT64:
+          valueChunkWriter.write(time, (long) point.getValue(), isNull);
+          break;
+        case FLOAT:
+          valueChunkWriter.write(time, (float) point.getValue(), isNull);
+          break;
+        case DOUBLE:
+          valueChunkWriter.write(time, (double) point.getValue(), isNull);
+          break;
+        case TEXT:
+          valueChunkWriter.write(time, (Binary) point.getValue(), isNull);
+          break;
+        default:
+          throw new UnSupportedDataTypeException(
+              String.format("Data type %s is not supported.", point.getType()));
+      }
+    }
+    writeEmptyDataInOneRow(time);
+    timeChunkWriter.write(time);
+    lastTime = time;
+    if (checkPageSizeAndMayOpenANewPage()) {
+      writePageToPageBuffer();
+    }
+    return 1;
+  }
+
+  @Override
+  public int write(Tablet tablet) throws WriteProcessException, IOException {
+    int pointCount = 0;
+    List<IMeasurementSchema> measurementSchemas = tablet.getSchemas();
+    for (int row = 0; row < tablet.rowSize; row++) {
+      long time = tablet.timestamps[row];
+      if (checkIsHistoryData("", time)) {
+        continue;
+      }
+      for (int columnIndex = 0; columnIndex < measurementSchemas.size(); columnIndex++) {
+        writenMeasurementSet.add(measurementSchemas.get(columnIndex).getMeasurementId());
+        boolean isNull = false;
+        // check isNull by bitMap in tablet
+        if (tablet.bitMaps != null
+            && tablet.bitMaps[columnIndex] != null
+            && tablet.bitMaps[columnIndex].isMarked(row)) {

Review comment:
       ```suggestion
               && !tablet.bitMaps[columnIndex].isMarked(row)) {
   ```

##########
File path: tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
##########
@@ -134,7 +139,8 @@ protected TsFileWriter(TsFileIOWriter fileWriter, Schema schema, TSFileConfig co
     this.fileWriter = fileWriter;
 
     if (fileWriter instanceof RestorableTsFileIOWriter) {
-      this.schema = new Schema(((RestorableTsFileIOWriter) fileWriter).getKnownSchema());
+      this.schema = null;
+      // this.schema = new Schema(((RestorableTsFileIOWriter) fileWriter).getKnownSchema()); #Todo

Review comment:
       Why delete this line? The line of code is used to recover schema from an existing tsfile.

##########
File path: tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
##########
@@ -151,92 +157,205 @@ protected TsFileWriter(TsFileIOWriter fileWriter, Schema schema, TSFileConfig co
   }
 
   public void registerSchemaTemplate(
-      String templateName, Map<String, IMeasurementSchema> template) {
-    schema.registerSchemaTemplate(templateName, template);
+      String templateName, Map<String, UnaryMeasurementSchema> template, boolean isAligned) {
+    schema.registerSchemaTemplate(templateName, new MeasurementGroup(isAligned, template));
   }
 
-  public void registerDevice(String deviceId, String templateName) {
+  public void registerDevice(String deviceId, String templateName) throws WriteProcessException {
+    if (!schema.getSchemaTemplates().containsKey(templateName)) {
+      throw new WriteProcessException("given template is not existed! " + templateName);
+    }
+    if (schema.getRegisteredTimeseriesMap().containsKey(new Path(deviceId))) {
+      throw new WriteProcessException(
+          "this device "
+              + deviceId
+              + " has been registered, you can only use registerDevice method to register empty device.");
+    }
     schema.registerDevice(deviceId, templateName);
   }
 
-  public void registerTimeseries(Path path, IMeasurementSchema measurementSchema)
-      throws WriteProcessException {
-    if (schema.containsTimeseries(path)) {
-      throw new WriteProcessException("given timeseries has exists! " + path);
+  public void registerTimeseries(Path devicePath, List<UnaryMeasurementSchema> measurementSchemas) {
+    for (UnaryMeasurementSchema schema : measurementSchemas) {
+      try {
+        registerTimeseries(devicePath, schema);
+      } catch (WriteProcessException e) {
+        LOG.error(e.getMessage());
+      }
     }
-    schema.registerTimeseries(path, measurementSchema);
   }
 
-  /**
-   * Confirm whether the record is legal. If legal, add it into this RecordWriter.
-   *
-   * @param record - a record responding a line
-   * @return - whether the record has been added into RecordWriter legally
-   * @throws WriteProcessException exception
-   */
-  private boolean checkIsTimeSeriesExist(TSRecord record) throws WriteProcessException {
-    IChunkGroupWriter groupWriter;
-    if (!groupWriters.containsKey(record.deviceId)) {
-      groupWriter = new ChunkGroupWriterImpl(record.deviceId);
-      groupWriters.put(record.deviceId, groupWriter);
+  public void registerTimeseries(Path devicePath, UnaryMeasurementSchema measurementSchema)
+      throws WriteProcessException {
+    MeasurementGroup measurementGroup;
+    if (schema.containsTimeseries(devicePath)) {

Review comment:
       rename the function to `containsDevice`

##########
File path: tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
##########
@@ -865,7 +865,7 @@ public ByteBuffer readPage(PageHeader header, CompressionType type) throws IOExc
     ByteBuffer buffer = readData(-1, header.getCompressedSize());
     IUnCompressor unCompressor = IUnCompressor.getUnCompressor(type);
     ByteBuffer uncompressedBuffer = ByteBuffer.allocate(header.getUncompressedSize());
-    if (type == CompressionType.UNCOMPRESSED) {
+    if (type == CompressionType.UNCOMPRESSED || header.getUncompressedSize() == 0) {

Review comment:
       You should do the judgement at the beginning of this function, there is no need to construct a `IUnCompressor` and `uncompressedBuffer` for empty page.

##########
File path: example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
##########
@@ -85,33 +90,82 @@ public static void main(String[] args) throws IOException {
             Decoder valueDecoder =
                 Decoder.getDecoderByType(header.getEncodingType(), header.getDataType());
             int dataSize = header.getDataSize();
+            timeBatchIndex = 0;
+            if (header.getDataType() == TSDataType.VECTOR) {
+              timeBatch = new ArrayList<>();
+            }
             while (dataSize > 0) {
               valueDecoder.reset();
-              System.out.println("\t\t[Page]\n \t\tPage head position: " + reader.position());
+              System.out.println(
+                  "\t\t[Page"
+                      + timeBatchIndex
+                      + "]\n \t\tPage head position: "
+                      + reader.position());
               PageHeader pageHeader =
                   reader.readPageHeader(
-                      header.getDataType(), header.getChunkType() == MetaMarker.CHUNK_HEADER);
+                      header.getDataType(),
+                      header.getChunkType() == (byte) MetaMarker.CHUNK_HEADER
+                          || header.getChunkType()
+                              == (byte) (MetaMarker.CHUNK_HEADER | TsFileConstant.TIME_COLUMN_MASK)
+                          || header.getChunkType()
+                              == (byte)
+                                  (MetaMarker.CHUNK_HEADER | TsFileConstant.VALUE_COLUMN_MASK));

Review comment:
       ```suggestion
   (header.getChunkType() & 0x3F) == MetaMarker.CHUNK_HEADER
   ```

##########
File path: tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
##########
@@ -151,92 +157,205 @@ protected TsFileWriter(TsFileIOWriter fileWriter, Schema schema, TSFileConfig co
   }
 
   public void registerSchemaTemplate(
-      String templateName, Map<String, IMeasurementSchema> template) {
-    schema.registerSchemaTemplate(templateName, template);
+      String templateName, Map<String, UnaryMeasurementSchema> template, boolean isAligned) {
+    schema.registerSchemaTemplate(templateName, new MeasurementGroup(isAligned, template));
   }
 
-  public void registerDevice(String deviceId, String templateName) {
+  public void registerDevice(String deviceId, String templateName) throws WriteProcessException {
+    if (!schema.getSchemaTemplates().containsKey(templateName)) {
+      throw new WriteProcessException("given template is not existed! " + templateName);
+    }
+    if (schema.getRegisteredTimeseriesMap().containsKey(new Path(deviceId))) {
+      throw new WriteProcessException(
+          "this device "
+              + deviceId
+              + " has been registered, you can only use registerDevice method to register empty device.");
+    }
     schema.registerDevice(deviceId, templateName);
   }
 
-  public void registerTimeseries(Path path, IMeasurementSchema measurementSchema)
-      throws WriteProcessException {
-    if (schema.containsTimeseries(path)) {
-      throw new WriteProcessException("given timeseries has exists! " + path);
+  public void registerTimeseries(Path devicePath, List<UnaryMeasurementSchema> measurementSchemas) {
+    for (UnaryMeasurementSchema schema : measurementSchemas) {
+      try {
+        registerTimeseries(devicePath, schema);
+      } catch (WriteProcessException e) {
+        LOG.error(e.getMessage());
+      }
     }
-    schema.registerTimeseries(path, measurementSchema);
   }
 
-  /**
-   * Confirm whether the record is legal. If legal, add it into this RecordWriter.
-   *
-   * @param record - a record responding a line
-   * @return - whether the record has been added into RecordWriter legally
-   * @throws WriteProcessException exception
-   */
-  private boolean checkIsTimeSeriesExist(TSRecord record) throws WriteProcessException {
-    IChunkGroupWriter groupWriter;
-    if (!groupWriters.containsKey(record.deviceId)) {
-      groupWriter = new ChunkGroupWriterImpl(record.deviceId);
-      groupWriters.put(record.deviceId, groupWriter);
+  public void registerTimeseries(Path devicePath, UnaryMeasurementSchema measurementSchema)
+      throws WriteProcessException {
+    MeasurementGroup measurementGroup;
+    if (schema.containsTimeseries(devicePath)) {
+      measurementGroup = schema.getSeriesSchema(devicePath);
+      if (measurementGroup.isAligned()) {
+        throw new WriteProcessException(
+            "given device " + devicePath + " has been registered for aligned timeseries.");
+      } else if (measurementGroup
+          .getMeasurementSchemaMap()
+          .containsKey(measurementSchema.getMeasurementId())) {
+        throw new WriteProcessException(
+            "given nonAligned timeseries "
+                + (devicePath + "." + measurementSchema.getMeasurementId())
+                + " has been registered.");
+      }
     } else {
-      groupWriter = groupWriters.get(record.deviceId);
+      measurementGroup = new MeasurementGroup(false);
     }
+    measurementGroup
+        .getMeasurementSchemaMap()
+        .put(measurementSchema.getMeasurementId(), measurementSchema);
+    schema.registerTimeseries(devicePath, measurementGroup);
+  }
 
-    // add all SeriesWriter of measurements in this TSRecord to this ChunkGroupWriter
-    for (DataPoint dp : record.dataPointList) {
-      String measurementId = dp.getMeasurementId();
-      Path path = new Path(record.deviceId, measurementId);
-      if (schema.containsTimeseries(path)) {
-        groupWriter.tryToAddSeriesWriter(schema.getSeriesSchema(path), pageSize);
-      } else if (schema.getSchemaTemplates() != null && schema.getSchemaTemplates().size() == 1) {
-        // use the default template without needing to register device
-        Map<String, IMeasurementSchema> template =
-            schema.getSchemaTemplates().entrySet().iterator().next().getValue();
-        if (template.containsKey(path.getMeasurement())) {
-          groupWriter.tryToAddSeriesWriter(template.get(path.getMeasurement()), pageSize);
-        }
+  public void registerAlignedTimeseries(
+      Path devicePath, List<UnaryMeasurementSchema> measurementSchemas)
+      throws WriteProcessException {
+    if (schema.containsTimeseries(devicePath)) {
+      if (schema.getSeriesSchema(devicePath).isAligned()) {
+        throw new WriteProcessException(
+            "given device "
+                + devicePath
+                + " has been registered for aligned timeseries and should not be expanded.");
       } else {
-        throw new NoMeasurementException("input path is invalid: " + path);
+        throw new WriteProcessException(
+            "given device " + devicePath + " has been registered for nonAligned timeseries.");
       }
     }
+    MeasurementGroup measurementGroup = new MeasurementGroup(true);
+    measurementSchemas.forEach(
+        measurementSchema -> {
+          measurementGroup
+              .getMeasurementSchemaMap()
+              .put(measurementSchema.getMeasurementId(), measurementSchema);
+        });
+    schema.registerTimeseries(devicePath, measurementGroup);

Review comment:
       rename it to `registerMeasurementGroup`

##########
File path: tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java
##########
@@ -180,14 +203,41 @@ public long estimateMaxSeriesMemSize() {
   }
 
   public long getCurrentChunkSize() {
-    if (pageBuffer.size() == 0) {
+    if (pageBuffer.size() == 0 || statistics.getCount() == 0) {

Review comment:
       Even if `statistics.getCount() == 0`, the chunk size is not 0.

##########
File path: tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkGroupWriterImpl.java
##########
@@ -45,150 +41,98 @@
 
   private final String deviceId;
 
-  /** Map(measurementID, ChunkWriterImpl). */
-  private Map<String, IChunkWriter> chunkWriters = new HashMap<>();
+  /** Map(measurementID, ChunkWriterImpl). Aligned measurementId is empty. */
+  private Map<String, ChunkWriterImpl> chunkWriters = new LinkedHashMap<>();
+
+  private Map<String, Long> lastTimeMap = new HashMap<>();
 
   public ChunkGroupWriterImpl(String deviceId) {
     this.deviceId = deviceId;
   }
 
   @Override
-  public void tryToAddSeriesWriter(IMeasurementSchema schema, int pageSizeThreshold) {
+  public void tryToAddSeriesWriter(IMeasurementSchema schema) {
     if (!chunkWriters.containsKey(schema.getMeasurementId())) {
-      IChunkWriter seriesWriter = null;
-      // initialize depend on schema type
-      if (schema instanceof VectorMeasurementSchema) {
-        seriesWriter = new AlignedChunkWriterImpl(schema);
-      } else if (schema instanceof UnaryMeasurementSchema) {
-        seriesWriter = new ChunkWriterImpl(schema);
-      }
-      this.chunkWriters.put(schema.getMeasurementId(), seriesWriter);
+      this.chunkWriters.put(schema.getMeasurementId(), new ChunkWriterImpl(schema));
     }
   }
 
   @Override
-  public void write(long time, List<DataPoint> data) throws WriteProcessException, IOException {
-    for (DataPoint point : data) {
-      String measurementId = point.getMeasurementId();
-      if (!chunkWriters.containsKey(measurementId)) {
-        throw new NoMeasurementException(
-            "time " + time + ", measurement id " + measurementId + " not found!");
+  public void tryToAddSeriesWriter(List<IMeasurementSchema> schemas) {
+    for (IMeasurementSchema schema : schemas) {
+      if (!chunkWriters.containsKey(schema.getMeasurementId())) {
+        this.chunkWriters.put(schema.getMeasurementId(), new ChunkWriterImpl(schema));
       }
-      point.writeTo(time, (ChunkWriterImpl) chunkWriters.get(measurementId));
     }
   }
 
   @Override
-  public void write(Tablet tablet) throws WriteProcessException {
-    List<IMeasurementSchema> timeseries = tablet.getSchemas();
-    for (int i = 0; i < timeseries.size(); i++) {
-      String measurementId = timeseries.get(i).getMeasurementId();
-      TSDataType dataType = timeseries.get(i).getType();
-      if (!chunkWriters.containsKey(measurementId)) {
-        throw new NoMeasurementException("measurement id" + measurementId + " not found!");
+  public int write(long time, List<DataPoint> data) throws IOException {
+    int pointCount = 0;
+    for (DataPoint point : data) {
+      if (checkIsHistoryData(point.getMeasurementId(), time)) {
+        continue;
       }
-      if (dataType.equals(TSDataType.VECTOR)) {
-        writeVectorDataType(tablet, measurementId, i);
-      } else {
-        writeByDataType(tablet, measurementId, dataType, i);
+      if (pointCount == 0) {
+        pointCount++;
       }
+      point.writeTo(
+          time, chunkWriters.get(point.getMeasurementId())); // write time and value to page
+      lastTimeMap.put(point.getMeasurementId(), time);
     }
+    return pointCount;
   }
 
-  /**
-   * write if data type is VECTOR this method write next n column values (belong to one vector), and
-   * return n to increase index
-   *
-   * @param tablet table
-   * @param measurement vector measurement
-   * @param index measurement start index
-   */
-  private void writeVectorDataType(Tablet tablet, String measurement, int index) {
-    // reference: MemTableFlushTask.java
-    int batchSize = tablet.rowSize;
-    VectorMeasurementSchema vectorMeasurementSchema =
-        (VectorMeasurementSchema) tablet.getSchemas().get(index);
-    List<TSDataType> valueDataTypes = vectorMeasurementSchema.getSubMeasurementsTSDataTypeList();
-    AlignedChunkWriterImpl vectorChunkWriter =
-        (AlignedChunkWriterImpl) chunkWriters.get(measurement);
-    for (int row = 0; row < batchSize; row++) {
+  @Override
+  public int write(Tablet tablet) {
+    int pointCount = 0;
+    List<IMeasurementSchema> timeseries = tablet.getSchemas();
+    for (int row = 0; row < tablet.rowSize; row++) {
       long time = tablet.timestamps[row];
-      for (int columnIndex = 0; columnIndex < valueDataTypes.size(); columnIndex++) {
+      boolean hasOneColumnWritten = false;
+      for (int column = 0; column < timeseries.size(); column++) {
+        String measurementId = timeseries.get(column).getMeasurementId();
+        if (checkIsHistoryData(measurementId, time)) {
+          continue;
+        }
+        hasOneColumnWritten = true;
         boolean isNull = false;
         // check isNull by bitMap in tablet
         if (tablet.bitMaps != null
-            && tablet.bitMaps[columnIndex] != null
-            && tablet.bitMaps[columnIndex].isMarked(row)) {
+            && tablet.bitMaps[column] != null
+            && tablet.bitMaps[column].isMarked(row)) {
           isNull = true;
         }

Review comment:
       `isNUll` is useless

##########
File path: tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
##########
@@ -457,4 +457,8 @@ public boolean isMerging() {
   public void setLastPoint(boolean isLastPoint) {
     this.isLastPoint = isLastPoint;
   }
+
+  public void setValueIndex(int valueIndex) {
+    throw new IllegalStateException("write time method is not implemented in common chunk writer");
+  }

Review comment:
       This method is useless any more.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org