You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/10/05 00:59:04 UTC

[iotdb] branch master updated: [IOTDB-4547] Fix error in routing batch insert plannode (#7481)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 68d8f52352 [IOTDB-4547] Fix error in routing batch insert plannode (#7481)
68d8f52352 is described below

commit 68d8f52352b0852019c61f136acff26dffb0e076
Author: Mrquan <50...@users.noreply.github.com>
AuthorDate: Wed Oct 5 08:58:58 2022 +0800

    [IOTDB-4547] Fix error in routing batch insert plannode (#7481)
---
 .../iotdb/session/IoTDBSessionComplexIT.java       | 220 +++++++++++++++++----
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  |  29 ++-
 2 files changed, 207 insertions(+), 42 deletions(-)

diff --git a/integration-test/src/test/java/org/apache/iotdb/session/IoTDBSessionComplexIT.java b/integration-test/src/test/java/org/apache/iotdb/session/IoTDBSessionComplexIT.java
index ef49cef0a7..d63840c28e 100644
--- a/integration-test/src/test/java/org/apache/iotdb/session/IoTDBSessionComplexIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/session/IoTDBSessionComplexIT.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.session;
 
 import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.trigger.service.TriggerRegistrationService;
 import org.apache.iotdb.db.exception.TriggerManagementException;
 import org.apache.iotdb.db.metadata.idtable.trigger_example.Counter;
@@ -50,7 +51,10 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -89,18 +93,19 @@ public class IoTDBSessionComplexIT {
 
   private void createTimeseries(ISession session)
       throws StatementExecutionException, IoTDBConnectionException {
-    session.createTimeseries(
-        "root.sg1.d1.s1", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
-    session.createTimeseries(
-        "root.sg1.d1.s2", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
-    session.createTimeseries(
-        "root.sg1.d1.s3", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
-    session.createTimeseries(
-        "root.sg1.d2.s1", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
-    session.createTimeseries(
-        "root.sg1.d2.s2", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
-    session.createTimeseries(
-        "root.sg1.d2.s3", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
+    createTimeseries(session, Arrays.asList("root.sg1.d1", "root.sg1.d2"));
+  }
+
+  private void createTimeseries(ISession session, List<String> deviceIds)
+      throws IoTDBConnectionException, StatementExecutionException {
+    for (String device : deviceIds) {
+      session.createTimeseries(
+          device + ".s1", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
+      session.createTimeseries(
+          device + ".s2", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
+      session.createTimeseries(
+          device + ".s3", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
+    }
   }
 
   private void insertByStr(ISession session)
@@ -263,6 +268,127 @@ public class IoTDBSessionComplexIT {
     }
   }
 
+  private void insertRecords(ISession session, List<String> deviceIdList)
+      throws IoTDBConnectionException, StatementExecutionException {
+    long timePartitionForRouting =
+        IoTDBDescriptor.getInstance().getConfig().getTimePartitionIntervalForRouting();
+
+    List<String> measurements = new ArrayList<>();
+    measurements.add("s1");
+    measurements.add("s2");
+    measurements.add("s3");
+    List<String> deviceIds = new ArrayList<>();
+    List<List<String>> measurementsList = new ArrayList<>();
+    List<List<Object>> valuesList = new ArrayList<>();
+    List<Long> timestamps = new ArrayList<>();
+    List<List<TSDataType>> typesList = new ArrayList<>();
+
+    for (long time = 0; time < 10 * timePartitionForRouting; time += timePartitionForRouting / 10) {
+      List<Object> values = new ArrayList<>();
+      List<TSDataType> types = new ArrayList<>();
+      values.add(1L);
+      values.add(2L);
+      values.add(3L);
+      types.add(TSDataType.INT64);
+      types.add(TSDataType.INT64);
+      types.add(TSDataType.INT64);
+
+      deviceIds.addAll(deviceIdList);
+
+      measurementsList.add(measurements);
+      measurementsList.add(measurements);
+
+      valuesList.add(values);
+      valuesList.add(values);
+      typesList.add(types);
+      typesList.add(types);
+      timestamps.add(time);
+      timestamps.add(time);
+
+      if (time != 0 && time % (5 * timePartitionForRouting) == 0) {
+        session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
+        deviceIds.clear();
+        measurementsList.clear();
+        valuesList.clear();
+        typesList.clear();
+        timestamps.clear();
+      }
+    }
+
+    session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
+  }
+
+  private void insertMultiTablets(ISession session, List<String> deviceIdList)
+      throws IoTDBConnectionException, StatementExecutionException {
+    long timePartitionForRouting =
+        IoTDBDescriptor.getInstance().getConfig().getTimePartitionIntervalForRouting();
+    List<MeasurementSchema> schemaList = new ArrayList<>();
+    schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
+    schemaList.add(new MeasurementSchema("s2", TSDataType.INT64));
+    schemaList.add(new MeasurementSchema("s3", TSDataType.INT64));
+
+    Map<String, Tablet> tabletMap = new HashMap<>();
+    for (String device : deviceIdList) {
+      tabletMap.put(device, new Tablet(device, schemaList, 100));
+    }
+
+    for (long time = 0; time < 10 * timePartitionForRouting; time += timePartitionForRouting / 10) {
+      for (Tablet tablet : tabletMap.values()) {
+        long value = 0;
+        tablet.addTimestamp(tablet.rowSize, time);
+        for (int s = 0; s < 3; s++) {
+          tablet.addValue(schemaList.get(s).getMeasurementId(), tablet.rowSize, value);
+          value++;
+        }
+        tablet.rowSize++;
+      }
+    }
+
+    session.insertTablets(tabletMap);
+  }
+
+  private void insertRecordsOfOneDevice(ISession session, String deviceId)
+      throws IoTDBConnectionException, StatementExecutionException {
+    long timePartitionForRouting =
+        IoTDBDescriptor.getInstance().getConfig().getTimePartitionIntervalForRouting();
+
+    List<String> measurements = new ArrayList<>();
+    measurements.add("s1");
+    measurements.add("s2");
+    measurements.add("s3");
+    List<List<String>> measurementsList = new ArrayList<>();
+    List<List<Object>> valuesList = new ArrayList<>();
+    List<Long> timestamps = new ArrayList<>();
+    List<List<TSDataType>> typesList = new ArrayList<>();
+
+    for (long time = 0; time < 10 * timePartitionForRouting; time += timePartitionForRouting / 10) {
+      List<Object> values = new ArrayList<>();
+      List<TSDataType> types = new ArrayList<>();
+      values.add(1L);
+      values.add(2L);
+      values.add(3L);
+      types.add(TSDataType.INT64);
+      types.add(TSDataType.INT64);
+      types.add(TSDataType.INT64);
+
+      measurementsList.add(measurements);
+      valuesList.add(values);
+      typesList.add(types);
+      timestamps.add(time);
+
+      if (time != 0 && time % (5 * timePartitionForRouting) == 0) {
+        session.insertRecordsOfOneDevice(
+            deviceId, timestamps, measurementsList, typesList, valuesList);
+        measurementsList.clear();
+        valuesList.clear();
+        typesList.clear();
+        timestamps.clear();
+      }
+    }
+
+    session.insertRecordsOfOneDevice(deviceId, timestamps, measurementsList, typesList, valuesList);
+  }
+
   @Test
   @Category({LocalStandaloneIT.class, ClusterIT.class})
   public void testBatchInsertSeqAndUnseq()
@@ -277,40 +403,41 @@ public class IoTDBSessionComplexIT {
       session.executeNonQueryStatement("MERGE");
       session.executeNonQueryStatement("FULL MERGE");
 
-      queryForBatch();
+      List<String> deviceIds = new ArrayList<>();
+      queryForBatch(Collections.singletonList("root.sg1.d1"), 400);
+
     } catch (Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
     }
   }
 
-  private void queryForBatch() throws SQLException {
-    List<String> standards =
-        Arrays.asList(
-            "Time",
-            "root.sg1.d1.s1",
-            "root.sg1.d1.s2",
-            "root.sg1.d1.s3",
-            "root.sg1.d2.s1",
-            "root.sg1.d2.s2",
-            "root.sg1.d2.s3");
+  private void queryForBatch(List<String> deviceIds, int pointNumPerDevice) throws SQLException {
+    List<String> standards = new ArrayList<>();
+    standards.add("Time");
+    for (String device : deviceIds) {
+      standards.add(device + ".s1");
+      standards.add(device + ".s2");
+      standards.add(device + ".s3");
+    }
 
     try (Connection connection = EnvFactory.getEnv().getConnection();
         Statement statement = connection.createStatement()) {
-      ResultSet resultSet = statement.executeQuery("SELECT * FROM root.**");
-      final ResultSetMetaData metaData = resultSet.getMetaData();
-      final int colCount = metaData.getColumnCount();
-      for (int i = 0; i < colCount; i++) {
-        assertTrue(standards.contains(metaData.getColumnLabel(i + 1)));
-      }
-
-      int count = 0;
-      while (resultSet.next()) {
-        for (int i = 1; i <= colCount; i++) {
-          count++;
+      for (String device : deviceIds) {
+        ResultSet resultSet = statement.executeQuery("SELECT * FROM " + device);
+        final ResultSetMetaData metaData = resultSet.getMetaData();
+        final int colCount = metaData.getColumnCount();
+        for (int i = 0; i < colCount; i++) {
+          assertTrue(standards.contains(metaData.getColumnLabel(i + 1)));
         }
+        int count = 0;
+        while (resultSet.next()) {
+          for (int i = 1; i <= colCount; i++) {
+            count++;
+          }
+        }
+        assertEquals(pointNumPerDevice, count);
       }
-      assertEquals(700, count);
     }
   }
 
@@ -404,4 +531,27 @@ public class IoTDBSessionComplexIT {
       assertEquals("NodeUrl Incorrect format", e.getMessage());
     }
   }
+
+  @Test
+  @Category({ClusterIT.class})
+  public void insertWithMultipleTimeSlotsTest() {
+
+    try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
+      createTimeseries(session, Arrays.asList("root.sg1.d1", "root.sg1.d2"));
+      insertRecords(session, Arrays.asList("root.sg1.d1", "root.sg1.d2"));
+      queryForBatch(Arrays.asList("root.sg1.d1", "root.sg1.d2"), 400);
+
+      createTimeseries(session, Arrays.asList("root.sg2.d1", "root.sg2.d2"));
+      insertMultiTablets(session, Arrays.asList("root.sg2.d1", "root.sg2.d2"));
+      queryForBatch(Arrays.asList("root.sg2.d1", "root.sg2.d2"), 400);
+
+      createTimeseries(session, Collections.singletonList("root.sg3.d1"));
+      insertRecordsOfOneDevice(session, "root.sg3.d1");
+      queryForBatch(Collections.singletonList("root.sg3.d1"), 400);
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index 5f51228abb..585cc5ff1e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -1267,11 +1267,19 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
       InsertRowsStatement insertRowsStatement, MPPQueryContext context) {
     context.setQueryType(QueryType.WRITE);
 
-    List<DataPartitionQueryParam> dataPartitionQueryParams = new ArrayList<>();
+    Map<String, Set<TTimePartitionSlot>> dataPartitionQueryParamMap = new HashMap<>();
     for (InsertRowStatement insertRowStatement : insertRowsStatement.getInsertRowStatementList()) {
+      Set<TTimePartitionSlot> timePartitionSlotSet =
+          dataPartitionQueryParamMap.computeIfAbsent(
+              insertRowStatement.getDevicePath().getFullPath(), k -> new HashSet());
+      timePartitionSlotSet.addAll(insertRowStatement.getTimePartitionSlots());
+    }
+
+    List<DataPartitionQueryParam> dataPartitionQueryParams = new ArrayList<>();
+    for (Map.Entry<String, Set<TTimePartitionSlot>> entry : dataPartitionQueryParamMap.entrySet()) {
       DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
-      dataPartitionQueryParam.setDevicePath(insertRowStatement.getDevicePath().getFullPath());
-      dataPartitionQueryParam.setTimePartitionSlotList(insertRowStatement.getTimePartitionSlots());
+      dataPartitionQueryParam.setDevicePath(entry.getKey());
+      dataPartitionQueryParam.setTimePartitionSlotList(new ArrayList<>(entry.getValue()));
       dataPartitionQueryParams.add(dataPartitionQueryParam);
     }
 
@@ -1290,13 +1298,20 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
       InsertMultiTabletsStatement insertMultiTabletsStatement, MPPQueryContext context) {
     context.setQueryType(QueryType.WRITE);
 
-    List<DataPartitionQueryParam> dataPartitionQueryParams = new ArrayList<>();
+    Map<String, Set<TTimePartitionSlot>> dataPartitionQueryParamMap = new HashMap<>();
     for (InsertTabletStatement insertTabletStatement :
         insertMultiTabletsStatement.getInsertTabletStatementList()) {
+      Set<TTimePartitionSlot> timePartitionSlotSet =
+          dataPartitionQueryParamMap.computeIfAbsent(
+              insertTabletStatement.getDevicePath().getFullPath(), k -> new HashSet());
+      timePartitionSlotSet.addAll(insertTabletStatement.getTimePartitionSlots());
+    }
+
+    List<DataPartitionQueryParam> dataPartitionQueryParams = new ArrayList<>();
+    for (Map.Entry<String, Set<TTimePartitionSlot>> entry : dataPartitionQueryParamMap.entrySet()) {
       DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
-      dataPartitionQueryParam.setDevicePath(insertTabletStatement.getDevicePath().getFullPath());
-      dataPartitionQueryParam.setTimePartitionSlotList(
-          insertTabletStatement.getTimePartitionSlots());
+      dataPartitionQueryParam.setDevicePath(entry.getKey());
+      dataPartitionQueryParam.setTimePartitionSlotList(new ArrayList<>(entry.getValue()));
       dataPartitionQueryParams.add(dataPartitionQueryParam);
     }