You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/24 06:40:02 UTC

[iotdb] branch xingtanzjr/delete_timeseries_plan updated: add more attrs for DeleteDataNode

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/delete_timeseries_plan
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/xingtanzjr/delete_timeseries_plan by this push:
     new 267cb59603 add more attrs for DeleteDataNode
267cb59603 is described below

commit 267cb596034727866f98b986b6b6ac825f84064a
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Tue May 24 14:39:53 2022 +0800

    add more attrs for DeleteDataNode
---
 .../iotdb/commons/partition/DataPartition.java     | 19 +++++++++
 .../commons/partition/RegionReplicaSetInfo.java    | 47 ++++++++++++++++++++++
 .../db/mpp/plan/planner/LogicalPlanBuilder.java    |  6 ++-
 .../iotdb/db/mpp/plan/planner/LogicalPlanner.java  |  6 +--
 .../plan/planner/distribution/SourceRewriter.java  | 39 ++++++++++++++++--
 .../db/mpp/plan/planner/plan/node/PlanVisitor.java |  5 +++
 .../planner/plan/node/write/DeleteDataNode.java    | 39 ++++++++++++++----
 .../plan/node/write/DeleteDataNodeSerdeTest.java   | 19 ++++++++-
 8 files changed, 160 insertions(+), 20 deletions(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
index 3fff3e130c..ac0bfa012e 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
@@ -310,4 +310,23 @@ public class DataPartition extends Partition {
       storageGroupNum--;
     }
   }
