You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/24 06:40:02 UTC
[iotdb] branch xingtanzjr/delete_timeseries_plan updated: add more attrs for DeleteDataNode
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/delete_timeseries_plan
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/xingtanzjr/delete_timeseries_plan by this push:
new 267cb59603 add more attrs for DeleteDataNode
267cb59603 is described below
commit 267cb596034727866f98b986b6b6ac825f84064a
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Tue May 24 14:39:53 2022 +0800
add more attrs for DeleteDataNode
---
.../iotdb/commons/partition/DataPartition.java | 19 +++++++++
.../commons/partition/RegionReplicaSetInfo.java | 47 ++++++++++++++++++++++
.../db/mpp/plan/planner/LogicalPlanBuilder.java | 6 ++-
.../iotdb/db/mpp/plan/planner/LogicalPlanner.java | 6 +--
.../plan/planner/distribution/SourceRewriter.java | 39 ++++++++++++++++--
.../db/mpp/plan/planner/plan/node/PlanVisitor.java | 5 +++
.../planner/plan/node/write/DeleteDataNode.java | 39 ++++++++++++++----
.../plan/node/write/DeleteDataNodeSerdeTest.java | 19 ++++++++-
8 files changed, 160 insertions(+), 20 deletions(-)
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
index 3fff3e130c..ac0bfa012e 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
@@ -310,4 +310,23 @@ public class DataPartition extends Partition {
storageGroupNum--;
}
}
+
+ public List<RegionReplicaSetInfo> getDataDistributionInfo() {
+ Map<TRegionReplicaSet, RegionReplicaSetInfo> distributionMap = new HashMap<>();
+
+ dataPartitionMap.forEach(
+ (storageGroup, partition) -> {
+ List<TRegionReplicaSet> ret =
+ partition.entrySet().stream()
+ .flatMap(
+ s -> s.getValue().entrySet().stream().flatMap(e -> e.getValue().stream()))
+ .collect(Collectors.toList());
+ for (TRegionReplicaSet regionReplicaSet : ret) {
+ distributionMap
+ .computeIfAbsent(regionReplicaSet, RegionReplicaSetInfo::new)
+ .addStorageGroup(storageGroup);
+ }
+ });
+ return new ArrayList<>(distributionMap.values());
+ }
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSetInfo.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSetInfo.java
new file mode 100644
index 0000000000..298547d866
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSetInfo.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.partition;
+
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class RegionReplicaSetInfo {
+ private TRegionReplicaSet regionReplicaSet;
+ private List<String> ownedStorageGroups;
+
+ public RegionReplicaSetInfo(TRegionReplicaSet regionReplicaSet) {
+ this.regionReplicaSet = regionReplicaSet;
+ this.ownedStorageGroups = new ArrayList<>();
+ }
+
+ public void addStorageGroup(String storageGroup) {
+ ownedStorageGroups.add(storageGroup);
+ }
+
+ public TRegionReplicaSet getRegionReplicaSet() {
+ return regionReplicaSet;
+ }
+
+ public List<String> getOwnedStorageGroups() {
+ return ownedStorageGroups;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
index 5a07b64d79..68b0a39237 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
@@ -611,8 +611,10 @@ public class LogicalPlanBuilder {
return this;
}
- public LogicalPlanBuilder planDeleteData(List<PartialPath> paths) {
- DeleteDataNode node = new DeleteDataNode(context.getQueryId().genPlanNodeId(), paths);
+ public LogicalPlanBuilder planDeleteData(List<PartialPath> paths, List<String> storageGroups) {
+ DeleteDataNode node =
+ new DeleteDataNode(
+ context.getQueryId().genPlanNodeId(), context.getQueryId(), paths, storageGroups);
node.addChild(this.root);
this.root = node;
return this;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
index 7fd514ed22..539ef91b2f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
@@ -545,11 +545,7 @@ public class LogicalPlanner {
List<PartialPath> paths = deleteTimeSeriesStatement.getPaths();
List<String> storageGroups =
new ArrayList<>(analysis.getSchemaPartitionInfo().getSchemaPartitionMap().keySet());
- return planBuilder
- .planInvalidateSchemaCache(paths, storageGroups)
- .planDeleteData(paths)
- .planDeleteTimeseries(paths)
- .getRoot();
+ return planBuilder.planDeleteData(paths, storageGroups).planDeleteTimeseries(paths).getRoot();
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index 606ccf82cf..752383eff2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -20,7 +20,10 @@
package org.apache.iotdb.db.mpp.plan.planner.distribution;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.partition.RegionReplicaSetInfo;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
import org.apache.iotdb.db.mpp.plan.expression.Expression;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
@@ -30,6 +33,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaFetchM
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaFetchScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.DeleteTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildNode;
@@ -69,6 +73,37 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
return null;
}
+ public PlanNode visitDeleteTimeseries(
+ DeleteTimeSeriesNode node, DistributionPlanContext context) {
+ return null;
+ }
+
+ private List<DeleteDataNode> splitDeleteDataNode(
+ DeleteDataNode node, DistributionPlanContext context) {
+ List<DeleteDataNode> ret = new ArrayList<>();
+ List<PartialPath> rawPaths = node.getPathList();
+ List<RegionReplicaSetInfo> relatedRegions =
+ analysis.getDataPartitionInfo().getDataDistributionInfo();
+ for (RegionReplicaSetInfo regionReplicaSetInfo : relatedRegions) {}
+
+ return null;
+ }
+
+ private List<PartialPath> getRelatedPaths(List<PartialPath> paths, List<String> storageGroups) {
+ List<PartialPath> ret = new ArrayList<>();
+ PathPatternTree patternTree = new PathPatternTree(paths);
+ for (String storageGroup : storageGroups) {
+ try {
+ ret.addAll(
+ patternTree.findOverlappedPattern(new PartialPath(storageGroup)).splitToPathList());
+ } catch (IllegalPathException e) {
+ // The IllegalPathException is definitely not threw here
+ throw new RuntimeException(e);
+ }
+ }
+ return ret;
+ }
+
@Override
public PlanNode visitSchemaQueryMerge(
SchemaQueryMergeNode node, DistributionPlanContext context) {
@@ -498,10 +533,6 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
return sources;
}
- public PlanNode visit(DeleteDataNode node, DistributionPlanContext context) {
- return null;
- }
-
public PlanNode visit(PlanNode node, DistributionPlanContext context) {
return node.accept(this, context);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
index d60df01d36..4c7f1a5587 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.AlterTimeSe
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.DeleteTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
@@ -255,4 +256,8 @@ public abstract class PlanVisitor<R, C> {
public R visitLastQueryMerge(LastQueryMergeNode node, C context) {
return visitPlan(node, context);
}
+
+ public R visitDeleteTimeseries(DeleteTimeSeriesNode node, C context) {
+ return visitPlan(node, context);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/DeleteDataNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/DeleteDataNode.java
index 11c803c605..3e16c1021e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/DeleteDataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/DeleteDataNode.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.metadata.path.PathDeserializeUtil;
+import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.IPartitionRelatedNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
@@ -34,11 +35,16 @@ import java.util.List;
public class DeleteDataNode extends PlanNode implements IPartitionRelatedNode {
+ private final QueryId queryId;
private final List<PartialPath> pathList;
+ private final List<String> storageGroups;
- public DeleteDataNode(PlanNodeId id, List<PartialPath> pathList) {
+ public DeleteDataNode(
+ PlanNodeId id, QueryId queryId, List<PartialPath> pathList, List<String> storageGroups) {
super(id);
this.pathList = pathList;
+ this.queryId = queryId;
+ this.storageGroups = storageGroups;
}
public List<PartialPath> getPathList() {
@@ -55,7 +61,7 @@ public class DeleteDataNode extends PlanNode implements IPartitionRelatedNode {
@Override
public PlanNode clone() {
- return new DeleteDataNode(getPlanNodeId(), pathList);
+ return new DeleteDataNode(getPlanNodeId(), queryId, pathList, storageGroups);
}
@Override
@@ -71,24 +77,43 @@ public class DeleteDataNode extends PlanNode implements IPartitionRelatedNode {
@Override
protected void serializeAttributes(ByteBuffer byteBuffer) {
PlanNodeType.DELETE_DATA.serialize(byteBuffer);
+ queryId.serialize(byteBuffer);
ReadWriteIOUtils.write(pathList.size(), byteBuffer);
for (PartialPath path : pathList) {
path.serialize(byteBuffer);
}
+ ReadWriteIOUtils.write(storageGroups.size(), byteBuffer);
+ for (String storageGroup : storageGroups) {
+ ReadWriteIOUtils.write(storageGroup, byteBuffer);
+ }
}
- public static DeleteDataNode deserialize(ByteBuffer buffer) {
- int size = ReadWriteIOUtils.readInt(buffer);
+ public static DeleteDataNode deserialize(ByteBuffer byteBuffer) {
+ QueryId queryId = QueryId.deserialize(byteBuffer);
+ int size = ReadWriteIOUtils.readInt(byteBuffer);
List<PartialPath> pathList = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
- pathList.add((PartialPath) PathDeserializeUtil.deserialize(buffer));
+ pathList.add((PartialPath) PathDeserializeUtil.deserialize(byteBuffer));
+ }
+ size = ReadWriteIOUtils.readInt(byteBuffer);
+ List<String> storageGroups = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ storageGroups.add(ReadWriteIOUtils.readString(byteBuffer));
}
- PlanNodeId planNodeId = PlanNodeId.deserialize(buffer);
- return new DeleteDataNode(planNodeId, pathList);
+ PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+ return new DeleteDataNode(planNodeId, queryId, pathList, storageGroups);
}
@Override
public TRegionReplicaSet getRegionReplicaSet() {
return null;
}
+
+ public QueryId getQueryId() {
+ return queryId;
+ }
+
+ public List<String> getStorageGroups() {
+ return storageGroups;
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/DeleteDataNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/DeleteDataNodeSerdeTest.java
index 5dbde28d3d..86f8ce1dff 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/DeleteDataNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/DeleteDataNodeSerdeTest.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.plan.plan.node.write;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
@@ -37,11 +38,16 @@ public class DeleteDataNodeSerdeTest {
@Test
public void testSerializeAndDeserialize() throws IllegalPathException {
- PlanNodeId planNodeId = new PlanNodeId("DeleteDataNode");
+ PlanNodeId planNodeId = new PlanNodeId("InvalidateSchemaCacheNode");
+ QueryId queryId = new QueryId("query");
List<PartialPath> pathList = new ArrayList<>();
pathList.add(new PartialPath("root.sg.d1.s1"));
pathList.add(new PartialPath("root.sg.d2.*"));
- DeleteDataNode deleteDataNode = new DeleteDataNode(planNodeId, pathList);
+ List<String> storageGroups = new ArrayList<>();
+ storageGroups.add("root.sg1");
+ storageGroups.add("root.sg2");
+ DeleteDataNode deleteDataNode =
+ new DeleteDataNode(planNodeId, queryId, pathList, storageGroups);
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
deleteDataNode.serialize(byteBuffer);
@@ -52,10 +58,19 @@ public class DeleteDataNodeSerdeTest {
Assert.assertEquals(planNodeId, deserializedNode.getPlanNodeId());
deleteDataNode = (DeleteDataNode) deserializedNode;
+
+ Assert.assertEquals(queryId, deleteDataNode.getQueryId());
+
List<PartialPath> deserializedPathList = deleteDataNode.getPathList();
Assert.assertEquals(pathList.size(), deserializedPathList.size());
for (int i = 0; i < pathList.size(); i++) {
Assert.assertEquals(pathList.get(i), deserializedPathList.get(i));
}
+
+ List<String> deserializedStorageGroups = deleteDataNode.getStorageGroups();
+ Assert.assertEquals(storageGroups.size(), deserializedStorageGroups.size());
+ for (int i = 0; i < storageGroups.size(); i++) {
+ Assert.assertEquals(storageGroups.get(i), deserializedStorageGroups.get(i));
+ }
}
}