You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xu...@apache.org on 2021/11/04 03:15:36 UTC
[iotdb] 01/01: replace vector measurement schema
This is an automated email from the ASF dual-hosted git repository.
xuekaifeng pushed a commit to branch separate_insert_plan_xkf
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b246bb50a4fb464a64887b7c867f548d86194971
Author: 151250176 <15...@smail.nju.edu.cn>
AuthorDate: Thu Nov 4 10:18:57 2021 +0800
replace vector measurement schema
---
.../iotdb/AlignedTimeseriesSessionExample.java | 55 +++++++--------------
.../iotdb/HybridTimeseriesSessionExample.java | 9 ++--
.../java/org/apache/iotdb/session/Session.java | 57 ++++++++++++++--------
.../org/apache/iotdb/session/pool/SessionPool.java | 30 ++++++++++++
.../session/IoTDBSessionVectorAggregationIT.java | 9 ++--
.../iotdb/session/IoTDBSessionVectorInsertIT.java | 9 ++--
.../apache/iotdb/tsfile/write/record/Tablet.java | 4 ++
.../write/schema/VectorMeasurementSchema.java | 23 +++++++++
8 files changed, 121 insertions(+), 75 deletions(-)
diff --git a/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java b/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
index c7a7fa7..e012edd 100644
--- a/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
@@ -29,7 +29,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.utils.BitMap;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
-import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
import java.security.SecureRandom;
import java.util.ArrayList;
@@ -266,11 +266,8 @@ public class AlignedTimeseriesSessionExample {
// The schema of measurements of one device
// only measurementId and data type in MeasurementSchema take effects in Tablet
List<IMeasurementSchema> schemaList = new ArrayList<>();
- schemaList.add(
- new VectorMeasurementSchema(
- "vector",
- new String[] {"s1", "s2"},
- new TSDataType[] {TSDataType.INT64, TSDataType.INT32}));
+ schemaList.add(new UnaryMeasurementSchema("s1", TSDataType.INT64));
+ schemaList.add(new UnaryMeasurementSchema("s2", TSDataType.INT32));
Tablet tablet = new Tablet(ROOT_SG1_D1_VECTOR1, schemaList);
tablet.setAligned(true);
@@ -280,13 +277,8 @@ public class AlignedTimeseriesSessionExample {
int rowIndex = tablet.rowSize++;
tablet.addTimestamp(rowIndex, timestamp);
tablet.addValue(
- schemaList.get(0).getSubMeasurementsList().get(0),
- rowIndex,
- new SecureRandom().nextLong());
- tablet.addValue(
- schemaList.get(0).getSubMeasurementsList().get(1),
- rowIndex,
- new SecureRandom().nextInt());
+ schemaList.get(0).getMeasurementId(), rowIndex, new SecureRandom().nextLong());
+ tablet.addValue(schemaList.get(1).getMeasurementId(), rowIndex, new SecureRandom().nextInt());
if (tablet.rowSize == tablet.getMaxRowNumber()) {
session.insertTablet(tablet, true);
@@ -309,11 +301,8 @@ public class AlignedTimeseriesSessionExample {
// The schema of measurements of one device
// only measurementId and data type in MeasurementSchema take effects in Tablet
List<IMeasurementSchema> schemaList = new ArrayList<>();
- schemaList.add(
- new VectorMeasurementSchema(
- "vector2",
- new String[] {"s1", "s2"},
- new TSDataType[] {TSDataType.INT64, TSDataType.INT32}));
+ schemaList.add(new UnaryMeasurementSchema("s1", TSDataType.INT64));
+ schemaList.add(new UnaryMeasurementSchema("s2", TSDataType.INT32));
Tablet tablet = new Tablet(ROOT_SG1_D1_VECTOR2, schemaList);
tablet.setAligned(true);
@@ -349,11 +338,8 @@ public class AlignedTimeseriesSessionExample {
// The schema of measurements of one device
// only measurementId and data type in MeasurementSchema take effects in Tablet
List<IMeasurementSchema> schemaList = new ArrayList<>();
- schemaList.add(
- new VectorMeasurementSchema(
- "vector3",
- new String[] {"s1", "s2"},
- new TSDataType[] {TSDataType.INT64, TSDataType.INT32}));
+ schemaList.add(new UnaryMeasurementSchema("s1", TSDataType.INT64));
+ schemaList.add(new UnaryMeasurementSchema("s2", TSDataType.INT32));
Tablet tablet = new Tablet(ROOT_SG1_D1_VECTOR3, schemaList);
tablet.setAligned(true);
@@ -515,23 +501,16 @@ public class AlignedTimeseriesSessionExample {
throws IoTDBConnectionException, StatementExecutionException {
List<IMeasurementSchema> schemaList1 = new ArrayList<>();
- schemaList1.add(
- new VectorMeasurementSchema(
- "vector6",
- new String[] {"s1", "s2"},
- new TSDataType[] {TSDataType.INT64, TSDataType.INT64}));
+ schemaList1.add(new UnaryMeasurementSchema("s1", TSDataType.INT64));
+ schemaList1.add(new UnaryMeasurementSchema("s2", TSDataType.INT64));
+
List<IMeasurementSchema> schemaList2 = new ArrayList<>();
- schemaList2.add(
- new VectorMeasurementSchema(
- "vector7",
- new String[] {"s1", "s2"},
- new TSDataType[] {TSDataType.INT64, TSDataType.INT64}));
+ schemaList1.add(new UnaryMeasurementSchema("s1", TSDataType.INT64));
+ schemaList1.add(new UnaryMeasurementSchema("s2", TSDataType.INT64));
+
List<IMeasurementSchema> schemaList3 = new ArrayList<>();
- schemaList3.add(
- new VectorMeasurementSchema(
- "vector8",
- new String[] {"s1", "s2"},
- new TSDataType[] {TSDataType.INT64, TSDataType.INT64}));
+ schemaList1.add(new UnaryMeasurementSchema("s1", TSDataType.INT64));
+ schemaList1.add(new UnaryMeasurementSchema("s2", TSDataType.INT64));
Tablet tablet1 = new Tablet(ROOT_SG2_D1_VECTOR6, schemaList1, 100);
Tablet tablet2 = new Tablet(ROOT_SG2_D1_VECTOR7, schemaList2, 100);
diff --git a/example/session/src/main/java/org/apache/iotdb/HybridTimeseriesSessionExample.java b/example/session/src/main/java/org/apache/iotdb/HybridTimeseriesSessionExample.java
index 241e40e..7c13681 100644
--- a/example/session/src/main/java/org/apache/iotdb/HybridTimeseriesSessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/HybridTimeseriesSessionExample.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.session.SessionDataSet;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
-import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
import java.util.ArrayList;
import java.util.List;
@@ -73,11 +73,8 @@ public class HybridTimeseriesSessionExample {
// The schema of measurements of one device
// only measurementId and data type in MeasurementSchema take effects in Tablet
List<IMeasurementSchema> schemaList = new ArrayList<>();
- schemaList.add(
- new VectorMeasurementSchema(
- "vector",
- new String[] {"s1", "s2"},
- new TSDataType[] {TSDataType.INT64, TSDataType.INT32}));
+ schemaList.add(new UnaryMeasurementSchema("s1", TSDataType.INT64));
+ schemaList.add(new UnaryMeasurementSchema("s2", TSDataType.INT32));
Tablet tablet = new Tablet(ROOT_SG1_D1_VECTOR1, schemaList);
tablet.setAligned(true);
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 5ea17d2..d73be83 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -18,25 +18,6 @@
*/
package org.apache.iotdb.session;
-import java.nio.ByteBuffer;
-import java.time.ZoneId;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
import org.apache.iotdb.rpc.BatchExecutionException;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.RedirectException;
@@ -70,9 +51,31 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.nio.ByteBuffer;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
@SuppressWarnings({"java:S107", "java:S1135"}) // need enough parameters, ignore todos
public class Session {
@@ -1435,6 +1438,9 @@ public class Session {
*/
public void insertAlignedTablet(Tablet tablet)
throws StatementExecutionException, IoTDBConnectionException {
+ tablet.setSchemas(
+ Collections.singletonList(
+ VectorMeasurementSchema.buildFromSchemas(tablet.prefixPath, tablet.getSchemas())));
insertTablet(tablet);
}
@@ -1446,6 +1452,9 @@ public class Session {
*/
public void insertAlignedTablet(Tablet tablet, boolean sorted)
throws IoTDBConnectionException, StatementExecutionException {
+ tablet.setSchemas(
+ Collections.singletonList(
+ VectorMeasurementSchema.buildFromSchemas(tablet.prefixPath, tablet.getSchemas())));
insertTablet(tablet, sorted);
}
@@ -1530,6 +1539,11 @@ public class Session {
*/
public void insertAlignedTablets(Map<String, Tablet> tablets)
throws IoTDBConnectionException, StatementExecutionException {
+ for (Tablet tablet : tablets.values()) {
+ tablet.setSchemas(
+ Collections.singletonList(
+ VectorMeasurementSchema.buildFromSchemas(tablet.prefixPath, tablet.getSchemas())));
+ }
insertTablets(tablets, false);
}
@@ -1542,6 +1556,11 @@ public class Session {
*/
public void insertAlignedTablets(Map<String, Tablet> tablets, boolean sorted)
throws IoTDBConnectionException, StatementExecutionException {
+ for (Tablet tablet : tablets.values()) {
+ tablet.setSchemas(
+ Collections.singletonList(
+ VectorMeasurementSchema.buildFromSchemas(tablet.prefixPath, tablet.getSchemas())));
+ }
insertTablets(tablets, sorted);
}
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 9c075e5..ea193e3 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -27,11 +27,13 @@ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.ZoneId;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -462,10 +464,28 @@ public class SessionPool {
* <p>Users need to control the count of Tablet and write a batch when it reaches the maxBatchSize
*
* @param tablet a tablet data of one device
+ */
+ public void insertAlignedTablet(Tablet tablet)
+ throws IoTDBConnectionException, StatementExecutionException {
+ tablet.setSchemas(
+ Collections.singletonList(
+ VectorMeasurementSchema.buildFromSchemas(tablet.prefixPath, tablet.getSchemas())));
+ insertTablet(tablet, false);
+ }
+
+ /**
+ * insert the data of a device. For each timestamp, the number of measurements is the same.
+ *
+ * <p>Users need to control the count of Tablet and write a batch when it reaches the maxBatchSize
+ *
+ * @param tablet a tablet data of one device
* @param sorted whether times in Tablet are in ascending order
*/
public void insertAlignedTablet(Tablet tablet, boolean sorted)
throws IoTDBConnectionException, StatementExecutionException {
+ tablet.setSchemas(
+ Collections.singletonList(
+ VectorMeasurementSchema.buildFromSchemas(tablet.prefixPath, tablet.getSchemas())));
insertTablet(tablet, sorted);
}
@@ -486,6 +506,11 @@ public class SessionPool {
*/
public void insertAlignedTablets(Map<String, Tablet> tablets)
throws IoTDBConnectionException, StatementExecutionException {
+ for (Tablet tablet : tablets.values()) {
+ tablet.setSchemas(
+ Collections.singletonList(
+ VectorMeasurementSchema.buildFromSchemas(tablet.prefixPath, tablet.getSchemas())));
+ }
insertTablets(tablets, false);
}
@@ -520,6 +545,11 @@ public class SessionPool {
*/
public void insertAlignedTablets(Map<String, Tablet> tablets, boolean sorted)
throws IoTDBConnectionException, StatementExecutionException {
+ for (Tablet tablet : tablets.values()) {
+ tablet.setSchemas(
+ Collections.singletonList(
+ VectorMeasurementSchema.buildFromSchemas(tablet.prefixPath, tablet.getSchemas())));
+ }
insertTablets(tablets, sorted);
}
diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorAggregationIT.java b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorAggregationIT.java
index 8c3624d..c2e62f4 100644
--- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorAggregationIT.java
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorAggregationIT.java
@@ -28,7 +28,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
-import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -217,11 +217,8 @@ public class IoTDBSessionVectorAggregationIT {
// The schema of measurements of one device
// only measurementId and data type in MeasurementSchema take effects in Tablet
List<IMeasurementSchema> schemaList = new ArrayList<>();
- schemaList.add(
- new VectorMeasurementSchema(
- "vector1",
- new String[] {"s1", "s2"},
- new TSDataType[] {TSDataType.INT64, TSDataType.INT32}));
+ schemaList.add(new UnaryMeasurementSchema("s1", TSDataType.INT64));
+ schemaList.add(new UnaryMeasurementSchema("s2", TSDataType.INT32));
Tablet tablet = new Tablet(ROOT_SG1_D1_VECTOR1, schemaList);
tablet.setAligned(true);
diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorInsertIT.java b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorInsertIT.java
index de35457..b4d1f46 100644
--- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorInsertIT.java
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorInsertIT.java
@@ -28,7 +28,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
-import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
import org.junit.After;
import org.junit.Before;
@@ -265,11 +265,8 @@ public class IoTDBSessionVectorInsertIT {
// The schema of measurements of one device
// only measurementId and data type in MeasurementSchema take effects in Tablet
List<IMeasurementSchema> schemaList = new ArrayList<>();
- schemaList.add(
- new VectorMeasurementSchema(
- "vector",
- new String[] {"s1", "s2"},
- new TSDataType[] {TSDataType.INT64, TSDataType.INT32}));
+ schemaList.add(new UnaryMeasurementSchema("s1", TSDataType.INT64));
+ schemaList.add(new UnaryMeasurementSchema("s2", TSDataType.INT32));
Tablet tablet = new Tablet(ROOT_SG1_D1_VECTOR1, schemaList);
tablet.setAligned(true);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/Tablet.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/Tablet.java
index 0442101..0a621ee 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/Tablet.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/Tablet.java
@@ -116,6 +116,10 @@ public class Tablet {
this.prefixPath = prefixPath;
}
+ public void setSchemas(List<IMeasurementSchema> schemas) {
+ this.schemas = schemas;
+ }
+
public void addTimestamp(int rowIndex, long timestamp) {
timestamps[rowIndex] = timestamp;
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/VectorMeasurementSchema.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/VectorMeasurementSchema.java
index d2e97cd..ecf7b89 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/VectorMeasurementSchema.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/VectorMeasurementSchema.java
@@ -117,6 +117,29 @@ public class VectorMeasurementSchema
TSFileDescriptor.getInstance().getConfig().getCompressor());
}
+ /**
+ * build a vector measurement schema from list of measurement schema
+ *
+ * @param deviceId device id
+ * @param schemas list of measurement schema
+ * @return vector measurement schema
+ */
+ public static VectorMeasurementSchema buildFromSchemas(
+ String deviceId, List<IMeasurementSchema> schemas) {
+ String[] subMeasurements = new String[schemas.size()];
+ TSDataType[] types = new TSDataType[schemas.size()];
+ TSEncoding[] encodings = new TSEncoding[schemas.size()];
+
+ for (int i = 0; i < schemas.size(); i++) {
+ IMeasurementSchema schema = schemas.get(i);
+ subMeasurements[i] = schema.getMeasurementId();
+ types[i] = schema.getType();
+ encodings[i] = schema.getEncodingType();
+ }
+
+ return new VectorMeasurementSchema(deviceId, subMeasurements, types, encodings);
+ }
+
@Override
public String getMeasurementId() {
return vectorMeasurementId;