You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/04/07 01:00:38 UTC
[iotdb] branch master updated: [IOTDB-5761] Open channel of ShuffleSinkHandle after the drivertask begins
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c6fad19e9d [IOTDB-5761] Open channel of ShuffleSinkHandle after the drivertask begins
c6fad19e9d is described below
commit c6fad19e9df6f22385da4232b8a6a3e35a8a2a20
Author: Liao Lanyu <14...@qq.com>
AuthorDate: Fri Apr 7 09:00:30 2023 +0800
[IOTDB-5761] Open channel of ShuffleSinkHandle after the drivertask begins
---
.../db/mpp/execution/exchange/sink/ShuffleSinkHandle.java | 12 +++++-------
1 file changed, 5 insertions(+), 7 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
index 7060541acc..6f9b617e2e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
@@ -41,7 +41,7 @@ public class ShuffleSinkHandle implements ISinkHandle {
private static final Logger LOGGER = LoggerFactory.getLogger(ShuffleSinkHandle.class);
- /** Each ISinkHandle in the list matches one downStream ISourceHandle */
+ /** Each ISinkChannel in the list matches one downStream ISourceHandle */
private final List<ISinkChannel> downStreamChannelList;
private final boolean[] hasSetNoMoreTsBlocks;
@@ -88,8 +88,6 @@ public class ShuffleSinkHandle implements ISinkHandle {
this.shuffleStrategy = getShuffleStrategy(shuffleStrategyEnum);
this.hasSetNoMoreTsBlocks = new boolean[channelNum];
this.channelOpened = new boolean[channelNum];
- // open first channel
- tryOpenChannel(0);
}
@Override
@@ -103,12 +101,13 @@ public class ShuffleSinkHandle implements ISinkHandle {
@Override
public synchronized ListenableFuture<?> isFull() {
+ int currentIndex = downStreamChannelIndex.getCurrentIndex();
+ // try open channel
+ tryOpenChannel(currentIndex);
// It is safe to use currentChannel.isFull() to judge whether we can send a TsBlock only when
// downStreamChannelIndex will not be changed between we call isFull() and send() of
// ShuffleSinkHandle
- ISinkChannel currentChannel =
- downStreamChannelList.get(downStreamChannelIndex.getCurrentIndex());
- return currentChannel.isFull();
+ return downStreamChannelList.get(currentIndex).isFull();
}
@Override
@@ -234,7 +233,6 @@ public class ShuffleSinkHandle implements ISinkHandle {
private void switchChannelIfNecessary() {
shuffleStrategy.shuffle();
- tryOpenChannel(downStreamChannelIndex.getCurrentIndex());
}
public void tryOpenChannel(int channelIndex) {