You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2023/02/08 14:18:52 UTC
[iotdb] branch rel/1.0 updated: [To rel/1.0] Correct DataPartiton Fetch request parameter contruction (#9019)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch rel/1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.0 by this push:
new 0b65f4d5a8 [To rel/1.0] Correct DataPartiton Fetch request parameter contruction (#9019)
0b65f4d5a8 is described below
commit 0b65f4d5a803d4b83681e83345b2de05ea500eeb
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Wed Feb 8 22:18:46 2023 +0800
[To rel/1.0] Correct DataPartiton Fetch request parameter contruction (#9019)
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +-
.../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java | 6 +-
.../mpp/plan/analyze/ClusterPartitionFetcher.java | 76 ++++++++++++++++++----
.../distribution/DistributionPlanContext.java | 14 ++++
.../plan/planner/distribution/SourceRewriter.java | 40 +++++++++---
.../iotdb/db/mpp/plan/analyze/AnalyzeTest.java | 6 +-
6 files changed, 115 insertions(+), 29 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 6356449867..2ba70d2f05 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -905,7 +905,7 @@ public class IoTDBConfig {
* series partition
*/
private String seriesPartitionExecutorClass =
- "org.apache.iotdb.commons.partition.executor.hash.APHashExecutor";
+ "org.apache.iotdb.commons.partition.executor.hash.BKDRHashExecutor";
/** The number of series partitions in a database */
private int seriesPartitionSlotNum = 10000;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index 42c7aedcc7..4f5b7d0598 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -1706,7 +1706,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
for (InsertRowStatement insertRowStatement : insertRowsStatement.getInsertRowStatementList()) {
Set<TTimePartitionSlot> timePartitionSlotSet =
dataPartitionQueryParamMap.computeIfAbsent(
- insertRowStatement.getDevicePath().getFullPath(), k -> new HashSet());
+ insertRowStatement.getDevicePath().getFullPath(), k -> new HashSet<>());
timePartitionSlotSet.addAll(insertRowStatement.getTimePartitionSlots());
}
@@ -1731,7 +1731,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
insertMultiTabletsStatement.getInsertTabletStatementList()) {
Set<TTimePartitionSlot> timePartitionSlotSet =
dataPartitionQueryParamMap.computeIfAbsent(
- insertTabletStatement.getDevicePath().getFullPath(), k -> new HashSet());
+ insertTabletStatement.getDevicePath().getFullPath(), k -> new HashSet<>());
timePartitionSlotSet.addAll(insertTabletStatement.getTimePartitionSlots());
}
@@ -2410,7 +2410,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
context.setQueryType(QueryType.WRITE);
List<List<String>> measurementsList = createTemplateStatement.getMeasurements();
- for (List measurements : measurementsList) {
+ for (List<String> measurements : measurementsList) {
Set<String> measurementsSet = new HashSet<>(measurements);
if (measurementsSet.size() < measurements.size()) {
throw new SemanticException(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
index 029c878114..472dc5b695 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
@@ -58,9 +58,11 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
public class ClusterPartitionFetcher implements IPartitionFetcher {
@@ -180,7 +182,7 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
try (ConfigNodeClient client =
configNodeClientManager.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
TDataPartitionTableResp dataPartitionTableResp =
- client.getDataPartitionTable(constructDataPartitionReq(sgNameToQueryParamsMap));
+ client.getDataPartitionTable(constructDataPartitionReqForQuery(sgNameToQueryParamsMap));
if (dataPartitionTableResp.getStatus().getCode()
== TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
dataPartition = parseDataPartitionResp(dataPartitionTableResp);
@@ -207,7 +209,7 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
try (ConfigNodeClient client =
configNodeClientManager.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
TDataPartitionTableResp dataPartitionTableResp =
- client.getDataPartitionTable(constructDataPartitionReq(sgNameToQueryParamsMap));
+ client.getDataPartitionTable(constructDataPartitionReqForQuery(sgNameToQueryParamsMap));
if (dataPartitionTableResp.getStatus().getCode()
== TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return parseDataPartitionResp(dataPartitionTableResp);
@@ -260,9 +262,8 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
if (null == dataPartition) {
try (ConfigNodeClient client =
configNodeClientManager.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
- TDataPartitionTableResp dataPartitionTableResp =
- client.getOrCreateDataPartitionTable(
- constructDataPartitionReq(splitDataPartitionQueryParams));
+ TDataPartitionReq req = constructDataPartitionReq(splitDataPartitionQueryParams);
+ TDataPartitionTableResp dataPartitionTableResp = client.getOrCreateDataPartitionTable(req);
if (dataPartitionTableResp.getStatus().getCode()
== TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -346,6 +347,22 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
}
}
+ private static class ComplexTimeSlotList {
+ Set<TTimePartitionSlot> timeSlotList;
+ boolean needLeftAll;
+ boolean needRightAll;
+
+ private ComplexTimeSlotList(boolean needLeftAll, boolean needRightAll) {
+ timeSlotList = new HashSet<>();
+ this.needLeftAll = needLeftAll;
+ this.needRightAll = needRightAll;
+ }
+
+ private void putTimeSlot(List<TTimePartitionSlot> slotList) {
+ timeSlotList.addAll(slotList);
+ }
+ }
+
private TDataPartitionReq constructDataPartitionReq(
Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap = new HashMap<>();
@@ -353,15 +370,50 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
sgNameToQueryParamsMap.entrySet()) {
// for each sg
Map<TSeriesPartitionSlot, TTimeSlotList> deviceToTimePartitionMap = new HashMap<>();
+
+ Map<TSeriesPartitionSlot, ComplexTimeSlotList> seriesSlotTimePartitionMap = new HashMap<>();
+
+ for (DataPartitionQueryParam queryParam : entry.getValue()) {
+ seriesSlotTimePartitionMap
+ .computeIfAbsent(
+ partitionExecutor.getSeriesPartitionSlot(queryParam.getDevicePath()),
+ k ->
+ new ComplexTimeSlotList(
+ queryParam.isNeedLeftAll(), queryParam.isNeedRightAll()))
+ .putTimeSlot(queryParam.getTimePartitionSlotList());
+ }
+ seriesSlotTimePartitionMap.forEach(
+ (k, v) ->
+ deviceToTimePartitionMap.put(
+ k,
+ new TTimeSlotList(
+ new ArrayList<>(v.timeSlotList), v.needLeftAll, v.needRightAll)));
+ partitionSlotsMap.put(entry.getKey(), deviceToTimePartitionMap);
+ }
+ return new TDataPartitionReq(partitionSlotsMap);
+ }
+
+ /** For query, DataPartitionQueryParam is shared by each device */
+ private TDataPartitionReq constructDataPartitionReqForQuery(
+ Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
+ Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap = new HashMap<>();
+ TTimeSlotList sharedTTimeSlotList = null;
+ for (Map.Entry<String, List<DataPartitionQueryParam>> entry :
+ sgNameToQueryParamsMap.entrySet()) {
+ // for each sg
+ Map<TSeriesPartitionSlot, TTimeSlotList> deviceToTimePartitionMap = new HashMap<>();
+
for (DataPartitionQueryParam queryParam : entry.getValue()) {
- TTimeSlotList timePartitionSlotList =
- new TTimeSlotList(
- queryParam.getTimePartitionSlotList(),
- queryParam.isNeedLeftAll(),
- queryParam.isNeedRightAll());
- deviceToTimePartitionMap.put(
+ if (sharedTTimeSlotList == null) {
+ sharedTTimeSlotList =
+ new TTimeSlotList(
+ queryParam.getTimePartitionSlotList(),
+ queryParam.isNeedLeftAll(),
+ queryParam.isNeedRightAll());
+ }
+ deviceToTimePartitionMap.putIfAbsent(
partitionExecutor.getSeriesPartitionSlot(queryParam.getDevicePath()),
- timePartitionSlotList);
+ sharedTTimeSlotList);
}
partitionSlotsMap.put(entry.getKey(), deviceToTimePartitionMap);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanContext.java
index 32de442e65..c35c4a72ac 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanContext.java
@@ -20,6 +20,9 @@
package org.apache.iotdb.db.mpp.plan.planner.distribution;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.plan.expression.Expression;
+
+import java.util.Map;
public class DistributionPlanContext {
protected boolean isRoot;
@@ -32,6 +35,9 @@ public class DistributionPlanContext {
// DataRegions
protected boolean queryMultiRegion;
+ // used by group by level
+ private Map<String, Expression> columnNameToExpression;
+
protected DistributionPlanContext(MPPQueryContext queryContext) {
this.isRoot = true;
this.queryContext = queryContext;
@@ -62,4 +68,12 @@ public class DistributionPlanContext {
public void setQueryMultiRegion(boolean queryMultiRegion) {
this.queryMultiRegion = queryMultiRegion;
}
+
+ public Map<String, Expression> getColumnNameToExpression() {
+ return columnNameToExpression;
+ }
+
+ public void setColumnNameToExpression(Map<String, Expression> columnNameToExpression) {
+ this.columnNameToExpression = columnNameToExpression;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index 799b311551..aa11eb80c6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -749,6 +749,18 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
: groupSourcesForGroupByLevel(root, sourceGroup, context);
// Then, we calculate the attributes for GroupByLevelNode in each level
+ Map<String, Expression> columnNameToExpression = new HashMap<>();
+ for (CrossSeriesAggregationDescriptor originalDescriptor :
+ newRoot.getGroupByLevelDescriptors()) {
+ for (Expression exp : originalDescriptor.getInputExpressions()) {
+ columnNameToExpression.put(exp.getExpressionString(), exp);
+ }
+ columnNameToExpression.put(
+ originalDescriptor.getOutputExpression().getExpressionString(),
+ originalDescriptor.getOutputExpression());
+ }
+
+ context.setColumnNameToExpression(columnNameToExpression);
calculateGroupByLevelNodeAttributes(newRoot, 0, context);
return newRoot;
}
@@ -884,21 +896,29 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
// Check every OutputColumn of GroupByLevelNode and set the Expression of corresponding
// AggregationDescriptor
List<CrossSeriesAggregationDescriptor> descriptorList = new ArrayList<>();
+ Map<String, Expression> columnNameToExpression = context.getColumnNameToExpression();
+ Set<Expression> childrenExpressionSet = new HashSet<>();
+ for (String childColumn : childrenOutputColumns) {
+ Expression childExpression =
+ columnNameToExpression.get(
+ childColumn.substring(childColumn.indexOf("(") + 1, childColumn.lastIndexOf(")")));
+ childrenExpressionSet.add(childExpression);
+ }
+
for (CrossSeriesAggregationDescriptor originalDescriptor :
handle.getGroupByLevelDescriptors()) {
Set<Expression> descriptorExpressions = new HashSet<>();
- for (String childColumn : childrenOutputColumns) {
- // If this condition matched, the childColumn should come from GroupByLevelNode
- if (isAggColumnMatchExpression(childColumn, originalDescriptor.getOutputExpression())) {
- descriptorExpressions.add(originalDescriptor.getOutputExpression());
- continue;
- }
- for (Expression exp : originalDescriptor.getInputExpressions()) {
- if (isAggColumnMatchExpression(childColumn, exp)) {
- descriptorExpressions.add(exp);
- }
+
+ if (childrenExpressionSet.contains(originalDescriptor.getOutputExpression())) {
+ descriptorExpressions.add(originalDescriptor.getOutputExpression());
+ }
+
+ for (Expression exp : originalDescriptor.getInputExpressions()) {
+ if (childrenExpressionSet.contains(exp)) {
+ descriptorExpressions.add(exp);
}
}
+
if (descriptorExpressions.isEmpty()) {
continue;
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeTest.java
index d15cdd3ce4..3fbf8f44c1 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeTest.java
@@ -581,13 +581,13 @@ public class AnalyzeTest {
public void testDataPartitionAnalyze() {
Analysis analysis = analyzeSQL("insert into root.sg.d1(timestamp,s) values(1,10),(86401,11)");
Assert.assertEquals(
+ 1,
analysis
.getDataPartitionInfo()
.getDataPartitionMap()
.get("root.sg")
- .get(new TSeriesPartitionSlot(8923))
- .size(),
- 1);
+ .get(new TSeriesPartitionSlot(1107))
+ .size());
}
@Test