You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xu...@apache.org on 2021/11/04 03:15:36 UTC

[iotdb] 01/01: replace vector measurement schema

This is an automated email from the ASF dual-hosted git repository.

xuekaifeng pushed a commit to branch separate_insert_plan_xkf
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b246bb50a4fb464a64887b7c867f548d86194971
Author: 151250176 <15...@smail.nju.edu.cn>
AuthorDate: Thu Nov 4 10:18:57 2021 +0800

    replace vector measurement schema
---
 .../iotdb/AlignedTimeseriesSessionExample.java     | 55 +++++++--------------
 .../iotdb/HybridTimeseriesSessionExample.java      |  9 ++--
 .../java/org/apache/iotdb/session/Session.java     | 57 ++++++++++++++--------
 .../org/apache/iotdb/session/pool/SessionPool.java | 30 ++++++++++++
 .../session/IoTDBSessionVectorAggregationIT.java   |  9 ++--
 .../iotdb/session/IoTDBSessionVectorInsertIT.java  |  9 ++--
 .../apache/iotdb/tsfile/write/record/Tablet.java   |  4 ++
 .../write/schema/VectorMeasurementSchema.java      | 23 +++++++++
 8 files changed, 121 insertions(+), 75 deletions(-)

diff --git a/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java b/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
index c7a7fa7..e012edd 100644
--- a/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
@@ -29,7 +29,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.utils.BitMap;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
-import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
 
 import java.security.SecureRandom;
 import java.util.ArrayList;
