You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/04/03 14:21:03 UTC

[iotdb] branch FIDig updated: Try open channel (#9513)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch FIDig
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/FIDig by this push:
     new 852ed379d7 Try open channel (#9513)
852ed379d7 is described below

commit 852ed379d7900611c4b9cd54d6746ba6f21ed6ae
Author: Liao Lanyu <14...@qq.com>
AuthorDate: Mon Apr 3 22:20:56 2023 +0800

    Try open channel (#9513)
---
 .../exchange/IMPPDataExchangeManager.java          |  1 +
 .../execution/exchange/MPPDataExchangeManager.java | 50 +++++++++++++-
 .../mpp/execution/exchange/sink/ISinkHandle.java   |  2 +
 .../execution/exchange/sink/ShuffleSinkHandle.java | 19 ++---
 .../exchange/source/LocalSourceHandle.java         | 80 +++++++++++++++++++++-
 .../db/mpp/plan/execution/QueryExecution.java      |  1 +
 .../db/mpp/plan/planner/OperatorTreeGenerator.java |  5 +-
 .../planner/distribution/DistributionPlanner.java  |  4 +-
 .../planner/plan/node/process/ExchangeNode.java    | 18 ++---
 .../execution/exchange/LocalSinkChannelTest.java   | 16 ++++-
 .../execution/exchange/LocalSourceHandleTest.java  | 22 +++++-
 .../exchange/MPPDataExchangeManagerTest.java       |  2 +
 thrift/src/main/thrift/datanode.thrift             |  8 +++
 13 files changed, 201 insertions(+), 27 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/IMPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/IMPPDataExchangeManager.java
index bbbce26e7c..f8e064bcc4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/IMPPDataExchangeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/IMPPDataExchangeManager.java
@@ -74,6 +74,7 @@ public interface IMPPDataExchangeManager {
       TFragmentInstanceId localFragmentInstanceId,
       String localPlanNodeId,
       String remotePlanNodeId,
+      TEndPoint remoteEndPoint,
       TFragmentInstanceId remoteFragmentInstanceId,
       int index,
       IMPPDataExchangeManagerCallback<Throwable> onFailureCallback);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
index 9a58904c79..5b3cebbfc1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
@@ -40,6 +40,7 @@ import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
 import org.apache.iotdb.db.utils.SetThreadName;
 import org.apache.iotdb.mpp.rpc.thrift.MPPDataExchangeService;
 import org.apache.iotdb.mpp.rpc.thrift.TAcknowledgeDataBlockEvent;
+import org.apache.iotdb.mpp.rpc.thrift.TCloseLocalSinkChannelEvent;
 import org.apache.iotdb.mpp.rpc.thrift.TCloseSinkChannelEvent;
 import org.apache.iotdb.mpp.rpc.thrift.TEndOfDataBlockEvent;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
@@ -178,6 +179,7 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
           return;
         }
         sinkHandle.getChannel(e.getIndex()).close();
+        closeISinkChannelOfShuffleSinkHandle(sinkHandle, e.getIndex());
       } catch (Throwable t) {
         LOGGER.warn(
             "Close channel of ShuffleSinkHandle {}, index {} failed.",
@@ -188,6 +190,46 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
       }
     }
 
+    @Override
+    public void onCloseLocalSinkChannelEvent(TCloseLocalSinkChannelEvent e) throws TException {
+      try (SetThreadName fragmentInstanceName =
+          new SetThreadName(
+              createFullId(
+                  e.sourceFragmentInstanceId.queryId,
+                  e.sourceFragmentInstanceId.fragmentId,
+                  e.sourceFragmentInstanceId.instanceId))) {
+        LOGGER.debug(
+            "Closed LocalSourceHandle of ShuffleSinkHandle {}, channel index: {}.",
+            e.getSourceFragmentInstanceId(),
+            e.getIndex());
+        ISinkHandle sinkHandle = shuffleSinkHandles.get(e.getSourceFragmentInstanceId());
+        if (sinkHandle == null) {
+          LOGGER.debug(
+              "received CloseLocalSinkChannelEvent but target FragmentInstance[{}] is not found.",
+              e.getSourceFragmentInstanceId());
+          return;
+        }
+        closeISinkChannelOfShuffleSinkHandle(sinkHandle, e.getIndex());
+
+      } catch (Throwable t) {
+        LOGGER.warn(
+            "Close channel of ShuffleSinkHandle {}, index {} failed.",
+            e.getSourceFragmentInstanceId(),
+            e.getIndex(),
+            t);
+        throw t;
+      }
+    }
+
+    private void closeISinkChannelOfShuffleSinkHandle(ISinkHandle sinkHandle, int index) {
+      sinkHandle.addToClosedChannel(index);
+      // if all the channels of the ShuffleSinkHandle are closed, we close the ShuffleSinkHandle
+      // directly so that we can finish the FI.
+      if (sinkHandle.isClosed()) {
+        sinkHandle.close();
+      }
+    }
+
     @Override
     public void onNewDataBlockEvent(TNewDataBlockEvent e) throws TException {
       long startTime = System.nanoTime();
@@ -634,6 +676,7 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
       TFragmentInstanceId localFragmentInstanceId,
       String localPlanNodeId,
       String remotePlanNodeId,
+      TEndPoint remoteEndPoint,
       TFragmentInstanceId remoteFragmentInstanceId,
       int index,
       IMPPDataExchangeManagerCallback<Throwable> onFailureCallback) {
@@ -667,7 +710,12 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
             localFragmentInstanceId,
             localPlanNodeId,
             queue,
-            new SourceHandleListenerImpl(onFailureCallback));
+            new SourceHandleListenerImpl(onFailureCallback),
+            mppDataExchangeServiceClientManager,
+            executorService,
+            index,
+            remoteEndPoint,
+            remoteFragmentInstanceId);
     sourceHandles
         .computeIfAbsent(localFragmentInstanceId, key -> new ConcurrentHashMap<>())
         .put(localPlanNodeId, localSourceHandle);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ISinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ISinkHandle.java
