You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xu...@apache.org on 2021/11/04 03:15:35 UTC

[iotdb] branch separate_insert_plan_xkf created (now b246bb5)

This is an automated email from the ASF dual-hosted git repository.

xuekaifeng pushed a change to branch separate_insert_plan_xkf
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at b246bb5  replace vector measurement schema

This branch includes the following new commits:

     new b246bb5  replace vector measurement schema

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 01/01: replace vector measurement schema

Posted by xu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xuekaifeng pushed a commit to branch separate_insert_plan_xkf
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b246bb50a4fb464a64887b7c867f548d86194971
Author: 151250176 <15...@smail.nju.edu.cn>
AuthorDate: Thu Nov 4 10:18:57 2021 +0800

    replace vector measurement schema
---
 .../iotdb/AlignedTimeseriesSessionExample.java     | 55 +++++++--------------
 .../iotdb/HybridTimeseriesSessionExample.java      |  9 ++--
 .../java/org/apache/iotdb/session/Session.java     | 57 ++++++++++++++--------
 .../org/apache/iotdb/session/pool/SessionPool.java | 30 ++++++++++++
 .../session/IoTDBSessionVectorAggregationIT.java   |  9 ++--
 .../iotdb/session/IoTDBSessionVectorInsertIT.java  |  9 ++--
 .../apache/iotdb/tsfile/write/record/Tablet.java   |  4 ++
 .../write/schema/VectorMeasurementSchema.java      | 23 +++++++++
 8 files changed, 121 insertions(+), 75 deletions(-)

diff --git a/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java b/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
index c7a7fa7..e012edd 100644
--- a/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
@@ -29,7 +29,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.utils.BitMap;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
-import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
 
 import java.security.SecureRandom;
 import java.util.ArrayList;
