You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/04/03 14:21:03 UTC
[iotdb] branch FIDig updated: Try open channel (#9513)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch FIDig
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/FIDig by this push:
new 852ed379d7 Try open channel (#9513)
852ed379d7 is described below
commit 852ed379d7900611c4b9cd54d6746ba6f21ed6ae
Author: Liao Lanyu <14...@qq.com>
AuthorDate: Mon Apr 3 22:20:56 2023 +0800
Try open channel (#9513)
---
.../exchange/IMPPDataExchangeManager.java | 1 +
.../execution/exchange/MPPDataExchangeManager.java | 50 +++++++++++++-
.../mpp/execution/exchange/sink/ISinkHandle.java | 2 +
.../execution/exchange/sink/ShuffleSinkHandle.java | 19 ++---
.../exchange/source/LocalSourceHandle.java | 80 +++++++++++++++++++++-
.../db/mpp/plan/execution/QueryExecution.java | 1 +
.../db/mpp/plan/planner/OperatorTreeGenerator.java | 5 +-
.../planner/distribution/DistributionPlanner.java | 4 +-
.../planner/plan/node/process/ExchangeNode.java | 18 ++---
.../execution/exchange/LocalSinkChannelTest.java | 16 ++++-
.../execution/exchange/LocalSourceHandleTest.java | 22 +++++-
.../exchange/MPPDataExchangeManagerTest.java | 2 +
thrift/src/main/thrift/datanode.thrift | 8 +++
13 files changed, 201 insertions(+), 27 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/IMPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/IMPPDataExchangeManager.java
index bbbce26e7c..f8e064bcc4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/IMPPDataExchangeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/IMPPDataExchangeManager.java
@@ -74,6 +74,7 @@ public interface IMPPDataExchangeManager {
TFragmentInstanceId localFragmentInstanceId,
String localPlanNodeId,
String remotePlanNodeId,
+ TEndPoint remoteEndPoint,
TFragmentInstanceId remoteFragmentInstanceId,
int index,
IMPPDataExchangeManagerCallback<Throwable> onFailureCallback);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
index 9a58904c79..5b3cebbfc1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
@@ -40,6 +40,7 @@ import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.mpp.rpc.thrift.MPPDataExchangeService;
import org.apache.iotdb.mpp.rpc.thrift.TAcknowledgeDataBlockEvent;
+import org.apache.iotdb.mpp.rpc.thrift.TCloseLocalSinkChannelEvent;
import org.apache.iotdb.mpp.rpc.thrift.TCloseSinkChannelEvent;
import org.apache.iotdb.mpp.rpc.thrift.TEndOfDataBlockEvent;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
@@ -178,6 +179,7 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
return;
}
sinkHandle.getChannel(e.getIndex()).close();
+ closeISinkChannelOfShuffleSinkHandle(sinkHandle, e.getIndex());
} catch (Throwable t) {
LOGGER.warn(
"Close channel of ShuffleSinkHandle {}, index {} failed.",
@@ -188,6 +190,46 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
}
}
+ @Override
+ public void onCloseLocalSinkChannelEvent(TCloseLocalSinkChannelEvent e) throws TException {
+ try (SetThreadName fragmentInstanceName =
+ new SetThreadName(
+ createFullId(
+ e.sourceFragmentInstanceId.queryId,
+ e.sourceFragmentInstanceId.fragmentId,
+ e.sourceFragmentInstanceId.instanceId))) {
+ LOGGER.debug(
+ "Closed LocalSourceHandle of ShuffleSinkHandle {}, channel index: {}.",
+ e.getSourceFragmentInstanceId(),
+ e.getIndex());
+ ISinkHandle sinkHandle = shuffleSinkHandles.get(e.getSourceFragmentInstanceId());
+ if (sinkHandle == null) {
+ LOGGER.debug(
+ "received CloseLocalSinkChannelEvent but target FragmentInstance[{}] is not found.",
+ e.getSourceFragmentInstanceId());
+ return;
+ }
+ closeISinkChannelOfShuffleSinkHandle(sinkHandle, e.getIndex());
+
+ } catch (Throwable t) {
+ LOGGER.warn(
+ "Close channel of ShuffleSinkHandle {}, index {} failed.",
+ e.getSourceFragmentInstanceId(),
+ e.getIndex(),
+ t);
+ throw t;
+ }
+ }
+
+ private void closeISinkChannelOfShuffleSinkHandle(ISinkHandle sinkHandle, int index) {
+ sinkHandle.addToClosedChannel(index);
+ // if all the channels of the ShuffleSinkHandle are closed, we close the ShuffleSinkHandle
+ // directly so that we can finish the FI.
+ if (sinkHandle.isClosed()) {
+ sinkHandle.close();
+ }
+ }
+
@Override
public void onNewDataBlockEvent(TNewDataBlockEvent e) throws TException {
long startTime = System.nanoTime();
@@ -634,6 +676,7 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
TFragmentInstanceId localFragmentInstanceId,
String localPlanNodeId,
String remotePlanNodeId,
+ TEndPoint remoteEndPoint,
TFragmentInstanceId remoteFragmentInstanceId,
int index,
IMPPDataExchangeManagerCallback<Throwable> onFailureCallback) {
@@ -667,7 +710,12 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
localFragmentInstanceId,
localPlanNodeId,
queue,
- new SourceHandleListenerImpl(onFailureCallback));
+ new SourceHandleListenerImpl(onFailureCallback),
+ mppDataExchangeServiceClientManager,
+ executorService,
+ index,
+ remoteEndPoint,
+ remoteFragmentInstanceId);
sourceHandles
.computeIfAbsent(localFragmentInstanceId, key -> new ConcurrentHashMap<>())
.put(localPlanNodeId, localSourceHandle);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ISinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ISinkHandle.java
index 5fb511e402..266a77341a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ISinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ISinkHandle.java
@@ -36,4 +36,6 @@ public interface ISinkHandle extends ISink {
/** Return true if the specified channel is closed. */
boolean isChannelClosed(int index);
+
+ void addToClosedChannel(int index);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
index 7060541acc..fd49636406 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
@@ -41,7 +41,7 @@ public class ShuffleSinkHandle implements ISinkHandle {
private static final Logger LOGGER = LoggerFactory.getLogger(ShuffleSinkHandle.class);
- /** Each ISinkHandle in the list matches one downStream ISourceHandle */
+ /** Each ISinkChannel in the list matches one downStream ISourceHandle */
private final List<ISinkChannel> downStreamChannelList;
private final boolean[] hasSetNoMoreTsBlocks;
@@ -88,8 +88,6 @@ public class ShuffleSinkHandle implements ISinkHandle {
this.shuffleStrategy = getShuffleStrategy(shuffleStrategyEnum);
this.hasSetNoMoreTsBlocks = new boolean[channelNum];
this.channelOpened = new boolean[channelNum];
- // open first channel
- tryOpenChannel(0);
}
@Override
@@ -103,12 +101,13 @@ public class ShuffleSinkHandle implements ISinkHandle {
@Override
public synchronized ListenableFuture<?> isFull() {
+ int currentIndex = downStreamChannelIndex.getCurrentIndex();
+ // try open channel
+ tryOpenChannel(currentIndex);
// It is safe to use currentChannel.isFull() to judge whether we can send a TsBlock only when
// downStreamChannelIndex will not be changed between we call isFull() and send() of
// ShuffleSinkHandle
- ISinkChannel currentChannel =
- downStreamChannelList.get(downStreamChannelIndex.getCurrentIndex());
- return currentChannel.isFull();
+ return downStreamChannelList.get(currentIndex).isFull();
}
@Override
@@ -146,10 +145,15 @@ public class ShuffleSinkHandle implements ISinkHandle {
}
@Override
- public boolean isClosed() {
+ public synchronized boolean isClosed() {
return closedChannel.size() == downStreamChannelList.size();
}
+ @Override
+ public synchronized void addToClosedChannel(int index) {
+ closedChannel.add(index);
+ }
+
@Override
public synchronized boolean isAborted() {
return aborted;
@@ -234,7 +238,6 @@ public class ShuffleSinkHandle implements ISinkHandle {
private void switchChannelIfNecessary() {
shuffleStrategy.shuffle();
- tryOpenChannel(downStreamChannelIndex.getCurrentIndex());
}
public void tryOpenChannel(int channelIndex) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/LocalSourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/LocalSourceHandle.java
index 7dc6ad2983..749b8b6274 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/LocalSourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/LocalSourceHandle.java
@@ -19,11 +19,15 @@
package org.apache.iotdb.db.mpp.execution.exchange.source;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SourceHandleListener;
import org.apache.iotdb.db.mpp.execution.exchange.SharedTsBlockQueue;
import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
import org.apache.iotdb.db.utils.SetThreadName;
+import org.apache.iotdb.mpp.rpc.thrift.TCloseLocalSinkChannelEvent;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -35,6 +39,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
import static org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.createFullIdFrom;
@@ -45,6 +50,21 @@ public class LocalSourceHandle implements ISourceHandle {
private static final Logger LOGGER = LoggerFactory.getLogger(LocalSourceHandle.class);
+ public static final int MAX_ATTEMPT_TIMES = 3;
+
+ private static final long DEFAULT_RETRY_INTERVAL_IN_MS = 1000;
+
+ private ExecutorService executorService;
+
+ private int indexOfUpstreamISinkChannel = 0;
+
+ private TEndPoint remoteEndpoint;
+
+ private TFragmentInstanceId remoteFragmentInstanceId;
+
+ private IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
+ mppDataExchangeServiceClientManager;
+
private TFragmentInstanceId localFragmentInstanceId;
private String localPlanNodeId;
private final SourceHandleListener sourceHandleListener;
@@ -74,13 +94,24 @@ public class LocalSourceHandle implements ISourceHandle {
TFragmentInstanceId localFragmentInstanceId,
String localPlanNodeId,
SharedTsBlockQueue queue,
- SourceHandleListener sourceHandleListener) {
+ SourceHandleListener sourceHandleListener,
+ IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
+ mppDataExchangeServiceClientManager,
+ ExecutorService executorService,
+ int indexOfUpstreamISinkChannel,
+ TEndPoint remoteEndpoint,
+ TFragmentInstanceId remoteFragmentInstanceId) {
this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
this.localPlanNodeId = Validate.notNull(localPlanNodeId);
this.queue = Validate.notNull(queue);
this.queue.setSourceHandle(this);
this.sourceHandleListener = Validate.notNull(sourceHandleListener);
this.threadName = createFullIdFrom(localFragmentInstanceId, localPlanNodeId);
+ this.mppDataExchangeServiceClientManager = mppDataExchangeServiceClientManager;
+ this.executorService = executorService;
+ this.indexOfUpstreamISinkChannel = indexOfUpstreamISinkChannel;
+ this.remoteEndpoint = remoteEndpoint;
+ this.remoteFragmentInstanceId = remoteFragmentInstanceId;
}
@Override
@@ -231,6 +262,10 @@ public class LocalSourceHandle implements ISourceHandle {
}
queue.close();
closed = true;
+ if (executorService != null) {
+ // only send close event when this LocalSourceHandle is created for a Fragment
+ executorService.submit(new SendCloseLocalSinkChannelEventTask());
+ }
sourceHandleListener.onFinished(this);
}
}
@@ -255,4 +290,47 @@ public class LocalSourceHandle implements ISourceHandle {
// do nothing, the maxBytesCanReserve of SharedTsBlockQueue should be set by corresponding
// LocalSinkChannel
}
+
+ class SendCloseLocalSinkChannelEventTask implements Runnable {
+
+ @Override
+ public void run() {
+ try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
+ LOGGER.debug(
+ "[SendCloseLocalSinkChanelEvent] to [ShuffleSinkHandle: {}, index: {}]).",
+ remoteFragmentInstanceId,
+ indexOfUpstreamISinkChannel);
+ int attempt = 0;
+ TCloseLocalSinkChannelEvent closeLocalSinkChannelEvent =
+ new TCloseLocalSinkChannelEvent(remoteFragmentInstanceId, indexOfUpstreamISinkChannel);
+ while (attempt < MAX_ATTEMPT_TIMES) {
+ attempt += 1;
+ long startTime = System.nanoTime();
+ try (SyncDataNodeMPPDataExchangeServiceClient client =
+ mppDataExchangeServiceClientManager.borrowClient(remoteEndpoint)) {
+ client.onCloseLocalSinkChannelEvent(closeLocalSinkChannelEvent);
+ break;
+ } catch (Throwable e) {
+ LOGGER.warn(
+ "[SendCloseLocalSinkChanelEvent] to [ShuffleSinkHandle: {}, index: {}] failed.).",
+ remoteFragmentInstanceId,
+ indexOfUpstreamISinkChannel);
+ if (attempt == MAX_ATTEMPT_TIMES) {
+ synchronized (LocalSourceHandle.this) {
+ sourceHandleListener.onFailure(LocalSourceHandle.this, e);
+ }
+ }
+ try {
+ Thread.sleep(DEFAULT_RETRY_INTERVAL_IN_MS);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ synchronized (LocalSourceHandle.this) {
+ sourceHandleListener.onFailure(LocalSourceHandle.this, e);
+ }
+ }
+ }
+ }
+ }
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 4c2d0c23bf..12c4959ba5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -575,6 +575,7 @@ public class QueryExecution implements IQueryExecution {
context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift(),
context.getResultNodeContext().getVirtualResultNodeId().getId(),
context.getResultNodeContext().getUpStreamPlanNodeId().getId(),
+ null,
context.getResultNodeContext().getUpStreamFragmentInstanceId().toThrift(),
0, // Upstream of result ExchangeNode will only have one child.
stateMachine::transitionToFailed)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 1083fafcfc..bd10051d5f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -1883,13 +1883,14 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
localInstanceId.toThrift(),
node.getPlanNodeId().getId(),
node.getUpstreamPlanNodeId().getId(),
+ upstreamEndPoint,
remoteInstanceId.toThrift(),
- node.getIndexOfUpstreamSinkHandle(),
+ node.getIndexOfUpstreamISinkChannel(),
context.getInstanceContext()::failed)
: MPP_DATA_EXCHANGE_MANAGER.createSourceHandle(
localInstanceId.toThrift(),
node.getPlanNodeId().getId(),
- node.getIndexOfUpstreamSinkHandle(),
+ node.getIndexOfUpstreamISinkChannel(),
upstreamEndPoint,
remoteInstanceId.toThrift(),
context.getInstanceContext()::failed);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
index 2f1e85aedf..45802e0cfc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
@@ -126,7 +126,7 @@ public class DistributionPlanner {
newChild.addDownStreamChannelLocation(
new DownStreamChannelLocation(exchangeNode.getPlanNodeId().toString()));
exchangeNode.setChild(newChild);
- exchangeNode.setIndexOfUpstreamSinkHandle(newChild.getCurrentLastIndex());
+ exchangeNode.setIndexOfUpstreamISinkChannel(newChild.getCurrentLastIndex());
}
}
}
@@ -153,7 +153,7 @@ public class DistributionPlanner {
newChild.addDownStreamChannelLocation(
new DownStreamChannelLocation(exchangeNode.getPlanNodeId().toString()));
exchangeNode.setChild(newChild);
- exchangeNode.setIndexOfUpstreamSinkHandle(newChild.getCurrentLastIndex());
+ exchangeNode.setIndexOfUpstreamISinkChannel(newChild.getCurrentLastIndex());
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ExchangeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ExchangeNode.java
index 84622d1d5c..3b3f21d5b8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ExchangeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ExchangeNode.java
@@ -45,7 +45,7 @@ public class ExchangeNode extends SingleChildProcessNode {
private List<String> outputColumnNames = new ArrayList<>();
/** Exchange needs to know which child of IdentitySinkNode/ShuffleSinkNode it matches */
- private int indexOfUpstreamSinkHandle = 0;
+ private int indexOfUpstreamISinkChannel = 0;
public ExchangeNode(PlanNodeId id) {
super(id);
@@ -65,7 +65,7 @@ public class ExchangeNode extends SingleChildProcessNode {
public PlanNode clone() {
ExchangeNode node = new ExchangeNode(getPlanNodeId());
node.setOutputColumnNames(outputColumnNames);
- node.setIndexOfUpstreamSinkHandle(indexOfUpstreamSinkHandle);
+ node.setIndexOfUpstreamISinkChannel(indexOfUpstreamISinkChannel);
return node;
}
@@ -101,7 +101,7 @@ public class ExchangeNode extends SingleChildProcessNode {
ExchangeNode exchangeNode = new ExchangeNode(planNodeId);
exchangeNode.setUpstream(endPoint, fragmentInstanceId, upstreamPlanNodeId);
exchangeNode.setOutputColumnNames(outputColumnNames);
- exchangeNode.setIndexOfUpstreamSinkHandle(index);
+ exchangeNode.setIndexOfUpstreamISinkChannel(index);
return exchangeNode;
}
@@ -116,7 +116,7 @@ public class ExchangeNode extends SingleChildProcessNode {
for (String outputColumnName : outputColumnNames) {
ReadWriteIOUtils.write(outputColumnName, byteBuffer);
}
- ReadWriteIOUtils.write(indexOfUpstreamSinkHandle, byteBuffer);
+ ReadWriteIOUtils.write(indexOfUpstreamISinkChannel, byteBuffer);
}
@Override
@@ -130,7 +130,7 @@ public class ExchangeNode extends SingleChildProcessNode {
for (String outputColumnName : outputColumnNames) {
ReadWriteIOUtils.write(outputColumnName, stream);
}
- ReadWriteIOUtils.write(indexOfUpstreamSinkHandle, stream);
+ ReadWriteIOUtils.write(indexOfUpstreamISinkChannel, stream);
}
@Override
@@ -148,12 +148,12 @@ public class ExchangeNode extends SingleChildProcessNode {
getUpstreamEndpoint().getIp(), getUpstreamInstanceId(), getUpstreamPlanNodeId());
}
- public int getIndexOfUpstreamSinkHandle() {
- return indexOfUpstreamSinkHandle;
+ public int getIndexOfUpstreamISinkChannel() {
+ return indexOfUpstreamISinkChannel;
}
- public void setIndexOfUpstreamSinkHandle(int indexOfUpstreamSinkHandle) {
- this.indexOfUpstreamSinkHandle = indexOfUpstreamSinkHandle;
+ public void setIndexOfUpstreamISinkChannel(int indexOfUpstreamISinkChannel) {
+ this.indexOfUpstreamISinkChannel = indexOfUpstreamISinkChannel;
}
public TEndPoint getUpstreamEndpoint() {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkChannelTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkChannelTest.java
index dac0ec75d8..25f1ea962f 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkChannelTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkChannelTest.java
@@ -32,6 +32,8 @@ import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
+import java.util.concurrent.Executors;
+
public class LocalSinkChannelTest {
@Test
public void testSend() {
@@ -64,7 +66,12 @@ public class LocalSinkChannelTest {
localFragmentInstanceId,
remotePlanNodeId,
queue,
- Mockito.mock(MPPDataExchangeManager.SourceHandleListener.class));
+ Mockito.mock(MPPDataExchangeManager.SourceHandleListener.class),
+ null,
+ Executors.newSingleThreadExecutor(),
+ 0,
+ null,
+ remoteFragmentInstanceId);
Assert.assertFalse(localSinkChannel.isFull().isDone());
localSourceHandle.isBlocked();
@@ -151,7 +158,12 @@ public class LocalSinkChannelTest {
localFragmentInstanceId,
remotePlanNodeId,
queue,
- Mockito.mock(MPPDataExchangeManager.SourceHandleListener.class));
+ Mockito.mock(MPPDataExchangeManager.SourceHandleListener.class),
+ null,
+ Executors.newSingleThreadExecutor(),
+ 0,
+ null,
+ remoteFragmentInstanceId);
Assert.assertFalse(localSinkChannel.isFull().isDone());
localSourceHandle.isBlocked();
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandleTest.java
index aa15f19967..aafcd7fad5 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandleTest.java
@@ -30,6 +30,8 @@ import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
+import java.util.concurrent.Executors;
+
public class LocalSourceHandleTest {
@Test
public void testReceive() {
@@ -51,7 +53,15 @@ public class LocalSourceHandleTest {
LocalSourceHandle localSourceHandle =
new LocalSourceHandle(
- localFragmentInstanceId, localPlanNodeId, queue, mockSourceHandleListener);
+ localFragmentInstanceId,
+ localPlanNodeId,
+ queue,
+ mockSourceHandleListener,
+ null,
+ Executors.newSingleThreadExecutor(),
+ 0,
+ null,
+ remoteFragmentInstanceId);
Assert.assertFalse(localSourceHandle.isBlocked().isDone());
Assert.assertFalse(localSourceHandle.isAborted());
Assert.assertFalse(localSourceHandle.isFinished());
@@ -95,7 +105,15 @@ public class LocalSourceHandleTest {
LocalSourceHandle localSourceHandle =
new LocalSourceHandle(
- localFragmentInstanceId, localPlanNodeId, queue, mockSourceHandleListener);
+ localFragmentInstanceId,
+ localPlanNodeId,
+ queue,
+ mockSourceHandleListener,
+ null,
+ Executors.newSingleThreadExecutor(),
+ 0,
+ null,
+ remoteFragmentInstanceId);
ListenableFuture<?> future = localSourceHandle.isBlocked();
Assert.assertFalse(future.isDone());
Assert.assertFalse(localSourceHandle.isAborted());
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManagerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManagerTest.java
index f66baa559d..d8ec74ddf8 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManagerTest.java
@@ -89,6 +89,7 @@ public class MPPDataExchangeManagerTest {
remoteFragmentInstanceId,
remotePlanNodeId,
localPlanNodeId,
+ null,
localFragmentInstanceId,
0,
t -> {});
@@ -128,6 +129,7 @@ public class MPPDataExchangeManagerTest {
remoteFragmentInstanceId,
remotePlanNodeId,
localPlanNodeId,
+ null,
localFragmentInstanceId,
0,
t -> {});
diff --git a/thrift/src/main/thrift/datanode.thrift b/thrift/src/main/thrift/datanode.thrift
index 671a24a39a..cf8a889dd7 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -85,6 +85,12 @@ struct TCloseSinkChannelEvent {
2: required i32 index
}
+struct TCloseLocalSinkChannelEvent {
+ 1: required TFragmentInstanceId sourceFragmentInstanceId
+ // index of upstream SinkChannel
+ 2: required i32 index
+}
+
struct TNewDataBlockEvent {
1: required TFragmentInstanceId targetFragmentInstanceId
2: required string targetPlanNodeId
@@ -774,6 +780,8 @@ service MPPDataExchangeService {
void onCloseSinkChannelEvent(TCloseSinkChannelEvent e);
+ void onCloseLocalSinkChannelEvent(TCloseLocalSinkChannelEvent e);
+
void onNewDataBlockEvent(TNewDataBlockEvent e);
void onEndOfDataBlockEvent(TEndOfDataBlockEvent e);