You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/08/23 01:54:21 UTC
[iotdb] branch master updated: [IOTDB-3938] Accelerate hashcode computation of partialPath for new Standalone data insertion (#7084)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 8bd115fc11 [IOTDB-3938] Accelerate hashcode computation of partialPath for new Standalone data insertion (#7084)
8bd115fc11 is described below
commit 8bd115fc110a015f6cf3296cd9fe6db37f692692
Author: Marcos_Zyk <38...@users.noreply.github.com>
AuthorDate: Tue Aug 23 09:54:15 2022 +0800
[IOTDB-3938] Accelerate hashcode computation of partialPath for new Standalone data insertion (#7084)
---
.../db/integration/IoTDBFlushQueryMergeIT.java | 2 +-
.../org/apache/iotdb/commons/path/PartialPath.java | 6 +-
.../dataregion/HashVirtualPartitioner.java | 2 +-
.../db/mpp/plan/plan/QueryLogicalPlanUtil.java | 184 +++++++++++----------
4 files changed, 106 insertions(+), 88 deletions(-)
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBFlushQueryMergeIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBFlushQueryMergeIT.java
index 64aa2820e4..b80bdb855c 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBFlushQueryMergeIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBFlushQueryMergeIT.java
@@ -185,7 +185,7 @@ public class IoTDBFlushQueryMergeIT {
"FLUSH root.noexist.nodatagroup1,root.notExistGroup1,root.notExistGroup2");
} catch (SQLException sqe) {
String expectedMsg =
- "Storage group is not set for current seriesPath: [root.notExistGroup1,root.notExistGroup2]";
+ "Storage group is not set for current seriesPath: [root.notExistGroup2,root.notExistGroup1]";
sqe.printStackTrace();
Assert.assertTrue(sqe.getMessage().contains(expectedMsg));
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java b/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
index 9f5d5d2a11..8baed98f22 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
@@ -398,7 +398,11 @@ public class PartialPath extends Path implements Comparable<Path>, Cloneable {
@Override
public int hashCode() {
- return this.getFullPath().hashCode();
+ int h = 0;
+ for (String node : nodes) {
+ h += 31 * h + node.hashCode();
+ }
+ return h;
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/dataregion/HashVirtualPartitioner.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/dataregion/HashVirtualPartitioner.java
index f84ae7c5b7..66dd554bae 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/dataregion/HashVirtualPartitioner.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/dataregion/HashVirtualPartitioner.java
@@ -44,7 +44,7 @@ public class HashVirtualPartitioner implements VirtualPartitioner {
}
private int toStorageGroupId(PartialPath deviceId) {
- return Math.abs(deviceId.hashCode() % STORAGE_GROUP_NUM);
+ return Math.abs(deviceId.getFullPath().hashCode() % STORAGE_GROUP_NUM);
}
private static class HashVirtualPartitionerHolder {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
index 7f9c71940b..7f26a71ca0 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
@@ -361,33 +361,52 @@ public class QueryLogicalPlanUtil {
QueryId queryId = new QueryId("test");
List<PlanNode> sourceNodeList = new ArrayList<>();
Filter timeFilter = TimeFilter.gt(100);
+ try {
+ sourceNodeList.add(
+ new AlignedSeriesAggregationScanNode(
+ queryId.genPlanNodeId(),
+ new AlignedPath(
+ "root.sg.d2.a",
+ Arrays.asList("s2", "s1"),
+ ((AlignedPath) schemaMap.get("root.sg.d2.a")).getSchemaList()),
+ Arrays.asList(
+ new AggregationDescriptor(
+ AggregationType.SUM.name().toLowerCase(),
+ AggregationStep.SINGLE,
+ Collections.singletonList(
+ new TimeSeriesOperand(schemaMap.get("root.sg.d2.a.s2")))),
+ new AggregationDescriptor(
+ AggregationType.FIRST_VALUE.name().toLowerCase(),
+ AggregationStep.SINGLE,
+ Collections.singletonList(
+ new TimeSeriesOperand(schemaMap.get("root.sg.d2.a.s1"))))),
+ Ordering.ASC,
+ null));
+ } catch (IllegalPathException e) {
+ e.printStackTrace();
+ }
sourceNodeList.add(
- new AlignedSeriesAggregationScanNode(
+ new SeriesAggregationScanNode(
queryId.genPlanNodeId(),
- (AlignedPath) schemaMap.get("root.sg.d2.a"),
- Arrays.asList(
- new AggregationDescriptor(
- AggregationType.FIRST_VALUE.name().toLowerCase(),
- AggregationStep.SINGLE,
- Collections.singletonList(
- new TimeSeriesOperand(schemaMap.get("root.sg.d2.a.s1")))),
+ (MeasurementPath) schemaMap.get("root.sg.d1.s2"),
+ Collections.singletonList(
new AggregationDescriptor(
AggregationType.SUM.name().toLowerCase(),
AggregationStep.SINGLE,
Collections.singletonList(
- new TimeSeriesOperand(schemaMap.get("root.sg.d2.a.s2"))))),
+ new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2"))))),
Ordering.ASC,
null));
sourceNodeList.add(
new SeriesAggregationScanNode(
queryId.genPlanNodeId(),
- (MeasurementPath) schemaMap.get("root.sg.d2.s1"),
+ (MeasurementPath) schemaMap.get("root.sg.d2.s2"),
Collections.singletonList(
new AggregationDescriptor(
- AggregationType.FIRST_VALUE.name().toLowerCase(),
+ AggregationType.SUM.name().toLowerCase(),
AggregationStep.SINGLE,
Collections.singletonList(
- new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1"))))),
+ new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2"))))),
Ordering.ASC,
null));
sourceNodeList.add(
@@ -405,25 +424,13 @@ public class QueryLogicalPlanUtil {
sourceNodeList.add(
new SeriesAggregationScanNode(
queryId.genPlanNodeId(),
- (MeasurementPath) schemaMap.get("root.sg.d2.s2"),
- Collections.singletonList(
- new AggregationDescriptor(
- AggregationType.SUM.name().toLowerCase(),
- AggregationStep.SINGLE,
- Collections.singletonList(
- new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2"))))),
- Ordering.ASC,
- null));
- sourceNodeList.add(
- new SeriesAggregationScanNode(
- queryId.genPlanNodeId(),
- (MeasurementPath) schemaMap.get("root.sg.d1.s2"),
+ (MeasurementPath) schemaMap.get("root.sg.d2.s1"),
Collections.singletonList(
new AggregationDescriptor(
- AggregationType.SUM.name().toLowerCase(),
+ AggregationType.FIRST_VALUE.name().toLowerCase(),
AggregationStep.SINGLE,
Collections.singletonList(
- new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2"))))),
+ new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1"))))),
Ordering.ASC,
null));
sourceNodeList.add(
@@ -489,26 +496,57 @@ public class QueryLogicalPlanUtil {
QueryId queryId = new QueryId("test");
List<PlanNode> sourceNodeList = new ArrayList<>();
Filter timeFilter = TimeFilter.gt(100);
+ try {
+ sourceNodeList.add(
+ new AlignedSeriesAggregationScanNode(
+ queryId.genPlanNodeId(),
+ new AlignedPath(
+ "root.sg.d2.a",
+ Arrays.asList("s2", "s1"),
+ ((AlignedPath) schemaMap.get("root.sg.d2.a")).getSchemaList()),
+ Arrays.asList(
+ new AggregationDescriptor(
+ AggregationType.MAX_VALUE.name().toLowerCase(),
+ AggregationStep.PARTIAL,
+ Collections.singletonList(
+ new TimeSeriesOperand(schemaMap.get("root.sg.d2.a.s2")))),
+ new AggregationDescriptor(
+ AggregationType.LAST_VALUE.name().toLowerCase(),
+ AggregationStep.PARTIAL,
+ Collections.singletonList(
+ new TimeSeriesOperand(schemaMap.get("root.sg.d2.a.s1")))),
+ new AggregationDescriptor(
+ AggregationType.COUNT.name().toLowerCase(),
+ AggregationStep.PARTIAL,
+ Collections.singletonList(
+ new TimeSeriesOperand(schemaMap.get("root.sg.d2.a.s1"))))),
+ Ordering.DESC,
+ null));
+ } catch (IllegalPathException e) {
+ e.printStackTrace();
+ }
sourceNodeList.add(
- new AlignedSeriesAggregationScanNode(
+ new SeriesAggregationScanNode(
queryId.genPlanNodeId(),
- (AlignedPath) schemaMap.get("root.sg.d2.a"),
- Arrays.asList(
- new AggregationDescriptor(
- AggregationType.LAST_VALUE.name().toLowerCase(),
- AggregationStep.PARTIAL,
- Collections.singletonList(
- new TimeSeriesOperand(schemaMap.get("root.sg.d2.a.s1")))),
+ (MeasurementPath) schemaMap.get("root.sg.d1.s2"),
+ Collections.singletonList(
new AggregationDescriptor(
- AggregationType.COUNT.name().toLowerCase(),
+ AggregationType.MAX_VALUE.name().toLowerCase(),
AggregationStep.PARTIAL,
Collections.singletonList(
- new TimeSeriesOperand(schemaMap.get("root.sg.d2.a.s1")))),
+ new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2"))))),
+ Ordering.DESC,
+ null));
+ sourceNodeList.add(
+ new SeriesAggregationScanNode(
+ queryId.genPlanNodeId(),
+ (MeasurementPath) schemaMap.get("root.sg.d2.s2"),
+ Collections.singletonList(
new AggregationDescriptor(
AggregationType.MAX_VALUE.name().toLowerCase(),
AggregationStep.PARTIAL,
Collections.singletonList(
- new TimeSeriesOperand(schemaMap.get("root.sg.d2.a.s2"))))),
+ new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2"))))),
Ordering.DESC,
null));
sourceNodeList.add(
@@ -545,30 +583,6 @@ public class QueryLogicalPlanUtil {
new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1"))))),
Ordering.DESC,
null));
- sourceNodeList.add(
- new SeriesAggregationScanNode(
- queryId.genPlanNodeId(),
- (MeasurementPath) schemaMap.get("root.sg.d2.s2"),
- Collections.singletonList(
- new AggregationDescriptor(
- AggregationType.MAX_VALUE.name().toLowerCase(),
- AggregationStep.PARTIAL,
- Collections.singletonList(
- new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2"))))),
- Ordering.DESC,
- null));
- sourceNodeList.add(
- new SeriesAggregationScanNode(
- queryId.genPlanNodeId(),
- (MeasurementPath) schemaMap.get("root.sg.d1.s2"),
- Collections.singletonList(
- new AggregationDescriptor(
- AggregationType.MAX_VALUE.name().toLowerCase(),
- AggregationStep.PARTIAL,
- Collections.singletonList(
- new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2"))))),
- Ordering.DESC,
- null));
sourceNodeList.forEach(
node -> {
if (node instanceof SeriesAggregationScanNode) {
@@ -641,6 +655,18 @@ public class QueryLogicalPlanUtil {
QueryId queryId = new QueryId("test");
Filter timeFilter = TimeFilter.gt(100);
List<PlanNode> sourceNodeList1 = new ArrayList<>();
+ sourceNodeList1.add(
+ new SeriesAggregationScanNode(
+ queryId.genPlanNodeId(),
+ (MeasurementPath) schemaMap.get("root.sg.d1.s2"),
+ Collections.singletonList(
+ new AggregationDescriptor(
+ AggregationType.MAX_VALUE.name().toLowerCase(),
+ AggregationStep.SINGLE,
+ Collections.singletonList(
+ new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2"))))),
+ Ordering.DESC,
+ null));
sourceNodeList1.add(
new SeriesAggregationScanNode(
queryId.genPlanNodeId(),
@@ -658,24 +684,24 @@ public class QueryLogicalPlanUtil {
new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1"))))),
Ordering.DESC,
null));
- sourceNodeList1.add(
+ sourceNodeList1.forEach(node -> ((SeriesAggregationScanNode) node).setTimeFilter(timeFilter));
+
+ TimeJoinNode timeJoinNode1 =
+ new TimeJoinNode(queryId.genPlanNodeId(), Ordering.DESC, sourceNodeList1);
+
+ List<PlanNode> sourceNodeList2 = new ArrayList<>();
+ sourceNodeList2.add(
new SeriesAggregationScanNode(
queryId.genPlanNodeId(),
- (MeasurementPath) schemaMap.get("root.sg.d1.s2"),
+ (MeasurementPath) schemaMap.get("root.sg.d2.s2"),
Collections.singletonList(
new AggregationDescriptor(
AggregationType.MAX_VALUE.name().toLowerCase(),
AggregationStep.SINGLE,
Collections.singletonList(
- new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2"))))),
+ new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2"))))),
Ordering.DESC,
null));
- sourceNodeList1.forEach(node -> ((SeriesAggregationScanNode) node).setTimeFilter(timeFilter));
-
- TimeJoinNode timeJoinNode1 =
- new TimeJoinNode(queryId.genPlanNodeId(), Ordering.DESC, sourceNodeList1);
-
- List<PlanNode> sourceNodeList2 = new ArrayList<>();
sourceNodeList2.add(
new SeriesAggregationScanNode(
queryId.genPlanNodeId(),
@@ -693,26 +719,14 @@ public class QueryLogicalPlanUtil {
new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1"))))),
Ordering.DESC,
null));
- sourceNodeList2.add(
- new SeriesAggregationScanNode(
- queryId.genPlanNodeId(),
- (MeasurementPath) schemaMap.get("root.sg.d2.s2"),
- Collections.singletonList(
- new AggregationDescriptor(
- AggregationType.MAX_VALUE.name().toLowerCase(),
- AggregationStep.SINGLE,
- Collections.singletonList(
- new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2"))))),
- Ordering.DESC,
- null));
sourceNodeList2.forEach(node -> ((SeriesAggregationScanNode) node).setTimeFilter(timeFilter));
TimeJoinNode timeJoinNode2 =
new TimeJoinNode(queryId.genPlanNodeId(), Ordering.DESC, sourceNodeList2);
Map<String, List<Integer>> deviceToMeasurementIndexesMap = new HashMap<>();
- deviceToMeasurementIndexesMap.put("root.sg.d1", Arrays.asList(1, 3, 2));
- deviceToMeasurementIndexesMap.put("root.sg.d2", Arrays.asList(1, 3, 2));
+ deviceToMeasurementIndexesMap.put("root.sg.d1", Arrays.asList(2, 1, 3));
+ deviceToMeasurementIndexesMap.put("root.sg.d2", Arrays.asList(2, 1, 3));
DeviceViewNode deviceViewNode =
new DeviceViewNode(
queryId.genPlanNodeId(),