@@ -266,11 +266,8 @@ public class AlignedTimeseriesSessionExample {
     // The schema of measurements of one device
     // only measurementId and data type in MeasurementSchema take effects in Tablet
     List<IMeasurementSchema> schemaList = new ArrayList<>();
-    schemaList.add(
-        new VectorMeasurementSchema(
-            "vector",
-            new String[] {"s1", "s2"},
-            new TSDataType[] {TSDataType.INT64, TSDataType.INT32}));
+    schemaList.add(new UnaryMeasurementSchema("s1", TSDataType.INT64));
+    schemaList.add(new UnaryMeasurementSchema("s2", TSDataType.INT32));
 
     Tablet tablet = new Tablet(ROOT_SG1_D1_VECTOR1, schemaList);
     tablet.setAligned(true);
@@ -280,13 +277,8 @@ public class AlignedTimeseriesSessionExample {
       int rowIndex = tablet.rowSize++;
       tablet.addTimestamp(rowIndex, timestamp);
       tablet.addValue(
-          schemaList.get(0).getSubMeasurementsList().get(0),
-          rowIndex,
-          new SecureRandom().nextLong());
-      tablet.addValue(
-          schemaList.get(0).getSubMeasurementsList().get(1),
-          rowIndex,
-          new SecureRandom().nextInt());
+          schemaList.get(0).getMeasurementId(), rowIndex, new SecureRandom().nextLong());
+      tablet.addValue(schemaList.get(1).getMeasurementId(), rowIndex, new SecureRandom().nextInt());
 
       if (tablet.rowSize == tablet.getMaxRowNumber()) {
         session.insertTablet(tablet, true);
@@ -309,11 +301,8 @@ public class AlignedTimeseriesSessionExample {
     // The schema of measurements of one device
     // only measurementId and data type in MeasurementSchema take effects in Tablet
     List<IMeasurementSchema> schemaList = new ArrayList<>();
-    schemaList.add(
-        new VectorMeasurementSchema(
-            "vector2",
-            new String[] {"s1", "s2"},
-            new TSDataType[] {TSDataType.INT64, TSDataType.INT32}));
+    schemaList.add(new UnaryMeasurementSchema("s1", TSDataType.INT64));
+    schemaList.add(new UnaryMeasurementSchema("s2", TSDataType.INT32));
 
     Tablet tablet = new Tablet(ROOT_SG1_D1_VECTOR2, schemaList);
     tablet.setAligned(true);
@@ -349,11 +338,8 @@ public class AlignedTimeseriesSessionExample {
     // The schema of measurements of one device
     // only measurementId and data type in MeasurementSchema take effects in Tablet
     List<IMeasurementSchema> schemaList = new ArrayList<>();
-    schemaList.add(
-        new VectorMeasurementSchema(
-            "vector3",
-            new String[] {"s1", "s2"},
-            new TSDataType[] {TSDataType.INT64, TSDataType.INT32}));
+    schemaList.add(new UnaryMeasurementSchema("s1", TSDataType.INT64));
+    schemaList.add(new UnaryMeasurementSchema("s2", TSDataType.INT32));
 
     Tablet tablet = new Tablet(ROOT_SG1_D1_VECTOR3, schemaList);
     tablet.setAligned(true);
@@ -515,23 +501,16 @@ public class AlignedTimeseriesSessionExample {
       throws IoTDBConnectionException, StatementExecutionException {
 
     List<IMeasurementSchema> schemaList1 = new ArrayList<>();
-    schemaList1.add(
-        new VectorMeasurementSchema(
-            "vector6",
-            new String[] {"s1", "s2"},
-            new TSDataType[] {TSDataType.INT64, TSDataType.INT64}));
+    schemaList1.add(new UnaryMeasurementSchema("s1", TSDataType.INT64));
+    schemaList1.add(new UnaryMeasurementSchema("s2", TSDataType.INT64));
+
     List<IMeasurementSchema> schemaList2 = new ArrayList<>();
-    schemaList2.add(
-        new VectorMeasurementSchema(
-            "vector7",
-            new String[] {"s1", "s2"},
-            new TSDataType[] {TSDataType.INT64, TSDataType.INT64}));
+    schemaList1.add(new UnaryMeasurementSchema("s1", TSDataType.INT64));
+    schemaList1.add(new UnaryMeasurementSchema("s2", TSDataType.INT64));
+
     List<IMeasurementSchema> schemaList3 = new ArrayList<>();
-    schemaList3.add(
-        new VectorMeasurementSchema(
-            "vector8",
-            new String[] {"s1", "s2"},
-            new TSDataType[] {TSDataType.INT64, TSDataType.INT64}));
+    schemaList1.add(new UnaryMeasurementSchema("s1", TSDataType.INT64));
+    schemaList1.add(new UnaryMeasurementSchema("s2", TSDataType.INT64));
 
     Tablet tablet1 = new Tablet(ROOT_SG2_D1_VECTOR6, schemaList1, 100);
     Tablet tablet2 = new Tablet(ROOT_SG2_D1_VECTOR7, schemaList2, 100);
diff --git a/example/session/src/main/java/org/apache/iotdb/HybridTimeseriesSessionExample.java b/example/session/src/main/java/org/apache/iotdb/HybridTimeseriesSessionExample.java
index 241e40e..7c13681 100644
--- a/example/session/src/main/java/org/apache/iotdb/HybridTimeseriesSessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/HybridTimeseriesSessionExample.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.session.SessionDataSet;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
-import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -73,11 +73,8 @@ public class HybridTimeseriesSessionExample {
     // The schema of measurements of one device
     // only measurementId and data type in MeasurementSchema take effects in Tablet
     List<IMeasurementSchema> schemaList = new ArrayList<>();
-    schemaList.add(
-        new VectorMeasurementSchema(
-            "vector",
-            new String[] {"s1", "s2"},
-            new TSDataType[] {TSDataType.INT64, TSDataType.INT32}));
+    schemaList.add(new UnaryMeasurementSchema("s1", TSDataType.INT64));
+    schemaList.add(new UnaryMeasurementSchema("s2", TSDataType.INT32));
 
     Tablet tablet = new Tablet(ROOT_SG1_D1_VECTOR1, schemaList);
     tablet.setAligned(true);
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 5ea17d2..d73be83 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -18,25 +18,6 @@
  */
 package org.apache.iotdb.session;
 
-import java.nio.ByteBuffer;
-import java.time.ZoneId;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
 import org.apache.iotdb.rpc.BatchExecutionException;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.RedirectException;
@@ -70,9 +51,31 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.nio.ByteBuffer;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
 @SuppressWarnings({"java:S107", "java:S1135"}) // need enough parameters, ignore todos
 public class Session {
 
@@ -1435,6 +1438,9 @@ public class Session {
    */
   public void insertAlignedTablet(Tablet tablet)
       throws StatementExecutionException, IoTDBConnectionException {
+    tablet.setSchemas(
+        Collections.singletonList(
+            VectorMeasurementSchema.buildFromSchemas(tablet.prefixPath, tablet.getSchemas())));
     insertTablet(tablet);
   }
 
@@ -1446,6 +1452,9 @@ public class Session {
    */
   public void insertAlignedTablet(Tablet tablet, boolean sorted)
       throws IoTDBConnectionException, StatementExecutionException {
+    tablet.setSchemas(
+        Collections.singletonList(
+            VectorMeasurementSchema.buildFromSchemas(tablet.prefixPath, tablet.getSchemas())));
     insertTablet(tablet, sorted);
   }
 
@@ -1530,6 +1539,11 @@ public class Session {
    */
   public void insertAlignedTablets(Map<String, Tablet> tablets)
       throws IoTDBConnectionException, StatementExecutionException {
+    for (Tablet tablet : tablets.values()) {
+      tablet.setSchemas(
+          Collections.singletonList(
+              VectorMeasurementSchema.buildFromSchemas(tablet.prefixPath, tablet.getSchemas())));
+    }
     insertTablets(tablets, false);
   }
 
@@ -1542,6 +1556,11 @@ public class Session {
    */
   public void insertAlignedTablets(Map<String, Tablet> tablets, boolean sorted)
       throws IoTDBConnectionException, StatementExecutionException {
+    for (Tablet tablet : tablets.values()) {
+      tablet.setSchemas(
+          Collections.singletonList(
+              VectorMeasurementSchema.buildFromSchemas(tablet.prefixPath, tablet.getSchemas())));
+    }
     insertTablets(tablets, sorted);
   }
 
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 9c075e5..ea193e3 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -27,11 +27,13 @@ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.time.ZoneId;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -462,10 +464,28 @@ public class SessionPool {
    * <p>Users need to control the count of Tablet and write a batch when it reaches the maxBatchSize
    *
    * @param tablet a tablet data of one device
+   */
+  public void insertAlignedTablet(Tablet tablet)
+      throws IoTDBConnectionException, StatementExecutionException {
+    tablet.setSchemas(
+        Collections.singletonList(
+            VectorMeasurementSchema.buildFromSchemas(tablet.prefixPath, tablet.getSchemas())));
+    insertTablet(tablet, false);
+  }
+
+  /**
+   * insert the data of a device. For each timestamp, the number of measurements is the same.
+   *
+   * <p>Users need to control the count of Tablet and write a batch when it reaches the maxBatchSize
+   *
+   * @param tablet a tablet data of one device
    * @param sorted whether times in Tablet are in ascending order
    */
   public void insertAlignedTablet(Tablet tablet, boolean sorted)
       throws IoTDBConnectionException, StatementExecutionException {
+    tablet.setSchemas(
+        Collections.singletonList(
+            VectorMeasurementSchema.buildFromSchemas(tablet.prefixPath, tablet.getSchemas())));
     insertTablet(tablet, sorted);
   }
 
@@ -486,6 +506,11 @@ public class SessionPool {
    */
   public void insertAlignedTablets(Map<String, Tablet> tablets)
       throws IoTDBConnectionException, StatementExecutionException {
+    for (Tablet tablet : tablets.values()) {
+      tablet.setSchemas(
+          Collections.singletonList(
+              VectorMeasurementSchema.buildFromSchemas(tablet.prefixPath, tablet.getSchemas())));
+    }
     insertTablets(tablets, false);
   }
 
@@ -520,6 +545,11 @@ public class SessionPool {
    */
   public void insertAlignedTablets(Map<String, Tablet> tablets, boolean sorted)
       throws IoTDBConnectionException, StatementExecutionException {
+    for (Tablet tablet : tablets.values()) {
+      tablet.setSchemas(
+          Collections.singletonList(
+              VectorMeasurementSchema.buildFromSchemas(tablet.prefixPath, tablet.getSchemas())));
+    }
     insertTablets(tablets, sorted);
   }
 
diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorAggregationIT.java b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorAggregationIT.java
index 8c3624d..c2e62f4 100644
--- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorAggregationIT.java
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorAggregationIT.java
@@ -28,7 +28,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
-import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -217,11 +217,8 @@ public class IoTDBSessionVectorAggregationIT {
     // The schema of measurements of one device
     // only measurementId and data type in MeasurementSchema take effects in Tablet
     List<IMeasurementSchema> schemaList = new ArrayList<>();
-    schemaList.add(
-        new VectorMeasurementSchema(
-            "vector1",
-            new String[] {"s1", "s2"},
-            new TSDataType[] {TSDataType.INT64, TSDataType.INT32}));
+    schemaList.add(new UnaryMeasurementSchema("s1", TSDataType.INT64));
+    schemaList.add(new UnaryMeasurementSchema("s2", TSDataType.INT32));
 
     Tablet tablet = new Tablet(ROOT_SG1_D1_VECTOR1, schemaList);
     tablet.setAligned(true);
diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorInsertIT.java b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorInsertIT.java
index de35457..b4d1f46 100644
--- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorInsertIT.java
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorInsertIT.java
@@ -28,7 +28,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
-import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
 
 import org.junit.After;
 import org.junit.Before;
@@ -265,11 +265,8 @@ public class IoTDBSessionVectorInsertIT {
     // The schema of measurements of one device
     // only measurementId and data type in MeasurementSchema take effects in Tablet
     List<IMeasurementSchema> schemaList = new ArrayList<>();
-    schemaList.add(
-        new VectorMeasurementSchema(
-            "vector",
-            new String[] {"s1", "s2"},
-            new TSDataType[] {TSDataType.INT64, TSDataType.INT32}));
+    schemaList.add(new UnaryMeasurementSchema("s1", TSDataType.INT64));
+    schemaList.add(new UnaryMeasurementSchema("s2", TSDataType.INT32));
 
     Tablet tablet = new Tablet(ROOT_SG1_D1_VECTOR1, schemaList);
     tablet.setAligned(true);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/Tablet.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/Tablet.java
index 0442101..0a621ee 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/Tablet.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/Tablet.java
@@ -116,6 +116,10 @@ public class Tablet {
     this.prefixPath = prefixPath;
   }
 
+  public void setSchemas(List<IMeasurementSchema> schemas) {
+    this.schemas = schemas;
+  }
+
   public void addTimestamp(int rowIndex, long timestamp) {
     timestamps[rowIndex] = timestamp;
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/VectorMeasurementSchema.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/VectorMeasurementSchema.java
index d2e97cd..ecf7b89 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/VectorMeasurementSchema.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/VectorMeasurementSchema.java
@@ -117,6 +117,29 @@ public class VectorMeasurementSchema
         TSFileDescriptor.getInstance().getConfig().getCompressor());
   }
 
+  /**
+   * build a vector measurement schema from list of measurement schema
+   *
+   * @param deviceId device id
+   * @param schemas list of measurement schema
+   * @return vector measurement schema
+   */
+  public static VectorMeasurementSchema buildFromSchemas(
+      String deviceId, List<IMeasurementSchema> schemas) {
+    String[] subMeasurements = new String[schemas.size()];
+    TSDataType[] types = new TSDataType[schemas.size()];
+    TSEncoding[] encodings = new TSEncoding[schemas.size()];
+
+    for (int i = 0; i < schemas.size(); i++) {
+      IMeasurementSchema schema = schemas.get(i);
+      subMeasurements[i] = schema.getMeasurementId();
+      types[i] = schema.getType();
+      encodings[i] = schema.getEncodingType();
+    }
+
+    return new VectorMeasurementSchema(deviceId, subMeasurements, types, encodings);
+  }
+
   @Override
   public String getMeasurementId() {
     return vectorMeasurementId;