+
+  public List<RegionReplicaSetInfo> getDataDistributionInfo() {
+    Map<TRegionReplicaSet, RegionReplicaSetInfo> distributionMap = new HashMap<>();
+
+    dataPartitionMap.forEach(
+        (storageGroup, partition) -> {
+          List<TRegionReplicaSet> ret =
+              partition.entrySet().stream()
+                  .flatMap(
+                      s -> s.getValue().entrySet().stream().flatMap(e -> e.getValue().stream()))
+                  .collect(Collectors.toList());
+          for (TRegionReplicaSet regionReplicaSet : ret) {
+            distributionMap
+                .computeIfAbsent(regionReplicaSet, RegionReplicaSetInfo::new)
+                .addStorageGroup(storageGroup);
+          }
+        });
+    return new ArrayList<>(distributionMap.values());
+  }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSetInfo.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSetInfo.java
new file mode 100644
index 0000000000..298547d866
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSetInfo.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.partition;
+
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class RegionReplicaSetInfo {
+  private TRegionReplicaSet regionReplicaSet;
+  private List<String> ownedStorageGroups;
+
+  public RegionReplicaSetInfo(TRegionReplicaSet regionReplicaSet) {
+    this.regionReplicaSet = regionReplicaSet;
+    this.ownedStorageGroups = new ArrayList<>();
+  }
+
+  public void addStorageGroup(String storageGroup) {
+    ownedStorageGroups.add(storageGroup);
+  }
+
+  public TRegionReplicaSet getRegionReplicaSet() {
+    return regionReplicaSet;
+  }
+
+  public List<String> getOwnedStorageGroups() {
+    return ownedStorageGroups;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
index 5a07b64d79..68b0a39237 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
@@ -611,8 +611,10 @@ public class LogicalPlanBuilder {
     return this;
   }
 
-  public LogicalPlanBuilder planDeleteData(List<PartialPath> paths) {
-    DeleteDataNode node = new DeleteDataNode(context.getQueryId().genPlanNodeId(), paths);
+  public LogicalPlanBuilder planDeleteData(List<PartialPath> paths, List<String> storageGroups) {
+    DeleteDataNode node =
+        new DeleteDataNode(
+            context.getQueryId().genPlanNodeId(), context.getQueryId(), paths, storageGroups);
     node.addChild(this.root);
     this.root = node;
     return this;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
index 7fd514ed22..539ef91b2f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
@@ -545,11 +545,7 @@ public class LogicalPlanner {
       List<PartialPath> paths = deleteTimeSeriesStatement.getPaths();
       List<String> storageGroups =
           new ArrayList<>(analysis.getSchemaPartitionInfo().getSchemaPartitionMap().keySet());
-      return planBuilder
-          .planInvalidateSchemaCache(paths, storageGroups)
-          .planDeleteData(paths)
-          .planDeleteTimeseries(paths)
-          .getRoot();
+      return planBuilder.planDeleteData(paths, storageGroups).planDeleteTimeseries(paths).getRoot();
     }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index 606ccf82cf..752383eff2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -20,7 +20,10 @@
 package org.apache.iotdb.db.mpp.plan.planner.distribution;
 
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.partition.RegionReplicaSetInfo;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
 import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
@@ -30,6 +33,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaFetchM
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaFetchScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.DeleteTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildNode;
@@ -69,6 +73,37 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
     return null;
   }
 
+  public PlanNode visitDeleteTimeseries(
+      DeleteTimeSeriesNode node, DistributionPlanContext context) {
+    return null;
+  }
+
+  private List<DeleteDataNode> splitDeleteDataNode(
+      DeleteDataNode node, DistributionPlanContext context) {
+    List<DeleteDataNode> ret = new ArrayList<>();
+    List<PartialPath> rawPaths = node.getPathList();
+    List<RegionReplicaSetInfo> relatedRegions =
+        analysis.getDataPartitionInfo().getDataDistributionInfo();
+    for (RegionReplicaSetInfo regionReplicaSetInfo : relatedRegions) {}
+
+    return null;
+  }
+
+  private List<PartialPath> getRelatedPaths(List<PartialPath> paths, List<String> storageGroups) {
+    List<PartialPath> ret = new ArrayList<>();
+    PathPatternTree patternTree = new PathPatternTree(paths);
+    for (String storageGroup : storageGroups) {
+      try {
+        ret.addAll(
+            patternTree.findOverlappedPattern(new PartialPath(storageGroup)).splitToPathList());
+      } catch (IllegalPathException e) {
+        // The IllegalPathException is definitely not threw here
+        throw new RuntimeException(e);
+      }
+    }
+    return ret;
+  }
+
   @Override
   public PlanNode visitSchemaQueryMerge(
       SchemaQueryMergeNode node, DistributionPlanContext context) {
@@ -498,10 +533,6 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
     return sources;
   }
 
-  public PlanNode visit(DeleteDataNode node, DistributionPlanContext context) {
-    return null;
-  }
-
   public PlanNode visit(PlanNode node, DistributionPlanContext context) {
     return node.accept(this, context);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
index d60df01d36..4c7f1a5587 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.AlterTimeSe
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.DeleteTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceMergeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
@@ -255,4 +256,8 @@ public abstract class PlanVisitor<R, C> {
   public R visitLastQueryMerge(LastQueryMergeNode node, C context) {
     return visitPlan(node, context);
   }
+
+  public R visitDeleteTimeseries(DeleteTimeSeriesNode node, C context) {
+    return visitPlan(node, context);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/DeleteDataNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/DeleteDataNode.java
index 11c803c605..3e16c1021e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/DeleteDataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/DeleteDataNode.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.metadata.path.PathDeserializeUtil;
+import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.IPartitionRelatedNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
@@ -34,11 +35,16 @@ import java.util.List;
 
 public class DeleteDataNode extends PlanNode implements IPartitionRelatedNode {
 
+  private final QueryId queryId;
   private final List<PartialPath> pathList;
+  private final List<String> storageGroups;
 
-  public DeleteDataNode(PlanNodeId id, List<PartialPath> pathList) {
+  public DeleteDataNode(
+      PlanNodeId id, QueryId queryId, List<PartialPath> pathList, List<String> storageGroups) {
     super(id);
     this.pathList = pathList;
+    this.queryId = queryId;
+    this.storageGroups = storageGroups;
   }
 
   public List<PartialPath> getPathList() {
@@ -55,7 +61,7 @@ public class DeleteDataNode extends PlanNode implements IPartitionRelatedNode {
 
   @Override
   public PlanNode clone() {
-    return new DeleteDataNode(getPlanNodeId(), pathList);
+    return new DeleteDataNode(getPlanNodeId(), queryId, pathList, storageGroups);
   }
 
   @Override
@@ -71,24 +77,43 @@ public class DeleteDataNode extends PlanNode implements IPartitionRelatedNode {
   @Override
   protected void serializeAttributes(ByteBuffer byteBuffer) {
     PlanNodeType.DELETE_DATA.serialize(byteBuffer);
+    queryId.serialize(byteBuffer);
     ReadWriteIOUtils.write(pathList.size(), byteBuffer);
     for (PartialPath path : pathList) {
       path.serialize(byteBuffer);
     }
+    ReadWriteIOUtils.write(storageGroups.size(), byteBuffer);
+    for (String storageGroup : storageGroups) {
+      ReadWriteIOUtils.write(storageGroup, byteBuffer);
+    }
   }
 
-  public static DeleteDataNode deserialize(ByteBuffer buffer) {
-    int size = ReadWriteIOUtils.readInt(buffer);
+  public static DeleteDataNode deserialize(ByteBuffer byteBuffer) {
+    QueryId queryId = QueryId.deserialize(byteBuffer);
+    int size = ReadWriteIOUtils.readInt(byteBuffer);
     List<PartialPath> pathList = new ArrayList<>(size);
     for (int i = 0; i < size; i++) {
-      pathList.add((PartialPath) PathDeserializeUtil.deserialize(buffer));
+      pathList.add((PartialPath) PathDeserializeUtil.deserialize(byteBuffer));
+    }
+    size = ReadWriteIOUtils.readInt(byteBuffer);
+    List<String> storageGroups = new ArrayList<>(size);
+    for (int i = 0; i < size; i++) {
+      storageGroups.add(ReadWriteIOUtils.readString(byteBuffer));
     }
-    PlanNodeId planNodeId = PlanNodeId.deserialize(buffer);
-    return new DeleteDataNode(planNodeId, pathList);
+    PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+    return new DeleteDataNode(planNodeId, queryId, pathList, storageGroups);
   }
 
   @Override
   public TRegionReplicaSet getRegionReplicaSet() {
     return null;
   }
+
+  public QueryId getQueryId() {
+    return queryId;
+  }
+
+  public List<String> getStorageGroups() {
+    return storageGroups;
+  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/DeleteDataNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/DeleteDataNodeSerdeTest.java
index 5dbde28d3d..86f8ce1dff 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/DeleteDataNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/DeleteDataNodeSerdeTest.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.plan.plan.node.write;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
@@ -37,11 +38,16 @@ public class DeleteDataNodeSerdeTest {
 
   @Test
   public void testSerializeAndDeserialize() throws IllegalPathException {
-    PlanNodeId planNodeId = new PlanNodeId("DeleteDataNode");
+    PlanNodeId planNodeId = new PlanNodeId("InvalidateSchemaCacheNode");
+    QueryId queryId = new QueryId("query");
     List<PartialPath> pathList = new ArrayList<>();
     pathList.add(new PartialPath("root.sg.d1.s1"));
     pathList.add(new PartialPath("root.sg.d2.*"));
-    DeleteDataNode deleteDataNode = new DeleteDataNode(planNodeId, pathList);
+    List<String> storageGroups = new ArrayList<>();
+    storageGroups.add("root.sg1");
+    storageGroups.add("root.sg2");
+    DeleteDataNode deleteDataNode =
+        new DeleteDataNode(planNodeId, queryId, pathList, storageGroups);
 
     ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
     deleteDataNode.serialize(byteBuffer);
@@ -52,10 +58,19 @@ public class DeleteDataNodeSerdeTest {
     Assert.assertEquals(planNodeId, deserializedNode.getPlanNodeId());
 
     deleteDataNode = (DeleteDataNode) deserializedNode;
+
+    Assert.assertEquals(queryId, deleteDataNode.getQueryId());
+
     List<PartialPath> deserializedPathList = deleteDataNode.getPathList();
     Assert.assertEquals(pathList.size(), deserializedPathList.size());
     for (int i = 0; i < pathList.size(); i++) {
       Assert.assertEquals(pathList.get(i), deserializedPathList.get(i));
     }
+
+    List<String> deserializedStorageGroups = deleteDataNode.getStorageGroups();
+    Assert.assertEquals(storageGroups.size(), deserializedStorageGroups.size());
+    for (int i = 0; i < storageGroups.size(); i++) {
+      Assert.assertEquals(storageGroups.get(i), deserializedStorageGroups.get(i));
+    }
   }
 }