You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/10/05 00:59:04 UTC
[iotdb] branch master updated: [IOTDB-4547] Fix error in routing batch insert plannode (#7481)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 68d8f52352 [IOTDB-4547] Fix error in routing batch insert plannode (#7481)
68d8f52352 is described below
commit 68d8f52352b0852019c61f136acff26dffb0e076
Author: Mrquan <50...@users.noreply.github.com>
AuthorDate: Wed Oct 5 08:58:58 2022 +0800
[IOTDB-4547] Fix error in routing batch insert plannode (#7481)
---
.../iotdb/session/IoTDBSessionComplexIT.java | 220 +++++++++++++++++----
.../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java | 29 ++-
2 files changed, 207 insertions(+), 42 deletions(-)
diff --git a/integration-test/src/test/java/org/apache/iotdb/session/IoTDBSessionComplexIT.java b/integration-test/src/test/java/org/apache/iotdb/session/IoTDBSessionComplexIT.java
index ef49cef0a7..d63840c28e 100644
--- a/integration-test/src/test/java/org/apache/iotdb/session/IoTDBSessionComplexIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/session/IoTDBSessionComplexIT.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.session;
import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.trigger.service.TriggerRegistrationService;
import org.apache.iotdb.db.exception.TriggerManagementException;
import org.apache.iotdb.db.metadata.idtable.trigger_example.Counter;
@@ -50,7 +51,10 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -89,18 +93,19 @@ public class IoTDBSessionComplexIT {
private void createTimeseries(ISession session)
throws StatementExecutionException, IoTDBConnectionException {
- session.createTimeseries(
- "root.sg1.d1.s1", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
- session.createTimeseries(
- "root.sg1.d1.s2", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
- session.createTimeseries(
- "root.sg1.d1.s3", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
- session.createTimeseries(
- "root.sg1.d2.s1", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
- session.createTimeseries(
- "root.sg1.d2.s2", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
- session.createTimeseries(
- "root.sg1.d2.s3", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
+ createTimeseries(session, Arrays.asList("root.sg1.d1", "root.sg1.d2"));
+ }
+
+ private void createTimeseries(ISession session, List<String> deviceIds)
+ throws IoTDBConnectionException, StatementExecutionException {
+ for (String device : deviceIds) {
+ session.createTimeseries(
+ device + ".s1", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
+ session.createTimeseries(
+ device + ".s2", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
+ session.createTimeseries(
+ device + ".s3", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
+ }
}
private void insertByStr(ISession session)
@@ -263,6 +268,127 @@ public class IoTDBSessionComplexIT {
}
}
+ private void insertRecords(ISession session, List<String> deviceIdList)
+ throws IoTDBConnectionException, StatementExecutionException {
+ long timePartitionForRouting =
+ IoTDBDescriptor.getInstance().getConfig().getTimePartitionIntervalForRouting();
+
+ List<String> measurements = new ArrayList<>();
+ measurements.add("s1");
+ measurements.add("s2");
+ measurements.add("s3");
+ List<String> deviceIds = new ArrayList<>();
+ List<List<String>> measurementsList = new ArrayList<>();
+ List<List<Object>> valuesList = new ArrayList<>();
+ List<Long> timestamps = new ArrayList<>();
+ List<List<TSDataType>> typesList = new ArrayList<>();
+
+ for (long time = 0; time < 10 * timePartitionForRouting; time += timePartitionForRouting / 10) {
+ List<Object> values = new ArrayList<>();
+ List<TSDataType> types = new ArrayList<>();
+ values.add(1L);
+ values.add(2L);
+ values.add(3L);
+ types.add(TSDataType.INT64);
+ types.add(TSDataType.INT64);
+ types.add(TSDataType.INT64);
+
+ deviceIds.addAll(deviceIdList);
+
+ measurementsList.add(measurements);
+ measurementsList.add(measurements);
+
+ valuesList.add(values);
+ valuesList.add(values);
+ typesList.add(types);
+ typesList.add(types);
+ timestamps.add(time);
+ timestamps.add(time);
+
+ if (time != 0 && time % (5 * timePartitionForRouting) == 0) {
+ session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
+ deviceIds.clear();
+ measurementsList.clear();
+ valuesList.clear();
+ typesList.clear();
+ timestamps.clear();
+ }
+ }
+
+ session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
+ }
+
+ private void insertMultiTablets(ISession session, List<String> deviceIdList)
+ throws IoTDBConnectionException, StatementExecutionException {
+ long timePartitionForRouting =
+ IoTDBDescriptor.getInstance().getConfig().getTimePartitionIntervalForRouting();
+ List<MeasurementSchema> schemaList = new ArrayList<>();
+ schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
+ schemaList.add(new MeasurementSchema("s2", TSDataType.INT64));
+ schemaList.add(new MeasurementSchema("s3", TSDataType.INT64));
+
+ Map<String, Tablet> tabletMap = new HashMap<>();
+ for (String device : deviceIdList) {
+ tabletMap.put(device, new Tablet(device, schemaList, 100));
+ }
+
+ for (long time = 0; time < 10 * timePartitionForRouting; time += timePartitionForRouting / 10) {
+ for (Tablet tablet : tabletMap.values()) {
+ long value = 0;
+ tablet.addTimestamp(tablet.rowSize, time);
+ for (int s = 0; s < 3; s++) {
+ tablet.addValue(schemaList.get(s).getMeasurementId(), tablet.rowSize, value);
+ value++;
+ }
+ tablet.rowSize++;
+ }
+ }
+
+ session.insertTablets(tabletMap);
+ }
+
+ private void insertRecordsOfOneDevice(ISession session, String deviceId)
+ throws IoTDBConnectionException, StatementExecutionException {
+ long timePartitionForRouting =
+ IoTDBDescriptor.getInstance().getConfig().getTimePartitionIntervalForRouting();
+
+ List<String> measurements = new ArrayList<>();
+ measurements.add("s1");
+ measurements.add("s2");
+ measurements.add("s3");
+ List<List<String>> measurementsList = new ArrayList<>();
+ List<List<Object>> valuesList = new ArrayList<>();
+ List<Long> timestamps = new ArrayList<>();
+ List<List<TSDataType>> typesList = new ArrayList<>();
+
+ for (long time = 0; time < 10 * timePartitionForRouting; time += timePartitionForRouting / 10) {
+ List<Object> values = new ArrayList<>();
+ List<TSDataType> types = new ArrayList<>();
+ values.add(1L);
+ values.add(2L);
+ values.add(3L);
+ types.add(TSDataType.INT64);
+ types.add(TSDataType.INT64);
+ types.add(TSDataType.INT64);
+
+ measurementsList.add(measurements);
+ valuesList.add(values);
+ typesList.add(types);
+ timestamps.add(time);
+
+ if (time != 0 && time % (5 * timePartitionForRouting) == 0) {
+ session.insertRecordsOfOneDevice(
+ deviceId, timestamps, measurementsList, typesList, valuesList);
+ measurementsList.clear();
+ valuesList.clear();
+ typesList.clear();
+ timestamps.clear();
+ }
+ }
+
+ session.insertRecordsOfOneDevice(deviceId, timestamps, measurementsList, typesList, valuesList);
+ }
+
@Test
@Category({LocalStandaloneIT.class, ClusterIT.class})
public void testBatchInsertSeqAndUnseq()
@@ -277,40 +403,41 @@ public class IoTDBSessionComplexIT {
session.executeNonQueryStatement("MERGE");
session.executeNonQueryStatement("FULL MERGE");
- queryForBatch();
+ List<String> deviceIds = new ArrayList<>();
+ queryForBatch(Collections.singletonList("root.sg1.d1"), 400);
+
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
- private void queryForBatch() throws SQLException {
- List<String> standards =
- Arrays.asList(
- "Time",
- "root.sg1.d1.s1",
- "root.sg1.d1.s2",
- "root.sg1.d1.s3",
- "root.sg1.d2.s1",
- "root.sg1.d2.s2",
- "root.sg1.d2.s3");
+ private void queryForBatch(List<String> deviceIds, int pointNumPerDevice) throws SQLException {
+ List<String> standards = new ArrayList<>();
+ standards.add("Time");
+ for (String device : deviceIds) {
+ standards.add(device + ".s1");
+ standards.add(device + ".s2");
+ standards.add(device + ".s3");
+ }
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
- ResultSet resultSet = statement.executeQuery("SELECT * FROM root.**");
- final ResultSetMetaData metaData = resultSet.getMetaData();
- final int colCount = metaData.getColumnCount();
- for (int i = 0; i < colCount; i++) {
- assertTrue(standards.contains(metaData.getColumnLabel(i + 1)));
- }
-
- int count = 0;
- while (resultSet.next()) {
- for (int i = 1; i <= colCount; i++) {
- count++;
+ for (String device : deviceIds) {
+ ResultSet resultSet = statement.executeQuery("SELECT * FROM " + device);
+ final ResultSetMetaData metaData = resultSet.getMetaData();
+ final int colCount = metaData.getColumnCount();
+ for (int i = 0; i < colCount; i++) {
+ assertTrue(standards.contains(metaData.getColumnLabel(i + 1)));
}
+ int count = 0;
+ while (resultSet.next()) {
+ for (int i = 1; i <= colCount; i++) {
+ count++;
+ }
+ }
+ assertEquals(pointNumPerDevice, count);
}
- assertEquals(700, count);
}
}
@@ -404,4 +531,27 @@ public class IoTDBSessionComplexIT {
assertEquals("NodeUrl Incorrect format", e.getMessage());
}
}
+
+ @Test
+ @Category({ClusterIT.class})
+ public void insertWithMultipleTimeSlotsTest() {
+
+ try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
+ createTimeseries(session, Arrays.asList("root.sg1.d1", "root.sg1.d2"));
+ insertRecords(session, Arrays.asList("root.sg1.d1", "root.sg1.d2"));
+ queryForBatch(Arrays.asList("root.sg1.d1", "root.sg1.d2"), 400);
+
+ createTimeseries(session, Arrays.asList("root.sg2.d1", "root.sg2.d2"));
+ insertMultiTablets(session, Arrays.asList("root.sg2.d1", "root.sg2.d2"));
+ queryForBatch(Arrays.asList("root.sg2.d1", "root.sg2.d2"), 400);
+
+ createTimeseries(session, Collections.singletonList("root.sg3.d1"));
+ insertRecordsOfOneDevice(session, "root.sg3.d1");
+ queryForBatch(Collections.singletonList("root.sg3.d1"), 400);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index 5f51228abb..585cc5ff1e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -1267,11 +1267,19 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
InsertRowsStatement insertRowsStatement, MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
- List<DataPartitionQueryParam> dataPartitionQueryParams = new ArrayList<>();
+ Map<String, Set<TTimePartitionSlot>> dataPartitionQueryParamMap = new HashMap<>();
for (InsertRowStatement insertRowStatement : insertRowsStatement.getInsertRowStatementList()) {
+ Set<TTimePartitionSlot> timePartitionSlotSet =
+ dataPartitionQueryParamMap.computeIfAbsent(
+ insertRowStatement.getDevicePath().getFullPath(), k -> new HashSet());
+ timePartitionSlotSet.addAll(insertRowStatement.getTimePartitionSlots());
+ }
+
+ List<DataPartitionQueryParam> dataPartitionQueryParams = new ArrayList<>();
+ for (Map.Entry<String, Set<TTimePartitionSlot>> entry : dataPartitionQueryParamMap.entrySet()) {
DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
- dataPartitionQueryParam.setDevicePath(insertRowStatement.getDevicePath().getFullPath());
- dataPartitionQueryParam.setTimePartitionSlotList(insertRowStatement.getTimePartitionSlots());
+ dataPartitionQueryParam.setDevicePath(entry.getKey());
+ dataPartitionQueryParam.setTimePartitionSlotList(new ArrayList<>(entry.getValue()));
dataPartitionQueryParams.add(dataPartitionQueryParam);
}
@@ -1290,13 +1298,20 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
InsertMultiTabletsStatement insertMultiTabletsStatement, MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
- List<DataPartitionQueryParam> dataPartitionQueryParams = new ArrayList<>();
+ Map<String, Set<TTimePartitionSlot>> dataPartitionQueryParamMap = new HashMap<>();
for (InsertTabletStatement insertTabletStatement :
insertMultiTabletsStatement.getInsertTabletStatementList()) {
+ Set<TTimePartitionSlot> timePartitionSlotSet =
+ dataPartitionQueryParamMap.computeIfAbsent(
+ insertTabletStatement.getDevicePath().getFullPath(), k -> new HashSet());
+ timePartitionSlotSet.addAll(insertTabletStatement.getTimePartitionSlots());
+ }
+
+ List<DataPartitionQueryParam> dataPartitionQueryParams = new ArrayList<>();
+ for (Map.Entry<String, Set<TTimePartitionSlot>> entry : dataPartitionQueryParamMap.entrySet()) {
DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
- dataPartitionQueryParam.setDevicePath(insertTabletStatement.getDevicePath().getFullPath());
- dataPartitionQueryParam.setTimePartitionSlotList(
- insertTabletStatement.getTimePartitionSlots());
+ dataPartitionQueryParam.setDevicePath(entry.getKey());
+ dataPartitionQueryParam.setTimePartitionSlotList(new ArrayList<>(entry.getValue()));
dataPartitionQueryParams.add(dataPartitionQueryParam);
}