@@ -266,11 +266,8 @@ public class AlignedTimeseriesSessionExample {
     // The schema of measurements of one device
     // only measurementId and data type in MeasurementSchema take effects in Tablet
     List<IMeasurementSchema> schemaList = new ArrayList<>();
-    schemaList.add(
-        new VectorMeasurementSchema(
-            "vector",
-            new String[] {"s1", "s2"},
-            new TSDataType[] {TSDataType.INT64, TSDataType.INT32}));
+    schemaList.add(new UnaryMeasurementSchema("s1", TSDataType.INT64));
+    schemaList.add(new UnaryMeasurementSchema("s2", TSDataType.INT32));
 
     Tablet tablet = new Tablet(ROOT_SG1_D1_VECTOR1, schemaList);
     tablet.setAligned(true);
@@ -280,13 +277,8 @@ public class AlignedTimeseriesSessionExample {
       int rowIndex = tablet.rowSize++;
       tablet.addTimestamp(rowIndex, timestamp);
       tablet.addValue(
-          schemaList.get(0).getSubMeasurementsList().get(0),
-          rowIndex,
-          new SecureRandom().nextLong());
-      tablet.addValue(
-          schemaList.get(0).getSubMeasurementsList().get(1),
-          rowIndex,
-          new SecureRandom().nextInt());
+          schemaList.get(0).getMeasurementId(), rowIndex, new SecureRandom().nextLong());
+      tablet.addValue(schemaList.get(1).getMeasurementId(), rowIndex, new SecureRandom().nextInt());
 
       if (tablet.rowSize == tablet.getMaxRowNumber()) {
         session.insertTablet(tablet, true);
@@ -309,11 +301,8 @@ public class AlignedTimeseriesSessionExample {
     // The schema of measurements of one device
     // only measurementId and data type in MeasurementSchema take effects in Tablet
     List<IMeasurementSchema> schemaList = new ArrayList<>();
-    schemaList.add(
-        new VectorMeasurementSchema(
-            "vector2",
-            new String[] {"s1", "s2"},
-            new TSDataType[] {TSDataType.INT64, TSDataType.INT32}));
+    schemaList.add(new UnaryMeasurementSchema("s1", TSDataType.INT64));
+    schemaList.add(new UnaryMeasurementSchema("s2", TSDataType.INT32));
 
     Tablet tablet = new Tablet(ROOT_SG1_D1_VECTOR2, schemaList);
     tablet.setAligned(true);
@@ -349,11 +338,8 @@ public class AlignedTimeseriesSessionExample {
     // The schema of measurements of one device
     // only measurementId and data type in MeasurementSchema take effects in Tablet
     List<IMeasurementSchema> schemaList = new ArrayList<>();
-    schemaList.add(
-        new VectorMeasurementSchema(
-            "vector3",
-            new String[] {"s1", "s2"},
-            new TSDataType[] {TSDataType.INT64, TSDataType.INT32}));
+    schemaList.add(new UnaryMeasurementSchema("s1", TSDataType.INT64));
+    schemaList.add(new UnaryMeasurementSchema("s2", TSDataType.INT32));
 
     Tablet tablet = new Tablet(ROOT_SG1_D1_VECTOR3, schemaList);
     tablet.setAligned(true);
@@ -515,23 +501,16 @@ public class AlignedTimeseriesSessionExample {
       throws IoTDBConnectionException, StatementExecutionException {
 
     List<IMeasurementSchema> schemaList1 = new ArrayList<>();
-    schemaList1.add(
-        new VectorMeasurementSchema(
-            "vector6",
-            new String[] {"s1", "s2"},
-            new TSDataType[] {TSDataType.INT64, TSDataType.INT64}));
+    schemaList1.add(new UnaryMeasurementSchema("s1", TSDataType.INT64));
+    schemaList1.add(new UnaryMeasurementSchema("s2", TSDataType.INT64));
+
     List<IMeasurementSchema> schemaList2 = new ArrayList<>();
-    schemaList2.add(
-        new VectorMeasurementSchema(
-            "vector7",
-            new String[] {"s1", "s2"},
-            new TSDataType[] {TSDataType.INT64, TSDataType.INT64}));
+    schemaList1.add(new UnaryMeasurementSchema("s1", TSDataType.INT64));
+    schemaList1.add(new UnaryMeasurementSchema("s2", TSDataType.INT64));
+
     List<IMeasurementSchema> schemaList3 = new ArrayList<>();
-    schemaList3.add(
-        new VectorMeasurementSchema(
-            "vector8",
-            new String[] {"s1", "s2"},
-            new TSDataType[] {TSDataType.INT64, TSDataType.INT64}));
+    schemaList1.add(new UnaryMeasurementSchema("s1", TSDataType.INT64));
+    schemaList1.add(new UnaryMeasurementSchema("s2", TSDataType.INT64));
 
     Tablet tablet1 = new Tablet(ROOT_SG2_D1_VECTOR6, schemaList1, 100);
     Tablet tablet2 = new Tablet(ROOT_SG2_D1_VECTOR7, schemaList2, 100);
diff --git a/example/session/src/main/java/org/apache/iotdb/HybridTimeseriesSessionExample.java b/example/session/src/main/java/org/apache/iotdb/HybridTimeseriesSessionExample.java
index 241e40e..7c13681 100644
--- a/example/session/src/main/java/org/apache/iotdb/HybridTimeseriesSessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/HybridTimeseriesSessionExample.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.session.SessionDataSet;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
-import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -73,11 +73,8 @@ public class HybridTimeseriesSessionExample {
     // The schema of measurements of one device
     // only measurementId and data type in MeasurementSchema take effects in Tablet
     List<IMeasurementSchema> schemaList = new ArrayList<>();
-    schemaList.add(
-        new VectorMeasurementSchema(
-            "vector",
-            new String[] {"s1", "s2"},
-            new TSDataType[] {TSDataType.INT64, TSDataType.INT32}));
+    schemaList.add(new UnaryMeasurementSchema("s1", TSDataType.INT64));
+    schemaList.add(new UnaryMeasurementSchema("s2", TSDataType.INT32));
 
     Tablet tablet = new Tablet(ROOT_SG1_D1_VECTOR1, schemaList);
     tablet.setAligned(true);
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 5ea17d2..d73be83 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -18,25 +18,6 @@
  */
 package org.apache.iotdb.session;
 
-import java.nio.ByteBuffer;
-import java.time.ZoneId;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
 import org.apache.iotdb.rpc.BatchExecutionException;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.RedirectException;
@@ -70,9 +51,31 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.nio.ByteBuffer;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
 @SuppressWarnings({"java:S107", "java:S1135"}) // need enough parameters, ignore todos
 public class Session {
 
@@ -1435,6 +1438,9 @@ public class Session {
    */
   public void insertAlignedTablet(Tablet tablet)
       throws StatementExecutionException, IoTDBConnectionException {
+    tablet.setSchemas(
+        Collections.singletonList(
+            VectorMeasurementSchema.buildFromSchemas(tablet.prefixPath, tablet.getSchemas())));
     insertTablet(tablet);
   }
 
@@ -1446,6 +1452,9 @@ public class Session {
    */
   public void insertAlignedTablet(Tablet tablet, boolean sorted)
       throws IoTDBConnectionException, StatementExecutionException {
+    tablet.setSchemas(
+        Collections.singletonList(
+            VectorMeasurementSchema.buildFromSchemas(tablet.prefixPath, tablet.getSchemas())));
     insertTablet(tablet, sorted);
   }
 
@@ -1530,6 +1539,11 @@ public class Session {
    */
   public void insertAlignedTablets(Map<String, Tablet> tablets)
       throws IoTDBConnectionException, StatementExecutionException {
+    for (Tablet tablet : tablets.values()) {
+      tablet.setSchemas(
+          Collections.singletonList(
+              VectorMeasurementSchema.buildFromSchemas(tablet.prefixPath, tablet.getSchemas())));
+    }
     insertTablets(tablets, false);
   }
 
@@ -1542,6 +1556,11 @@ public class Session {
    */
   public void insertAlignedTablets(Map<String, Tablet> tablets, boolean sorted)
       throws IoTDBConnectionException, StatementExecutionException {
+    for (Tablet tablet : tablets.values()) {
+      tablet.setSchemas(
+          Collections.singletonList(
+              VectorMeasurementSchema.buildFromSchemas(tablet.prefixPath, tablet.getSchemas())));
+    }
     insertTablets(tablets, sorted);
   }
 
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 9c075e5..ea193e3 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -27,11 +27,13 @@ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.time.ZoneId;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -462,10 +464,28 @@ public class SessionPool {
    * <p>Users need to control the count of Tablet and write a batch when it reaches the maxBatchSize
    *
    * @param tablet a tablet data of one device
+   */
+  public void insertAlignedTablet(Tablet tablet)
+      throws IoTDBConnectionException, StatementExecutionException {
+    tablet.setSchemas(
+        Collections.singletonList(
+            VectorMeasurementSchema.buildFromSchemas(tablet.prefixPath, tablet.getSchemas())));
+    insertTablet(tablet, false);
+  }
+
+  /**
+   * insert the data of a device. For each timestamp, the number of measurements is the same.
+   *
+   * <p>Users need to control the count of Tablet and write a batch when it reaches the maxBatchSize
+   *
+   * @param tablet a tablet data of one device
    * @param sorted whether times in Tablet are in ascending order
    */
   public void insertAlignedTablet(Tablet tablet, boolean sorted)
       throws IoTDBConnectionException, StatementExecutionException {
+    tablet.setSchemas(
+        Collections.singletonList(
+            VectorMeasurementSchema.buildFromSchemas(tablet.prefixPath, tablet.getSchemas())));
     insertTablet(tablet, sorted);
   }
 
@@ -486,6 +506,11 @@ public class SessionPool {
    */
   public void insertAlignedTablets(Map<String, Tablet> tablets)
       throws IoTDBConnectionException, StatementExecutionException {
+    for (Tablet tablet : tablets.values()) {
+      tablet.setSchemas(
+          Collections.singletonList(
+              VectorMeasurementSchema.buildFromSchemas(tablet.prefixPath, tablet.getSchemas())));
+    }
     insertTablets(tablets, false);
   }
 
@@ -520,6 +545,11 @@ public class SessionPool {
    */
   public void insertAlignedTablets(Map<String, Tablet> tablets, boolean sorted)
       throws IoTDBConnectionException, StatementExecutionException {
+    for (Tablet tablet : tablets.values()) {
+      tablet.setSchemas(
+          Collections.singletonList(
+              VectorMeasurementSchema.buildFromSchemas(tablet.prefixPath, tablet.getSchemas())));
+    }
     insertTablets(tablets, sorted);
   }
 
diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorAggregationIT.java b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorAggregationIT.java
index 8c3624d..c2e62f4 100644
--- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorAggregationIT.java
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorAggregationIT.java
@@ -28,7 +28,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
-import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -217,11 +217,8 @@ public class IoTDBSessionVectorAggregationIT {
     // The schema of measurements of one device
     // only measurementId and data type in MeasurementSchema take effects in Tablet
     List<IMeasurementSchema> schemaList = new ArrayList<>();
-    schemaList.add(
-        new VectorMeasurementSchema(
-            "vector1",
-            new String[] {"s1", "s2"},
-            new TSDataType[] {TSDataType.INT64, TSDataType.INT32}));
+    schemaList.add(new UnaryMeasurementSchema("s1", TSDataType.INT64));
+    schemaList.add(new UnaryMeasurementSchema("s2", TSDataType.INT32));
 
     Tablet tablet = new Tablet(ROOT_SG1_D1_VECTOR1, schemaList);
     tablet.setAligned(true);
diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorInsertIT.java b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorInsertIT.java
index de35457..b4d1f46 100644
--- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorInsertIT.java
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorInsertIT.java
@@ -28,7 +28,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
-import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
 
 import org.junit.After;
 import org.junit.Before;
@@ -265,11 +265,8 @@ public class IoTDBSessionVectorInsertIT {
     // The schema of measurements of one device
     // only measurementId and data type in MeasurementSchema take effects in Tablet
     List<IMeasurementSchema> schemaList = new ArrayList<>();
-    schemaList.add(
-        new VectorMeasurementSchema(
-            "vector",
-            new String[] {"s1", "s2"},
-            new TSDataType[] {TSDataType.INT64, TSDataType.INT32}));
+    schemaList.add(new UnaryMeasurementSchema("s1", TSDataType.INT64));
+    schemaList.add(new UnaryMeasurementSchema("s2", TSDataType.INT32));
 
     Tablet tablet = new Tablet(ROOT_SG1_D1_VECTOR1, schemaList);
     tablet.setAligned(true);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/Tablet.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/Tablet.java
index 0442101..0a621ee 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/Tablet.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/Tablet.java
@@ -116,6 +116,10 @@ public class Tablet {
     this.prefixPath = prefixPath;
   }
 
+  public void setSchemas(List<IMeasurementSchema> schemas) {
+    this.schemas = schemas;
+  }
+
   public void addTimestamp(int rowIndex, long timestamp) {
     timestamps[rowIndex] = timestamp;
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/VectorMeasurementSchema.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/VectorMeasurementSchema.java
index d2e97cd..ecf7b89 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/VectorMeasurementSchema.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/VectorMeasurementSchema.java
@@ -117,6 +117,29 @@ public class VectorMeasurementSchema
         TSFileDescriptor.getInstance().getConfig().getCompressor());
   }
 
+  /**
+   * build a vector measurement schema from list of measurement schema
+   *
+   * @param deviceId device id
+   * @param schemas list of measurement schema
+   * @return vector measurement schema
+   */
+  public static VectorMeasurementSchema buildFromSchemas(
+      String deviceId, List<IMeasurementSchema> schemas) {
+    String[] subMeasurements = new String[schemas.size()];
+    TSDataType[] types = new TSDataType[schemas.size()];
+    TSEncoding[] encodings = new TSEncoding[schemas.size()];
+
+    for (int i = 0; i < schemas.size(); i++) {
+      IMeasurementSchema schema = schemas.get(i);
+      subMeasurements[i] = schema.getMeasurementId();
+      types[i] = schema.getType();
+      encodings[i] = schema.getEncodingType();
+    }
+
+    return new VectorMeasurementSchema(deviceId, subMeasurements, types, encodings);
+  }
+
   @Override
   public String getMeasurementId() {
     return vectorMeasurementId;