You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/13 12:01:56 UTC
[iotdb] 01/01: tmp save
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/write_instance_parallel
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 850198002ecf1da7999bc8468632d1fc16eac62f
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed Apr 13 20:01:43 2022 +0800
tmp save
---
.../apache/iotdb/db/mpp/common/PlanFragmentId.java | 4 +-
.../db/mpp/sql/planner/DistributionPlanner.java | 19 +++++--
.../db/mpp/sql/planner/plan/FragmentInstance.java | 27 +++-------
.../plan/SimpleFragmentParallelPlanner.java | 9 +---
.../planner/plan/WriteFragmentParallelPlanner.java | 61 ++++++++++++++++++++++
...lePlanNodeRewriter.java => IWritePlanNode.java} | 24 +++------
.../planner/plan/node/SimplePlanNodeRewriter.java | 4 ++
.../plan/node/write/InsertMultiTabletNode.java | 3 +-
.../plan/node/write/InsertMultiTabletsNode.java | 9 ++--
.../sql/planner/plan/node/write/InsertNode.java | 13 ++---
.../sql/planner/plan/node/write/InsertRowNode.java | 3 +-
.../planner/plan/node/write/InsertRowsNode.java | 3 +-
.../plan/node/write/InsertRowsOfOneDeviceNode.java | 3 +-
.../planner/plan/node/write/InsertTabletNode.java | 5 +-
.../db/mpp/sql/plan/DistributionPlannerTest.java | 39 ++++++++++++--
.../db/mpp/sql/plan/FragmentInstanceSerdeTest.java | 22 ++++----
16 files changed, 168 insertions(+), 80 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java
index e4bdd7d306..4d8d098e56 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java
@@ -89,9 +89,7 @@ public class PlanFragmentId {
return false;
}
PlanFragmentId that = (PlanFragmentId) o;
- return id == that.id
- && nextFragmentInstanceId == that.nextFragmentInstanceId
- && Objects.equals(queryId, that.queryId);
+ return id == that.id && Objects.equals(queryId, that.queryId);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
index 96341a2ca1..f87979a2a2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.commons.partition.RegionReplicaSet;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
import org.apache.iotdb.db.mpp.sql.planner.plan.*;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.*;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
@@ -71,7 +72,10 @@ public class DistributionPlanner {
PlanNode rootWithExchange = addExchangeNode(rootAfterRewrite);
SubPlan subPlan = splitFragment(rootWithExchange);
List<FragmentInstance> fragmentInstances = planFragmentInstances(subPlan);
- SetSinkForRootInstance(subPlan, fragmentInstances);
+ // Only execute this step for READ operation
+ if (context.getQueryType() == QueryType.READ) {
+ SetSinkForRootInstance(subPlan, fragmentInstances);
+ }
return new DistributedQueryPlan(
logicalPlan.getContext(), subPlan, subPlan.getPlanFragmentList(), fragmentInstances);
}
@@ -79,8 +83,9 @@ public class DistributionPlanner {
// Convert fragment to detailed instance
// And for parallel-able fragment, clone it into several instances with different params.
public List<FragmentInstance> planFragmentInstances(SubPlan subPlan) {
- IFragmentParallelPlaner parallelPlaner =
- new SimpleFragmentParallelPlanner(subPlan, analysis, context);
+ IFragmentParallelPlaner parallelPlaner = context.getQueryType() == QueryType.READ ?
+ new SimpleFragmentParallelPlanner(subPlan, analysis, context) :
+ new WriteFragmentParallelPlanner(subPlan, analysis, context);
return parallelPlaner.parallelPlan();
}
@@ -196,6 +201,10 @@ public class DistributionPlanner {
private class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
@Override
public PlanNode visitPlan(PlanNode node, NodeGroupContext context) {
+ // TODO: (xingtanzjr) we apply no action for IWritePlanNode currently
+ if (node instanceof IWritePlanNode) {
+ return node;
+ }
// Visit all the children of current node
List<PlanNode> children =
node.getChildren().stream()
@@ -341,6 +350,10 @@ public class DistributionPlanner {
}
private void splitToSubPlan(PlanNode root, SubPlan subPlan) {
+ // TODO: (xingtanzjr) we apply no action for IWritePlanNode currently
+ if (root instanceof IWritePlanNode) {
+ return;
+ }
if (root instanceof ExchangeNode) {
// We add a FragmentSinkNode for newly created PlanFragment
ExchangeNode exchangeNode = (ExchangeNode) root;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
index 03fd9f3e0e..c263732b00 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
@@ -22,7 +22,6 @@ import org.apache.iotdb.commons.cluster.Endpoint;
import org.apache.iotdb.commons.partition.RegionReplicaSet;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
-import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeUtil;
@@ -49,33 +48,27 @@ public class FragmentInstance implements IConsensusRequest {
// We can add some more params for a specific FragmentInstance
// So that we can make different FragmentInstance owns different data range.
- public FragmentInstance(PlanFragment fragment, int index, Filter timeFilter, QueryType type) {
+ public FragmentInstance(PlanFragment fragment, FragmentInstanceId id, Filter timeFilter, QueryType type) {
this.fragment = fragment;
this.timeFilter = timeFilter;
- this.id = generateId(fragment.getId(), index);
+ this.id = id;
this.type = type;
}
- public static FragmentInstanceId generateId(PlanFragmentId id, int index) {
- return new FragmentInstanceId(id, String.valueOf(index));
- }
-
public RegionReplicaSet getDataRegionId() {
return dataRegion;
}
- public void setDataRegionId(RegionReplicaSet dataRegion) {
+ public void setDataRegionAndHost(RegionReplicaSet dataRegion) {
this.dataRegion = dataRegion;
+ // TODO: (xingtanzjr) We select the first Endpoint as the default target host for current instance
+ this.hostEndpoint = dataRegion.getDataNodeList().get(0).getEndPoint();
}
public Endpoint getHostEndpoint() {
return hostEndpoint;
}
- public void setHostEndpoint(Endpoint hostEndpoint) {
- this.hostEndpoint = hostEndpoint;
- }
-
public PlanFragment getFragment() {
return fragment;
}
@@ -127,8 +120,7 @@ public class FragmentInstance implements IConsensusRequest {
Filter timeFilter = hasTimeFilter ? FilterFactory.deserialize(buffer) : null;
QueryType queryType = QueryType.values()[ReadWriteIOUtils.readInt(buffer)];
FragmentInstance fragmentInstance =
- new FragmentInstance(
- planFragment, Integer.parseInt(id.getInstanceId()), timeFilter, queryType);
+ new FragmentInstance(planFragment, id, timeFilter, queryType);
RegionReplicaSet regionReplicaSet = new RegionReplicaSet();
try {
regionReplicaSet.deserializeImpl(buffer);
@@ -161,12 +153,7 @@ public class FragmentInstance implements IConsensusRequest {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
FragmentInstance instance = (FragmentInstance) o;
- return Objects.equals(id, instance.id)
- && type == instance.type
- && Objects.equals(fragment, instance.fragment)
- && Objects.equals(dataRegion, instance.dataRegion)
- && Objects.equals(hostEndpoint, instance.hostEndpoint)
- && Objects.equals(timeFilter, instance.timeFilter);
+ return Objects.equals(id, instance.id) && type == instance.type && Objects.equals(fragment, instance.fragment) && Objects.equals(dataRegion, instance.dataRegion) && Objects.equals(hostEndpoint, instance.hostEndpoint) && Objects.equals(timeFilter, instance.timeFilter);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
index c2014c55d2..280d9891af 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
@@ -79,7 +79,6 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
private void produceFragmentInstance(PlanFragment fragment) {
// If one PlanFragment will produce several FragmentInstance, the instanceIdx will be increased
// one by one
- int instanceIdx = 0;
PlanNode rootCopy = PlanNodeUtil.deepCopy(fragment.getRoot());
Filter timeFilter =
analysis.getQueryFilter() == null
@@ -88,7 +87,7 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
FragmentInstance fragmentInstance =
new FragmentInstance(
new PlanFragment(fragment.getId(), rootCopy),
- instanceIdx,
+ fragment.getId().genFragmentInstanceId(),
timeFilter,
queryContext.getQueryType());
@@ -100,11 +99,7 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
// We need to store all the replica host in case of the scenario that the instance need to be
// redirected
// to another host when scheduling
- fragmentInstance.setDataRegionId(dataRegion);
-
- // TODO: (xingtanzjr) We select the first Endpoint as the default target host for current
- // instance
- fragmentInstance.setHostEndpoint(dataRegion.getDataNodeList().get(0).getEndPoint());
+ fragmentInstance.setDataRegionAndHost(dataRegion);
instanceMap.putIfAbsent(fragment.getId(), fragmentInstance);
fragmentInstanceList.add(fragmentInstance);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/WriteFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/WriteFragmentParallelPlanner.java
new file mode 100644
index 0000000000..eebf705cd3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/WriteFragmentParallelPlanner.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.planner.plan;
+
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.IWritePlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class WriteFragmentParallelPlanner implements IFragmentParallelPlaner{
+
+ private SubPlan subPlan;
+ private Analysis analysis;
+ private MPPQueryContext queryContext;
+
+ public WriteFragmentParallelPlanner(SubPlan subPlan, Analysis analysis, MPPQueryContext queryContext) {
+ this.subPlan = subPlan;
+ this.analysis = analysis;
+ this.queryContext = queryContext;
+ }
+
+ @Override
+ public List<FragmentInstance> parallelPlan() {
+ PlanFragment fragment = subPlan.getPlanFragment();
+ Filter timeFilter = analysis.getQueryFilter() != null ? ((GlobalTimeExpression) analysis.getQueryFilter()).getFilter() : null;
+ PlanNode node = fragment.getRoot();
+ if (!(node instanceof IWritePlanNode)) {
+ throw new IllegalArgumentException("PlanNode should be IWritePlanNode in WRITE operation");
+ }
+ List<IWritePlanNode> splits = ((IWritePlanNode) node).splitByPartition(analysis);
+ List<FragmentInstance> ret = new ArrayList<>();
+ for (IWritePlanNode split : splits) {
+ FragmentInstance instance = new FragmentInstance(new PlanFragment(fragment.getId(), split), fragment.getId().genFragmentInstanceId(), timeFilter, queryContext.getQueryType());
+ instance.setDataRegionAndHost(((IWritePlanNode) node).getRegionReplicaSet());
+ ret.add(instance);
+ }
+ return ret;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/SimplePlanNodeRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/IWritePlanNode.java
similarity index 58%
copy from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/SimplePlanNodeRewriter.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/IWritePlanNode.java
index e0ca8f6cfd..7170952721 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/SimplePlanNodeRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/IWritePlanNode.java
@@ -19,26 +19,18 @@
package org.apache.iotdb.db.mpp.sql.planner.plan.node;
+import org.apache.iotdb.commons.partition.RegionReplicaSet;
+import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
+
import java.util.List;
-import static com.google.common.collect.ImmutableList.toImmutableList;
+public abstract class IWritePlanNode extends PlanNode {
-public class SimplePlanNodeRewriter<C> extends PlanVisitor<PlanNode, C> {
- @Override
- public PlanNode visitPlan(PlanNode node, C context) {
- return defaultRewrite(node, context);
+ protected IWritePlanNode(PlanNodeId id) {
+ super(id);
}
- public PlanNode defaultRewrite(PlanNode node, C context) {
- List<PlanNode> children =
- node.getChildren().stream()
- .map(child -> rewrite(child, context))
- .collect(toImmutableList());
-
- return node.cloneWithChildren(children);
- }
+ public abstract RegionReplicaSet getRegionReplicaSet();
- public PlanNode rewrite(PlanNode node, C userContext) {
- return node.accept(this, userContext);
- }
+ public abstract List<IWritePlanNode> splitByPartition(Analysis analysis);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/SimplePlanNodeRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/SimplePlanNodeRewriter.java
index e0ca8f6cfd..b96002acb3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/SimplePlanNodeRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/SimplePlanNodeRewriter.java
@@ -26,6 +26,10 @@ import static com.google.common.collect.ImmutableList.toImmutableList;
public class SimplePlanNodeRewriter<C> extends PlanVisitor<PlanNode, C> {
@Override
public PlanNode visitPlan(PlanNode node, C context) {
+ // TODO: (xingtanzjr) we apply no action for IWritePlanNode currently
+ if (node instanceof IWritePlanNode) {
+ return node;
+ }
return defaultRewrite(node, context);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletNode.java
index ede58ff6b8..ef1c570457 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletNode.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.mpp.sql.planner.plan.node.write;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.IWritePlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
@@ -33,7 +34,7 @@ public class InsertMultiTabletNode extends InsertNode {
}
@Override
- public List<InsertNode> splitByPartition(Analysis analysis) {
+ public List<IWritePlanNode> splitByPartition(Analysis analysis) {
return null;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
index 2a11bc6f9a..49b1065a05 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.write;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.partition.RegionReplicaSet;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.IWritePlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
@@ -107,13 +108,13 @@ public class InsertMultiTabletsNode extends InsertNode {
}
@Override
- public List<InsertNode> splitByPartition(Analysis analysis) {
+ public List<IWritePlanNode> splitByPartition(Analysis analysis) {
Map<RegionReplicaSet, InsertMultiTabletsNode> splitMap = new HashMap<>();
for (int i = 0; i < insertTabletNodeList.size(); i++) {
InsertTabletNode insertTabletNode = insertTabletNodeList.get(i);
- List<InsertNode> tmpResult = insertTabletNode.splitByPartition(analysis);
- for (InsertNode subNode : tmpResult) {
- RegionReplicaSet dataRegionReplicaSet = subNode.getDataRegionReplicaSet();
+ List<IWritePlanNode> tmpResult = insertTabletNode.splitByPartition(analysis);
+ for (IWritePlanNode subNode : tmpResult) {
+ RegionReplicaSet dataRegionReplicaSet = ((InsertNode)subNode).getDataRegionReplicaSet();
if (splitMap.containsKey(dataRegionReplicaSet)) {
InsertMultiTabletsNode tmpNode = splitMap.get(dataRegionReplicaSet);
tmpNode.addInsertTabletNode((InsertTabletNode) subNode, i);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
index 30b091d83b..d7eef6b50b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
@@ -21,17 +21,14 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.write;
import org.apache.iotdb.commons.partition.RegionReplicaSet;
import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.IWritePlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-
import java.nio.ByteBuffer;
-import java.util.List;
-public abstract class InsertNode extends PlanNode {
+public abstract class InsertNode extends IWritePlanNode {
/**
* if use id table, this filed is id form of device path <br>
@@ -130,9 +127,9 @@ public abstract class InsertNode extends PlanNode {
this.deviceID = deviceID;
}
- // TODO(INSERT) split this insert node into multiple InsertNode according to the data partition
- // info
- public abstract List<InsertNode> splitByPartition(Analysis analysis);
+ public RegionReplicaSet getRegionReplicaSet() {
+ return dataRegionReplicaSet;
+ }
@Override
protected void serializeAttributes(ByteBuffer byteBuffer) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
index 6e937bef25..e84b461c87 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.commons.partition.TimePartitionSlot;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.IWritePlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.wal.buffer.IWALByteBufferView;
@@ -57,7 +58,7 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
}
@Override
- public List<InsertNode> splitByPartition(Analysis analysis) {
+ public List<IWritePlanNode> splitByPartition(Analysis analysis) {
TimePartitionSlot timePartitionSlot = StorageEngine.getTimePartitionSlot(time);
this.dataRegionReplicaSet =
analysis
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
index bcd4eedc07..3cba509506 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.partition.RegionReplicaSet;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.IWritePlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
@@ -109,7 +110,7 @@ public class InsertRowsNode extends InsertNode {
public void serialize(ByteBuffer byteBuffer) {}
@Override
- public List<InsertNode> splitByPartition(Analysis analysis) {
+ public List<IWritePlanNode> splitByPartition(Analysis analysis) {
Map<RegionReplicaSet, InsertRowsNode> splitMap = new HashMap<>();
for (int i = 0; i < insertRowNodeList.size(); i++) {
InsertRowNode insertRowNode = insertRowNodeList.get(i);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index da5b729318..8dc7fd7671 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.partition.RegionReplicaSet;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.IWritePlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
@@ -105,7 +106,7 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
public void serialize(ByteBuffer byteBuffer) {}
@Override
- public List<InsertNode> splitByPartition(Analysis analysis) {
+ public List<IWritePlanNode> splitByPartition(Analysis analysis) {
Map<RegionReplicaSet, InsertRowsNode> splitMap = new HashMap<>();
for (int i = 0; i < insertRowNodeList.size(); i++) {
InsertRowNode insertRowNode = insertRowNodeList.get(i);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
index d27d0dafd0..a0a262fb5a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.partition.TimePartitionSlot;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.IWritePlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.wal.buffer.IWALByteBufferView;
@@ -149,9 +150,9 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
public void serializeToWAL(IWALByteBufferView buffer, int start, int end) {}
@Override
- public List<InsertNode> splitByPartition(Analysis analysis) {
+ public List<IWritePlanNode> splitByPartition(Analysis analysis) {
// only single device in single storage group
- List<InsertNode> result = new ArrayList<>();
+ List<IWritePlanNode> result = new ArrayList<>();
if (times.length == 0) {
return Collections.emptyList();
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
index 03db5bb4ff..ed192c357b 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.commons.partition.SeriesPartitionSlot;
import org.apache.iotdb.commons.partition.TimePartitionSlot;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
@@ -40,10 +41,13 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.LimitNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import com.google.common.collect.Sets;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.junit.Test;
import java.util.ArrayList;
@@ -201,6 +205,35 @@ public class DistributionPlannerTest {
assertEquals(3, plan.getInstances().size());
}
+ @Test
+ public void TestWriteParallelPlan() throws IllegalPathException {
+ QueryId queryId = new QueryId("test_write");
+ InsertRowNode insertRowNode = new InsertRowNode(
+ queryId.genPlanNodeId(),
+ new PartialPath("root.sg.d1"),
+ false,
+ new MeasurementSchema[]{
+ new MeasurementSchema("s1", TSDataType.INT32),
+ },
+ new TSDataType[]{
+ TSDataType.INT32
+ },
+ 1L,
+ new Object[]{
+ 10
+ });
+
+ Analysis analysis = constructAnalysis();
+
+ MPPQueryContext context =
+ new MPPQueryContext("", queryId, null, QueryType.WRITE, new Endpoint());
+ DistributionPlanner planner =
+ new DistributionPlanner(analysis, new LogicalQueryPlan(context, insertRowNode));
+ DistributedQueryPlan plan = planner.planFragments();
+ plan.getInstances().forEach(System.out::println);
+ assertEquals(1, plan.getInstances().size());
+ }
+
private Analysis constructAnalysis() {
Analysis analysis = new Analysis();
@@ -228,7 +261,7 @@ public class DistributionPlannerTest {
new DataNodeLocation(21, new Endpoint("192.0.2.1", 9000)),
new DataNodeLocation(22, new Endpoint("192.0.2.2", 9000)))));
Map<TimePartitionSlot, List<RegionReplicaSet>> d1DataRegionMap = new HashMap<>();
- d1DataRegionMap.put(new TimePartitionSlot(), d1DataRegions);
+ d1DataRegionMap.put(new TimePartitionSlot(0), d1DataRegions);
List<RegionReplicaSet> d2DataRegions = new ArrayList<>();
d2DataRegions.add(
@@ -238,7 +271,7 @@ public class DistributionPlannerTest {
new DataNodeLocation(31, new Endpoint("192.0.3.1", 9000)),
new DataNodeLocation(32, new Endpoint("192.0.3.2", 9000)))));
Map<TimePartitionSlot, List<RegionReplicaSet>> d2DataRegionMap = new HashMap<>();
- d2DataRegionMap.put(new TimePartitionSlot(), d2DataRegions);
+ d2DataRegionMap.put(new TimePartitionSlot(0), d2DataRegions);
List<RegionReplicaSet> d3DataRegions = new ArrayList<>();
d3DataRegions.add(
@@ -254,7 +287,7 @@ public class DistributionPlannerTest {
new DataNodeLocation(41, new Endpoint("192.0.4.1", 9000)),
new DataNodeLocation(42, new Endpoint("192.0.4.2", 9000)))));
Map<TimePartitionSlot, List<RegionReplicaSet>> d3DataRegionMap = new HashMap<>();
- d3DataRegionMap.put(new TimePartitionSlot(), d3DataRegions);
+ d3DataRegionMap.put(new TimePartitionSlot(0), d3DataRegions);
sgPartitionMap.put(new SeriesPartitionSlot(device1.length()), d1DataRegionMap);
sgPartitionMap.put(new SeriesPartitionSlot(device2.length()), d2DataRegionMap);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
index dea1e9b031..f57f60dda6 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
@@ -18,6 +18,8 @@
*/
package org.apache.iotdb.db.mpp.sql.plan;
+import com.google.common.collect.ImmutableList;
+import org.apache.iotdb.commons.cluster.DataNodeLocation;
import org.apache.iotdb.commons.cluster.Endpoint;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.partition.RegionReplicaSet;
@@ -54,16 +56,16 @@ public class FragmentInstanceSerdeTest {
@Test
public void TestSerializeAndDeserializeForTree1() throws IllegalPathException {
+ PlanFragmentId planFragmentId = new PlanFragmentId("test", -1);
FragmentInstance fragmentInstance =
new FragmentInstance(
- new PlanFragment(new PlanFragmentId("test", -1), constructPlanNodeTree()),
- -1,
+ new PlanFragment(planFragmentId, constructPlanNodeTree()),
+ planFragmentId.genFragmentInstanceId(),
new GroupByFilter(1, 2, 3, 4),
QueryType.READ);
RegionReplicaSet regionReplicaSet =
- new RegionReplicaSet(new DataRegionId(1), new ArrayList<>());
- fragmentInstance.setDataRegionId(regionReplicaSet);
- fragmentInstance.setHostEndpoint(new Endpoint("127.0.0.1", 6666));
+ new RegionReplicaSet(new DataRegionId(1), ImmutableList.of(new DataNodeLocation(0, new Endpoint("127.0.0.1", 6666))));
+ fragmentInstance.setDataRegionAndHost(regionReplicaSet);
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
fragmentInstance.serializeRequest(byteBuffer);
@@ -74,16 +76,16 @@ public class FragmentInstanceSerdeTest {
@Test
public void TestSerializeAndDeserializeWithNullFilter() throws IllegalPathException {
+ PlanFragmentId planFragmentId = new PlanFragmentId("test2", 1);
FragmentInstance fragmentInstance =
new FragmentInstance(
- new PlanFragment(new PlanFragmentId("test2", 1), constructPlanNodeTree()),
- -1,
+ new PlanFragment(planFragmentId, constructPlanNodeTree()),
+ planFragmentId.genFragmentInstanceId(),
null,
QueryType.READ);
RegionReplicaSet regionReplicaSet =
- new RegionReplicaSet(new DataRegionId(1), new ArrayList<>());
- fragmentInstance.setDataRegionId(regionReplicaSet);
- fragmentInstance.setHostEndpoint(new Endpoint("127.0.0.2", 6667));
+ new RegionReplicaSet(new DataRegionId(1), ImmutableList.of(new DataNodeLocation(0, new Endpoint("127.0.0.2", 6667))));
+ fragmentInstance.setDataRegionAndHost(regionReplicaSet);
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
fragmentInstance.serializeRequest(byteBuffer);