You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/23 16:08:18 UTC

[iotdb] 03/03: refactor node for plan

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/delete_timeseries_plan
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 97f5d8dce8a635fb4e18f162eebc973fe4d048d1
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Tue May 24 00:08:00 2022 +0800

    refactor node for plan
---
 .../planner/distribution/ExchangeNodeAdder.java    | 36 ++++++++--------------
 .../plan/planner/distribution/SourceRewriter.java  |  5 +++
 .../db/mpp/plan/planner/plan/PlanFragment.java     |  6 ++--
 ...itePlanNode.java => IPartitionRelatedNode.java} | 14 ++-------
 .../mpp/plan/planner/plan/node/WritePlanNode.java  |  5 +--
 .../node/metedata/write/DeleteTimeSeriesNode.java  | 10 ++----
 .../metedata/write/InvalidateSchemaCacheNode.java  | 15 +--------
 .../plan/planner/plan/node/source/SourceNode.java  |  5 ++-
 .../planner/plan/node/write/DeleteDataNode.java    | 10 ++----
 9 files changed, 31 insertions(+), 75 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
index 23fcc4c96b..781c84b28e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
@@ -38,6 +38,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggreg
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SourceNode;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -113,52 +114,41 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
   }
 
   @Override
-  public PlanNode visitSchemaQueryScan(SchemaQueryScanNode node, NodeGroupContext context) {
-    NodeDistribution nodeDistribution = new NodeDistribution(NodeDistributionType.NO_CHILD);
-    nodeDistribution.region = node.getRegionReplicaSet();
-    context.putNodeDistribution(node.getPlanNodeId(), nodeDistribution);
-    return node;
+  public PlanNode visitSchemaFetchMerge(SchemaFetchMergeNode node, NodeGroupContext context) {
+    return internalVisitSchemaMerge(node, context);
   }
 
   @Override
-  public PlanNode visitSchemaFetchMerge(SchemaFetchMergeNode node, NodeGroupContext context) {
-    return internalVisitSchemaMerge(node, context);
+  public PlanNode visitSchemaQueryScan(SchemaQueryScanNode node, NodeGroupContext context) {
+    return processNoChildSourceNode(node, context);
   }
 
   @Override
   public PlanNode visitSchemaFetchScan(SchemaFetchScanNode node, NodeGroupContext context) {
-    NodeDistribution nodeDistribution = new NodeDistribution(NodeDistributionType.NO_CHILD);
-    nodeDistribution.region = node.getRegionReplicaSet();
-    context.putNodeDistribution(node.getPlanNodeId(), nodeDistribution);
-    return node;
+    return processNoChildSourceNode(node, context);
   }
 
   @Override
   public PlanNode visitSeriesScan(SeriesScanNode node, NodeGroupContext context) {
-    context.putNodeDistribution(
-        node.getPlanNodeId(),
-        new NodeDistribution(NodeDistributionType.NO_CHILD, node.getRegionReplicaSet()));
-    return node.clone();
+    return processNoChildSourceNode(node, context);
   }
 
   @Override
   public PlanNode visitAlignedSeriesScan(AlignedSeriesScanNode node, NodeGroupContext context) {
-    context.putNodeDistribution(
-        node.getPlanNodeId(),
-        new NodeDistribution(NodeDistributionType.NO_CHILD, node.getRegionReplicaSet()));
-    return node.clone();
+    return processNoChildSourceNode(node, context);
   }
 
   public PlanNode visitSeriesAggregationScan(
       SeriesAggregationScanNode node, NodeGroupContext context) {
-    context.putNodeDistribution(
-        node.getPlanNodeId(),
-        new NodeDistribution(NodeDistributionType.NO_CHILD, node.getRegionReplicaSet()));
-    return node.clone();
+    return processNoChildSourceNode(node, context);
   }
 
   public PlanNode visitAlignedSeriesAggregationScan(
       AlignedSeriesAggregationScanNode node, NodeGroupContext context) {
+    return processNoChildSourceNode(node, context);
+  }
+
+  private PlanNode processNoChildSourceNode(SourceNode node, NodeGroupContext context) {
     context.putNodeDistribution(
         node.getPlanNodeId(),
         new NodeDistribution(NodeDistributionType.NO_CHILD, node.getRegionReplicaSet()));
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index 973d40b3f8..606ccf82cf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationSo
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesSourceNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SourceNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByLevelDescriptor;
@@ -497,6 +498,10 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
     return sources;
   }
 
+  public PlanNode visit(DeleteDataNode node, DistributionPlanContext context) {
+    return null;
+  }
+
   public PlanNode visit(PlanNode node, DistributionPlanContext context) {
     return node.accept(this, context);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java
index 626893c152..a500f65d18 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java
@@ -21,10 +21,10 @@ package org.apache.iotdb.db.mpp.plan.planner.plan;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.IPartitionRelatedNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SourceNode;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.nio.ByteBuffer;
@@ -77,8 +77,8 @@ public class PlanFragment {
   }
 
   private TRegionReplicaSet getNodeRegion(PlanNode root) {
-    if (root instanceof SourceNode) {
-      return ((SourceNode) root).getRegionReplicaSet();
+    if (root instanceof IPartitionRelatedNode) {
+      return ((IPartitionRelatedNode) root).getRegionReplicaSet();
     }
     for (PlanNode child : root.getChildren()) {
       TRegionReplicaSet result = getNodeRegion(child);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/WritePlanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/IPartitionRelatedNode.java
similarity index 73%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/WritePlanNode.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/IPartitionRelatedNode.java
index 0d6ee0c793..ca82777058 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/WritePlanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/IPartitionRelatedNode.java
@@ -20,17 +20,7 @@
 package org.apache.iotdb.db.mpp.plan.planner.plan.node;
 
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
-import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
 
-import java.util.List;
-
-public abstract class WritePlanNode extends PlanNode {
-
-  protected WritePlanNode(PlanNodeId id) {
-    super(id);
-  }
-
-  public abstract TRegionReplicaSet getRegionReplicaSet();
-
-  public abstract List<WritePlanNode> splitByPartition(Analysis analysis);
+public interface IPartitionRelatedNode {
+  TRegionReplicaSet getRegionReplicaSet();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/WritePlanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/WritePlanNode.java
index 0d6ee0c793..ef877add59 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/WritePlanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/WritePlanNode.java
@@ -19,18 +19,15 @@
 
 package org.apache.iotdb.db.mpp.plan.planner.plan.node;
 
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
 
 import java.util.List;
 
-public abstract class WritePlanNode extends PlanNode {
+public abstract class WritePlanNode extends PlanNode implements IPartitionRelatedNode {
 
   protected WritePlanNode(PlanNodeId id) {
     super(id);
   }
 
-  public abstract TRegionReplicaSet getRegionReplicaSet();
-
   public abstract List<WritePlanNode> splitByPartition(Analysis analysis);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/DeleteTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/DeleteTimeSeriesNode.java
index 98df329b1a..05356144cb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/DeleteTimeSeriesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/DeleteTimeSeriesNode.java
@@ -22,18 +22,17 @@ package org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.metadata.path.PathDeserializeUtil;
-import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.IPartitionRelatedNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
-public class DeleteTimeSeriesNode extends WritePlanNode {
+public class DeleteTimeSeriesNode extends PlanNode implements IPartitionRelatedNode {
 
   private final List<PartialPath> pathList;
 
@@ -92,9 +91,4 @@ public class DeleteTimeSeriesNode extends WritePlanNode {
   public TRegionReplicaSet getRegionReplicaSet() {
     return null;
   }
-
-  @Override
-  public List<WritePlanNode> splitByPartition(Analysis analysis) {
-    return null;
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/InvalidateSchemaCacheNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/InvalidateSchemaCacheNode.java
index 95649d8248..8af09b8a3d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/InvalidateSchemaCacheNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/InvalidateSchemaCacheNode.java
@@ -19,22 +19,19 @@
 
 package org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write;
 
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.metadata.path.PathDeserializeUtil;
 import org.apache.iotdb.db.mpp.common.QueryId;
-import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
-public class InvalidateSchemaCacheNode extends WritePlanNode {
+public class InvalidateSchemaCacheNode extends PlanNode {
 
   private final QueryId queryId;
 
@@ -114,14 +111,4 @@ public class InvalidateSchemaCacheNode extends WritePlanNode {
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
     return new InvalidateSchemaCacheNode(planNodeId, queryId, pathList, storageGroups);
   }
-
-  @Override
-  public TRegionReplicaSet getRegionReplicaSet() {
-    return null;
-  }
-
-  @Override
-  public List<WritePlanNode> splitByPartition(Analysis analysis) {
-    return null;
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SourceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SourceNode.java
index e3209a03e4..63203cf07f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SourceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SourceNode.java
@@ -19,10 +19,11 @@
 package org.apache.iotdb.db.mpp.plan.planner.plan.node.source;
 
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.IPartitionRelatedNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 
-public abstract class SourceNode extends PlanNode implements AutoCloseable {
+public abstract class SourceNode extends PlanNode implements AutoCloseable, IPartitionRelatedNode {
 
   public SourceNode(PlanNodeId id) {
     super(id);
@@ -30,7 +31,5 @@ public abstract class SourceNode extends PlanNode implements AutoCloseable {
 
   public abstract void open() throws Exception;
 
-  public abstract TRegionReplicaSet getRegionReplicaSet();
-
   public abstract void setRegionReplicaSet(TRegionReplicaSet regionReplicaSet);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/DeleteDataNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/DeleteDataNode.java
index 21ebfec88d..11c803c605 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/DeleteDataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/DeleteDataNode.java
@@ -22,18 +22,17 @@ package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.metadata.path.PathDeserializeUtil;
-import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.IPartitionRelatedNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
-public class DeleteDataNode extends WritePlanNode {
+public class DeleteDataNode extends PlanNode implements IPartitionRelatedNode {
 
   private final List<PartialPath> pathList;
 
@@ -92,9 +91,4 @@ public class DeleteDataNode extends WritePlanNode {
   public TRegionReplicaSet getRegionReplicaSet() {
     return null;
   }
-
-  @Override
-  public List<WritePlanNode> splitByPartition(Analysis analysis) {
-    return null;
-  }
 }