index 5fb511e402..266a77341a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ISinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ISinkHandle.java
@@ -36,4 +36,6 @@ public interface ISinkHandle extends ISink {
 
   /** Return true if the specified channel is closed. */
   boolean isChannelClosed(int index);
+
+  void addToClosedChannel(int index);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
index 7060541acc..fd49636406 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
@@ -41,7 +41,7 @@ public class ShuffleSinkHandle implements ISinkHandle {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(ShuffleSinkHandle.class);
 
-  /** Each ISinkHandle in the list matches one downStream ISourceHandle */
+  /** Each ISinkChannel in the list matches one downStream ISourceHandle */
   private final List<ISinkChannel> downStreamChannelList;
 
   private final boolean[] hasSetNoMoreTsBlocks;
@@ -88,8 +88,6 @@ public class ShuffleSinkHandle implements ISinkHandle {
     this.shuffleStrategy = getShuffleStrategy(shuffleStrategyEnum);
     this.hasSetNoMoreTsBlocks = new boolean[channelNum];
     this.channelOpened = new boolean[channelNum];
-    // open first channel
-    tryOpenChannel(0);
   }
 
   @Override
@@ -103,12 +101,13 @@ public class ShuffleSinkHandle implements ISinkHandle {
 
   @Override
   public synchronized ListenableFuture<?> isFull() {
+    int currentIndex = downStreamChannelIndex.getCurrentIndex();
+    // try open channel
+    tryOpenChannel(currentIndex);
     // It is safe to use currentChannel.isFull() to judge whether we can send a TsBlock only when
     // downStreamChannelIndex will not be changed between we call isFull() and send() of
     // ShuffleSinkHandle
-    ISinkChannel currentChannel =
-        downStreamChannelList.get(downStreamChannelIndex.getCurrentIndex());
-    return currentChannel.isFull();
+    return downStreamChannelList.get(currentIndex).isFull();
   }
 
   @Override
@@ -146,10 +145,15 @@ public class ShuffleSinkHandle implements ISinkHandle {
   }
 
   @Override
-  public boolean isClosed() {
+  public synchronized boolean isClosed() {
     return closedChannel.size() == downStreamChannelList.size();
   }
 
+  @Override
+  public synchronized void addToClosedChannel(int index) {
+    closedChannel.add(index);
+  }
+
   @Override
   public synchronized boolean isAborted() {
     return aborted;
@@ -234,7 +238,6 @@ public class ShuffleSinkHandle implements ISinkHandle {
 
   private void switchChannelIfNecessary() {
     shuffleStrategy.shuffle();
-    tryOpenChannel(downStreamChannelIndex.getCurrentIndex());
   }
 
   public void tryOpenChannel(int channelIndex) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/LocalSourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/LocalSourceHandle.java
index 7dc6ad2983..749b8b6274 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/LocalSourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/LocalSourceHandle.java
@@ -19,11 +19,15 @@
 
 package org.apache.iotdb.db.mpp.execution.exchange.source;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
 import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SourceHandleListener;
 import org.apache.iotdb.db.mpp.execution.exchange.SharedTsBlockQueue;
 import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
 import org.apache.iotdb.db.utils.SetThreadName;
+import org.apache.iotdb.mpp.rpc.thrift.TCloseLocalSinkChannelEvent;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -35,6 +39,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
 
 import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
 import static org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.createFullIdFrom;
@@ -45,6 +50,21 @@ public class LocalSourceHandle implements ISourceHandle {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(LocalSourceHandle.class);
 
+  public static final int MAX_ATTEMPT_TIMES = 3;
+
+  private static final long DEFAULT_RETRY_INTERVAL_IN_MS = 1000;
+
+  private ExecutorService executorService;
+
+  private int indexOfUpstreamISinkChannel = 0;
+
+  private TEndPoint remoteEndpoint;
+
+  private TFragmentInstanceId remoteFragmentInstanceId;
+
+  private IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
+      mppDataExchangeServiceClientManager;
+
   private TFragmentInstanceId localFragmentInstanceId;
   private String localPlanNodeId;
   private final SourceHandleListener sourceHandleListener;
@@ -74,13 +94,24 @@ public class LocalSourceHandle implements ISourceHandle {
       TFragmentInstanceId localFragmentInstanceId,
       String localPlanNodeId,
       SharedTsBlockQueue queue,
-      SourceHandleListener sourceHandleListener) {
+      SourceHandleListener sourceHandleListener,
+      IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
+          mppDataExchangeServiceClientManager,
+      ExecutorService executorService,
+      int indexOfUpstreamISinkChannel,
+      TEndPoint remoteEndpoint,
+      TFragmentInstanceId remoteFragmentInstanceId) {
     this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
     this.localPlanNodeId = Validate.notNull(localPlanNodeId);
     this.queue = Validate.notNull(queue);
     this.queue.setSourceHandle(this);
     this.sourceHandleListener = Validate.notNull(sourceHandleListener);
     this.threadName = createFullIdFrom(localFragmentInstanceId, localPlanNodeId);
+    this.mppDataExchangeServiceClientManager = mppDataExchangeServiceClientManager;
+    this.executorService = executorService;
+    this.indexOfUpstreamISinkChannel = indexOfUpstreamISinkChannel;
+    this.remoteEndpoint = remoteEndpoint;
+    this.remoteFragmentInstanceId = remoteFragmentInstanceId;
   }
 
   @Override
@@ -231,6 +262,10 @@ public class LocalSourceHandle implements ISourceHandle {
           }
           queue.close();
           closed = true;
+          if (executorService != null) {
+            // only send close event when this LocalSourceHandle is created for a Fragment
+            executorService.submit(new SendCloseLocalSinkChannelEventTask());
+          }
           sourceHandleListener.onFinished(this);
         }
       }
@@ -255,4 +290,47 @@ public class LocalSourceHandle implements ISourceHandle {
     // do nothing, the maxBytesCanReserve of SharedTsBlockQueue should be set by corresponding
     // LocalSinkChannel
   }
+
+  class SendCloseLocalSinkChannelEventTask implements Runnable {
+
+    @Override
+    public void run() {
+      try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
+        LOGGER.debug(
+            "[SendCloseLocalSinkChanelEvent] to [ShuffleSinkHandle: {}, index: {}]).",
+            remoteFragmentInstanceId,
+            indexOfUpstreamISinkChannel);
+        int attempt = 0;
+        TCloseLocalSinkChannelEvent closeLocalSinkChannelEvent =
+            new TCloseLocalSinkChannelEvent(remoteFragmentInstanceId, indexOfUpstreamISinkChannel);
+        while (attempt < MAX_ATTEMPT_TIMES) {
+          attempt += 1;
+          long startTime = System.nanoTime();
+          try (SyncDataNodeMPPDataExchangeServiceClient client =
+              mppDataExchangeServiceClientManager.borrowClient(remoteEndpoint)) {
+            client.onCloseLocalSinkChannelEvent(closeLocalSinkChannelEvent);
+            break;
+          } catch (Throwable e) {
+            LOGGER.warn(
+                "[SendCloseLocalSinkChanelEvent] to [ShuffleSinkHandle: {}, index: {}] failed.).",
+                remoteFragmentInstanceId,
+                indexOfUpstreamISinkChannel);
+            if (attempt == MAX_ATTEMPT_TIMES) {
+              synchronized (LocalSourceHandle.this) {
+                sourceHandleListener.onFailure(LocalSourceHandle.this, e);
+              }
+            }
+            try {
+              Thread.sleep(DEFAULT_RETRY_INTERVAL_IN_MS);
+            } catch (InterruptedException ex) {
+              Thread.currentThread().interrupt();
+              synchronized (LocalSourceHandle.this) {
+                sourceHandleListener.onFailure(LocalSourceHandle.this, e);
+              }
+            }
+          }
+        }
+      }
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 4c2d0c23bf..12c4959ba5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -575,6 +575,7 @@ public class QueryExecution implements IQueryExecution {
                     context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift(),
                     context.getResultNodeContext().getVirtualResultNodeId().getId(),
                     context.getResultNodeContext().getUpStreamPlanNodeId().getId(),
+                    null,
                     context.getResultNodeContext().getUpStreamFragmentInstanceId().toThrift(),
                     0, // Upstream of result ExchangeNode will only have one child.
                     stateMachine::transitionToFailed)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 1083fafcfc..bd10051d5f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -1883,13 +1883,14 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
                 localInstanceId.toThrift(),
                 node.getPlanNodeId().getId(),
                 node.getUpstreamPlanNodeId().getId(),
+                upstreamEndPoint,
                 remoteInstanceId.toThrift(),
-                node.getIndexOfUpstreamSinkHandle(),
+                node.getIndexOfUpstreamISinkChannel(),
                 context.getInstanceContext()::failed)
             : MPP_DATA_EXCHANGE_MANAGER.createSourceHandle(
                 localInstanceId.toThrift(),
                 node.getPlanNodeId().getId(),
-                node.getIndexOfUpstreamSinkHandle(),
+                node.getIndexOfUpstreamISinkChannel(),
                 upstreamEndPoint,
                 remoteInstanceId.toThrift(),
                 context.getInstanceContext()::failed);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
index 2f1e85aedf..45802e0cfc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
@@ -126,7 +126,7 @@ public class DistributionPlanner {
         newChild.addDownStreamChannelLocation(
             new DownStreamChannelLocation(exchangeNode.getPlanNodeId().toString()));
         exchangeNode.setChild(newChild);
-        exchangeNode.setIndexOfUpstreamSinkHandle(newChild.getCurrentLastIndex());
+        exchangeNode.setIndexOfUpstreamISinkChannel(newChild.getCurrentLastIndex());
       }
     }
   }
@@ -153,7 +153,7 @@ public class DistributionPlanner {
         newChild.addDownStreamChannelLocation(
             new DownStreamChannelLocation(exchangeNode.getPlanNodeId().toString()));
         exchangeNode.setChild(newChild);
-        exchangeNode.setIndexOfUpstreamSinkHandle(newChild.getCurrentLastIndex());
+        exchangeNode.setIndexOfUpstreamISinkChannel(newChild.getCurrentLastIndex());
       }
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ExchangeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ExchangeNode.java
index 84622d1d5c..3b3f21d5b8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ExchangeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ExchangeNode.java
@@ -45,7 +45,7 @@ public class ExchangeNode extends SingleChildProcessNode {
   private List<String> outputColumnNames = new ArrayList<>();
 
   /** Exchange needs to know which child of IdentitySinkNode/ShuffleSinkNode it matches */
-  private int indexOfUpstreamSinkHandle = 0;
+  private int indexOfUpstreamISinkChannel = 0;
 
   public ExchangeNode(PlanNodeId id) {
     super(id);
@@ -65,7 +65,7 @@ public class ExchangeNode extends SingleChildProcessNode {
   public PlanNode clone() {
     ExchangeNode node = new ExchangeNode(getPlanNodeId());
     node.setOutputColumnNames(outputColumnNames);
-    node.setIndexOfUpstreamSinkHandle(indexOfUpstreamSinkHandle);
+    node.setIndexOfUpstreamISinkChannel(indexOfUpstreamISinkChannel);
     return node;
   }
 
@@ -101,7 +101,7 @@ public class ExchangeNode extends SingleChildProcessNode {
     ExchangeNode exchangeNode = new ExchangeNode(planNodeId);
     exchangeNode.setUpstream(endPoint, fragmentInstanceId, upstreamPlanNodeId);
     exchangeNode.setOutputColumnNames(outputColumnNames);
-    exchangeNode.setIndexOfUpstreamSinkHandle(index);
+    exchangeNode.setIndexOfUpstreamISinkChannel(index);
     return exchangeNode;
   }
 
@@ -116,7 +116,7 @@ public class ExchangeNode extends SingleChildProcessNode {
     for (String outputColumnName : outputColumnNames) {
       ReadWriteIOUtils.write(outputColumnName, byteBuffer);
     }
-    ReadWriteIOUtils.write(indexOfUpstreamSinkHandle, byteBuffer);
+    ReadWriteIOUtils.write(indexOfUpstreamISinkChannel, byteBuffer);
   }
 
   @Override
@@ -130,7 +130,7 @@ public class ExchangeNode extends SingleChildProcessNode {
     for (String outputColumnName : outputColumnNames) {
       ReadWriteIOUtils.write(outputColumnName, stream);
     }
-    ReadWriteIOUtils.write(indexOfUpstreamSinkHandle, stream);
+    ReadWriteIOUtils.write(indexOfUpstreamISinkChannel, stream);
   }
 
   @Override
@@ -148,12 +148,12 @@ public class ExchangeNode extends SingleChildProcessNode {
         getUpstreamEndpoint().getIp(), getUpstreamInstanceId(), getUpstreamPlanNodeId());
   }
 
-  public int getIndexOfUpstreamSinkHandle() {
-    return indexOfUpstreamSinkHandle;
+  public int getIndexOfUpstreamISinkChannel() {
+    return indexOfUpstreamISinkChannel;
   }
 
-  public void setIndexOfUpstreamSinkHandle(int indexOfUpstreamSinkHandle) {
-    this.indexOfUpstreamSinkHandle = indexOfUpstreamSinkHandle;
+  public void setIndexOfUpstreamISinkChannel(int indexOfUpstreamISinkChannel) {
+    this.indexOfUpstreamISinkChannel = indexOfUpstreamISinkChannel;
   }
 
   public TEndPoint getUpstreamEndpoint() {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkChannelTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkChannelTest.java
index dac0ec75d8..25f1ea962f 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkChannelTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkChannelTest.java
@@ -32,6 +32,8 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import java.util.concurrent.Executors;
+
 public class LocalSinkChannelTest {
   @Test
   public void testSend() {
@@ -64,7 +66,12 @@ public class LocalSinkChannelTest {
             localFragmentInstanceId,
             remotePlanNodeId,
             queue,
-            Mockito.mock(MPPDataExchangeManager.SourceHandleListener.class));
+            Mockito.mock(MPPDataExchangeManager.SourceHandleListener.class),
+            null,
+            Executors.newSingleThreadExecutor(),
+            0,
+            null,
+            remoteFragmentInstanceId);
 
     Assert.assertFalse(localSinkChannel.isFull().isDone());
     localSourceHandle.isBlocked();
@@ -151,7 +158,12 @@ public class LocalSinkChannelTest {
             localFragmentInstanceId,
             remotePlanNodeId,
             queue,
-            Mockito.mock(MPPDataExchangeManager.SourceHandleListener.class));
+            Mockito.mock(MPPDataExchangeManager.SourceHandleListener.class),
+            null,
+            Executors.newSingleThreadExecutor(),
+            0,
+            null,
+            remoteFragmentInstanceId);
 
     Assert.assertFalse(localSinkChannel.isFull().isDone());
     localSourceHandle.isBlocked();
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandleTest.java
index aa15f19967..aafcd7fad5 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandleTest.java
@@ -30,6 +30,8 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import java.util.concurrent.Executors;
+
 public class LocalSourceHandleTest {
   @Test
   public void testReceive() {
@@ -51,7 +53,15 @@ public class LocalSourceHandleTest {
 
     LocalSourceHandle localSourceHandle =
         new LocalSourceHandle(
-            localFragmentInstanceId, localPlanNodeId, queue, mockSourceHandleListener);
+            localFragmentInstanceId,
+            localPlanNodeId,
+            queue,
+            mockSourceHandleListener,
+            null,
+            Executors.newSingleThreadExecutor(),
+            0,
+            null,
+            remoteFragmentInstanceId);
     Assert.assertFalse(localSourceHandle.isBlocked().isDone());
     Assert.assertFalse(localSourceHandle.isAborted());
     Assert.assertFalse(localSourceHandle.isFinished());
@@ -95,7 +105,15 @@ public class LocalSourceHandleTest {
 
     LocalSourceHandle localSourceHandle =
         new LocalSourceHandle(
-            localFragmentInstanceId, localPlanNodeId, queue, mockSourceHandleListener);
+            localFragmentInstanceId,
+            localPlanNodeId,
+            queue,
+            mockSourceHandleListener,
+            null,
+            Executors.newSingleThreadExecutor(),
+            0,
+            null,
+            remoteFragmentInstanceId);
     ListenableFuture<?> future = localSourceHandle.isBlocked();
     Assert.assertFalse(future.isDone());
     Assert.assertFalse(localSourceHandle.isAborted());
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManagerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManagerTest.java
index f66baa559d..d8ec74ddf8 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManagerTest.java
@@ -89,6 +89,7 @@ public class MPPDataExchangeManagerTest {
             remoteFragmentInstanceId,
             remotePlanNodeId,
             localPlanNodeId,
+            null,
             localFragmentInstanceId,
             0,
             t -> {});
@@ -128,6 +129,7 @@ public class MPPDataExchangeManagerTest {
             remoteFragmentInstanceId,
             remotePlanNodeId,
             localPlanNodeId,
+            null,
             localFragmentInstanceId,
             0,
             t -> {});
diff --git a/thrift/src/main/thrift/datanode.thrift b/thrift/src/main/thrift/datanode.thrift
index 671a24a39a..cf8a889dd7 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -85,6 +85,12 @@ struct TCloseSinkChannelEvent {
   2: required i32 index
 }
 
+struct TCloseLocalSinkChannelEvent {
+  1: required TFragmentInstanceId sourceFragmentInstanceId
+  // index of upstream SinkChannel
+  2: required i32 index
+}
+
 struct TNewDataBlockEvent {
   1: required TFragmentInstanceId targetFragmentInstanceId
   2: required string targetPlanNodeId
@@ -774,6 +780,8 @@ service MPPDataExchangeService {
 
   void onCloseSinkChannelEvent(TCloseSinkChannelEvent e);
 
+  void onCloseLocalSinkChannelEvent(TCloseLocalSinkChannelEvent e);
+
   void onNewDataBlockEvent(TNewDataBlockEvent e);
 
   void onEndOfDataBlockEvent(TEndOfDataBlockEvent e);