You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2021/11/07 15:10:10 UTC

[GitHub] [iotdb] choubenson opened a new pull request #4331: [IOTDB-1626]TsFile API supports write and register on aligned timeseries

choubenson opened a new pull request #4331:
URL: https://github.com/apache/iotdb/pull/4331


   Supporting write/register vector in TsFile API. 
   Give four user examples of using TsFile write API.
   Upgrade the sequenceRead Tool which can read aligned or nonAligned timeseries in TsFile sequentially.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] qiaojialin commented on pull request #4331: [IOTDB-1626]TsFile API supports write and register on aligned timeseries

Posted by GitBox <gi...@apache.org>.
qiaojialin commented on pull request #4331:
URL: https://github.com/apache/iotdb/pull/4331#issuecomment-962640124


   add UserGuide for TsFile API


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] JackieTien97 commented on a change in pull request #4331: [IOTDB-1626]TsFile API supports write and register on aligned timeseries

Posted by GitBox <gi...@apache.org>.
JackieTien97 commented on a change in pull request #4331:
URL: https://github.com/apache/iotdb/pull/4331#discussion_r744366713



##########
File path: example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
##########
@@ -85,33 +90,82 @@ public static void main(String[] args) throws IOException {
             Decoder valueDecoder =
                 Decoder.getDecoderByType(header.getEncodingType(), header.getDataType());
             int dataSize = header.getDataSize();
+            timeBatchIndex = 0;
+            if (header.getDataType() == TSDataType.VECTOR) {
+              timeBatch = new ArrayList<>();

Review comment:
       ```suggestion
                 timeBatch.clear();
   ```

##########
File path: server/src/test/java/org/apache/iotdb/db/writelog/recover/DeviceStringTest.java
##########
@@ -69,17 +69,19 @@ public void setup() throws IOException, WriteProcessException, MetadataException
 
     schema = new Schema();
     schema.registerTimeseries(
-        new Path(("root.sg.device99"), ("sensor4")),
-        new UnaryMeasurementSchema("sensor4", TSDataType.INT64, TSEncoding.PLAIN));
+        new Path("root.sg.device99"),
+        new UnaryMeasurementSchema("sensor4", TSDataType.INT64, TSEncoding.PLAIN),
+        false);

Review comment:
       It seems that there is no such three parameters `registerTimeseries` function in `Schema` class

##########
File path: tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/TimePageReader.java
##########
@@ -53,12 +54,15 @@ public TimePageReader(PageHeader pageHeader, ByteBuffer pageData, Decoder timeDe
   }
 
   public long[] nexTimeBatch() throws IOException {
-    long[] timeBatch = new long[(int) pageHeader.getStatistics().getCount()];
-    int index = 0;
+    List<Long> timeList = new ArrayList<>();
+    // long[] timeBatch = new long[(int) pageHeader.getStatistics().getCount()]; //
+    // Todo:bug.当TimePage有多页,statistics就会为null
+    // int index = 0;
     while (timeDecoder.hasNext(timeBuffer)) {
-      timeBatch[index++] = timeDecoder.readLong(timeBuffer);
+      timeList.add(timeDecoder.readLong(timeBuffer));
+      // timeBatch[index++] = timeDecoder.readLong(timeBuffer);
     }
-    return timeBatch;
+    return timeList.stream().mapToLong(t -> t.longValue()).toArray();

Review comment:
       Why you change the previous code. Is there anything wrong with the previous codes? If you want to deal with the empty page case, you can do the judgement at the beginning of the function and return an long array with 0 length. There is no need to use Long to box that.

##########
File path: tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
##########
@@ -328,7 +328,7 @@ public long estimateMaxSeriesMemSize() {
 
   @Override
   public long getSerializedChunkSize() {
-    if (pageBuffer.size() == 0) {
+    if (pageBuffer.size() == 0 || statistics.getCount() == 0) {

Review comment:
       In which case, `pageBuffer.size() != 0` and `statistics.getCount() == 0`?

##########
File path: tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
##########
@@ -153,6 +155,8 @@ public static ChunkHeader deserializeFrom(InputStream inputStream, byte chunkTyp
       throws IOException {
     // read measurementID
     String measurementID = ReadWriteIOUtils.readVarIntString(inputStream);
+    measurementID =
+        measurementID == null ? "" : measurementID; // measurementID in TimeChunk header is null

Review comment:
       `ReadWriteIOUtils.readVarIntString` will return empty string if the length is 0. There is no need to do this check.

##########
File path: example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
##########
@@ -85,33 +90,82 @@ public static void main(String[] args) throws IOException {
             Decoder valueDecoder =
                 Decoder.getDecoderByType(header.getEncodingType(), header.getDataType());
             int dataSize = header.getDataSize();
+            timeBatchIndex = 0;
+            if (header.getDataType() == TSDataType.VECTOR) {
+              timeBatch = new ArrayList<>();
+            }
             while (dataSize > 0) {
               valueDecoder.reset();
-              System.out.println("\t\t[Page]\n \t\tPage head position: " + reader.position());
+              System.out.println(
+                  "\t\t[Page"
+                      + timeBatchIndex
+                      + "]\n \t\tPage head position: "
+                      + reader.position());

Review comment:
       Common Chunk should not print these.

##########
File path: tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
##########
@@ -74,7 +74,9 @@ public ChunkHeader(
       int mask) {
     this(
         (byte)
-            ((numOfPages <= 1 ? MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER : MetaMarker.CHUNK_HEADER)
+            ((numOfPages <= 1
+                    ? MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER
+                    : MetaMarker.CHUNK_HEADER) // Todo:what if timeChunk
                 | (byte) mask),

Review comment:
       Delete the TODO, whether it is timeChunk or valueChunk is defined by the `mask`.

##########
File path: tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.write.chunk;
+
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+import org.apache.iotdb.tsfile.encoding.encoder.TSEncodingBuilder;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter {
+  private static final Logger LOG = LoggerFactory.getLogger(ChunkGroupWriterImpl.class);
+
+  private final String deviceId;
+
+  // measurementID -> ValueChunkWriter
+  private Map<String, ValueChunkWriter> valueChunkWriterMap = new LinkedHashMap<>();
+
+  private TimeChunkWriter timeChunkWriter;
+
+  private Set<String> writenMeasurementSet = new HashSet<>();
+
+  private long lastTime = -1;
+
+  public AlignedChunkGroupWriterImpl(String deviceId) {
+    this.deviceId = deviceId;
+    String timeMeasurementId = "";
+    CompressionType compressionType = TSFileDescriptor.getInstance().getConfig().getCompressor();
+    TSEncoding tsEncoding =
+        TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder());
+    TSDataType timeType = TSFileDescriptor.getInstance().getConfig().getTimeSeriesDataType();
+    Encoder encoder = TSEncodingBuilder.getEncodingBuilder(tsEncoding).getEncoder(timeType);
+    timeChunkWriter = new TimeChunkWriter(timeMeasurementId, compressionType, tsEncoding, encoder);
+  }
+
+  @Override
+  public void tryToAddSeriesWriter(IMeasurementSchema measurementSchema) {
+    if (!valueChunkWriterMap.containsKey(measurementSchema.getMeasurementId())) {
+      ValueChunkWriter valueChunkWriter =
+          new ValueChunkWriter(
+              measurementSchema.getMeasurementId(),
+              measurementSchema.getCompressor(),
+              measurementSchema.getType(),
+              measurementSchema.getEncodingType(),
+              measurementSchema.getValueEncoder());
+      valueChunkWriterMap.put(measurementSchema.getMeasurementId(), valueChunkWriter);
+      tryToAddEmptyPageAndData(valueChunkWriter);
+    }
+  }
+
+  @Override
+  public void tryToAddSeriesWriter(List<IMeasurementSchema> measurementSchemas) {
+    for (IMeasurementSchema schema : measurementSchemas) {
+      if (!valueChunkWriterMap.containsKey(schema.getMeasurementId())) {
+        ValueChunkWriter valueChunkWriter =
+            new ValueChunkWriter(
+                schema.getMeasurementId(),
+                schema.getCompressor(),
+                schema.getType(),
+                schema.getEncodingType(),
+                schema.getValueEncoder());
+        valueChunkWriterMap.put(schema.getMeasurementId(), valueChunkWriter);
+        tryToAddEmptyPageAndData(valueChunkWriter);
+      }
+    }
+  }
+
+  @Override
+  public int write(long time, List<DataPoint> data) throws WriteProcessException, IOException {
+    if (checkIsHistoryData("", time)) {
+      return 0;
+    }
+    for (DataPoint point : data) {
+      writenMeasurementSet.add(point.getMeasurementId());
+      boolean isNull = point.getValue() == null;
+      ValueChunkWriter valueChunkWriter = valueChunkWriterMap.get(point.getMeasurementId());
+      switch (point.getType()) {
+        case BOOLEAN:
+          valueChunkWriter.write(time, (boolean) point.getValue(), isNull);
+          break;
+        case INT32:
+          valueChunkWriter.write(time, (int) point.getValue(), isNull);
+          break;
+        case INT64:
+          valueChunkWriter.write(time, (long) point.getValue(), isNull);
+          break;
+        case FLOAT:
+          valueChunkWriter.write(time, (float) point.getValue(), isNull);
+          break;
+        case DOUBLE:
+          valueChunkWriter.write(time, (double) point.getValue(), isNull);
+          break;
+        case TEXT:
+          valueChunkWriter.write(time, (Binary) point.getValue(), isNull);
+          break;
+        default:
+          throw new UnSupportedDataTypeException(
+              String.format("Data type %s is not supported.", point.getType()));
+      }
+    }
+    writeEmptyDataInOneRow(time);
+    timeChunkWriter.write(time);
+    lastTime = time;
+    if (checkPageSizeAndMayOpenANewPage()) {
+      writePageToPageBuffer();
+    }
+    return 1;
+  }
+
+  @Override
+  public int write(Tablet tablet) throws WriteProcessException, IOException {
+    int pointCount = 0;
+    List<IMeasurementSchema> measurementSchemas = tablet.getSchemas();
+    for (int row = 0; row < tablet.rowSize; row++) {
+      long time = tablet.timestamps[row];
+      if (checkIsHistoryData("", time)) {
+        continue;
+      }
+      for (int columnIndex = 0; columnIndex < measurementSchemas.size(); columnIndex++) {
+        writenMeasurementSet.add(measurementSchemas.get(columnIndex).getMeasurementId());
+        boolean isNull = false;
+        // check isNull by bitMap in tablet
+        if (tablet.bitMaps != null
+            && tablet.bitMaps[columnIndex] != null
+            && tablet.bitMaps[columnIndex].isMarked(row)) {

Review comment:
       ```suggestion
               && !tablet.bitMaps[columnIndex].isMarked(row)) {
   ```

##########
File path: tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
##########
@@ -134,7 +139,8 @@ protected TsFileWriter(TsFileIOWriter fileWriter, Schema schema, TSFileConfig co
     this.fileWriter = fileWriter;
 
     if (fileWriter instanceof RestorableTsFileIOWriter) {
-      this.schema = new Schema(((RestorableTsFileIOWriter) fileWriter).getKnownSchema());
+      this.schema = null;
+      // this.schema = new Schema(((RestorableTsFileIOWriter) fileWriter).getKnownSchema()); #Todo

Review comment:
       Why delete this line? The line of code is used to recover schema from an existing tsfile.

##########
File path: tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
##########
@@ -151,92 +157,205 @@ protected TsFileWriter(TsFileIOWriter fileWriter, Schema schema, TSFileConfig co
   }
 
   public void registerSchemaTemplate(
-      String templateName, Map<String, IMeasurementSchema> template) {
-    schema.registerSchemaTemplate(templateName, template);
+      String templateName, Map<String, UnaryMeasurementSchema> template, boolean isAligned) {
+    schema.registerSchemaTemplate(templateName, new MeasurementGroup(isAligned, template));
   }
 
-  public void registerDevice(String deviceId, String templateName) {
+  public void registerDevice(String deviceId, String templateName) throws WriteProcessException {
+    if (!schema.getSchemaTemplates().containsKey(templateName)) {
+      throw new WriteProcessException("given template is not existed! " + templateName);
+    }
+    if (schema.getRegisteredTimeseriesMap().containsKey(new Path(deviceId))) {
+      throw new WriteProcessException(
+          "this device "
+              + deviceId
+              + " has been registered, you can only use registerDevice method to register empty device.");
+    }
     schema.registerDevice(deviceId, templateName);
   }
 
-  public void registerTimeseries(Path path, IMeasurementSchema measurementSchema)
-      throws WriteProcessException {
-    if (schema.containsTimeseries(path)) {
-      throw new WriteProcessException("given timeseries has exists! " + path);
+  public void registerTimeseries(Path devicePath, List<UnaryMeasurementSchema> measurementSchemas) {
+    for (UnaryMeasurementSchema schema : measurementSchemas) {
+      try {
+        registerTimeseries(devicePath, schema);
+      } catch (WriteProcessException e) {
+        LOG.error(e.getMessage());
+      }
     }
-    schema.registerTimeseries(path, measurementSchema);
   }
 
-  /**
-   * Confirm whether the record is legal. If legal, add it into this RecordWriter.
-   *
-   * @param record - a record responding a line
-   * @return - whether the record has been added into RecordWriter legally
-   * @throws WriteProcessException exception
-   */
-  private boolean checkIsTimeSeriesExist(TSRecord record) throws WriteProcessException {
-    IChunkGroupWriter groupWriter;
-    if (!groupWriters.containsKey(record.deviceId)) {
-      groupWriter = new ChunkGroupWriterImpl(record.deviceId);
-      groupWriters.put(record.deviceId, groupWriter);
+  public void registerTimeseries(Path devicePath, UnaryMeasurementSchema measurementSchema)
+      throws WriteProcessException {
+    MeasurementGroup measurementGroup;
+    if (schema.containsTimeseries(devicePath)) {

Review comment:
       rename the function to `containsDevice`

##########
File path: tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
##########
@@ -865,7 +865,7 @@ public ByteBuffer readPage(PageHeader header, CompressionType type) throws IOExc
     ByteBuffer buffer = readData(-1, header.getCompressedSize());
     IUnCompressor unCompressor = IUnCompressor.getUnCompressor(type);
     ByteBuffer uncompressedBuffer = ByteBuffer.allocate(header.getUncompressedSize());
-    if (type == CompressionType.UNCOMPRESSED) {
+    if (type == CompressionType.UNCOMPRESSED || header.getUncompressedSize() == 0) {

Review comment:
       You should do the judgement at the beginning of this function, there is no need to construct a `IUnCompressor` and `uncompressedBuffer` for empty page.

##########
File path: example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
##########
@@ -85,33 +90,82 @@ public static void main(String[] args) throws IOException {
             Decoder valueDecoder =
                 Decoder.getDecoderByType(header.getEncodingType(), header.getDataType());
             int dataSize = header.getDataSize();
+            timeBatchIndex = 0;
+            if (header.getDataType() == TSDataType.VECTOR) {
+              timeBatch = new ArrayList<>();
+            }
             while (dataSize > 0) {
               valueDecoder.reset();
-              System.out.println("\t\t[Page]\n \t\tPage head position: " + reader.position());
+              System.out.println(
+                  "\t\t[Page"
+                      + timeBatchIndex
+                      + "]\n \t\tPage head position: "
+                      + reader.position());
               PageHeader pageHeader =
                   reader.readPageHeader(
-                      header.getDataType(), header.getChunkType() == MetaMarker.CHUNK_HEADER);
+                      header.getDataType(),
+                      header.getChunkType() == (byte) MetaMarker.CHUNK_HEADER
+                          || header.getChunkType()
+                              == (byte) (MetaMarker.CHUNK_HEADER | TsFileConstant.TIME_COLUMN_MASK)
+                          || header.getChunkType()
+                              == (byte)
+                                  (MetaMarker.CHUNK_HEADER | TsFileConstant.VALUE_COLUMN_MASK));

Review comment:
       ```suggestion
   (header.getChunkType() & 0x3F) == MetaMarker.CHUNK_HEADER
   ```

##########
File path: tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
##########
@@ -151,92 +157,205 @@ protected TsFileWriter(TsFileIOWriter fileWriter, Schema schema, TSFileConfig co
   }
 
   public void registerSchemaTemplate(
-      String templateName, Map<String, IMeasurementSchema> template) {
-    schema.registerSchemaTemplate(templateName, template);
+      String templateName, Map<String, UnaryMeasurementSchema> template, boolean isAligned) {
+    schema.registerSchemaTemplate(templateName, new MeasurementGroup(isAligned, template));
   }
 
-  public void registerDevice(String deviceId, String templateName) {
+  public void registerDevice(String deviceId, String templateName) throws WriteProcessException {
+    if (!schema.getSchemaTemplates().containsKey(templateName)) {
+      throw new WriteProcessException("given template is not existed! " + templateName);
+    }
+    if (schema.getRegisteredTimeseriesMap().containsKey(new Path(deviceId))) {
+      throw new WriteProcessException(
+          "this device "
+              + deviceId
+              + " has been registered, you can only use registerDevice method to register empty device.");
+    }
     schema.registerDevice(deviceId, templateName);
   }
 
-  public void registerTimeseries(Path path, IMeasurementSchema measurementSchema)
-      throws WriteProcessException {
-    if (schema.containsTimeseries(path)) {
-      throw new WriteProcessException("given timeseries has exists! " + path);
+  public void registerTimeseries(Path devicePath, List<UnaryMeasurementSchema> measurementSchemas) {
+    for (UnaryMeasurementSchema schema : measurementSchemas) {
+      try {
+        registerTimeseries(devicePath, schema);
+      } catch (WriteProcessException e) {
+        LOG.error(e.getMessage());
+      }
     }
-    schema.registerTimeseries(path, measurementSchema);
   }
 
-  /**
-   * Confirm whether the record is legal. If legal, add it into this RecordWriter.
-   *
-   * @param record - a record responding a line
-   * @return - whether the record has been added into RecordWriter legally
-   * @throws WriteProcessException exception
-   */
-  private boolean checkIsTimeSeriesExist(TSRecord record) throws WriteProcessException {
-    IChunkGroupWriter groupWriter;
-    if (!groupWriters.containsKey(record.deviceId)) {
-      groupWriter = new ChunkGroupWriterImpl(record.deviceId);
-      groupWriters.put(record.deviceId, groupWriter);
+  public void registerTimeseries(Path devicePath, UnaryMeasurementSchema measurementSchema)
+      throws WriteProcessException {
+    MeasurementGroup measurementGroup;
+    if (schema.containsTimeseries(devicePath)) {
+      measurementGroup = schema.getSeriesSchema(devicePath);
+      if (measurementGroup.isAligned()) {
+        throw new WriteProcessException(
+            "given device " + devicePath + " has been registered for aligned timeseries.");
+      } else if (measurementGroup
+          .getMeasurementSchemaMap()
+          .containsKey(measurementSchema.getMeasurementId())) {
+        throw new WriteProcessException(
+            "given nonAligned timeseries "
+                + (devicePath + "." + measurementSchema.getMeasurementId())
+                + " has been registered.");
+      }
     } else {
-      groupWriter = groupWriters.get(record.deviceId);
+      measurementGroup = new MeasurementGroup(false);
     }
+    measurementGroup
+        .getMeasurementSchemaMap()
+        .put(measurementSchema.getMeasurementId(), measurementSchema);
+    schema.registerTimeseries(devicePath, measurementGroup);
+  }
 
-    // add all SeriesWriter of measurements in this TSRecord to this ChunkGroupWriter
-    for (DataPoint dp : record.dataPointList) {
-      String measurementId = dp.getMeasurementId();
-      Path path = new Path(record.deviceId, measurementId);
-      if (schema.containsTimeseries(path)) {
-        groupWriter.tryToAddSeriesWriter(schema.getSeriesSchema(path), pageSize);
-      } else if (schema.getSchemaTemplates() != null && schema.getSchemaTemplates().size() == 1) {
-        // use the default template without needing to register device
-        Map<String, IMeasurementSchema> template =
-            schema.getSchemaTemplates().entrySet().iterator().next().getValue();
-        if (template.containsKey(path.getMeasurement())) {
-          groupWriter.tryToAddSeriesWriter(template.get(path.getMeasurement()), pageSize);
-        }
+  public void registerAlignedTimeseries(
+      Path devicePath, List<UnaryMeasurementSchema> measurementSchemas)
+      throws WriteProcessException {
+    if (schema.containsTimeseries(devicePath)) {
+      if (schema.getSeriesSchema(devicePath).isAligned()) {
+        throw new WriteProcessException(
+            "given device "
+                + devicePath
+                + " has been registered for aligned timeseries and should not be expanded.");
       } else {
-        throw new NoMeasurementException("input path is invalid: " + path);
+        throw new WriteProcessException(
+            "given device " + devicePath + " has been registered for nonAligned timeseries.");
       }
     }
+    MeasurementGroup measurementGroup = new MeasurementGroup(true);
+    measurementSchemas.forEach(
+        measurementSchema -> {
+          measurementGroup
+              .getMeasurementSchemaMap()
+              .put(measurementSchema.getMeasurementId(), measurementSchema);
+        });
+    schema.registerTimeseries(devicePath, measurementGroup);

Review comment:
       rename it to `registerMeasurementGroup`

##########
File path: tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java
##########
@@ -180,14 +203,41 @@ public long estimateMaxSeriesMemSize() {
   }
 
   public long getCurrentChunkSize() {
-    if (pageBuffer.size() == 0) {
+    if (pageBuffer.size() == 0 || statistics.getCount() == 0) {

Review comment:
       Even if `statistics.getCount() == 0`, the chunk size is not 0.

##########
File path: tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkGroupWriterImpl.java
##########
@@ -45,150 +41,98 @@
 
   private final String deviceId;
 
-  /** Map(measurementID, ChunkWriterImpl). */
-  private Map<String, IChunkWriter> chunkWriters = new HashMap<>();
+  /** Map(measurementID, ChunkWriterImpl). Aligned measurementId is empty. */
+  private Map<String, ChunkWriterImpl> chunkWriters = new LinkedHashMap<>();
+
+  private Map<String, Long> lastTimeMap = new HashMap<>();
 
   public ChunkGroupWriterImpl(String deviceId) {
     this.deviceId = deviceId;
   }
 
   @Override
-  public void tryToAddSeriesWriter(IMeasurementSchema schema, int pageSizeThreshold) {
+  public void tryToAddSeriesWriter(IMeasurementSchema schema) {
     if (!chunkWriters.containsKey(schema.getMeasurementId())) {
-      IChunkWriter seriesWriter = null;
-      // initialize depend on schema type
-      if (schema instanceof VectorMeasurementSchema) {
-        seriesWriter = new AlignedChunkWriterImpl(schema);
-      } else if (schema instanceof UnaryMeasurementSchema) {
-        seriesWriter = new ChunkWriterImpl(schema);
-      }
-      this.chunkWriters.put(schema.getMeasurementId(), seriesWriter);
+      this.chunkWriters.put(schema.getMeasurementId(), new ChunkWriterImpl(schema));
     }
   }
 
   @Override
-  public void write(long time, List<DataPoint> data) throws WriteProcessException, IOException {
-    for (DataPoint point : data) {
-      String measurementId = point.getMeasurementId();
-      if (!chunkWriters.containsKey(measurementId)) {
-        throw new NoMeasurementException(
-            "time " + time + ", measurement id " + measurementId + " not found!");
+  public void tryToAddSeriesWriter(List<IMeasurementSchema> schemas) {
+    for (IMeasurementSchema schema : schemas) {
+      if (!chunkWriters.containsKey(schema.getMeasurementId())) {
+        this.chunkWriters.put(schema.getMeasurementId(), new ChunkWriterImpl(schema));
       }
-      point.writeTo(time, (ChunkWriterImpl) chunkWriters.get(measurementId));
     }
   }
 
   @Override
-  public void write(Tablet tablet) throws WriteProcessException {
-    List<IMeasurementSchema> timeseries = tablet.getSchemas();
-    for (int i = 0; i < timeseries.size(); i++) {
-      String measurementId = timeseries.get(i).getMeasurementId();
-      TSDataType dataType = timeseries.get(i).getType();
-      if (!chunkWriters.containsKey(measurementId)) {
-        throw new NoMeasurementException("measurement id" + measurementId + " not found!");
+  public int write(long time, List<DataPoint> data) throws IOException {
+    int pointCount = 0;
+    for (DataPoint point : data) {
+      if (checkIsHistoryData(point.getMeasurementId(), time)) {
+        continue;
       }
-      if (dataType.equals(TSDataType.VECTOR)) {
-        writeVectorDataType(tablet, measurementId, i);
-      } else {
-        writeByDataType(tablet, measurementId, dataType, i);
+      if (pointCount == 0) {
+        pointCount++;
       }
+      point.writeTo(
+          time, chunkWriters.get(point.getMeasurementId())); // write time and value to page
+      lastTimeMap.put(point.getMeasurementId(), time);
     }
+    return pointCount;
   }
 
-  /**
-   * write if data type is VECTOR this method write next n column values (belong to one vector), and
-   * return n to increase index
-   *
-   * @param tablet table
-   * @param measurement vector measurement
-   * @param index measurement start index
-   */
-  private void writeVectorDataType(Tablet tablet, String measurement, int index) {
-    // reference: MemTableFlushTask.java
-    int batchSize = tablet.rowSize;
-    VectorMeasurementSchema vectorMeasurementSchema =
-        (VectorMeasurementSchema) tablet.getSchemas().get(index);
-    List<TSDataType> valueDataTypes = vectorMeasurementSchema.getSubMeasurementsTSDataTypeList();
-    AlignedChunkWriterImpl vectorChunkWriter =
-        (AlignedChunkWriterImpl) chunkWriters.get(measurement);
-    for (int row = 0; row < batchSize; row++) {
+  @Override
+  public int write(Tablet tablet) {
+    int pointCount = 0;
+    List<IMeasurementSchema> timeseries = tablet.getSchemas();
+    for (int row = 0; row < tablet.rowSize; row++) {
       long time = tablet.timestamps[row];
-      for (int columnIndex = 0; columnIndex < valueDataTypes.size(); columnIndex++) {
+      boolean hasOneColumnWritten = false;
+      for (int column = 0; column < timeseries.size(); column++) {
+        String measurementId = timeseries.get(column).getMeasurementId();
+        if (checkIsHistoryData(measurementId, time)) {
+          continue;
+        }
+        hasOneColumnWritten = true;
         boolean isNull = false;
         // check isNull by bitMap in tablet
         if (tablet.bitMaps != null
-            && tablet.bitMaps[columnIndex] != null
-            && tablet.bitMaps[columnIndex].isMarked(row)) {
+            && tablet.bitMaps[column] != null
+            && tablet.bitMaps[column].isMarked(row)) {
           isNull = true;
         }

Review comment:
       `isNUll` is useless

##########
File path: tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
##########
@@ -457,4 +457,8 @@ public boolean isMerging() {
   public void setLastPoint(boolean isLastPoint) {
     this.isLastPoint = isLastPoint;
   }
+
+  public void setValueIndex(int valueIndex) {
+    throw new IllegalStateException("write time method is not implemented in common chunk writer");
+  }

Review comment:
       This method is useless any more.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] qiaojialin commented on a change in pull request #4331: [IOTDB-1626]TsFile API supports write and register on aligned timeseries

Posted by GitBox <gi...@apache.org>.
qiaojialin commented on a change in pull request #4331:
URL: https://github.com/apache/iotdb/pull/4331#discussion_r744278030



##########
File path: tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/Schema.java
##########
@@ -33,77 +33,84 @@
 public class Schema implements Serializable {
 
   /**
-   * Path (device + measurement) -> measurementSchema By default, use the LinkedHashMap to store the
-   * order of insertion
+   * Path (devicePath + DeviceInfo) -> measurementSchema By default, use the LinkedHashMap to store

Review comment:
       what is DeviceInfo?

##########
File path: example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileWriteAlignedWithTablet.java
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile;
+
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.TsFileWriter;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TsFileWriteAlignedWithTablet {
+  private static final Logger logger = LoggerFactory.getLogger(TsFileWriteAlignedWithTablet.class);
+  private static String deviceId = "root.sg.d1";
+
+  public static void main(String[] args) throws IOException {
+    File f = FSFactoryProducer.getFSFactory().getFile("alignedTablet.tsfile");
+    if (f.exists() && !f.delete()) {
+      throw new RuntimeException("can not delete " + f.getAbsolutePath());
+    }
+    try (TsFileWriter tsFileWriter = new TsFileWriter(f)) {
+      List<UnaryMeasurementSchema> measurementSchemas = new ArrayList<>();
+      measurementSchemas.add(new UnaryMeasurementSchema("s1", TSDataType.TEXT, TSEncoding.PLAIN));
+      measurementSchemas.add(new UnaryMeasurementSchema("s2", TSDataType.TEXT, TSEncoding.PLAIN));
+      measurementSchemas.add(new UnaryMeasurementSchema("s3", TSDataType.TEXT, TSEncoding.PLAIN));
+      measurementSchemas.add(new UnaryMeasurementSchema("s4", TSDataType.INT64, TSEncoding.RLE));
+
+      // register align timeseries
+      tsFileWriter.registerAlignedTimeseries(new Path(deviceId), measurementSchemas);
+
+      List<IMeasurementSchema> writeMeasurementScheams = new ArrayList<>();
+      // example 1
+      writeMeasurementScheams.add(measurementSchemas.get(0));
+      writeAlignedWithTablet(tsFileWriter, deviceId, writeMeasurementScheams, 10, 0, 0);
+
+      // example 2
+      writeMeasurementScheams.clear();
+      writeMeasurementScheams.add(measurementSchemas.get(0));
+      writeMeasurementScheams.add(measurementSchemas.get(1));
+      writeAlignedWithTablet(tsFileWriter, deviceId, writeMeasurementScheams, 200000, 10, 0);
+
+      // example 3
+      writeMeasurementScheams.clear();
+      writeMeasurementScheams.add(measurementSchemas.get(2));
+      writeAlignedWithTablet(tsFileWriter, deviceId, writeMeasurementScheams, 10, 0, 0);
+
+      writeWithTablet(tsFileWriter); // write nonAligned timeseries
+    } catch (WriteProcessException e) {
+      e.printStackTrace();
+    }
+  }
+
+  private static void writeAlignedWithTablet(
+      TsFileWriter tsFileWriter,
+      String deviceId,
+      List<IMeasurementSchema> schemas,
+      long rowNum,
+      long startTime,
+      long startValue)
+      throws IOException, WriteProcessException {
+    Tablet tablet = new Tablet(deviceId, schemas);
+    long[] timestamps = tablet.timestamps;
+    Object[] values = tablet.values;
+    long sensorNum = schemas.size();
+
+    for (long r = 0; r < rowNum; r++, startValue++) {
+      int row = tablet.rowSize++;
+      timestamps[row] = startTime++;
+      for (int i = 0; i < sensorNum; i++) {
+        Binary[] textSensor = (Binary[]) values[i];
+        textSensor[row] = new Binary("testString.........");
+      }
+      // write
+      if (tablet.rowSize == tablet.getMaxRowNumber()) {
+        tsFileWriter.writeAligned(tablet);
+        tablet.reset();
+      }
+    }
+    // write
+    if (tablet.rowSize != 0) {
+      tsFileWriter.writeAligned(tablet);
+      tablet.reset();
+    }
+  }
+
+  private static void writeWithTablet(TsFileWriter tsFileWriter)
+      throws WriteProcessException, IOException {
+    // register nonAlign timeseries
+    tsFileWriter.registerTimeseries(
+        new Path("root.sg.d2"), new UnaryMeasurementSchema("s1", TSDataType.INT64, TSEncoding.RLE));
+    tsFileWriter.registerTimeseries(
+        new Path("root.sg.d2"), new UnaryMeasurementSchema("s2", TSDataType.INT64, TSEncoding.RLE));
+    // construct Tablet
+    List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
+    measurementSchemas.add(new UnaryMeasurementSchema("s1", TSDataType.INT64, TSEncoding.RLE));
+    measurementSchemas.add(new UnaryMeasurementSchema("s2", TSDataType.INT64, TSEncoding.RLE));
+    Tablet tablet = new Tablet("root.sg.d2", measurementSchemas);
+    long[] timestamps = tablet.timestamps;
+    Object[] values = tablet.values;
+    int rowNum = 100;
+    int sensorNum = measurementSchemas.size();
+    long timestamp = 1;
+    long value = 1000000L;
+    for (int r = 0; r < rowNum; r++, value++) {
+      int row = tablet.rowSize++;
+      timestamps[row] = timestamp++;
+      for (int i = 0; i < sensorNum; i++) {
+        long[] sensor = (long[]) values[i];
+        sensor[row] = value;
+      }
+      // write
+      if (tablet.rowSize == tablet.getMaxRowNumber()) {
+        tsFileWriter.write(tablet);

Review comment:
       ```suggestion
           tsFileWriter.writeNonAligned(tablet);
   ```
   Make some distinguish

##########
File path: tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/PageHeader.java
##########
@@ -54,6 +54,9 @@ public static int estimateMaxPageHeaderSizeWithoutStatistics() {
   public static PageHeader deserializeFrom(
       InputStream inputStream, TSDataType dataType, boolean hasStatistic) throws IOException {
     int uncompressedSize = ReadWriteForEncodingUtils.readUnsignedVarInt(inputStream);
+    if (uncompressedSize == 0) { // Empty Page

Review comment:
       when we get an empty page?

##########
File path: tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/Tablet.java
##########
@@ -288,7 +287,7 @@ public int getTotalValueOccupation() {
     int columnIndex = 0;
     for (int i = 0; i < schemas.size(); i++) {
       IMeasurementSchema schema = schemas.get(i);
-      if (schema instanceof UnaryMeasurementSchema) {
+      if (schema instanceof IMeasurementSchema) {

Review comment:
       why change this?

##########
File path: example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileWriteAlignedWithTSRecord.java
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile;
+
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.write.TsFileWriter;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
+import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TsFileWriteAlignedWithTSRecord {
+
+  public static void main(String[] args) throws IOException {
+    File f = FSFactoryProducer.getFSFactory().getFile("alignedRecord.tsfile");
+    if (f.exists() && !f.delete()) {
+      throw new RuntimeException("can not delete " + f.getAbsolutePath());
+    }
+    try (TsFileWriter tsFileWriter = new TsFileWriter(f)) {
+      List<UnaryMeasurementSchema> measurementSchemas = new ArrayList<>();
+      measurementSchemas.add(new UnaryMeasurementSchema("s1", TSDataType.INT64, TSEncoding.RLE));
+      measurementSchemas.add(new UnaryMeasurementSchema("s2", TSDataType.INT64, TSEncoding.RLE));
+      measurementSchemas.add(new UnaryMeasurementSchema("s3", TSDataType.INT64, TSEncoding.RLE));

Review comment:
       As User could only see UnaryMeasurementSchema, we could rename this to MeasurementSchema

##########
File path: example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileWriteAlignedWithTablet.java
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile;
+
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.TsFileWriter;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TsFileWriteAlignedWithTablet {
+  private static final Logger logger = LoggerFactory.getLogger(TsFileWriteAlignedWithTablet.class);
+  private static String deviceId = "root.sg.d1";
+
+  public static void main(String[] args) throws IOException {
+    File f = FSFactoryProducer.getFSFactory().getFile("alignedTablet.tsfile");
+    if (f.exists() && !f.delete()) {
+      throw new RuntimeException("can not delete " + f.getAbsolutePath());
+    }
+    try (TsFileWriter tsFileWriter = new TsFileWriter(f)) {
+      List<UnaryMeasurementSchema> measurementSchemas = new ArrayList<>();
+      measurementSchemas.add(new UnaryMeasurementSchema("s1", TSDataType.TEXT, TSEncoding.PLAIN));
+      measurementSchemas.add(new UnaryMeasurementSchema("s2", TSDataType.TEXT, TSEncoding.PLAIN));
+      measurementSchemas.add(new UnaryMeasurementSchema("s3", TSDataType.TEXT, TSEncoding.PLAIN));
+      measurementSchemas.add(new UnaryMeasurementSchema("s4", TSDataType.INT64, TSEncoding.RLE));
+
+      // register align timeseries
+      tsFileWriter.registerAlignedTimeseries(new Path(deviceId), measurementSchemas);
+
+      List<IMeasurementSchema> writeMeasurementScheams = new ArrayList<>();

Review comment:
       Registering N and write M<N is a particular case, more cases is M=N.
   So better to give an example that just use the measurementSchemas to write data.

##########
File path: tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkGroupWriterImpl.java
##########
@@ -45,150 +41,98 @@
 

Review comment:
       Rename to NonAlignedChunkGroupWriterImpl

##########
File path: tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/TimePageReader.java
##########
@@ -53,12 +54,15 @@ public TimePageReader(PageHeader pageHeader, ByteBuffer pageData, Decoder timeDe
   }
 
   public long[] nexTimeBatch() throws IOException {
-    long[] timeBatch = new long[(int) pageHeader.getStatistics().getCount()];
-    int index = 0;
+    List<Long> timeList = new ArrayList<>();
+    // long[] timeBatch = new long[(int) pageHeader.getStatistics().getCount()]; //
+    // Todo:bug.当TimePage有多页,statistics就会为null

Review comment:
       need to remove these lines

##########
File path: example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileWriteWithTSRecord.java
##########
@@ -39,43 +44,58 @@
 
   public static void main(String[] args) {
     try {
-      String path = "test.tsfile";
+      String path = "Record.tsfile";
       File f = FSFactoryProducer.getFSFactory().getFile(path);
       if (f.exists()) {
         f.delete();
       }
 
       try (TsFileWriter tsFileWriter = new TsFileWriter(f)) {
-        // add measurements into file schema
-        for (int i = 0; i < 4; i++) {
-          // add measurements into file schema
-          tsFileWriter.registerTimeseries(
-              new Path(Constant.DEVICE_PREFIX + i, Constant.SENSOR_1),
-              new UnaryMeasurementSchema(Constant.SENSOR_1, TSDataType.INT64, TSEncoding.RLE));
-          tsFileWriter.registerTimeseries(
-              new Path(Constant.DEVICE_PREFIX + i, Constant.SENSOR_2),
-              new UnaryMeasurementSchema(Constant.SENSOR_2, TSDataType.INT64, TSEncoding.RLE));
-          tsFileWriter.registerTimeseries(
-              new Path(Constant.DEVICE_PREFIX + i, Constant.SENSOR_3),
-              new UnaryMeasurementSchema(Constant.SENSOR_3, TSDataType.INT64, TSEncoding.RLE));
-        }
+        List<UnaryMeasurementSchema> schemas = new ArrayList<>();
+        schemas.add(new UnaryMeasurementSchema("s1", TSDataType.INT64, TSEncoding.RLE));
+        schemas.add(new UnaryMeasurementSchema("s2", TSDataType.INT64, TSEncoding.RLE));
+        schemas.add(new UnaryMeasurementSchema("s3", TSDataType.INT64, TSEncoding.RLE));
 
-        // construct TSRecord
-        for (int i = 0; i < 100; i++) {
-          TSRecord tsRecord = new TSRecord(i, Constant.DEVICE_PREFIX + (i % 4));
-          DataPoint dPoint1 = new LongDataPoint(Constant.SENSOR_1, i);
-          DataPoint dPoint2 = new LongDataPoint(Constant.SENSOR_2, i);
-          DataPoint dPoint3 = new LongDataPoint(Constant.SENSOR_3, i);
-          tsRecord.addTuple(dPoint1);
-          tsRecord.addTuple(dPoint2);
-          tsRecord.addTuple(dPoint3);
-          // write TSRecord
-          tsFileWriter.write(tsRecord);
-        }
+        // register timeseries
+        tsFileWriter.registerTimeseries(new Path("root.sg.d1"), schemas.get(0));
+        tsFileWriter.registerTimeseries(new Path("root.sg.d1"), schemas.get(1));
+        tsFileWriter.registerTimeseries(new Path("root.sg.d1"), schemas.get(2));
+
+        List<IMeasurementSchema> writeMeasurementScheams = new ArrayList<>();

Review comment:
       the same, give a most popular example.

##########
File path: example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileWriteAlignedWithTablet.java
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile;
+
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.TsFileWriter;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TsFileWriteAlignedWithTablet {
+  private static final Logger logger = LoggerFactory.getLogger(TsFileWriteAlignedWithTablet.class);
+  private static String deviceId = "root.sg.d1";
+
+  public static void main(String[] args) throws IOException {
+    File f = FSFactoryProducer.getFSFactory().getFile("alignedTablet.tsfile");
+    if (f.exists() && !f.delete()) {
+      throw new RuntimeException("can not delete " + f.getAbsolutePath());
+    }
+    try (TsFileWriter tsFileWriter = new TsFileWriter(f)) {
+      List<UnaryMeasurementSchema> measurementSchemas = new ArrayList<>();
+      measurementSchemas.add(new UnaryMeasurementSchema("s1", TSDataType.TEXT, TSEncoding.PLAIN));
+      measurementSchemas.add(new UnaryMeasurementSchema("s2", TSDataType.TEXT, TSEncoding.PLAIN));
+      measurementSchemas.add(new UnaryMeasurementSchema("s3", TSDataType.TEXT, TSEncoding.PLAIN));
+      measurementSchemas.add(new UnaryMeasurementSchema("s4", TSDataType.INT64, TSEncoding.RLE));
+
+      // register align timeseries
+      tsFileWriter.registerAlignedTimeseries(new Path(deviceId), measurementSchemas);
+
+      List<IMeasurementSchema> writeMeasurementScheams = new ArrayList<>();
+      // example 1
+      writeMeasurementScheams.add(measurementSchemas.get(0));
+      writeAlignedWithTablet(tsFileWriter, deviceId, writeMeasurementScheams, 10, 0, 0);
+
+      // example 2
+      writeMeasurementScheams.clear();
+      writeMeasurementScheams.add(measurementSchemas.get(0));
+      writeMeasurementScheams.add(measurementSchemas.get(1));
+      writeAlignedWithTablet(tsFileWriter, deviceId, writeMeasurementScheams, 200000, 10, 0);
+
+      // example 3
+      writeMeasurementScheams.clear();
+      writeMeasurementScheams.add(measurementSchemas.get(2));
+      writeAlignedWithTablet(tsFileWriter, deviceId, writeMeasurementScheams, 10, 0, 0);
+
+      writeWithTablet(tsFileWriter); // write nonAligned timeseries
+    } catch (WriteProcessException e) {
+      e.printStackTrace();
+    }
+  }
+
+  private static void writeAlignedWithTablet(
+      TsFileWriter tsFileWriter,
+      String deviceId,
+      List<IMeasurementSchema> schemas,
+      long rowNum,
+      long startTime,
+      long startValue)
+      throws IOException, WriteProcessException {
+    Tablet tablet = new Tablet(deviceId, schemas);
+    long[] timestamps = tablet.timestamps;
+    Object[] values = tablet.values;
+    long sensorNum = schemas.size();
+
+    for (long r = 0; r < rowNum; r++, startValue++) {
+      int row = tablet.rowSize++;
+      timestamps[row] = startTime++;
+      for (int i = 0; i < sensorNum; i++) {
+        Binary[] textSensor = (Binary[]) values[i];
+        textSensor[row] = new Binary("testString.........");
+      }
+      // write
+      if (tablet.rowSize == tablet.getMaxRowNumber()) {
+        tsFileWriter.writeAligned(tablet);
+        tablet.reset();
+      }
+    }
+    // write
+    if (tablet.rowSize != 0) {
+      tsFileWriter.writeAligned(tablet);
+      tablet.reset();
+    }
+  }
+
+  private static void writeWithTablet(TsFileWriter tsFileWriter)

Review comment:
       ```suggestion
     private static void writeNonalignedWithTablet(TsFileWriter tsFileWriter)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] JackieTien97 merged pull request #4331: [IOTDB-1626]TsFile API supports write and register on aligned timeseries

Posted by GitBox <gi...@apache.org>.
JackieTien97 merged pull request #4331:
URL: https://github.com/apache/iotdb/pull/4331


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] wangchao316 commented on a change in pull request #4331: [IOTDB-1626]TsFile API supports write and register on aligned timeseries

Posted by GitBox <gi...@apache.org>.
wangchao316 commented on a change in pull request #4331:
URL: https://github.com/apache/iotdb/pull/4331#discussion_r744480415



##########
File path: tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
##########
@@ -153,6 +153,8 @@ public static ChunkHeader deserializeFrom(InputStream inputStream, byte chunkTyp
       throws IOException {
     // read measurementID
     String measurementID = ReadWriteIOUtils.readVarIntString(inputStream);
+    measurementID =
+        measurementID == null ? "" : measurementID; // measurementID in TimeChunk header is null

Review comment:
       1. Comments should be placed on the top line of code
   2. Why is this parameter set to "" when measurementId is empty? 

##########
File path: tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
##########
@@ -151,92 +189,205 @@ protected TsFileWriter(TsFileIOWriter fileWriter, Schema schema, TSFileConfig co
   }
 
   public void registerSchemaTemplate(
-      String templateName, Map<String, IMeasurementSchema> template) {
-    schema.registerSchemaTemplate(templateName, template);
+      String templateName, Map<String, UnaryMeasurementSchema> template, boolean isAligned) {
+    schema.registerSchemaTemplate(templateName, new MeasurementGroup(isAligned, template));
   }
 
-  public void registerDevice(String deviceId, String templateName) {
+  public void registerDevice(String deviceId, String templateName) throws WriteProcessException {
+    if (!schema.getSchemaTemplates().containsKey(templateName)) {
+      throw new WriteProcessException("given template is not existed! " + templateName);
+    }
+    if (schema.getRegisteredTimeseriesMap().containsKey(new Path(deviceId))) {
+      throw new WriteProcessException(
+          "this device "
+              + deviceId
+              + " has been registered, you can only use registerDevice method to register empty device.");
+    }
     schema.registerDevice(deviceId, templateName);
   }
 
-  public void registerTimeseries(Path path, IMeasurementSchema measurementSchema)
-      throws WriteProcessException {
-    if (schema.containsTimeseries(path)) {
-      throw new WriteProcessException("given timeseries has exists! " + path);
+  public void registerTimeseries(Path devicePath, List<UnaryMeasurementSchema> measurementSchemas) {
+    for (UnaryMeasurementSchema schema : measurementSchemas) {
+      try {
+        registerTimeseries(devicePath, schema);
+      } catch (WriteProcessException e) {
+        LOG.error(e.getMessage());

Review comment:
       if exception does not deal,  please print warn log.

##########
File path: tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/TimePageReader.java
##########
@@ -53,12 +54,11 @@ public TimePageReader(PageHeader pageHeader, ByteBuffer pageData, Decoder timeDe
   }
 
   public long[] nexTimeBatch() throws IOException {
-    long[] timeBatch = new long[(int) pageHeader.getStatistics().getCount()];
-    int index = 0;
+    List<Long> timeList = new ArrayList<>();
     while (timeDecoder.hasNext(timeBuffer)) {
-      timeBatch[index++] = timeDecoder.readLong(timeBuffer);
+      timeList.add(timeDecoder.readLong(timeBuffer));
     }
-    return timeBatch;
+    return timeList.stream().mapToLong(t -> t.longValue()).toArray();

Review comment:
       Why is this modification necessary? This modification will cause performance loss, which is equivalent to two iterations. o(n*n)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org