You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2023/02/08 14:18:52 UTC

[iotdb] branch rel/1.0 updated: [To rel/1.0] Correct DataPartiton Fetch request parameter contruction (#9019)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch rel/1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.0 by this push:
     new 0b65f4d5a8 [To rel/1.0] Correct DataPartiton Fetch request parameter contruction (#9019)
0b65f4d5a8 is described below

commit 0b65f4d5a803d4b83681e83345b2de05ea500eeb
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Wed Feb 8 22:18:46 2023 +0800

    [To rel/1.0] Correct DataPartiton Fetch request parameter contruction (#9019)
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  2 +-
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  |  6 +-
 .../mpp/plan/analyze/ClusterPartitionFetcher.java  | 76 ++++++++++++++++++----
 .../distribution/DistributionPlanContext.java      | 14 ++++
 .../plan/planner/distribution/SourceRewriter.java  | 40 +++++++++---
 .../iotdb/db/mpp/plan/analyze/AnalyzeTest.java     |  6 +-
 6 files changed, 115 insertions(+), 29 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 6356449867..2ba70d2f05 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -905,7 +905,7 @@ public class IoTDBConfig {
    * series partition
    */
   private String seriesPartitionExecutorClass =
-      "org.apache.iotdb.commons.partition.executor.hash.APHashExecutor";
+      "org.apache.iotdb.commons.partition.executor.hash.BKDRHashExecutor";
 
   /** The number of series partitions in a database */
   private int seriesPartitionSlotNum = 10000;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index 42c7aedcc7..4f5b7d0598 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -1706,7 +1706,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
     for (InsertRowStatement insertRowStatement : insertRowsStatement.getInsertRowStatementList()) {
       Set<TTimePartitionSlot> timePartitionSlotSet =
           dataPartitionQueryParamMap.computeIfAbsent(
-              insertRowStatement.getDevicePath().getFullPath(), k -> new HashSet());
+              insertRowStatement.getDevicePath().getFullPath(), k -> new HashSet<>());
       timePartitionSlotSet.addAll(insertRowStatement.getTimePartitionSlots());
     }
 
@@ -1731,7 +1731,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
         insertMultiTabletsStatement.getInsertTabletStatementList()) {
       Set<TTimePartitionSlot> timePartitionSlotSet =
           dataPartitionQueryParamMap.computeIfAbsent(
-              insertTabletStatement.getDevicePath().getFullPath(), k -> new HashSet());
+              insertTabletStatement.getDevicePath().getFullPath(), k -> new HashSet<>());
       timePartitionSlotSet.addAll(insertTabletStatement.getTimePartitionSlots());
     }
 
@@ -2410,7 +2410,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
 
     context.setQueryType(QueryType.WRITE);
     List<List<String>> measurementsList = createTemplateStatement.getMeasurements();
-    for (List measurements : measurementsList) {
+    for (List<String> measurements : measurementsList) {
       Set<String> measurementsSet = new HashSet<>(measurements);
       if (measurementsSet.size() < measurements.size()) {
         throw new SemanticException(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
index 029c878114..472dc5b695 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
@@ -58,9 +58,11 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 public class ClusterPartitionFetcher implements IPartitionFetcher {
 
@@ -180,7 +182,7 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
       try (ConfigNodeClient client =
           configNodeClientManager.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
         TDataPartitionTableResp dataPartitionTableResp =
-            client.getDataPartitionTable(constructDataPartitionReq(sgNameToQueryParamsMap));
+            client.getDataPartitionTable(constructDataPartitionReqForQuery(sgNameToQueryParamsMap));
         if (dataPartitionTableResp.getStatus().getCode()
             == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
           dataPartition = parseDataPartitionResp(dataPartitionTableResp);
@@ -207,7 +209,7 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
     try (ConfigNodeClient client =
         configNodeClientManager.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
       TDataPartitionTableResp dataPartitionTableResp =
-          client.getDataPartitionTable(constructDataPartitionReq(sgNameToQueryParamsMap));
+          client.getDataPartitionTable(constructDataPartitionReqForQuery(sgNameToQueryParamsMap));
       if (dataPartitionTableResp.getStatus().getCode()
           == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         return parseDataPartitionResp(dataPartitionTableResp);
@@ -260,9 +262,8 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
     if (null == dataPartition) {
       try (ConfigNodeClient client =
           configNodeClientManager.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
-        TDataPartitionTableResp dataPartitionTableResp =
-            client.getOrCreateDataPartitionTable(
-                constructDataPartitionReq(splitDataPartitionQueryParams));
+        TDataPartitionReq req = constructDataPartitionReq(splitDataPartitionQueryParams);
+        TDataPartitionTableResp dataPartitionTableResp = client.getOrCreateDataPartitionTable(req);
 
         if (dataPartitionTableResp.getStatus().getCode()
             == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -346,6 +347,22 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
     }
   }
 
+  private static class ComplexTimeSlotList {
+    Set<TTimePartitionSlot> timeSlotList;
+    boolean needLeftAll;
+    boolean needRightAll;
+
+    private ComplexTimeSlotList(boolean needLeftAll, boolean needRightAll) {
+      timeSlotList = new HashSet<>();
+      this.needLeftAll = needLeftAll;
+      this.needRightAll = needRightAll;
+    }
+
+    private void putTimeSlot(List<TTimePartitionSlot> slotList) {
+      timeSlotList.addAll(slotList);
+    }
+  }
+
   private TDataPartitionReq constructDataPartitionReq(
       Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
     Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap = new HashMap<>();
@@ -353,15 +370,50 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
         sgNameToQueryParamsMap.entrySet()) {
       // for each sg
       Map<TSeriesPartitionSlot, TTimeSlotList> deviceToTimePartitionMap = new HashMap<>();
+
+      Map<TSeriesPartitionSlot, ComplexTimeSlotList> seriesSlotTimePartitionMap = new HashMap<>();
+
+      for (DataPartitionQueryParam queryParam : entry.getValue()) {
+        seriesSlotTimePartitionMap
+            .computeIfAbsent(
+                partitionExecutor.getSeriesPartitionSlot(queryParam.getDevicePath()),
+                k ->
+                    new ComplexTimeSlotList(
+                        queryParam.isNeedLeftAll(), queryParam.isNeedRightAll()))
+            .putTimeSlot(queryParam.getTimePartitionSlotList());
+      }
+      seriesSlotTimePartitionMap.forEach(
+          (k, v) ->
+              deviceToTimePartitionMap.put(
+                  k,
+                  new TTimeSlotList(
+                      new ArrayList<>(v.timeSlotList), v.needLeftAll, v.needRightAll)));
+      partitionSlotsMap.put(entry.getKey(), deviceToTimePartitionMap);
+    }
+    return new TDataPartitionReq(partitionSlotsMap);
+  }
+
+  /** For query, DataPartitionQueryParam is shared by each device */
+  private TDataPartitionReq constructDataPartitionReqForQuery(
+      Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
+    Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap = new HashMap<>();
+    TTimeSlotList sharedTTimeSlotList = null;
+    for (Map.Entry<String, List<DataPartitionQueryParam>> entry :
+        sgNameToQueryParamsMap.entrySet()) {
+      // for each sg
+      Map<TSeriesPartitionSlot, TTimeSlotList> deviceToTimePartitionMap = new HashMap<>();
+
       for (DataPartitionQueryParam queryParam : entry.getValue()) {
-        TTimeSlotList timePartitionSlotList =
-            new TTimeSlotList(
-                queryParam.getTimePartitionSlotList(),
-                queryParam.isNeedLeftAll(),
-                queryParam.isNeedRightAll());
-        deviceToTimePartitionMap.put(
+        if (sharedTTimeSlotList == null) {
+          sharedTTimeSlotList =
+              new TTimeSlotList(
+                  queryParam.getTimePartitionSlotList(),
+                  queryParam.isNeedLeftAll(),
+                  queryParam.isNeedRightAll());
+        }
+        deviceToTimePartitionMap.putIfAbsent(
             partitionExecutor.getSeriesPartitionSlot(queryParam.getDevicePath()),
-            timePartitionSlotList);
+            sharedTTimeSlotList);
       }
       partitionSlotsMap.put(entry.getKey(), deviceToTimePartitionMap);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanContext.java
index 32de442e65..c35c4a72ac 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanContext.java
@@ -20,6 +20,9 @@
 package org.apache.iotdb.db.mpp.plan.planner.distribution;
 
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.plan.expression.Expression;
+
+import java.util.Map;
 
 public class DistributionPlanContext {
   protected boolean isRoot;
@@ -32,6 +35,9 @@ public class DistributionPlanContext {
   // DataRegions
   protected boolean queryMultiRegion;
 
+  // used by group by level
+  private Map<String, Expression> columnNameToExpression;
+
   protected DistributionPlanContext(MPPQueryContext queryContext) {
     this.isRoot = true;
     this.queryContext = queryContext;
@@ -62,4 +68,12 @@ public class DistributionPlanContext {
   public void setQueryMultiRegion(boolean queryMultiRegion) {
     this.queryMultiRegion = queryMultiRegion;
   }
+
+  public Map<String, Expression> getColumnNameToExpression() {
+    return columnNameToExpression;
+  }
+
+  public void setColumnNameToExpression(Map<String, Expression> columnNameToExpression) {
+    this.columnNameToExpression = columnNameToExpression;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index 799b311551..aa11eb80c6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -749,6 +749,18 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
             : groupSourcesForGroupByLevel(root, sourceGroup, context);
 
     // Then, we calculate the attributes for GroupByLevelNode in each level
+    Map<String, Expression> columnNameToExpression = new HashMap<>();
+    for (CrossSeriesAggregationDescriptor originalDescriptor :
+        newRoot.getGroupByLevelDescriptors()) {
+      for (Expression exp : originalDescriptor.getInputExpressions()) {
+        columnNameToExpression.put(exp.getExpressionString(), exp);
+      }
+      columnNameToExpression.put(
+          originalDescriptor.getOutputExpression().getExpressionString(),
+          originalDescriptor.getOutputExpression());
+    }
+
+    context.setColumnNameToExpression(columnNameToExpression);
     calculateGroupByLevelNodeAttributes(newRoot, 0, context);
     return newRoot;
   }
@@ -884,21 +896,29 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
       // Check every OutputColumn of GroupByLevelNode and set the Expression of corresponding
       // AggregationDescriptor
       List<CrossSeriesAggregationDescriptor> descriptorList = new ArrayList<>();
+      Map<String, Expression> columnNameToExpression = context.getColumnNameToExpression();
+      Set<Expression> childrenExpressionSet = new HashSet<>();
+      for (String childColumn : childrenOutputColumns) {
+        Expression childExpression =
+            columnNameToExpression.get(
+                childColumn.substring(childColumn.indexOf("(") + 1, childColumn.lastIndexOf(")")));
+        childrenExpressionSet.add(childExpression);
+      }
+
       for (CrossSeriesAggregationDescriptor originalDescriptor :
           handle.getGroupByLevelDescriptors()) {
         Set<Expression> descriptorExpressions = new HashSet<>();
-        for (String childColumn : childrenOutputColumns) {
-          // If this condition matched, the childColumn should come from GroupByLevelNode
-          if (isAggColumnMatchExpression(childColumn, originalDescriptor.getOutputExpression())) {
-            descriptorExpressions.add(originalDescriptor.getOutputExpression());
-            continue;
-          }
-          for (Expression exp : originalDescriptor.getInputExpressions()) {
-            if (isAggColumnMatchExpression(childColumn, exp)) {
-              descriptorExpressions.add(exp);
-            }
+
+        if (childrenExpressionSet.contains(originalDescriptor.getOutputExpression())) {
+          descriptorExpressions.add(originalDescriptor.getOutputExpression());
+        }
+
+        for (Expression exp : originalDescriptor.getInputExpressions()) {
+          if (childrenExpressionSet.contains(exp)) {
+            descriptorExpressions.add(exp);
           }
         }
+
         if (descriptorExpressions.isEmpty()) {
           continue;
         }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeTest.java
index d15cdd3ce4..3fbf8f44c1 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeTest.java
@@ -581,13 +581,13 @@ public class AnalyzeTest {
   public void testDataPartitionAnalyze() {
     Analysis analysis = analyzeSQL("insert into root.sg.d1(timestamp,s) values(1,10),(86401,11)");
     Assert.assertEquals(
+        1,
         analysis
             .getDataPartitionInfo()
             .getDataPartitionMap()
             .get("root.sg")
-            .get(new TSeriesPartitionSlot(8923))
-            .size(),
-        1);
+            .get(new TSeriesPartitionSlot(1107))
+            .size());
   }
 
   @Test