You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/14 05:14:51 UTC
[iotdb] branch master updated: Add DistributionPlanner for WRITE operation (#5515)
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 60fc7fc0da Add DistributionPlanner for WRITE operation (#5515)
60fc7fc0da is described below
commit 60fc7fc0da3bbcc1b56c543bfbf9a79d42b5b7f3
Author: Zhang.Jinrui <xi...@gmail.com>
AuthorDate: Thu Apr 14 13:14:46 2022 +0800
Add DistributionPlanner for WRITE operation (#5515)
---
.../iotdb/commons/partition/DataPartition.java | 20 ++++-
.../iotdb/commons/partition/RegionReplicaSet.java | 4 +-
.../apache/iotdb/db/mpp/common/PlanFragmentId.java | 4 +-
.../db/mpp/sql/planner/DistributionPlanner.java | 20 ++++-
.../db/mpp/sql/planner/plan/FragmentInstance.java | 24 +++---
.../plan/SimpleFragmentParallelPlanner.java | 9 +--
.../planner/plan/WriteFragmentParallelPlanner.java | 70 +++++++++++++++++
.../planner/plan/node/SimplePlanNodeRewriter.java | 4 +
...plePlanNodeRewriter.java => WritePlanNode.java} | 24 ++----
.../plan/node/source/SeriesAggregateScanNode.java | 3 +-
.../planner/plan/node/source/SeriesScanNode.java | 3 +-
.../plan/node/write/InsertMultiTabletsNode.java | 9 ++-
.../sql/planner/plan/node/write/InsertNode.java | 12 ++-
.../sql/planner/plan/node/write/InsertRowNode.java | 3 +-
.../planner/plan/node/write/InsertRowsNode.java | 3 +-
.../plan/node/write/InsertRowsOfOneDeviceNode.java | 3 +-
.../planner/plan/node/write/InsertTabletNode.java | 5 +-
.../db/mpp/sql/plan/DistributionPlannerTest.java | 87 ++++++++++++++++++++--
.../db/mpp/sql/plan/FragmentInstanceSerdeTest.java | 26 ++++---
.../iotdb/db/service/InternalServiceImplTest.java | 9 ++-
20 files changed, 259 insertions(+), 83 deletions(-)
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
index 3f677ce2c9..054c01b471 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
@@ -68,7 +68,14 @@ public class DataPartition {
// A list of data region replica sets will store data in a same time partition.
// We will insert data to the last set in the list.
// TODO return the latest dataRegionReplicaSet for each time partition
- return Collections.emptyList();
+ String storageGroup = getStorageGroupByDevice(deviceName);
+ SeriesPartitionSlot seriesPartitionSlot = calculateDeviceGroupId(deviceName);
+ // IMPORTANT TODO: (xingtanzjr) need to handle the situation for write operation that there are
+ // more than 1 Regions for one timeSlot
+ return dataPartitionMap.get(storageGroup).get(seriesPartitionSlot).entrySet().stream()
+ .filter(entry -> timePartitionSlotList.contains(entry.getKey()))
+ .flatMap(entry -> entry.getValue().stream())
+ .collect(Collectors.toList());
}
public RegionReplicaSet getDataRegionReplicaSetForWriting(
@@ -76,7 +83,16 @@ public class DataPartition {
// A list of data region replica sets will store data in a same time partition.
// We will insert data to the last set in the list.
// TODO return the latest dataRegionReplicaSet for each time partition
- return null;
+ String storageGroup = getStorageGroupByDevice(deviceName);
+ SeriesPartitionSlot seriesPartitionSlot = calculateDeviceGroupId(deviceName);
+ List<RegionReplicaSet> regions =
+ dataPartitionMap.get(storageGroup).get(seriesPartitionSlot).entrySet().stream()
+ .filter(entry -> entry.getKey().equals(timePartitionSlot))
+ .flatMap(entry -> entry.getValue().stream())
+ .collect(Collectors.toList());
+ // IMPORTANT TODO: (xingtanzjr) need to handle the situation for write operation that there are
+ // more than 1 Regions for one timeSlot
+ return regions.get(0);
}
private SeriesPartitionSlot calculateDeviceGroupId(String deviceName) {
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java
index d98695824d..955a021e50 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java
@@ -55,9 +55,7 @@ public class RegionReplicaSet {
@Override
public String toString() {
- return String.format(
- "RegionReplicaSet[%s-%d]: %s",
- consensusGroupId.getType(), consensusGroupId.getId(), dataNodeList);
+ return String.format("RegionReplicaSet[%s]: %s", consensusGroupId, dataNodeList);
}
public void serializeImpl(ByteBuffer buffer) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java
index 1d3cc2296d..01cde7ef0b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java
@@ -89,9 +89,7 @@ public class PlanFragmentId {
return false;
}
PlanFragmentId that = (PlanFragmentId) o;
- return id == that.id
- && nextFragmentInstanceId == that.nextFragmentInstanceId
- && Objects.equals(queryId, that.queryId);
+ return id == that.id && Objects.equals(queryId, that.queryId);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
index 4ef647dc7e..e0d42e1bae 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.commons.partition.RegionReplicaSet;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
import org.apache.iotdb.db.mpp.sql.planner.plan.DistributedQueryPlan;
import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
import org.apache.iotdb.db.mpp.sql.planner.plan.IFragmentParallelPlaner;
@@ -29,10 +30,12 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.LogicalQueryPlan;
import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
import org.apache.iotdb.db.mpp.sql.planner.plan.SimpleFragmentParallelPlanner;
import org.apache.iotdb.db.mpp.sql.planner.plan.SubPlan;
+import org.apache.iotdb.db.mpp.sql.planner.plan.WriteFragmentParallelPlanner;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.SimplePlanNodeRewriter;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.WritePlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaMergeNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaScanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
@@ -84,7 +87,10 @@ public class DistributionPlanner {
PlanNode rootWithExchange = addExchangeNode(rootAfterRewrite);
SubPlan subPlan = splitFragment(rootWithExchange);
List<FragmentInstance> fragmentInstances = planFragmentInstances(subPlan);
- SetSinkForRootInstance(subPlan, fragmentInstances);
+ // Only execute this step for READ operation
+ if (context.getQueryType() == QueryType.READ) {
+ SetSinkForRootInstance(subPlan, fragmentInstances);
+ }
return new DistributedQueryPlan(
logicalPlan.getContext(), subPlan, subPlan.getPlanFragmentList(), fragmentInstances);
}
@@ -93,7 +99,9 @@ public class DistributionPlanner {
// And for parallel-able fragment, clone it into several instances with different params.
public List<FragmentInstance> planFragmentInstances(SubPlan subPlan) {
IFragmentParallelPlaner parallelPlaner =
- new SimpleFragmentParallelPlanner(subPlan, analysis, context);
+ context.getQueryType() == QueryType.READ
+ ? new SimpleFragmentParallelPlanner(subPlan, analysis, context)
+ : new WriteFragmentParallelPlanner(subPlan, analysis, context);
return parallelPlaner.parallelPlan();
}
@@ -239,6 +247,10 @@ public class DistributionPlanner {
private class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
@Override
public PlanNode visitPlan(PlanNode node, NodeGroupContext context) {
+ // TODO: (xingtanzjr) we apply no action for IWritePlanNode currently
+ if (node instanceof WritePlanNode) {
+ return node;
+ }
// Visit all the children of current node
List<PlanNode> children =
node.getChildren().stream()
@@ -433,6 +445,10 @@ public class DistributionPlanner {
}
private void splitToSubPlan(PlanNode root, SubPlan subPlan) {
+ // TODO: (xingtanzjr) we apply no action for IWritePlanNode currently
+ if (root instanceof WritePlanNode) {
+ return;
+ }
if (root instanceof ExchangeNode) {
// We add a FragmentSinkNode for newly created PlanFragment
ExchangeNode exchangeNode = (ExchangeNode) root;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
index 3e55a5d32c..a3065c8c03 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
@@ -22,7 +22,6 @@ import org.apache.iotdb.commons.cluster.Endpoint;
import org.apache.iotdb.commons.partition.RegionReplicaSet;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
-import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeUtil;
@@ -50,15 +49,23 @@ public class FragmentInstance implements IConsensusRequest {
// We can add some more params for a specific FragmentInstance
// So that we can make different FragmentInstance owns different data range.
- public FragmentInstance(PlanFragment fragment, int index, Filter timeFilter, QueryType type) {
+ public FragmentInstance(
+ PlanFragment fragment, FragmentInstanceId id, Filter timeFilter, QueryType type) {
this.fragment = fragment;
this.timeFilter = timeFilter;
- this.id = generateId(fragment.getId(), index);
+ this.id = id;
this.type = type;
}
- public static FragmentInstanceId generateId(PlanFragmentId id, int index) {
- return new FragmentInstanceId(id, String.valueOf(index));
+ public RegionReplicaSet getDataRegionId() {
+ return regionReplicaSet;
+ }
+
+ public void setDataRegionAndHost(RegionReplicaSet regionReplicaSet) {
+ this.regionReplicaSet = regionReplicaSet;
+ // TODO: (xingtanzjr) We select the first Endpoint as the default target host for current
+ // instance
+ this.hostEndpoint = regionReplicaSet.getDataNodeList().get(0).getEndPoint();
}
public RegionReplicaSet getRegionReplicaSet() {
@@ -73,10 +80,6 @@ public class FragmentInstance implements IConsensusRequest {
return hostEndpoint;
}
- public void setHostEndpoint(Endpoint hostEndpoint) {
- this.hostEndpoint = hostEndpoint;
- }
-
public PlanFragment getFragment() {
return fragment;
}
@@ -133,8 +136,7 @@ public class FragmentInstance implements IConsensusRequest {
Filter timeFilter = hasTimeFilter ? FilterFactory.deserialize(buffer) : null;
QueryType queryType = QueryType.values()[ReadWriteIOUtils.readInt(buffer)];
FragmentInstance fragmentInstance =
- new FragmentInstance(
- planFragment, Integer.parseInt(id.getInstanceId()), timeFilter, queryType);
+ new FragmentInstance(planFragment, id, timeFilter, queryType);
fragmentInstance.regionReplicaSet = RegionReplicaSet.deserializeImpl(buffer);
fragmentInstance.hostEndpoint = Endpoint.deserializeImpl(buffer);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
index d3357ec275..631b217695 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
@@ -79,7 +79,6 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
private void produceFragmentInstance(PlanFragment fragment) {
// If one PlanFragment will produce several FragmentInstance, the instanceIdx will be increased
// one by one
- int instanceIdx = 0;
PlanNode rootCopy = PlanNodeUtil.deepCopy(fragment.getRoot());
Filter timeFilter =
analysis.getQueryFilter() == null
@@ -88,7 +87,7 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
FragmentInstance fragmentInstance =
new FragmentInstance(
new PlanFragment(fragment.getId(), rootCopy),
- instanceIdx,
+ fragment.getId().genFragmentInstanceId(),
timeFilter,
queryContext.getQueryType());
@@ -100,11 +99,7 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
// We need to store all the replica host in case of the scenario that the instance need to be
// redirected
// to another host when scheduling
- fragmentInstance.setRegionReplicaSet(regionReplicaSet);
-
- // TODO: (xingtanzjr) We select the first Endpoint as the default target host for current
- // instance
- fragmentInstance.setHostEndpoint(regionReplicaSet.getDataNodeList().get(0).getEndPoint());
+ fragmentInstance.setDataRegionAndHost(regionReplicaSet);
instanceMap.putIfAbsent(fragment.getId(), fragmentInstance);
fragmentInstanceList.add(fragmentInstance);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/WriteFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/WriteFragmentParallelPlanner.java
new file mode 100644
index 0000000000..be34ae257e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/WriteFragmentParallelPlanner.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.planner.plan;
+
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.WritePlanNode;
+import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class WriteFragmentParallelPlanner implements IFragmentParallelPlaner {
+
+ private SubPlan subPlan;
+ private Analysis analysis;
+ private MPPQueryContext queryContext;
+
+ public WriteFragmentParallelPlanner(
+ SubPlan subPlan, Analysis analysis, MPPQueryContext queryContext) {
+ this.subPlan = subPlan;
+ this.analysis = analysis;
+ this.queryContext = queryContext;
+ }
+
+ @Override
+ public List<FragmentInstance> parallelPlan() {
+ PlanFragment fragment = subPlan.getPlanFragment();
+ Filter timeFilter =
+ analysis.getQueryFilter() != null
+ ? ((GlobalTimeExpression) analysis.getQueryFilter()).getFilter()
+ : null;
+ PlanNode node = fragment.getRoot();
+ if (!(node instanceof WritePlanNode)) {
+ throw new IllegalArgumentException("PlanNode should be IWritePlanNode in WRITE operation");
+ }
+ List<WritePlanNode> splits = ((WritePlanNode) node).splitByPartition(analysis);
+ List<FragmentInstance> ret = new ArrayList<>();
+ for (WritePlanNode split : splits) {
+ FragmentInstance instance =
+ new FragmentInstance(
+ new PlanFragment(fragment.getId(), split),
+ fragment.getId().genFragmentInstanceId(),
+ timeFilter,
+ queryContext.getQueryType());
+ instance.setDataRegionAndHost(split.getRegionReplicaSet());
+ ret.add(instance);
+ }
+ return ret;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/SimplePlanNodeRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/SimplePlanNodeRewriter.java
index e0ca8f6cfd..30a42b21d0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/SimplePlanNodeRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/SimplePlanNodeRewriter.java
@@ -26,6 +26,10 @@ import static com.google.common.collect.ImmutableList.toImmutableList;
public class SimplePlanNodeRewriter<C> extends PlanVisitor<PlanNode, C> {
@Override
public PlanNode visitPlan(PlanNode node, C context) {
+ // TODO: (xingtanzjr) we apply no action for IWritePlanNode currently
+ if (node instanceof WritePlanNode) {
+ return node;
+ }
return defaultRewrite(node, context);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/SimplePlanNodeRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/WritePlanNode.java
similarity index 58%
copy from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/SimplePlanNodeRewriter.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/WritePlanNode.java
index e0ca8f6cfd..c37b8e1d7f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/SimplePlanNodeRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/WritePlanNode.java
@@ -19,26 +19,18 @@
package org.apache.iotdb.db.mpp.sql.planner.plan.node;
+import org.apache.iotdb.commons.partition.RegionReplicaSet;
+import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
+
import java.util.List;
-import static com.google.common.collect.ImmutableList.toImmutableList;
+public abstract class WritePlanNode extends PlanNode {
-public class SimplePlanNodeRewriter<C> extends PlanVisitor<PlanNode, C> {
- @Override
- public PlanNode visitPlan(PlanNode node, C context) {
- return defaultRewrite(node, context);
+ protected WritePlanNode(PlanNodeId id) {
+ super(id);
}
- public PlanNode defaultRewrite(PlanNode node, C context) {
- List<PlanNode> children =
- node.getChildren().stream()
- .map(child -> rewrite(child, context))
- .collect(toImmutableList());
-
- return node.cloneWithChildren(children);
- }
+ public abstract RegionReplicaSet getRegionReplicaSet();
- public PlanNode rewrite(PlanNode node, C userContext) {
- return node.accept(this, userContext);
- }
+ public abstract List<WritePlanNode> splitByPartition(Analysis analysis);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
index d54501cb51..dcde49fd9a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
@@ -195,8 +195,7 @@ public class SeriesAggregateScanNode extends SourceNode implements IOutputPlanNo
Filter timeFilter = FilterFactory.deserialize(byteBuffer);
// TODO serialize groupByTimeParameter
- RegionReplicaSet regionReplicaSet = new RegionReplicaSet();
- RegionReplicaSet.deserializeImpl(byteBuffer);
+ RegionReplicaSet regionReplicaSet = RegionReplicaSet.deserializeImpl(byteBuffer);
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
SeriesAggregateScanNode seriesAggregateScanNode =
new SeriesAggregateScanNode(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
index 11726b62d9..bf0df051bc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
@@ -209,8 +209,7 @@ public class SeriesScanNode extends SourceNode implements IOutputPlanNode {
if (isNull == 1) valueFilter = FilterFactory.deserialize(byteBuffer);
int limit = ReadWriteIOUtils.readInt(byteBuffer);
int offset = ReadWriteIOUtils.readInt(byteBuffer);
- RegionReplicaSet dataRegionReplicaSet = new RegionReplicaSet();
- RegionReplicaSet.deserializeImpl(byteBuffer);
+ RegionReplicaSet dataRegionReplicaSet = RegionReplicaSet.deserializeImpl(byteBuffer);
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
SeriesScanNode seriesScanNode = new SeriesScanNode(planNodeId, partialPath);
seriesScanNode.allSensors = allSensors;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
index 47fbed6dff..946fa9f7f2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.WritePlanNode;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
import java.nio.ByteBuffer;
@@ -108,13 +109,13 @@ public class InsertMultiTabletsNode extends InsertNode {
}
@Override
- public List<InsertNode> splitByPartition(Analysis analysis) {
+ public List<WritePlanNode> splitByPartition(Analysis analysis) {
Map<RegionReplicaSet, InsertMultiTabletsNode> splitMap = new HashMap<>();
for (int i = 0; i < insertTabletNodeList.size(); i++) {
InsertTabletNode insertTabletNode = insertTabletNodeList.get(i);
- List<InsertNode> tmpResult = insertTabletNode.splitByPartition(analysis);
- for (InsertNode subNode : tmpResult) {
- RegionReplicaSet dataRegionReplicaSet = subNode.getDataRegionReplicaSet();
+ List<WritePlanNode> tmpResult = insertTabletNode.splitByPartition(analysis);
+ for (WritePlanNode subNode : tmpResult) {
+ RegionReplicaSet dataRegionReplicaSet = ((InsertNode) subNode).getDataRegionReplicaSet();
if (splitMap.containsKey(dataRegionReplicaSet)) {
InsertMultiTabletsNode tmpNode = splitMap.get(dataRegionReplicaSet);
tmpNode.addInsertTabletNode((InsertTabletNode) subNode, i);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
index 30b091d83b..6856f2c0dc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
@@ -21,17 +21,15 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.write;
import org.apache.iotdb.commons.partition.RegionReplicaSet;
import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.WritePlanNode;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import java.nio.ByteBuffer;
-import java.util.List;
-public abstract class InsertNode extends PlanNode {
+public abstract class InsertNode extends WritePlanNode {
/**
* if use id table, this filed is id form of device path <br>
@@ -130,9 +128,9 @@ public abstract class InsertNode extends PlanNode {
this.deviceID = deviceID;
}
- // TODO(INSERT) split this insert node into multiple InsertNode according to the data partition
- // info
- public abstract List<InsertNode> splitByPartition(Analysis analysis);
+ public RegionReplicaSet getRegionReplicaSet() {
+ return dataRegionReplicaSet;
+ }
@Override
protected void serializeAttributes(ByteBuffer byteBuffer) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
index 6e937bef25..1d37b01626 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.WritePlanNode;
import org.apache.iotdb.db.wal.buffer.IWALByteBufferView;
import org.apache.iotdb.db.wal.buffer.WALEntryValue;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
@@ -57,7 +58,7 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
}
@Override
- public List<InsertNode> splitByPartition(Analysis analysis) {
+ public List<WritePlanNode> splitByPartition(Analysis analysis) {
TimePartitionSlot timePartitionSlot = StorageEngine.getTimePartitionSlot(time);
this.dataRegionReplicaSet =
analysis
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
index b3012c9986..cd80ae45cf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.WritePlanNode;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
import java.nio.ByteBuffer;
@@ -114,7 +115,7 @@ public class InsertRowsNode extends InsertNode {
public void serialize(ByteBuffer byteBuffer) {}
@Override
- public List<InsertNode> splitByPartition(Analysis analysis) {
+ public List<WritePlanNode> splitByPartition(Analysis analysis) {
Map<RegionReplicaSet, InsertRowsNode> splitMap = new HashMap<>();
for (int i = 0; i < insertRowNodeList.size(); i++) {
InsertRowNode insertRowNode = insertRowNodeList.get(i);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index 3c22c0fd57..34d4e81c63 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.WritePlanNode;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
import java.nio.ByteBuffer;
@@ -110,7 +111,7 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
public void serialize(ByteBuffer byteBuffer) {}
@Override
- public List<InsertNode> splitByPartition(Analysis analysis) {
+ public List<WritePlanNode> splitByPartition(Analysis analysis) {
Map<RegionReplicaSet, InsertRowsNode> splitMap = new HashMap<>();
for (int i = 0; i < insertRowNodeList.size(); i++) {
InsertRowNode insertRowNode = insertRowNodeList.get(i);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
index d27d0dafd0..2d516f1e47 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.WritePlanNode;
import org.apache.iotdb.db.wal.buffer.IWALByteBufferView;
import org.apache.iotdb.db.wal.buffer.WALEntryValue;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
@@ -149,9 +150,9 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
public void serializeToWAL(IWALByteBufferView buffer, int start, int end) {}
@Override
- public List<InsertNode> splitByPartition(Analysis analysis) {
+ public List<WritePlanNode> splitByPartition(Analysis analysis) {
// only single device in single storage group
- List<InsertNode> result = new ArrayList<>();
+ List<WritePlanNode> result = new ArrayList<>();
if (times.length == 0) {
return Collections.emptyList();
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
index 51a2313f08..41813bd337 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
@@ -46,8 +46,11 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.TimeSeriesSch
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.LimitNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowsNode;
import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import com.google.common.collect.Sets;
import org.junit.Test;
@@ -254,6 +257,74 @@ public class DistributionPlannerTest {
assertEquals(3, plan.getInstances().size());
}
+ @Test
+ public void TestInsertRowNodeParallelPlan() throws IllegalPathException {
+ QueryId queryId = new QueryId("test_write");
+ InsertRowNode insertRowNode =
+ new InsertRowNode(
+ queryId.genPlanNodeId(),
+ new PartialPath("root.sg.d1"),
+ false,
+ new MeasurementSchema[] {
+ new MeasurementSchema("s1", TSDataType.INT32),
+ },
+ new TSDataType[] {TSDataType.INT32},
+ 1L,
+ new Object[] {10});
+
+ Analysis analysis = constructAnalysis();
+
+ MPPQueryContext context =
+ new MPPQueryContext("", queryId, null, QueryType.WRITE, new Endpoint());
+ DistributionPlanner planner =
+ new DistributionPlanner(analysis, new LogicalQueryPlan(context, insertRowNode));
+ DistributedQueryPlan plan = planner.planFragments();
+ plan.getInstances().forEach(System.out::println);
+ assertEquals(1, plan.getInstances().size());
+ }
+
+ @Test
+ public void TestInsertRowsNodeParallelPlan() throws IllegalPathException {
+ QueryId queryId = new QueryId("test_write");
+ InsertRowNode insertRowNode1 =
+ new InsertRowNode(
+ queryId.genPlanNodeId(),
+ new PartialPath("root.sg.d1"),
+ false,
+ new MeasurementSchema[] {
+ new MeasurementSchema("s1", TSDataType.INT32),
+ },
+ new TSDataType[] {TSDataType.INT32},
+ 1L,
+ new Object[] {10});
+
+ InsertRowNode insertRowNode2 =
+ new InsertRowNode(
+ queryId.genPlanNodeId(),
+ new PartialPath("root.sg.d1"),
+ false,
+ new MeasurementSchema[] {
+ new MeasurementSchema("s1", TSDataType.INT32),
+ },
+ new TSDataType[] {TSDataType.INT32},
+ 100000L,
+ new Object[] {10});
+
+ InsertRowsNode node = new InsertRowsNode(queryId.genPlanNodeId());
+ node.setInsertRowNodeList(Arrays.asList(insertRowNode1, insertRowNode2));
+ node.setInsertRowNodeIndexList(Arrays.asList(0, 1));
+
+ Analysis analysis = constructAnalysis();
+
+ MPPQueryContext context =
+ new MPPQueryContext("", queryId, null, QueryType.WRITE, new Endpoint());
+ DistributionPlanner planner =
+ new DistributionPlanner(analysis, new LogicalQueryPlan(context, node));
+ DistributedQueryPlan plan = planner.planFragments();
+ plan.getInstances().forEach(System.out::println);
+ assertEquals(1, plan.getInstances().size());
+ }
+
private Analysis constructAnalysis() {
Analysis analysis = new Analysis();
@@ -267,21 +338,25 @@ public class DistributionPlannerTest {
Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>> sgPartitionMap =
new HashMap<>();
- List<RegionReplicaSet> d1DataRegions = new ArrayList<>();
- d1DataRegions.add(
+ List<RegionReplicaSet> d1DataRegion1 = new ArrayList<>();
+ d1DataRegion1.add(
new RegionReplicaSet(
new DataRegionId(1),
Arrays.asList(
new DataNodeLocation(11, new Endpoint("192.0.1.1", 9000)),
new DataNodeLocation(12, new Endpoint("192.0.1.2", 9000)))));
- d1DataRegions.add(
+
+ List<RegionReplicaSet> d1DataRegion2 = new ArrayList<>();
+ d1DataRegion2.add(
new RegionReplicaSet(
new DataRegionId(2),
Arrays.asList(
new DataNodeLocation(21, new Endpoint("192.0.2.1", 9000)),
new DataNodeLocation(22, new Endpoint("192.0.2.2", 9000)))));
+
Map<TimePartitionSlot, List<RegionReplicaSet>> d1DataRegionMap = new HashMap<>();
- d1DataRegionMap.put(new TimePartitionSlot(), d1DataRegions);
+ d1DataRegionMap.put(new TimePartitionSlot(0), d1DataRegion1);
+ d1DataRegionMap.put(new TimePartitionSlot(1), d1DataRegion2);
List<RegionReplicaSet> d2DataRegions = new ArrayList<>();
d2DataRegions.add(
@@ -291,7 +366,7 @@ public class DistributionPlannerTest {
new DataNodeLocation(31, new Endpoint("192.0.3.1", 9000)),
new DataNodeLocation(32, new Endpoint("192.0.3.2", 9000)))));
Map<TimePartitionSlot, List<RegionReplicaSet>> d2DataRegionMap = new HashMap<>();
- d2DataRegionMap.put(new TimePartitionSlot(), d2DataRegions);
+ d2DataRegionMap.put(new TimePartitionSlot(0), d2DataRegions);
List<RegionReplicaSet> d3DataRegions = new ArrayList<>();
d3DataRegions.add(
@@ -307,7 +382,7 @@ public class DistributionPlannerTest {
new DataNodeLocation(41, new Endpoint("192.0.4.1", 9000)),
new DataNodeLocation(42, new Endpoint("192.0.4.2", 9000)))));
Map<TimePartitionSlot, List<RegionReplicaSet>> d3DataRegionMap = new HashMap<>();
- d3DataRegionMap.put(new TimePartitionSlot(), d3DataRegions);
+ d3DataRegionMap.put(new TimePartitionSlot(0), d3DataRegions);
sgPartitionMap.put(new SeriesPartitionSlot(device1.length()), d1DataRegionMap);
sgPartitionMap.put(new SeriesPartitionSlot(device2.length()), d2DataRegionMap);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
index 6871ff6560..4bf6a477e2 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.mpp.sql.plan;
+import org.apache.iotdb.commons.cluster.DataNodeLocation;
import org.apache.iotdb.commons.cluster.Endpoint;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.partition.RegionReplicaSet;
@@ -43,6 +44,7 @@ import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
import org.apache.iotdb.tsfile.read.filter.operator.Gt;
+import com.google.common.collect.ImmutableList;
import org.junit.Test;
import java.nio.ByteBuffer;
@@ -54,16 +56,18 @@ public class FragmentInstanceSerdeTest {
@Test
public void TestSerializeAndDeserializeForTree1() throws IllegalPathException {
+ PlanFragmentId planFragmentId = new PlanFragmentId("test", -1);
FragmentInstance fragmentInstance =
new FragmentInstance(
- new PlanFragment(new PlanFragmentId("test", -1), constructPlanNodeTree()),
- -1,
+ new PlanFragment(planFragmentId, constructPlanNodeTree()),
+ planFragmentId.genFragmentInstanceId(),
new GroupByFilter(1, 2, 3, 4),
QueryType.READ);
RegionReplicaSet regionReplicaSet =
- new RegionReplicaSet(new DataRegionId(1), new ArrayList<>());
- fragmentInstance.setRegionReplicaSet(regionReplicaSet);
- fragmentInstance.setHostEndpoint(new Endpoint("127.0.0.1", 6666));
+ new RegionReplicaSet(
+ new DataRegionId(1),
+ ImmutableList.of(new DataNodeLocation(0, new Endpoint("127.0.0.1", 6666))));
+ fragmentInstance.setDataRegionAndHost(regionReplicaSet);
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
fragmentInstance.serializeRequest(byteBuffer);
@@ -74,16 +78,18 @@ public class FragmentInstanceSerdeTest {
@Test
public void TestSerializeAndDeserializeWithNullFilter() throws IllegalPathException {
+ PlanFragmentId planFragmentId = new PlanFragmentId("test2", 1);
FragmentInstance fragmentInstance =
new FragmentInstance(
- new PlanFragment(new PlanFragmentId("test2", 1), constructPlanNodeTree()),
- -1,
+ new PlanFragment(planFragmentId, constructPlanNodeTree()),
+ planFragmentId.genFragmentInstanceId(),
null,
QueryType.READ);
RegionReplicaSet regionReplicaSet =
- new RegionReplicaSet(new DataRegionId(1), new ArrayList<>());
- fragmentInstance.setRegionReplicaSet(regionReplicaSet);
- fragmentInstance.setHostEndpoint(new Endpoint("127.0.0.2", 6667));
+ new RegionReplicaSet(
+ new DataRegionId(1),
+ ImmutableList.of(new DataNodeLocation(0, new Endpoint("127.0.0.2", 6667))));
+ fragmentInstance.setDataRegionAndHost(regionReplicaSet);
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
fragmentInstance.serializeRequest(byteBuffer);
diff --git a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
index 7ed08d65b6..d5ba4d1f07 100644
--- a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
@@ -117,9 +117,12 @@ public class InternalServiceImplTest {
RegionReplicaSet regionReplicaSet = genRegionReplicaSet();
PlanFragment planFragment = new PlanFragment(new PlanFragmentId("2", 3), createTimeSeriesNode);
FragmentInstance fragmentInstance =
- new FragmentInstance(planFragment, 4, new GroupByFilter(1, 2, 3, 4), QueryType.WRITE);
- fragmentInstance.setRegionReplicaSet(regionReplicaSet);
- fragmentInstance.setHostEndpoint(new Endpoint("127.0.0.1", 6666));
+ new FragmentInstance(
+ planFragment,
+ planFragment.getId().genFragmentInstanceId(),
+ new GroupByFilter(1, 2, 3, 4),
+ QueryType.WRITE);
+ fragmentInstance.setDataRegionAndHost(regionReplicaSet);
// serialize fragmentInstance
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);