You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/02/28 03:53:22 UTC
[iotdb] 01/02: try to fix
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch geely_car_0206
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 503fe57b5f6996c34bf5e4842445d4ad75546a10
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Tue Feb 28 09:54:49 2023 +0800
try to fix
---
.../execution/exchange/MPPDataExchangeManager.java | 18 +++--
.../mpp/execution/exchange/sink/SinkChannel.java | 41 ++++++++++-
.../execution/exchange/source/SourceHandle.java | 7 +-
.../planner/distribution/DistributionPlanner.java | 81 ++++++++++------------
.../planner/distribution/ExchangeNodeAdder.java | 15 ++--
.../planner/distribution/NodeGroupContext.java | 7 +-
.../plan/node/sink/MultiChildrenSinkNode.java | 4 ++
7 files changed, 113 insertions(+), 60 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
index 39cf36a5c0..926e622b1b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
@@ -60,6 +60,7 @@ import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -505,7 +506,8 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
String localPlanNodeId,
// TODO: replace with callbacks to decouple MPPDataExchangeManager from
// FragmentInstanceContext
- FragmentInstanceContext instanceContext) {
+ FragmentInstanceContext instanceContext,
+ int index) {
LOGGER.debug(
"Create sink handle to plan node {} of {} for {}",
@@ -523,7 +525,8 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
executorService,
tsBlockSerdeFactory.get(),
new SinkListenerImpl(instanceContext, instanceContext::failed),
- mppDataExchangeServiceClientManager);
+ mppDataExchangeServiceClientManager,
+ index);
}
@Override
@@ -541,6 +544,8 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
"ShuffleSinkHandle for " + localFragmentInstanceId + " exists.");
}
+ AtomicInteger index = new AtomicInteger(0);
+
List<ISinkChannel> downStreamChannelList =
downStreamChannelLocationList.stream()
.map(
@@ -549,7 +554,8 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
localFragmentInstanceId,
localPlanNodeId,
downStreamChannelLocation,
- instanceContext))
+ instanceContext,
+ index.getAndIncrement()))
.collect(Collectors.toList());
ShuffleSinkHandle shuffleSinkHandle =
@@ -568,7 +574,8 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
TFragmentInstanceId localFragmentInstanceId,
String localPlanNodeId,
DownStreamChannelLocation downStreamChannelLocation,
- FragmentInstanceContext instanceContext) {
+ FragmentInstanceContext instanceContext,
+ int index) {
if (isSameNode(downStreamChannelLocation.getRemoteEndpoint())) {
return createLocalSinkChannel(
localFragmentInstanceId,
@@ -583,7 +590,8 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
downStreamChannelLocation.getRemoteFragmentInstanceId(),
downStreamChannelLocation.getRemotePlanNodeId(),
localPlanNodeId,
- instanceContext);
+ instanceContext,
+ index);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
index 9ad82768c6..36836e2f81 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
@@ -108,6 +108,41 @@ public class SinkChannel implements ISinkChannel {
private static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance();
+ public SinkChannel(
+ TEndPoint remoteEndpoint,
+ TFragmentInstanceId remoteFragmentInstanceId,
+ String remotePlanNodeId,
+ String localPlanNodeId,
+ TFragmentInstanceId localFragmentInstanceId,
+ LocalMemoryManager localMemoryManager,
+ ExecutorService executorService,
+ TsBlockSerde serde,
+ SinkListener sinkListener,
+ IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
+ mppDataExchangeServiceClientManager,
+ int index) {
+ this.remoteEndpoint = Validate.notNull(remoteEndpoint);
+ this.remoteFragmentInstanceId = Validate.notNull(remoteFragmentInstanceId);
+ this.remotePlanNodeId = Validate.notNull(remotePlanNodeId);
+ this.localPlanNodeId = Validate.notNull(localPlanNodeId);
+ this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
+ this.fullFragmentInstanceId =
+ FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(localFragmentInstanceId);
+ this.localMemoryManager = Validate.notNull(localMemoryManager);
+ this.executorService = Validate.notNull(executorService);
+ this.serde = Validate.notNull(serde);
+ this.sinkListener = Validate.notNull(sinkListener);
+ this.mppDataExchangeServiceClientManager = mppDataExchangeServiceClientManager;
+ this.retryIntervalInMs = DEFAULT_RETRY_INTERVAL_IN_MS;
+ this.threadName =
+ createFullId(
+ localFragmentInstanceId.queryId,
+ localFragmentInstanceId.fragmentId,
+ localFragmentInstanceId.instanceId + "-" + index);
+ this.bufferRetainedSizeInBytes = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+ this.currentTsBlockSize = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+ }
+
public SinkChannel(
TEndPoint remoteEndpoint,
TFragmentInstanceId remoteFragmentInstanceId,
@@ -419,7 +454,11 @@ public class SinkChannel implements ISinkChannel {
public void run() {
try (SetThreadName sinkChannelName = new SetThreadName(threadName)) {
LOGGER.debug(
- "[NotifyNewTsBlock] [{}, {})", startSequenceId, startSequenceId + blockSizes.size());
+ "[NotifyNewTsBlock] [{}, {}) to {}.{}",
+ startSequenceId,
+ startSequenceId + blockSizes.size(),
+ remoteFragmentInstanceId,
+ remotePlanNodeId);
int attempt = 0;
TNewDataBlockEvent newDataBlockEvent =
new TNewDataBlockEvent(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
index 10821ca1ac..db462721d0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
@@ -470,7 +470,12 @@ public class SourceHandle implements ISourceHandle {
@Override
public void run() {
try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
- LOGGER.debug("[StartPullTsBlocksFromRemote] [{}, {}) ", startSequenceId, endSequenceId);
+ LOGGER.debug(
+ "[StartPullTsBlocksFromRemote] {}-{} [{}, {}) ",
+ remoteFragmentInstanceId,
+ indexOfUpstreamSinkHandle,
+ startSequenceId,
+ endSequenceId);
TGetDataBlockRequest req =
new TGetDataBlockRequest(
remoteFragmentInstanceId,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
index 73fe5c5259..4733371321 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
@@ -44,8 +44,9 @@ import org.apache.iotdb.db.mpp.plan.statement.component.SortKey;
import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
import org.apache.commons.lang3.Validate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -54,6 +55,9 @@ import java.util.Map;
import java.util.Set;
public class DistributionPlanner {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(DistributionPlanner.class);
+
private Analysis analysis;
private MPPQueryContext context;
private LogicalQueryPlan logicalPlan;
@@ -89,7 +93,7 @@ public class DistributionPlanner {
&& (((QueryStatement) analysis.getStatement()).isAlignByDevice()),
root);
PlanNode newRoot = adder.visit(root, nodeGroupContext);
- adjustUpStream(nodeGroupContext);
+ adjustUpStream(root, nodeGroupContext);
return newRoot;
}
@@ -99,8 +103,8 @@ public class DistributionPlanner {
* org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.ShuffleSinkNode} for the children of
* ExchangeNodes with Same DataRegion.
*/
- private void adjustUpStream(NodeGroupContext context) {
- if (context.exchangeNodes.isEmpty()) {
+ private void adjustUpStream(PlanNode root, NodeGroupContext context) {
+ if (!context.hasExchangeNode) {
return;
}
@@ -108,44 +112,37 @@ public class DistributionPlanner {
analysis.getStatement() instanceof QueryStatement
&& needShuffleSinkNode((QueryStatement) analysis.getStatement(), context);
- // step1: group children of ExchangeNodes
- Map<TRegionReplicaSet, List<PlanNode>> nodeGroups = new HashMap<>();
- context.exchangeNodes.forEach(
- exchangeNode ->
- nodeGroups
- .computeIfAbsent(
- context.getNodeDistribution(exchangeNode.getChild().getPlanNodeId()).region,
- exchangeNodes -> new ArrayList<>())
- .add(exchangeNode.getChild()));
-
- // step2: add IdentitySinkNode/ShuffleSinkNode as parent for nodes of each group
- nodeGroups
- .values()
- .forEach(
- planNodeList -> {
- MultiChildrenSinkNode parent =
- needShuffleSinkNode
- ? new ShuffleSinkNode(context.queryContext.getQueryId().genPlanNodeId())
- : new IdentitySinkNode(context.queryContext.getQueryId().genPlanNodeId());
- parent.addChildren(planNodeList);
- // we put the parent in list to get it quickly by dataRegion of one ExchangeNode
- planNodeList.add(parent);
- });
-
- // step3: add child for each ExchangeNode,
- // the child is IdentitySinkNode/ShuffleSinkNode we generated in the last step
+ adjustUpStreamHelper(root, new HashMap<>(), needShuffleSinkNode, context);
+ }
- // count the visited time of each SinkNode
- Map<TRegionReplicaSet, Integer> visitedCount = new HashMap<>();
- context.exchangeNodes.forEach(
- exchangeNode -> {
- TRegionReplicaSet regionOfChild =
- context.getNodeDistribution(exchangeNode.getChild().getPlanNodeId()).region;
- visitedCount.compute(regionOfChild, (region, count) -> (count == null) ? 0 : count + 1);
- List<PlanNode> planNodeList = nodeGroups.get(regionOfChild);
- exchangeNode.setChild(planNodeList.get(planNodeList.size() - 1));
- exchangeNode.setIndexOfUpstreamSinkHandle(visitedCount.get(regionOfChild));
- });
+ private void adjustUpStreamHelper(
+ PlanNode root,
+ Map<TRegionReplicaSet, MultiChildrenSinkNode> memo,
+ boolean needShuffleSinkNode,
+ NodeGroupContext context) {
+ for (PlanNode child : root.getChildren()) {
+ adjustUpStreamHelper(child, memo, needShuffleSinkNode, context);
+ if (child instanceof ExchangeNode) {
+ ExchangeNode exchangeNode = (ExchangeNode) child;
+ TRegionReplicaSet regionOfChild =
+ context.getNodeDistribution(exchangeNode.getChild().getPlanNodeId()).region;
+ MultiChildrenSinkNode newChild =
+ memo.computeIfAbsent(
+ regionOfChild,
+ tRegionReplicaSet -> {
+ MultiChildrenSinkNode result =
+ needShuffleSinkNode
+ ? new ShuffleSinkNode(context.queryContext.getQueryId().genPlanNodeId())
+ : new IdentitySinkNode(context.queryContext.getQueryId().genPlanNodeId());
+ result.addChild(exchangeNode.getChild());
+ result.addDownStreamChannelLocation(
+ new DownStreamChannelLocation(exchangeNode.getPlanNodeId().toString()));
+ return result;
+ });
+ exchangeNode.setChild(newChild);
+ exchangeNode.setIndexOfUpstreamSinkHandle(newChild.getCurrentLastIndex());
+ }
+ }
}
/** Return true if we need to use ShuffleSinkNode instead of IdentitySinkNode. */
@@ -265,8 +262,6 @@ public class DistributionPlanner {
exchangeNode.getChild() instanceof MultiChildrenSinkNode,
"child of ExchangeNode must be MultiChildrenSinkNode");
MultiChildrenSinkNode sinkNode = (MultiChildrenSinkNode) (exchangeNode.getChild());
- sinkNode.addDownStreamChannelLocation(
- new DownStreamChannelLocation(exchangeNode.getPlanNodeId().toString()));
// We cut off the subtree to make the ExchangeNode as the leaf node of current PlanFragment
exchangeNode.cleanChildren();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
index 14a5ec1c00..56dfda1cd7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
@@ -124,7 +124,7 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
new ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
exchangeNode.setChild(child);
exchangeNode.setOutputColumnNames(child.getOutputColumnNames());
- context.exchangeNodes.add(exchangeNode);
+ context.hasExchangeNode = true;
newNode.addChild(exchangeNode);
} else {
newNode.addChild(child);
@@ -288,13 +288,18 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
// For align by device,
// if dataRegions of children are the same, we set child's dataRegion to this node,
// else we set the selected mostlyUsedDataRegion to this node
+ boolean inSame = nodeDistributionIsSame(visitedChildren, context);
dataRegion =
- nodeDistributionIsSame(visitedChildren, context)
+ inSame
? context.getNodeDistribution(visitedChildren.get(0).getPlanNodeId()).region
: context.getMostlyUsedDataRegion();
context.putNodeDistribution(
newNode.getPlanNodeId(),
- new NodeDistribution(NodeDistributionType.SAME_WITH_ALL_CHILDREN, dataRegion));
+ new NodeDistribution(
+ inSame
+ ? NodeDistributionType.SAME_WITH_ALL_CHILDREN
+ : NodeDistributionType.SAME_WITH_SOME_CHILD,
+ dataRegion));
} else {
// TODO For align by time, we keep old logic for now
dataRegion = calculateDataRegionByChildren(visitedChildren, context);
@@ -324,7 +329,7 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
new ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
exchangeNode.setChild(child);
exchangeNode.setOutputColumnNames(child.getOutputColumnNames());
- context.exchangeNodes.add(exchangeNode);
+ context.hasExchangeNode = true;
newNode.addChild(exchangeNode);
} else {
newNode.addChild(child);
@@ -345,7 +350,7 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
new ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
exchangeNode.setChild(child);
exchangeNode.setOutputColumnNames(child.getOutputColumnNames());
- context.exchangeNodes.add(exchangeNode);
+ context.hasExchangeNode = true;
newNode.addChild(exchangeNode);
}
return newNode;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/NodeGroupContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/NodeGroupContext.java
index 389e30f0eb..6dda89e41f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/NodeGroupContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/NodeGroupContext.java
@@ -24,13 +24,10 @@ import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SourceNode;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -39,14 +36,14 @@ public class NodeGroupContext {
private final Map<PlanNodeId, NodeDistribution> nodeDistributionMap;
private final boolean isAlignByDevice;
private final TRegionReplicaSet mostlyUsedDataRegion;
- protected final List<ExchangeNode> exchangeNodes;
+ protected boolean hasExchangeNode;
public NodeGroupContext(MPPQueryContext queryContext, boolean isAlignByDevice, PlanNode root) {
this.queryContext = queryContext;
this.nodeDistributionMap = new HashMap<>();
this.isAlignByDevice = isAlignByDevice;
this.mostlyUsedDataRegion = isAlignByDevice ? getMostlyUsedDataRegion(root) : null;
- this.exchangeNodes = new ArrayList<>();
+ this.hasExchangeNode = false;
}
private TRegionReplicaSet getMostlyUsedDataRegion(PlanNode root) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/sink/MultiChildrenSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/sink/MultiChildrenSinkNode.java
index 686dae93e9..1e1ad0f428 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/sink/MultiChildrenSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/sink/MultiChildrenSinkNode.java
@@ -114,4 +114,8 @@ public abstract class MultiChildrenSinkNode extends SinkNode {
public void addDownStreamChannelLocation(DownStreamChannelLocation downStreamChannelLocation) {
downStreamChannelLocationList.add(downStreamChannelLocation);
}
+
+ public int getCurrentLastIndex() {
+ return children.size() - 1;
+ }
}