You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2021/06/05 17:12:26 UTC
[iotdb] 01/01: [IOTDB-1422] Support partial insert for new vector
interfaces
This is an automated email from the ASF dual-hosted git repository.
sunzesong pushed a commit to branch vector_partial
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 489c7de6f6046fe8f00b3d4c222b6046e5279652
Author: samperson1997 <sz...@mails.tsinghua.edu.cn>
AuthorDate: Sun Jun 6 01:11:39 2021 +0800
[IOTDB-1422] Support partial insert for new vector interfaces
---
.../iotdb/AlignedTimeseriesSessionExample.java | 53 +++++++++++-----------
.../iotdb/db/engine/memtable/AbstractMemTable.java | 34 +++++++-------
.../org/apache/iotdb/db/metadata/MManager.java | 47 ++++++++++---------
.../iotdb/db/qp/physical/crud/InsertPlan.java | 18 ++++++++
.../iotdb/db/metadata/MManagerBasicTest.java | 4 +-
.../java/org/apache/iotdb/session/Session.java | 3 +-
6 files changed, 92 insertions(+), 67 deletions(-)
diff --git a/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java b/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
index 37ea190..795ed18 100644
--- a/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java
@@ -39,7 +39,9 @@ import java.util.List;
public class AlignedTimeseriesSessionExample {
private static Session session;
- private static final String ROOT_SG1_D1_VECTOR = "root.sg_1.d1.vector";
+ private static final String ROOT_SG1_D1 = "root.sg_1.d1";
+ private static final String ROOT_SG1_D1_VECTOR1 = "root.sg_1.d1.vector";
+ private static final String ROOT_SG1_D1_VECTOR2 = "root.sg_1.d1.vector2";
public static void main(String[] args)
throws IoTDBConnectionException, StatementExecutionException {
@@ -55,30 +57,29 @@ public class AlignedTimeseriesSessionExample {
insertTabletWithAlignedTimeseriesMethod1();
insertTabletWithAlignedTimeseriesMethod2();
-
insertNullableTabletWithAlignedTimeseries();
+
selectTest();
selectWithValueFilterTest();
-
selectWithGroupByTest();
selectWithLastTest();
selectWithAggregationTest();
-
- selectWithAlignByDeviceTest();
+ // FIXME @Silver Narcissus
+ // selectWithAlignByDeviceTest();
session.close();
}
private static void selectTest() throws StatementExecutionException, IoTDBConnectionException {
- SessionDataSet dataSet = session.executeQueryStatement("select s1 from root.sg_1.d1");
+ SessionDataSet dataSet = session.executeQueryStatement("select s1 from root.sg_1.d1.vector");
System.out.println(dataSet.getColumnNames());
while (dataSet.hasNext()) {
System.out.println(dataSet.next());
}
dataSet.closeOperationHandle();
- dataSet = session.executeQueryStatement("select * from root.sg_1.d1");
+ dataSet = session.executeQueryStatement("select * from root.sg_1.d1.vector");
System.out.println(dataSet.getColumnNames());
while (dataSet.hasNext()) {
System.out.println(dataSet.next());
@@ -102,8 +103,7 @@ public class AlignedTimeseriesSessionExample {
private static void selectWithValueFilterTest()
throws StatementExecutionException, IoTDBConnectionException {
SessionDataSet dataSet =
- session.executeQueryStatement("select s1 from root.sg_1.d1 where s1 > 0");
- System.out.println(dataSet.getColumnNames());
+ session.executeQueryStatement("select s1 from root.sg_1.d1.vector where s1 > 0");
while (dataSet.hasNext()) {
System.out.println(dataSet.next());
}
@@ -111,7 +111,7 @@ public class AlignedTimeseriesSessionExample {
dataSet.closeOperationHandle();
dataSet =
session.executeQueryStatement(
- "select * from root.sg_1.d1 where time > 50 and s1 > 0 and s2 > 10000");
+ "select * from root.sg_1.d1.vector where time > 50 and s1 > 0 and s2 > 10000");
System.out.println(dataSet.getColumnNames());
while (dataSet.hasNext()) {
System.out.println(dataSet.next());
@@ -122,7 +122,8 @@ public class AlignedTimeseriesSessionExample {
private static void selectWithAggregationTest()
throws StatementExecutionException, IoTDBConnectionException {
- SessionDataSet dataSet = session.executeQueryStatement("select count(s1) from root.sg_1.d1");
+ SessionDataSet dataSet =
+ session.executeQueryStatement("select count(s1) from root.sg_1.d1.vector");
System.out.println(dataSet.getColumnNames());
while (dataSet.hasNext()) {
System.out.println(dataSet.next());
@@ -131,7 +132,7 @@ public class AlignedTimeseriesSessionExample {
dataSet.closeOperationHandle();
dataSet =
session.executeQueryStatement(
- "select sum(*) from root.sg_1.d1 where time > 50 and s1 > 0 and s2 > 10000");
+ "select sum(*) from root.sg_1.d1.vector where time > 50 and s1 > 0 and s2 > 10000");
System.out.println(dataSet.getColumnNames());
while (dataSet.hasNext()) {
System.out.println(dataSet.next());
@@ -144,7 +145,7 @@ public class AlignedTimeseriesSessionExample {
throws StatementExecutionException, IoTDBConnectionException {
SessionDataSet dataSet =
session.executeQueryStatement(
- "select count(s1) from root.sg_1.d1 GROUP BY ([1, 100), 20ms)");
+ "select count(s1) from root.sg_1.d1.vector GROUP BY ([1, 100), 20ms)");
System.out.println(dataSet.getColumnNames());
while (dataSet.hasNext()) {
System.out.println(dataSet.next());
@@ -153,7 +154,7 @@ public class AlignedTimeseriesSessionExample {
dataSet.closeOperationHandle();
dataSet =
session.executeQueryStatement(
- "select count(*) from root.sg_1.d1 where time > 50 and s1 > 0 and s2 > 10000"
+ "select count(*) from root.sg_1.d1.vector where time > 50 and s1 > 0 and s2 > 10000"
+ " GROUP BY ([50, 100), 10ms)");
System.out.println(dataSet.getColumnNames());
while (dataSet.hasNext()) {
@@ -165,14 +166,15 @@ public class AlignedTimeseriesSessionExample {
private static void selectWithLastTest()
throws StatementExecutionException, IoTDBConnectionException {
- SessionDataSet dataSet = session.executeQueryStatement("select last s1 from root.sg_1.d1");
+ SessionDataSet dataSet =
+ session.executeQueryStatement("select last s1 from root.sg_1.d1.vector");
System.out.println(dataSet.getColumnNames());
while (dataSet.hasNext()) {
System.out.println(dataSet.next());
}
dataSet.closeOperationHandle();
- dataSet = session.executeQueryStatement("select last * from root.sg_1.d1");
+ dataSet = session.executeQueryStatement("select last * from root.sg_1.d1.vector");
System.out.println(dataSet.getColumnNames());
while (dataSet.hasNext()) {
System.out.println(dataSet.next());
@@ -195,7 +197,7 @@ public class AlignedTimeseriesSessionExample {
encodings.add(TSEncoding.RLE);
}
session.createAlignedTimeseries(
- ROOT_SG1_D1_VECTOR, measurements, dataTypes, encodings, CompressionType.SNAPPY, null);
+ ROOT_SG1_D1_VECTOR1, measurements, dataTypes, encodings, CompressionType.SNAPPY, null);
}
// be sure template is coordinate with tablet
@@ -244,7 +246,7 @@ public class AlignedTimeseriesSessionExample {
new String[] {"s1", "s2"},
new TSDataType[] {TSDataType.INT64, TSDataType.INT32}));
- Tablet tablet = new Tablet(ROOT_SG1_D1_VECTOR, schemaList);
+ Tablet tablet = new Tablet(ROOT_SG1_D1, schemaList);
tablet.setAligned(true);
long timestamp = System.currentTimeMillis();
@@ -283,11 +285,11 @@ public class AlignedTimeseriesSessionExample {
List<IMeasurementSchema> schemaList = new ArrayList<>();
schemaList.add(
new VectorMeasurementSchema(
- "vector",
+ "vector2",
new String[] {"s1", "s2"},
new TSDataType[] {TSDataType.INT64, TSDataType.INT32}));
- Tablet tablet = new Tablet(ROOT_SG1_D1_VECTOR, schemaList);
+ Tablet tablet = new Tablet(ROOT_SG1_D1, schemaList);
tablet.setAligned(true);
long[] timestamps = tablet.timestamps;
Object[] values = tablet.values;
@@ -323,11 +325,11 @@ public class AlignedTimeseriesSessionExample {
List<IMeasurementSchema> schemaList = new ArrayList<>();
schemaList.add(
new VectorMeasurementSchema(
- "vector",
+ "vector3",
new String[] {"s1", "s2"},
new TSDataType[] {TSDataType.INT64, TSDataType.INT32}));
- Tablet tablet = new Tablet(ROOT_SG1_D1_VECTOR, schemaList);
+ Tablet tablet = new Tablet(ROOT_SG1_D1, schemaList);
tablet.setAligned(true);
long[] timestamps = tablet.timestamps;
@@ -373,17 +375,14 @@ public class AlignedTimeseriesSessionExample {
List<TSDataType> types = new ArrayList<>();
measurements.add("s1");
measurements.add("s2");
- measurements.add("s3");
types.add(TSDataType.INT64);
types.add(TSDataType.INT32);
- types.add(TSDataType.INT64);
- for (long time = 0; time < 100; time++) {
+ for (long time = 0; time < 1; time++) {
List<Object> values = new ArrayList<>();
values.add(1L);
values.add(2);
- values.add(3L);
- session.insertRecord(ROOT_SG1_D1_VECTOR, time, measurements, types, values, true);
+ session.insertRecord(ROOT_SG1_D1_VECTOR2, time, measurements, types, values, true);
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index d00c2e8..d62f8fa 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -119,23 +119,25 @@ public abstract class AbstractMemTable implements IMemTable {
int columnIndex = 0;
if (insertRowPlan.isAligned()) {
MeasurementMNode measurementMNode = measurementMNodes[0];
- // write vector
- Object[] vectorValue =
- new Object[measurementMNode.getSchema().getValueTSDataTypeList().size()];
- for (int j = 0; j < vectorValue.length; j++) {
- vectorValue[j] = values[columnIndex];
- columnIndex++;
+ if (measurementMNode != null) {
+ // write vector
+ Object[] vectorValue =
+ new Object[measurementMNode.getSchema().getValueTSDataTypeList().size()];
+ for (int j = 0; j < vectorValue.length; j++) {
+ vectorValue[j] = values[columnIndex];
+ columnIndex++;
+ }
+ memSize +=
+ MemUtils.getVectorRecordSize(
+ measurementMNode.getSchema().getValueTSDataTypeList(),
+ vectorValue,
+ disableMemControl);
+ write(
+ insertRowPlan.getPrefixPath().getFullPath(),
+ measurementMNode.getSchema(),
+ insertRowPlan.getTime(),
+ vectorValue);
}
- memSize +=
- MemUtils.getVectorRecordSize(
- measurementMNode.getSchema().getValueTSDataTypeList(),
- vectorValue,
- disableMemControl);
- write(
- insertRowPlan.getPrefixPath().getFullPath(),
- measurementMNode.getSchema(),
- insertRowPlan.getTime(),
- vectorValue);
} else {
for (MeasurementMNode measurementMNode : measurementMNodes) {
if (values[columnIndex] == null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 3e9af1b..3a94e9e 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -2201,9 +2201,7 @@ public class MManager {
}
// check type is match
- boolean mismatch = false;
TSDataType insertDataType;
- DataTypeMismatchException mismatchException = null;
if (plan instanceof InsertRowPlan || plan instanceof InsertTabletPlan) {
if (plan.isAligned()) {
TSDataType dataTypeInNode =
@@ -2213,16 +2211,27 @@ public class MManager {
insertDataType = dataTypeInNode;
}
if (dataTypeInNode != insertDataType) {
- mismatch = true;
logger.warn(
"DataType mismatch, Insert measurement {} in {} type {}, metadata tree type {}",
measurementMNode.getSchema().getValueMeasurementIdList().get(i),
measurementList[i],
insertDataType,
dataTypeInNode);
- mismatchException =
+ DataTypeMismatchException mismatchException =
new DataTypeMismatchException(measurementList[i], insertDataType, dataTypeInNode);
+ if (!config.isEnablePartialInsert()) {
+ throw mismatchException;
+ } else {
+ // mark failed measurement
+ plan.markFailedMeasurementAlignedInsertion(mismatchException);
+ for (int j = 0; j < i; j++) {
+ // all the measurementMNodes should be null
+ measurementMNodes[j] = null;
+ }
+ break;
+ }
}
+ measurementMNodes[i] = measurementMNode;
} else {
if (plan instanceof InsertRowPlan) {
if (!((InsertRowPlan) plan).isNeedInferType()) {
@@ -2234,34 +2243,28 @@ public class MManager {
} else {
insertDataType = getTypeInLoc(plan, i);
}
- mismatch = measurementMNode.getSchema().getType() != insertDataType;
- if (mismatch) {
+ if (measurementMNode.getSchema().getType() != insertDataType) {
logger.warn(
"DataType mismatch, Insert measurement {} type {}, metadata tree type {}",
measurementList[i],
insertDataType,
measurementMNode.getSchema().getType());
- mismatchException =
+ DataTypeMismatchException mismatchException =
new DataTypeMismatchException(
measurementList[i], insertDataType, measurementMNode.getSchema().getType());
+ if (!config.isEnablePartialInsert()) {
+ throw mismatchException;
+ } else {
+ // mark failed measurement
+ plan.markFailedMeasurementInsertion(i, mismatchException);
+ continue;
+ }
}
+ measurementMNodes[i] = measurementMNode;
+ // set measurementName instead of alias
+ measurementList[i] = measurementMNode.getName();
}
}
-
- if (mismatch) {
- if (!config.isEnablePartialInsert()) {
- throw mismatchException;
- } else {
- // mark failed measurement
- plan.markFailedMeasurementInsertion(i, mismatchException);
- continue;
- }
- }
-
- measurementMNodes[i] = measurementMNode;
-
- // set measurementName instead of alias
- measurementList[i] = measurementMNode.getName();
} catch (MetadataException e) {
logger.warn(
"meet error when check {}.{}, message: {}",
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
index c05d19a..8d5ad71 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
@@ -120,6 +120,24 @@ public abstract class InsertPlan extends PhysicalPlan {
measurements[index] = null;
}
+ public void markFailedMeasurementAlignedInsertion(Exception e) {
+ if (failedMeasurements == null) {
+ failedMeasurements = new ArrayList<>();
+ failedExceptions = new ArrayList<>();
+ failedIndices = new ArrayList<>();
+ }
+
+ for (int i = 0; i < measurements.length; i++) {
+ if (measurements[i] == null) {
+ continue;
+ }
+ failedMeasurements.add(measurements[i]);
+ failedExceptions.add(e);
+ failedIndices.add(i);
+ measurements[i] = null;
+ }
+ }
+
/**
* Reconstruct this plan with the failed measurements.
*
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java
index 85422f4..5580b99 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java
@@ -1370,8 +1370,10 @@ public class MManagerBasicTest {
// call getSeriesSchemasAndReadLockDevice
MNode mNode = manager.getSeriesSchemasAndReadLockDevice(insertRowPlan);
assertEquals(4, mNode.getMeasurementMNodeCount());
+ assertNull(insertRowPlan.getMeasurementMNodes()[0]);
assertNull(insertRowPlan.getMeasurementMNodes()[1]);
- assertEquals(1, insertRowPlan.getFailedMeasurementNumber());
+ assertNull(insertRowPlan.getMeasurementMNodes()[2]);
+ assertEquals(3, insertRowPlan.getFailedMeasurementNumber());
} catch (Exception e) {
e.printStackTrace();
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index dbdabc3..179b32e 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -1192,10 +1192,11 @@ public class Session {
TSInsertTabletReq request = new TSInsertTabletReq();
- if (request.isAligned) {
+ if (tablet.isAligned()) {
if (tablet.getSchemas().size() > 1) {
throw new BatchExecutionException("One tablet should only contain one aligned timeseries!");
}
+ request.setIsAligned(true);
IMeasurementSchema measurementSchema = tablet.getSchemas().get(0);
request.setPrefixPath(
tablet.deviceId + TsFileConstant.PATH_SEPARATOR + measurementSchema.getMeasurementId());