You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/25 09:37:33 UTC

[iotdb] branch master updated: Optimize source handle error handling (#5659)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 138bd00ff9 Optimize source handle error handling (#5659)
138bd00ff9 is described below

commit 138bd00ff998efd2671263adac4157d0490deabd
Author: Zhong Wang <wa...@alibaba-inc.com>
AuthorDate: Mon Apr 25 17:37:27 2022 +0800

    Optimize source handle error handling (#5659)
---
 .../iotdb/db/mpp/buffer/DataBlockManager.java      | 58 +++++++++++++------
 .../iotdb/db/mpp/buffer/IDataBlockManager.java     |  5 +-
 .../db/mpp/buffer/IDataBlockManagerCallback.java   | 24 ++++++++
 .../apache/iotdb/db/mpp/buffer/ISinkHandle.java    | 10 ++--
 .../apache/iotdb/db/mpp/buffer/ISourceHandle.java  | 10 +++-
 .../org/apache/iotdb/db/mpp/buffer/SinkHandle.java | 21 +++----
 .../apache/iotdb/db/mpp/buffer/SourceHandle.java   | 34 ++++-------
 .../iotdb/db/mpp/execution/QueryExecution.java     |  7 +--
 .../db/mpp/operator/source/ExchangeOperator.java   |  9 +--
 .../db/mpp/sql/planner/LocalExecutionPlanner.java  |  3 +-
 .../apache/iotdb/db/mpp/buffer/SinkHandleTest.java |  8 +--
 .../iotdb/db/mpp/buffer/SourceHandleTest.java      | 67 ++++++----------------
 .../apache/iotdb/db/mpp/buffer/StubSinkHandle.java | 10 ++--
 13 files changed, 132 insertions(+), 134 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
index 7a38ef6d09..d55de6f780 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
@@ -50,19 +50,21 @@ public class DataBlockManager implements IDataBlockManager {
   private static final Logger logger = LoggerFactory.getLogger(DataBlockManager.class);
 
   public interface SourceHandleListener {
-    void onFinished(SourceHandle sourceHandle);
+    void onFinished(ISourceHandle sourceHandle);
 
-    void onClosed(SourceHandle sourceHandle);
+    void onClosed(ISourceHandle sourceHandle);
+
+    void onFailure(ISourceHandle sourceHandle, Throwable t);
   }
 
   public interface SinkHandleListener {
-    void onFinish(SinkHandle sinkHandle);
+    void onFinish(ISinkHandle sinkHandle);
 
-    void onClosed(SinkHandle sinkHandle);
+    void onClosed(ISinkHandle sinkHandle);
 
-    void onAborted(SinkHandle sinkHandle);
+    void onAborted(ISinkHandle sinkHandle);
 
-    void onFailure(Throwable t);
+    void onFailure(ISinkHandle sinkHandle, Throwable t);
   }
 
   /** Handle thrift communications. */
@@ -169,8 +171,14 @@ public class DataBlockManager implements IDataBlockManager {
   /** Listen to the state changes of a source handle. */
   class SourceHandleListenerImpl implements SourceHandleListener {
 
+    private final IDataBlockManagerCallback<Throwable> onFailureCallback;
+
+    public SourceHandleListenerImpl(IDataBlockManagerCallback<Throwable> onFailureCallback) {
+      this.onFailureCallback = onFailureCallback;
+    }
+
     @Override
-    public void onFinished(SourceHandle sourceHandle) {
+    public void onFinished(ISourceHandle sourceHandle) {
       logger.info("Release resources of finished source handle {}", sourceHandle);
       if (!sourceHandles.containsKey(sourceHandle.getLocalFragmentInstanceId())
           || !sourceHandles
@@ -190,22 +198,33 @@ public class DataBlockManager implements IDataBlockManager {
     }
 
     @Override
-    public void onClosed(SourceHandle sourceHandle) {
+    public void onClosed(ISourceHandle sourceHandle) {
       onFinished(sourceHandle);
     }
+
+    @Override
+    public void onFailure(ISourceHandle sourceHandle, Throwable t) {
+      logger.error("Source handle {} failed due to {}", sourceHandle, t);
+      if (onFailureCallback != null) {
+        onFailureCallback.call(t);
+      }
+    }
   }
 
   /** Listen to the state changes of a sink handle. */
   class SinkHandleListenerImpl implements SinkHandleListener {
 
     private final FragmentInstanceContext context;
+    private final IDataBlockManagerCallback<Throwable> onFailureCallback;
 
-    public SinkHandleListenerImpl(FragmentInstanceContext context) {
+    public SinkHandleListenerImpl(
+        FragmentInstanceContext context, IDataBlockManagerCallback<Throwable> onFailureCallback) {
       this.context = context;
+      this.onFailureCallback = onFailureCallback;
     }
 
     @Override
-    public void onFinish(SinkHandle sinkHandle) {
+    public void onFinish(ISinkHandle sinkHandle) {
       logger.info("Release resources of finished sink handle {}", sourceHandles);
       if (!sinkHandles.containsKey(sinkHandle.getLocalFragmentInstanceId())) {
         logger.info("Resources of finished sink handle {} has already been released", sinkHandle);
@@ -215,12 +234,12 @@ public class DataBlockManager implements IDataBlockManager {
     }
 
     @Override
-    public void onClosed(SinkHandle sinkHandle) {
+    public void onClosed(ISinkHandle sinkHandle) {
       context.transitionToFlushing();
     }
 
     @Override
-    public void onAborted(SinkHandle sinkHandle) {
+    public void onAborted(ISinkHandle sinkHandle) {
       logger.info("Release resources of aborted sink handle {}", sourceHandles);
       if (!sinkHandles.containsKey(sinkHandle.getLocalFragmentInstanceId())) {
         logger.info("Resources of aborted sink handle {} has already been released", sinkHandle);
@@ -229,8 +248,11 @@ public class DataBlockManager implements IDataBlockManager {
     }
 
     @Override
-    public void onFailure(Throwable t) {
-      context.failed(t);
+    public void onFailure(ISinkHandle sinkHandle, Throwable t) {
+      logger.error("Sink handle {} failed due to {}", sinkHandle, t);
+      if (onFailureCallback != null) {
+        onFailureCallback.call(t);
+      }
     }
   }
 
@@ -269,6 +291,7 @@ public class DataBlockManager implements IDataBlockManager {
       TEndPoint remoteEndpoint,
       TFragmentInstanceId remoteFragmentInstanceId,
       String remotePlanNodeId,
+      // TODO: replace with callbacks to decouple DataBlockManager from FragmentInstanceContext
       FragmentInstanceContext instanceContext) {
     if (sinkHandles.containsKey(localFragmentInstanceId)) {
       throw new IllegalStateException("Sink handle for " + localFragmentInstanceId + " exists.");
@@ -290,7 +313,7 @@ public class DataBlockManager implements IDataBlockManager {
             executorService,
             clientFactory.getDataBlockServiceClient(remoteEndpoint),
             tsBlockSerdeFactory.get(),
-            new SinkHandleListenerImpl(instanceContext));
+            new SinkHandleListenerImpl(instanceContext, instanceContext::failed));
     sinkHandles.put(localFragmentInstanceId, sinkHandle);
     return sinkHandle;
   }
@@ -300,7 +323,8 @@ public class DataBlockManager implements IDataBlockManager {
       TFragmentInstanceId localFragmentInstanceId,
       String localPlanNodeId,
       TEndPoint remoteEndpoint,
-      TFragmentInstanceId remoteFragmentInstanceId) {
+      TFragmentInstanceId remoteFragmentInstanceId,
+      IDataBlockManagerCallback<Throwable> onFailureCallback) {
     if (sourceHandles.containsKey(localFragmentInstanceId)
         && sourceHandles.get(localFragmentInstanceId).containsKey(localPlanNodeId)) {
       throw new IllegalStateException(
@@ -327,7 +351,7 @@ public class DataBlockManager implements IDataBlockManager {
             executorService,
             clientFactory.getDataBlockServiceClient(remoteEndpoint),
             tsBlockSerdeFactory.get(),
-            new SourceHandleListenerImpl());
+            new SourceHandleListenerImpl(onFailureCallback));
     sourceHandles
         .computeIfAbsent(localFragmentInstanceId, key -> new ConcurrentHashMap<>())
         .put(localPlanNodeId, sourceHandle);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManager.java
index 0d60d346db..049f8b023d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManager.java
@@ -33,6 +33,7 @@ public interface IDataBlockManager {
    * @param endpoint Hostname and Port of the remote fragment instance where the data blocks should
    *     be sent to.
    * @param remotePlanNodeId The sink plan node ID of the remote fragment instance.
+   * @param instanceContext The context of local fragment instance.
    */
   ISinkHandle createSinkHandle(
       TFragmentInstanceId localFragmentInstanceId,
@@ -51,12 +52,14 @@ public interface IDataBlockManager {
    * @param endpoint Hostname and Port of the remote fragment instance where the data blocks should
    *     be received from.
    * @param remoteFragmentInstanceId ID of the remote fragment instance.
+   * @param onFailureCallback The callback on failure.
    */
   ISourceHandle createSourceHandle(
       TFragmentInstanceId localFragmentInstanceId,
       String localPlanNodeId,
       TEndPoint endpoint,
-      TFragmentInstanceId remoteFragmentInstanceId);
+      TFragmentInstanceId remoteFragmentInstanceId,
+      IDataBlockManagerCallback<Throwable> onFailureCallback);
 
   /**
    * Release all the related resources of a fragment instance, including data blocks that are not
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManagerCallback.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManagerCallback.java
new file mode 100644
index 0000000000..ecafdd4a28
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManagerCallback.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.buffer;
+
+public interface IDataBlockManagerCallback<T> {
+  public void call(T argument);
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java
index 99f92e3f45..b28b812175 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISinkHandle.java
@@ -18,21 +18,21 @@
  */
 package org.apache.iotdb.db.mpp.buffer;
 
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
-import java.io.IOException;
 import java.util.List;
 
 public interface ISinkHandle {
 
+  /** Get the local fragment instance ID that this sink handle belongs to. */
+  TFragmentInstanceId getLocalFragmentInstanceId();
+
   /** Get the total amount of memory used by buffered tsblocks. */
   long getBufferRetainedSizeInBytes();
 
-  /** Get the number of buffered tsblocks. */
-  int getNumOfBufferedTsBlocks();
-
   /** Get a future that will be completed when the output buffer is not full. */
   ListenableFuture<Void> isFull();
 
@@ -48,7 +48,7 @@ public interface ISinkHandle {
    * tsblock call is ignored. This can happen with limit queries. A {@link RuntimeException} will be
    * thrown if any exception happened * during the data transmission.
    */
-  void send(int partition, List<TsBlock> tsBlocks) throws IOException;
+  void send(int partition, List<TsBlock> tsBlocks);
 
   /**
    * Notify the handle that no more tsblocks will be sent. Any future calls to send a tsblock should
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISourceHandle.java
index 9e31918627..e9be12d838 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/ISourceHandle.java
@@ -18,15 +18,21 @@
  */
 package org.apache.iotdb.db.mpp.buffer;
 
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
 import java.io.Closeable;
-import java.io.IOException;
 
 public interface ISourceHandle extends Closeable {
 
+  /** Get the local fragment instance ID that this source handle belongs to. */
+  TFragmentInstanceId getLocalFragmentInstanceId();
+
+  /** Get the local plan node ID that this source handle belongs to. */
+  String getLocalPlanNodeId();
+
   /** Get the total amount of memory used by buffered tsblocks. */
   long getBufferRetainedSizeInBytes();
 
@@ -34,7 +40,7 @@ public interface ISourceHandle extends Closeable {
    * Get a {@link TsBlock}. If the source handle is blocked, a null will be returned. A {@link
    * RuntimeException} will be thrown if any error happened.
    */
-  TsBlock receive() throws IOException;
+  TsBlock receive();
 
   /** If there are more tsblocks. */
   boolean isFinished();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
index c540103126..5d9cfc8263 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
@@ -71,9 +71,9 @@ public class SinkHandle implements ISinkHandle {
 
   private volatile ListenableFuture<Void> blocked = immediateFuture(null);
   private int nextSequenceId = 0;
-  private long bufferRetainedSizeInBytes;
-  private boolean closed;
-  private boolean noMoreTsBlocks;
+  private long bufferRetainedSizeInBytes = 0;
+  private boolean closed = false;
+  private boolean noMoreTsBlocks = false;
 
   public SinkHandle(
       TEndPoint remoteEndpoint,
@@ -105,10 +105,6 @@ public class SinkHandle implements ISinkHandle {
   }
 
   private void submitSendNewDataBlockEventTask(int startSequenceId, List<Long> blockSizes) {
-    // TODO: (xingtanzjr)
-    // We temporarily make it sync instead of async to avoid EOS Event(SinkHandle close() method is
-    // called) is sent before NewDataBlockEvent arrived
-    //    new SendNewDataBlockEventTask(startSequenceId, blockSizes).run();
     executorService.submit(new SendNewDataBlockEventTask(startSequenceId, blockSizes));
   }
 
@@ -248,7 +244,6 @@ public class SinkHandle implements ISinkHandle {
     return bufferRetainedSizeInBytes;
   }
 
-  @Override
   public int getNumOfBufferedTsBlocks() {
     return sequenceIdToTsBlock.size();
   }
@@ -289,19 +284,19 @@ public class SinkHandle implements ISinkHandle {
     localMemoryManager.getQueryPool().free(localFragmentInstanceId.getQueryId(), freedBytes);
   }
 
-  TEndPoint getRemoteEndpoint() {
+  public TEndPoint getRemoteEndpoint() {
     return remoteEndpoint;
   }
 
-  TFragmentInstanceId getRemoteFragmentInstanceId() {
+  public TFragmentInstanceId getRemoteFragmentInstanceId() {
     return remoteFragmentInstanceId;
   }
 
-  String getRemotePlanNodeId() {
+  public String getRemotePlanNodeId() {
     return remotePlanNodeId;
   }
 
-  TFragmentInstanceId getLocalFragmentInstanceId() {
+  public TFragmentInstanceId getLocalFragmentInstanceId() {
     return localFragmentInstanceId;
   }
 
@@ -364,7 +359,7 @@ public class SinkHandle implements ISinkHandle {
               attempt,
               e);
           if (attempt == MAX_ATTEMPT_TIMES) {
-            sinkHandleListener.onFailure(e);
+            sinkHandleListener.onFailure(SinkHandle.this, e);
           }
         }
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
index b249cb78d7..cbe5e9d540 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
@@ -37,7 +37,6 @@ import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -68,12 +67,11 @@ public class SourceHandle implements ISourceHandle {
   private final Map<Integer, Long> sequenceIdToDataBlockSize = new HashMap<>();
 
   private volatile SettableFuture<Void> blocked = SettableFuture.create();
-  private long bufferRetainedSizeInBytes;
+  private long bufferRetainedSizeInBytes = 0L;
   private int currSequenceId = 0;
   private int nextSequenceId = 0;
   private int lastSequenceId = Integer.MAX_VALUE;
-  private boolean closed;
-  private Throwable throwable;
+  private boolean closed = false;
 
   public SourceHandle(
       TEndPoint remoteEndpoint,
@@ -98,10 +96,7 @@ public class SourceHandle implements ISourceHandle {
   }
 
   @Override
-  public TsBlock receive() throws IOException {
-    if (throwable != null) {
-      throw new IOException(throwable);
-    }
+  public TsBlock receive() {
     if (closed) {
       throw new IllegalStateException("Source handle is closed.");
     }
@@ -196,9 +191,6 @@ public class SourceHandle implements ISourceHandle {
 
   @Override
   public ListenableFuture<Void> isBlocked() {
-    if (throwable != null) {
-      throw new RuntimeException(throwable);
-    }
     if (closed) {
       throw new IllegalStateException("Source handle is closed.");
     }
@@ -240,7 +232,7 @@ public class SourceHandle implements ISourceHandle {
 
   @Override
   public boolean isFinished() {
-    return throwable == null && remoteTsBlockedConsumedUp();
+    return remoteTsBlockedConsumedUp();
   }
 
   // Return true indicates two points:
@@ -250,19 +242,19 @@ public class SourceHandle implements ISourceHandle {
     return currSequenceId - 1 == lastSequenceId;
   }
 
-  TEndPoint getRemoteEndpoint() {
+  public TEndPoint getRemoteEndpoint() {
     return remoteEndpoint;
   }
 
-  TFragmentInstanceId getRemoteFragmentInstanceId() {
+  public TFragmentInstanceId getRemoteFragmentInstanceId() {
     return remoteFragmentInstanceId.deepCopy();
   }
 
-  TFragmentInstanceId getLocalFragmentInstanceId() {
+  public TFragmentInstanceId getLocalFragmentInstanceId() {
     return localFragmentInstanceId;
   }
 
-  String getLocalPlanNodeId() {
+  public String getLocalPlanNodeId() {
     return localPlanNodeId;
   }
 
@@ -352,17 +344,11 @@ public class SourceHandle implements ISourceHandle {
               attempt);
           if (attempt == MAX_ATTEMPT_TIMES) {
             synchronized (SourceHandle.this) {
-              throwable = e;
               bufferRetainedSizeInBytes -= reservedBytes;
               localMemoryManager
                   .getQueryPool()
                   .free(localFragmentInstanceId.getQueryId(), reservedBytes);
-              // That GetDataBlocksTask throws exception means some TsBlock cannot be
-              // fetched permanently. Someone maybe waiting the SourceHandle to be
-              // unblocked after fetching data. So the blocked should be released.
-              if (blocked != null && !blocked.isDone()) {
-                blocked.cancel(true);
-              }
+              sourceHandleListener.onFailure(SourceHandle.this, e);
             }
           }
         }
@@ -406,7 +392,7 @@ public class SourceHandle implements ISourceHandle {
               attempt);
           if (attempt == MAX_ATTEMPT_TIMES) {
             synchronized (this) {
-              throwable = e;
+              sourceHandleListener.onFailure(SourceHandle.this, e);
             }
           }
         }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index ad9a1c170a..0df9e2de4d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -46,7 +46,6 @@ import com.google.common.util.concurrent.SettableFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.List;
@@ -210,8 +209,7 @@ public class QueryExecution implements IQueryExecution {
         return null;
       }
       return resultHandle.receive();
-
-    } catch (ExecutionException | IOException | CancellationException e) {
+    } catch (ExecutionException | CancellationException e) {
       stateMachine.transitionToFailed(e);
       throwIfUnchecked(e.getCause());
       throw new RuntimeException(e.getCause());
@@ -284,7 +282,8 @@ public class QueryExecution implements IQueryExecution {
                   new TEndPoint(
                       context.getResultNodeContext().getUpStreamEndpoint().getIp(),
                       IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort()),
-                  context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift());
+                  context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift(),
+                  stateMachine::transitionToFailed);
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/ExchangeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/ExchangeOperator.java
index 397b7dc461..26d4250acc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/ExchangeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/ExchangeOperator.java
@@ -25,8 +25,6 @@ import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
-import java.io.IOException;
-
 public class ExchangeOperator implements SourceOperator {
 
   private final OperatorContext operatorContext;
@@ -51,12 +49,7 @@ public class ExchangeOperator implements SourceOperator {
 
   @Override
   public TsBlock next() {
-    try {
-      return sourceHandle.receive();
-    } catch (IOException e) {
-      throw new RuntimeException(
-          "Error happened while reading from source handle " + sourceHandle, e);
-    }
+    return sourceHandle.receive();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
index 230ffac797..f191e47c79 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
@@ -356,7 +356,8 @@ public class LocalExecutionPlanner {
               new TEndPoint(
                   source.getIp(),
                   IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort()),
-              remoteInstanceId.toThrift());
+              remoteInstanceId.toThrift(),
+              context.instanceContext::failed);
       return new ExchangeOperator(operatorContext, sourceHandle, node.getUpstreamPlanNodeId());
     }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java
index 126faaeef5..d45eae5db9 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java
@@ -355,12 +355,12 @@ public class SinkHandleTest {
     List<TsBlock> mockTsBlocks = Utils.createMockTsBlocks(numOfMockTsBlock, mockTsBlockSize);
     // Construct a mock client.
     Client mockClient = Mockito.mock(Client.class);
-    TException exception = new TException("Mock exception");
+    TException mockException = new TException("Mock exception");
     try {
-      Mockito.doThrow(exception)
+      Mockito.doThrow(mockException)
           .when(mockClient)
           .onEndOfDataBlockEvent(Mockito.any(TEndOfDataBlockEvent.class));
-      Mockito.doThrow(exception)
+      Mockito.doThrow(mockException)
           .when(mockClient)
           .onNewDataBlockEvent(Mockito.any(TNewDataBlockEvent.class));
     } catch (TException e) {
@@ -413,7 +413,7 @@ public class SinkHandleTest {
       Assert.fail();
     }
 
-    Mockito.verify(mockSinkHandleListener, Mockito.times(1)).onFailure(exception);
+    Mockito.verify(mockSinkHandleListener, Mockito.times(1)).onFailure(sinkHandle, mockException);
 
     // Close the SinkHandle.
     try {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java
index dc84cff501..7f53a2f4ab 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java
@@ -36,7 +36,6 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
@@ -137,12 +136,9 @@ public class SourceHandleTest {
 
     // The local fragment instance consumes the data blocks.
     for (int i = 0; i < numOfMockTsBlock; i++) {
-      try {
-        sourceHandle.receive();
-      } catch (IOException e) {
-        e.printStackTrace();
-        Assert.fail();
-      }
+
+      sourceHandle.receive();
+
       if (i < numOfMockTsBlock - 1) {
         Assert.assertTrue(sourceHandle.isBlocked().isDone());
       } else {
@@ -260,12 +256,7 @@ public class SourceHandleTest {
     // The local fragment instance consumes the data blocks.
     for (int i = 0; i < numOfMockTsBlock; i++) {
       Mockito.verify(spyMemoryPool, Mockito.times(i)).free(queryId, mockTsBlockSize);
-      try {
-        sourceHandle.receive();
-      } catch (IOException e) {
-        e.printStackTrace();
-        Assert.fail();
-      }
+      sourceHandle.receive();
       try {
         Thread.sleep(100L);
         if (i < 5) {
@@ -425,12 +416,7 @@ public class SourceHandleTest {
 
     // The local fragment instance consumes the data blocks.
     for (int i = 0; i < 2 * numOfMockTsBlock; i++) {
-      try {
-        sourceHandle.receive();
-      } catch (IOException e) {
-        e.printStackTrace();
-        Assert.fail();
-      }
+      sourceHandle.receive();
       if (i < 2 * numOfMockTsBlock - 1) {
         Assert.assertTrue(sourceHandle.isBlocked().isDone());
       } else {
@@ -477,12 +463,7 @@ public class SourceHandleTest {
 
     // The local fragment instance consumes the data blocks.
     for (int i = 0; i < numOfMockTsBlock; i++) {
-      try {
-        sourceHandle.receive();
-      } catch (IOException e) {
-        e.printStackTrace();
-        Assert.fail();
-      }
+      sourceHandle.receive();
       if (i < numOfMockTsBlock - 1) {
         Assert.assertTrue(sourceHandle.isBlocked().isDone());
       } else {
@@ -530,8 +511,9 @@ public class SourceHandleTest {
     TsBlockSerde mockTsBlockSerde = Utils.createMockTsBlockSerde(mockTsBlockSize);
     // Construct a mock client.
     Client mockClient = Mockito.mock(Client.class);
+    TException mockException = new TException("Mock exception");
     try {
-      Mockito.doThrow(new TException("Mock exception"))
+      Mockito.doThrow(mockException)
           .when(mockClient)
           .getDataBlock(Mockito.any(TGetDataBlockRequest.class));
     } catch (TException e) {
@@ -550,7 +532,8 @@ public class SourceHandleTest {
             mockClient,
             mockTsBlockSerde,
             mockSourceHandleListener);
-    Assert.assertFalse(sourceHandle.isBlocked().isDone());
+    Future<?> blocked = sourceHandle.isBlocked();
+    Assert.assertFalse(blocked.isDone());
     Assert.assertFalse(sourceHandle.isClosed());
     Assert.assertFalse(sourceHandle.isFinished());
     Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
@@ -564,36 +547,20 @@ public class SourceHandleTest {
             .collect(Collectors.toList()));
     try {
       Thread.sleep(100L);
-    } catch (InterruptedException e) {
+      Mockito.verify(mockClient, Mockito.times(SourceHandle.MAX_ATTEMPT_TIMES))
+          .getDataBlock(Mockito.any());
+    } catch (InterruptedException | TException e) {
       e.printStackTrace();
       Assert.fail();
     }
-    try {
-      Assert.assertFalse(sourceHandle.isBlocked().isDone());
-      Assert.fail("Expect an IOException.");
-    } catch (RuntimeException e) {
-      Assert.assertEquals("org.apache.thrift.TException: Mock exception", e.getMessage());
-    }
-    Assert.assertFalse(sourceHandle.isClosed());
-    Assert.assertFalse(sourceHandle.isFinished());
-    Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
 
-    // The local fragment instance consumes the data blocks.
-    try {
-      sourceHandle.receive();
-      Assert.fail("Expect an IOException.");
-    } catch (IOException e) {
-      Assert.assertEquals("org.apache.thrift.TException: Mock exception", e.getMessage());
-    }
-
-    // Receive EndOfDataBlock event from upstream fragment instance.
-    sourceHandle.setNoMoreTsBlocks(numOfMockTsBlock - 1);
-    Assert.assertFalse(sourceHandle.isClosed());
-    Assert.assertFalse(sourceHandle.isFinished());
-    Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
+    Mockito.verify(mockSourceHandleListener, Mockito.times(1))
+        .onFailure(sourceHandle, mockException);
+    Assert.assertFalse(blocked.isDone());
 
     sourceHandle.close();
     Assert.assertFalse(sourceHandle.isFinished());
+    Assert.assertTrue(blocked.isDone());
     Assert.assertEquals(0L, sourceHandle.getBufferRetainedSizeInBytes());
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
similarity index 92%
rename from server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
rename to server/src/test/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
index 49a13eddd7..ebd863d738 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
@@ -19,11 +19,11 @@
 package org.apache.iotdb.db.mpp.buffer;
 
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -44,12 +44,12 @@ public class StubSinkHandle implements ISinkHandle {
   }
 
   @Override
-  public long getBufferRetainedSizeInBytes() {
-    return 0;
+  public TFragmentInstanceId getLocalFragmentInstanceId() {
+    return null;
   }
 
   @Override
-  public int getNumOfBufferedTsBlocks() {
+  public long getBufferRetainedSizeInBytes() {
     return 0;
   }
 
@@ -64,7 +64,7 @@ public class StubSinkHandle implements ISinkHandle {
   }
 
   @Override
-  public void send(int partition, List<TsBlock> tsBlocks) throws IOException {
+  public void send(int partition, List<TsBlock> tsBlocks) {
     this.tsBlocks.addAll(tsBlocks);
   }