You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/17 05:16:27 UTC

[iotdb] branch master updated: Make standalone mode mpp basically runnable (#5566)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 2691ade495 Make standalone mode mpp basically runnable (#5566)
2691ade495 is described below

commit 2691ade495513b54f00810c6cdd97c08326293e5
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Sun Apr 17 13:16:20 2022 +0800

    Make standalone mode mpp basically runnable (#5566)
---
 .../iotdb/commons/concurrent/ThreadName.java       | 11 +--
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 19 -----
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  6 --
 .../iotdb/db/mpp/buffer/DataBlockManager.java      | 36 ++++----
 .../iotdb/db/mpp/buffer/DataBlockService.java      |  4 +-
 .../mpp/buffer/DataBlockServiceClientFactory.java  | 95 ++++++++++++++++++----
 .../iotdb/db/mpp/buffer/IDataBlockManager.java     | 31 +++----
 .../org/apache/iotdb/db/mpp/buffer/SinkHandle.java |  7 +-
 .../apache/iotdb/db/mpp/buffer/SourceHandle.java   |  6 +-
 .../apache/iotdb/db/mpp/buffer/StubSinkHandle.java | 11 +++
 .../apache/iotdb/db/mpp/execution/Coordinator.java |  4 +-
 .../apache/iotdb/db/mpp/execution/DataDriver.java  |  1 -
 .../iotdb/db/mpp/execution/DriverContext.java      |  4 +
 .../db/mpp/execution/FragmentInstanceContext.java  |  7 ++
 .../db/mpp/execution/FragmentInstanceManager.java  |  2 +-
 .../iotdb/db/mpp/execution/QueryExecution.java     | 24 +++---
 .../iotdb/db/mpp/execution/QueryStateMachine.java  |  6 +-
 .../iotdb/db/mpp/execution/SchemaDriver.java       | 21 ++++-
 .../scheduler/AbstractFragInsStateTracker.java     | 10 ++-
 .../scheduler/FixedRateFragInsStateTracker.java    |  8 ++
 .../scheduler/InternalServiceClientFactory.java    | 89 +++++++++++++++++---
 .../scheduler/SimpleFragInstanceDispatcher.java    | 10 ++-
 .../execution/scheduler/SimpleQueryTerminator.java |  9 +-
 .../db/mpp/operator/process/TimeJoinOperator.java  |  5 +-
 .../db/mpp/schedule/FragmentInstanceScheduler.java | 17 +---
 .../db/mpp/sql/analyze/ClusterSchemaFetcher.java   |  2 +-
 .../db/mpp/sql/planner/LocalExecutionPlanner.java  | 46 +++++------
 .../iotdb/db/service/DataNodeManagementServer.java |  4 +-
 .../apache/iotdb/db/service/InternalService.java   |  4 +-
 .../iotdb/db/service/InternalServiceImpl.java      |  6 +-
 .../thrift/impl/DataNodeTSIServiceImpl.java        |  2 +-
 .../iotdb/db/mpp/execution/DataDriverTest.java     |  4 +-
 .../db/mpp/schedule/DefaultTaskSchedulerTest.java  |  8 --
 33 files changed, 330 insertions(+), 189 deletions(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index e587d38730..33b99446aa 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -74,12 +74,13 @@ public enum ThreadName {
   CLUSTER_MONITOR("ClusterMonitor"),
   CONFIG_NODE_RPC_SERVER("ConfigNodeRpcServer"),
   CONFIG_NODE_RPC_CLIENT("ConfigNodeRPC-Client"),
-  DATA_NODE_MANAGEMENT_SERVER("DataNodeManagementServer"),
-  DATA_NODE_MANAGEMENT_CLIENT("DataNodeManagementClient"),
+  DATA_NODE_MANAGEMENT_RPC_SERVER("DataNodeManagementRPC"),
+  DATA_NODE_MANAGEMENT_RPC_CLIENT("DataNodeManagementRPC-Client"),
   Cluster_Monitor("ClusterMonitor"),
-  DATA_BLOCK_MANAGER_SERVICE("DataBlockManagerService"),
-  DATA_BLOCK_MANAGER_CLIENT("DataBlockManagerService-Client"),
-  INTERNAL_SERVICE_CLIENT("InternalService-Client"),
+  DATA_BLOCK_MANAGER_RPC_SERVER("DataBlockManagerRPC"),
+  DATA_BLOCK_MANAGER_RPC_CLIENT("DataBlockManagerRPC-Client"),
+  INTERNAL_SERVICE_RPC_SERVER("InternalServiceRPC"),
+  INTERNAL_SERVICE_RPC_CLIENT("InternalServiceRPC-Client"),
   ;
 
   private final String name;
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 856381c0d9..9540120af5 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -303,17 +303,6 @@ public class IoTDBConfig {
   /** How many threads can concurrently build index. When <= 0, use CPU core number. */
   private int concurrentIndexBuildThread = Runtime.getRuntime().availableProcessors();
 
-  /**
-   * If we enable the memory-control mechanism during index building , {@code indexBufferSize}
-   * refers to the byte-size of memory buffer threshold. For each index processor, all indexes in
-   * one {@linkplain org.apache.iotdb.db.index.IndexFileProcessor IndexFileProcessor} share a total
-   * common buffer size. With the memory-control mechanism, the occupied memory of all raw data and
-   * index structures will be counted. If the memory buffer size reaches this threshold, the indexes
-   * will be flushed to the disk file. As a result, data in one series may be divided into more than
-   * one part and indexed separately. Unit: byte
-   */
-  private long indexBufferSize = 128 * 1024 * 1024L;
-
   /**
    * the index framework adopts sliding window model to preprocess the original tv list in the
    * subsequence matching task.
@@ -2330,14 +2319,6 @@ public class IoTDBConfig {
     return concurrentIndexBuildThread;
   }
 
-  public long getIndexBufferSize() {
-    return indexBufferSize;
-  }
-
-  public void setIndexBufferSize(long indexBufferSize) {
-    this.indexBufferSize = indexBufferSize;
-  }
-
   public String getIndexRootFolder() {
     return indexRootFolder;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 189d65a85d..c11b14ccab 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -465,12 +465,6 @@ public class IoTDBDescriptor {
                   "default_index_window_range",
                   Integer.toString(conf.getDefaultIndexWindowRange()))));
 
-      conf.setIndexBufferSize(
-          Long.parseLong(
-              properties.getProperty(
-                  "index_buffer_size", Long.toString(conf.getIndexBufferSize()))));
-      // end: index parameter setting
-
       conf.setConcurrentQueryThread(
           Integer.parseInt(
               properties.getProperty(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
index f90fce1f3e..a5278bdfd4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.mpp.buffer;
 
+import org.apache.iotdb.commons.cluster.Endpoint;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
 import org.apache.iotdb.mpp.rpc.thrift.AcknowledgeDataBlockEvent;
 import org.apache.iotdb.mpp.rpc.thrift.DataBlockService;
@@ -191,6 +193,12 @@ public class DataBlockManager implements IDataBlockManager {
   /** Listen to the state changes of a sink handle. */
   class SinkHandleListenerImpl implements SinkHandleListener {
 
+    private final FragmentInstanceContext context;
+
+    public SinkHandleListenerImpl(FragmentInstanceContext context) {
+      this.context = context;
+    }
+
     @Override
     public void onFinish(SinkHandle sinkHandle) {
       logger.info("Release resources of finished sink handle {}", sourceHandles);
@@ -198,10 +206,13 @@ public class DataBlockManager implements IDataBlockManager {
         logger.info("Resources of finished sink handle {} has already been released", sinkHandle);
       }
       sinkHandles.remove(sinkHandle.getLocalFragmentInstanceId());
+      context.finish();
     }
 
     @Override
-    public void onClosed(SinkHandle sinkHandle) {}
+    public void onClosed(SinkHandle sinkHandle) {
+      context.flushing();
+    }
 
     @Override
     public void onAborted(SinkHandle sinkHandle) {
@@ -245,11 +256,10 @@ public class DataBlockManager implements IDataBlockManager {
   @Override
   public ISinkHandle createSinkHandle(
       TFragmentInstanceId localFragmentInstanceId,
-      String remoteHostname,
-      int remotePort,
+      Endpoint endpoint,
       TFragmentInstanceId remoteFragmentInstanceId,
-      String remotePlanNodeId)
-      throws IOException {
+      String remotePlanNodeId,
+      FragmentInstanceContext instanceContext) {
     if (sinkHandles.containsKey(localFragmentInstanceId)) {
       throw new IllegalStateException("Sink handle for " + localFragmentInstanceId + " exists.");
     }
@@ -262,15 +272,15 @@ public class DataBlockManager implements IDataBlockManager {
 
     SinkHandle sinkHandle =
         new SinkHandle(
-            remoteHostname,
+            endpoint.toString(),
             remoteFragmentInstanceId,
             remotePlanNodeId,
             localFragmentInstanceId,
             localMemoryManager,
             executorService,
-            clientFactory.getDataBlockServiceClient(remoteHostname, remotePort),
+            clientFactory.getDataBlockServiceClient(endpoint),
             tsBlockSerdeFactory.get(),
-            new SinkHandleListenerImpl());
+            new SinkHandleListenerImpl(instanceContext));
     sinkHandles.put(localFragmentInstanceId, sinkHandle);
     return sinkHandle;
   }
@@ -279,10 +289,8 @@ public class DataBlockManager implements IDataBlockManager {
   public ISourceHandle createSourceHandle(
       TFragmentInstanceId localFragmentInstanceId,
       String localPlanNodeId,
-      String remoteHostname,
-      int remotePort,
-      TFragmentInstanceId remoteFragmentInstanceId)
-      throws IOException {
+      Endpoint endpoint,
+      TFragmentInstanceId remoteFragmentInstanceId) {
     if (sourceHandles.containsKey(localFragmentInstanceId)
         && sourceHandles.get(localFragmentInstanceId).containsKey(localPlanNodeId)) {
       throw new IllegalStateException(
@@ -301,13 +309,13 @@ public class DataBlockManager implements IDataBlockManager {
 
     SourceHandle sourceHandle =
         new SourceHandle(
-            remoteHostname,
+            endpoint.getIp(),
             remoteFragmentInstanceId,
             localFragmentInstanceId,
             localPlanNodeId,
             localMemoryManager,
             executorService,
-            clientFactory.getDataBlockServiceClient(remoteHostname, remotePort),
+            clientFactory.getDataBlockServiceClient(endpoint),
             tsBlockSerdeFactory.get(),
             new SourceHandleListenerImpl());
     sourceHandles
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockService.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockService.java
index fa9b30cd8a..cabb725b95 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockService.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockService.java
@@ -84,7 +84,7 @@ public class DataBlockService extends ThriftService implements DataBlockServiceM
           new ThriftServiceThread(
               processor,
               getID().getName(),
-              ThreadName.DATA_BLOCK_MANAGER_CLIENT.getName(),
+              ThreadName.DATA_BLOCK_MANAGER_RPC_CLIENT.getName(),
               getBindIP(),
               getBindPort(),
               config.getRpcMaxConcurrentClientNum(),
@@ -95,7 +95,7 @@ public class DataBlockService extends ThriftService implements DataBlockServiceM
     } catch (RPCServiceException e) {
       throw new IllegalAccessException(e.getMessage());
     }
-    thriftServiceThread.setName(ThreadName.DATA_BLOCK_MANAGER_SERVICE.getName());
+    thriftServiceThread.setName(ThreadName.DATA_BLOCK_MANAGER_RPC_SERVER.getName());
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockServiceClientFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockServiceClientFactory.java
index f289662d80..397755cd4e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockServiceClientFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockServiceClientFactory.java
@@ -19,9 +19,9 @@
 
 package org.apache.iotdb.db.mpp.buffer;
 
+import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.mpp.rpc.thrift.DataBlockService;
-import org.apache.iotdb.mpp.rpc.thrift.DataBlockService.Client;
 import org.apache.iotdb.rpc.RpcTransportFactory;
 
 import org.apache.thrift.TException;
@@ -29,22 +29,89 @@ import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TCompactProtocol;
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 public class DataBlockServiceClientFactory {
-  public DataBlockService.Client getDataBlockServiceClient(String hostname, int port)
-      throws IOException {
-    try {
-      TTransport transport = RpcTransportFactory.INSTANCE.getTransportWithNoTimeout(hostname, port);
-      transport.open();
-      TProtocol protocol =
-          IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable()
-              ? new TCompactProtocol(transport)
-              : new TBinaryProtocol(transport);
-      return new Client(protocol);
-    } catch (TException e) {
-      throw new IOException(e);
+
+  private static final int TIMEOUT_MS = 30_000;
+
+  private static final Logger logger = LoggerFactory.getLogger(DataBlockServiceClientFactory.class);
+
+  // TODO need to be replaced by mature client pool in the future
+  private static final Map<Endpoint, DataBlockService.Iface> dataBlockServiceClientMap =
+      new ConcurrentHashMap<>();
+
+  public DataBlockService.Iface getDataBlockServiceClient(Endpoint endpoint) {
+
+    return dataBlockServiceClientMap.computeIfAbsent(
+        endpoint,
+        address -> {
+          TTransport transport;
+          try {
+            transport =
+                RpcTransportFactory.INSTANCE.getTransport(
+                    // as there is a try-catch already, we do not need to use TSocket.wrap
+                    address.getIp(), address.getPort(), TIMEOUT_MS);
+            transport.open();
+            TProtocol protocol =
+                IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable()
+                    ? new TCompactProtocol(transport)
+                    : new TBinaryProtocol(transport);
+            return newSynchronizedClient(new DataBlockService.Client(protocol));
+          } catch (TTransportException e) {
+            logger.error(
+                "error happened while creating mpp service client for {}:{}",
+                endpoint.getIp(),
+                endpoint.getPort(),
+                e);
+            throw new RuntimeException(e);
+          }
+        });
+  }
+
+  public static DataBlockService.Iface newSynchronizedClient(DataBlockService.Iface client) {
+    return (DataBlockService.Iface)
+        Proxy.newProxyInstance(
+            DataBlockServiceClientFactory.class.getClassLoader(),
+            new Class[] {DataBlockService.Iface.class},
+            new SynchronizedHandler(client));
+  }
+
+  private static class SynchronizedHandler implements InvocationHandler {
+
+    private final DataBlockService.Iface client;
+
+    public SynchronizedHandler(DataBlockService.Iface client) {
+      this.client = client;
+    }
+
+    @Override
+    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+      try {
+        synchronized (client) {
+          return method.invoke(client, args);
+        }
+      } catch (InvocationTargetException e) {
+        // all IFace APIs throw TException
+        if (e.getTargetException() instanceof TException) {
+          throw e.getTargetException();
+        } else {
+          // should not happen
+          throw new TException(
+              "Error in calling method " + method.getName(), e.getTargetException());
+        }
+      } catch (Exception e) {
+        throw new TException("Error in calling method " + method.getName(), e);
+      }
     }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManager.java
index e69a4a6b9c..0eff0c6620 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManager.java
@@ -19,12 +19,10 @@
 
 package org.apache.iotdb.db.mpp.buffer;
 
+import org.apache.iotdb.commons.cluster.Endpoint;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 
-import org.apache.thrift.transport.TTransportException;
-
-import java.io.IOException;
-
 public interface IDataBlockManager {
   /**
    * Create a sink handle who sends data blocks to a remote downstream fragment instance in async
@@ -32,19 +30,16 @@ public interface IDataBlockManager {
    *
    * @param localFragmentInstanceId ID of the local fragment instance who generates and sends data
    *     blocks to the sink handle.
-   * @param remoteHostname Hostname of the remote fragment instance where the data blocks should be
-   *     sent to.
-   * @param remotePort Port of the remote fragment instance where the data blocks should be sent to.
-   * @param remoteFragmentInstanceId ID of the remote fragment instance.
+   * @param endpoint Hostname and Port of the remote fragment instance where the data blocks should
+   *     be sent to.
    * @param remotePlanNodeId The sink plan node ID of the remote fragment instance.
    */
   ISinkHandle createSinkHandle(
       TFragmentInstanceId localFragmentInstanceId,
-      String remoteHostname,
-      int remotePort,
+      Endpoint endpoint,
       TFragmentInstanceId remoteFragmentInstanceId,
-      String remotePlanNodeId)
-      throws TTransportException, IOException;
+      String remotePlanNodeId,
+      FragmentInstanceContext instanceContext);
 
   /**
    * Create a source handle who fetches data blocks from a remote upstream fragment instance for a
@@ -53,19 +48,15 @@ public interface IDataBlockManager {
    * @param localFragmentInstanceId ID of the local fragment instance who receives data blocks from
    *     the source handle.
    * @param localPlanNodeId The local sink plan node ID.
-   * @param remoteHostname Hostname of the remote fragment instance where the data blocks should be
-   *     received from.
-   * @param remotePort Port of the remote fragment instance where the data blocks should be received
-   *     from.
+   * @param endpoint Hostname and Port of the remote fragment instance where the data blocks should
+   *     be received from.
    * @param remoteFragmentInstanceId ID of the remote fragment instance.
    */
   ISourceHandle createSourceHandle(
       TFragmentInstanceId localFragmentInstanceId,
       String localPlanNodeId,
-      String remoteHostname,
-      int remotePort,
-      TFragmentInstanceId remoteFragmentInstanceId)
-      throws IOException;
+      Endpoint endpoint,
+      TFragmentInstanceId remoteFragmentInstanceId);
 
   /**
    * Release all the related resources of a fragment instance, including data blocks that are not
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
index 2934a6bf7a..0beab26e36 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
@@ -59,7 +59,7 @@ public class SinkHandle implements ISinkHandle {
   private final TFragmentInstanceId localFragmentInstanceId;
   private final LocalMemoryManager localMemoryManager;
   private final ExecutorService executorService;
-  private final DataBlockService.Client client;
+  private final DataBlockService.Iface client;
   private final TsBlockSerde serde;
   private final SinkHandleListener sinkHandleListener;
 
@@ -82,7 +82,7 @@ public class SinkHandle implements ISinkHandle {
       TFragmentInstanceId localFragmentInstanceId,
       LocalMemoryManager localMemoryManager,
       ExecutorService executorService,
-      DataBlockService.Client client,
+      DataBlockService.Iface client,
       TsBlockSerde serde,
       SinkHandleListener sinkHandleListener) {
     this.remoteHostname = Validate.notNull(remoteHostname);
@@ -359,7 +359,8 @@ public class SinkHandle implements ISinkHandle {
               remotePlanNodeId,
               remoteFragmentInstanceId,
               e.getMessage(),
-              attempt);
+              attempt,
+              e);
           if (attempt == MAX_ATTEMPT_TIMES) {
             synchronized (this) {
               throwable = e;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
index b577174702..802634efa7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
@@ -59,7 +59,7 @@ public class SourceHandle implements ISourceHandle {
   private final String localPlanNodeId;
   private final LocalMemoryManager localMemoryManager;
   private final ExecutorService executorService;
-  private final DataBlockService.Client client;
+  private final DataBlockService.Iface client;
   private final TsBlockSerde serde;
   private final SourceHandleListener sourceHandleListener;
 
@@ -83,7 +83,7 @@ public class SourceHandle implements ISourceHandle {
       String localPlanNodeId,
       LocalMemoryManager localMemoryManager,
       ExecutorService executorService,
-      DataBlockService.Client client,
+      DataBlockService.Iface client,
       TsBlockSerde serde,
       SourceHandleListener sourceHandleListener) {
     this.remoteHostname = Validate.notNull(remoteHostname);
@@ -242,7 +242,7 @@ public class SourceHandle implements ISourceHandle {
     return throwable == null
         && noMoreTsBlocks
         && numActiveGetDataBlocksTask == 0
-        && nextSequenceId - 1 == lastSequenceId
+        && currSequenceId - 1 == lastSequenceId
         && sequenceIdToTsBlock.isEmpty();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
index 7753bc3ff1..9f3b9240c2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.buffer;
 
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
@@ -34,8 +35,14 @@ public class StubSinkHandle implements ISinkHandle {
 
   private final List<TsBlock> tsBlocks = new ArrayList<>();
 
+  private final FragmentInstanceContext instanceContext;
+
   private boolean closed = false;
 
+  public StubSinkHandle(FragmentInstanceContext instanceContext) {
+    this.instanceContext = instanceContext;
+  }
+
   @Override
   public long getBufferRetainedSizeInBytes() {
     return 0;
@@ -76,7 +83,11 @@ public class StubSinkHandle implements ISinkHandle {
 
   @Override
   public void close() {
+    if (closed) {
+      return;
+    }
     closed = true;
+    instanceContext.flushing();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
index d92368cee4..51f3a6d6f3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
@@ -44,9 +44,9 @@ import java.util.concurrent.ScheduledExecutorService;
  */
 public class Coordinator {
   private static final String COORDINATOR_EXECUTOR_NAME = "MPPCoordinator";
-  private static final int COORDINATOR_EXECUTOR_SIZE = 10;
+  private static final int COORDINATOR_EXECUTOR_SIZE = 2;
   private static final String COORDINATOR_SCHEDULED_EXECUTOR_NAME = "MPPCoordinatorScheduled";
-  private static final int COORDINATOR_SCHEDULED_EXECUTOR_SIZE = 10;
+  private static final int COORDINATOR_SCHEDULED_EXECUTOR_SIZE = 2;
 
   private static final Endpoint LOCAL_HOST =
       new Endpoint(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
index 88c160dd28..785d8d8db3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
@@ -91,7 +91,6 @@ public class DataDriver implements Driver {
           closed || (driverBlockedFuture.get().isDone() && root != null && root.isFinished());
       if (isFinished) {
         close();
-        driverContext.finish();
       }
       return isFinished;
     } catch (Throwable t) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
index 2168c24f4e..8c20a2c334 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
@@ -43,4 +43,8 @@ public class DriverContext {
   public void finish() {
     fragmentInstanceContext.finish();
   }
+
+  public void flushing() {
+    fragmentInstanceContext.flushing();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
index 0884eb6f9c..c8f3e3ebd8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
@@ -114,6 +114,13 @@ public class FragmentInstanceContext extends QueryContext {
     this.endTime = System.currentTimeMillis();
   }
 
+  public void flushing() {
+    if (state.get().isDone()) {
+      return;
+    }
+    state.set(FragmentInstanceState.FLUSHING);
+  }
+
   public long getEndTime() {
     return endTime;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
index 1af0abfece..537f330f04 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
@@ -60,7 +60,7 @@ public class FragmentInstanceManager {
     this.instanceContext = new ConcurrentHashMap<>();
     this.instanceExecution = new ConcurrentHashMap<>();
     this.instanceManagementExecutor =
-        IoTDBThreadPoolFactory.newScheduledThreadPool(5, "instance-management");
+        IoTDBThreadPoolFactory.newScheduledThreadPool(1, "instance-management");
 
     this.infoCacheTime = new Duration(15, TimeUnit.MINUTES);
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index e7df772ad9..e7e4995995 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.execution;
 
+import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.buffer.DataBlockService;
 import org.apache.iotdb.db.mpp.buffer.ISourceHandle;
@@ -246,19 +247,16 @@ public class QueryExecution implements IQueryExecution {
 
   private void initResultHandle() {
     if (this.resultHandle == null) {
-      try {
-        this.resultHandle =
-            DataBlockService.getInstance()
-                .getDataBlockManager()
-                .createSourceHandle(
-                    context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift(),
-                    context.getResultNodeContext().getVirtualResultNodeId().getId(),
-                    context.getResultNodeContext().getUpStreamEndpoint().getIp(),
-                    IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort(),
-                    context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift());
-      } catch (IOException e) {
-        stateMachine.transitionToFailed();
-      }
+      this.resultHandle =
+          DataBlockService.getInstance()
+              .getDataBlockManager()
+              .createSourceHandle(
+                  context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift(),
+                  context.getResultNodeContext().getVirtualResultNodeId().getId(),
+                  new Endpoint(
+                      context.getResultNodeContext().getUpStreamEndpoint().getIp(),
+                      IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort()),
+                  context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift());
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
index 7bcded27dc..c5bc48ef42 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
@@ -33,9 +33,9 @@ import java.util.concurrent.ExecutorService;
  * register listeners when the state changes of the QueryExecution.
  */
 public class QueryStateMachine {
-  private String name;
-  private StateMachine<QueryState> queryState;
-  private Map<FragmentInstanceId, FragmentInstanceState> fragInstanceStateMap;
+  private final String name;
+  private final StateMachine<QueryState> queryState;
+  private final Map<FragmentInstanceId, FragmentInstanceState> fragInstanceStateMap;
 
   // The executor will be used in all the state machines belonged to this query.
   private Executor stateMachineExecutor;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java
index ea49d012e3..47edf98973 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java
@@ -50,6 +50,8 @@ public class SchemaDriver implements Driver {
 
   private final AtomicReference<SettableFuture<Void>> driverBlockedFuture = new AtomicReference<>();
 
+  private boolean closed = false;
+
   public SchemaDriver(Operator root, ISinkHandle sinkHandle, SchemaDriverContext driverContext) {
     this.root = root;
     this.sinkHandle = sinkHandle;
@@ -66,7 +68,6 @@ public class SchemaDriver implements Driver {
       boolean isFinished = driverBlockedFuture.get().isDone() && root != null && root.isFinished();
       if (isFinished) {
         close();
-        driverContext.finish();
       }
       return isFinished;
     } catch (Throwable t) {
@@ -146,5 +147,21 @@ public class SchemaDriver implements Driver {
   }
 
   @Override
-  public void close() {}
+  public void close() {
+    if (closed) {
+      return;
+    }
+    closed = true;
+    try {
+      if (root != null) {
+        root.close();
+      }
+      if (sinkHandle != null) {
+        sinkHandle.close();
+      }
+    } catch (Throwable t) {
+      logger.error("Failed to closed driver {}", driverContext.getId(), t);
+      driverContext.failed(t);
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java
index b87579561c..0e59d80855 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.mpp.execution.scheduler;
 
+import org.apache.iotdb.commons.cluster.Endpoint;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
 import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
 import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
@@ -56,9 +58,11 @@ public abstract class AbstractFragInsStateTracker implements IFragInstanceStateT
   public abstract void abort();
 
   protected FragmentInstanceState fetchState(FragmentInstance instance) throws TException {
-    InternalService.Client client =
-        InternalServiceClientFactory.getInternalServiceClient(
-            instance.getHostEndpoint().getIp(), instance.getHostEndpoint().getPort());
+    InternalService.Iface client =
+        InternalServiceClientFactory.getMppServiceClient(
+            new Endpoint(
+                instance.getHostEndpoint().getIp(),
+                IoTDBDescriptor.getInstance().getConfig().getMppPort()));
     TFragmentInstanceStateResp resp =
         client.fetchFragmentInstanceState(new TFetchFragmentInstanceStateReq(getTId(instance)));
     return FragmentInstanceState.valueOf(resp.state);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateTracker.java
index 0fb90f160a..16cb7fcdda 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateTracker.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateTracker.java
@@ -24,11 +24,16 @@ import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
 import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
 
 import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.concurrent.*;
 
 public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker {
+
+  private static final Logger logger = LoggerFactory.getLogger(FixedRateFragInsStateTracker.class);
+
   // TODO: (xingtanzjr) consider how much Interval is OK for state tracker
   private static final long STATE_FETCH_INTERVAL_IN_MS = 500;
   private ScheduledFuture<?> trackTask;
@@ -59,11 +64,14 @@ public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker {
     for (FragmentInstance instance : instances) {
       try {
         FragmentInstanceState state = fetchState(instance);
+        logger.info("Instance {}'s State is {}", instance.getId(), state);
+
         if (state != null) {
           stateMachine.updateFragInstanceState(instance.getId(), state);
         }
       } catch (TException e) {
         // TODO: do nothing ?
+        logger.error("error happened while fetching query state", e);
       }
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/InternalServiceClientFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/InternalServiceClientFactory.java
index 4a5a94b4bf..ce60581900 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/InternalServiceClientFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/InternalServiceClientFactory.java
@@ -19,26 +19,95 @@
 
 package org.apache.iotdb.db.mpp.execution.scheduler;
 
+import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.mpp.rpc.thrift.InternalService;
 import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.SynchronizedHandler;
 
+import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 public class InternalServiceClientFactory {
-  private static final int TIMEOUT_MS = 10000;
+  private static final int TIMEOUT_MS = 10_000;
+
+  private static final Logger logger = LoggerFactory.getLogger(InternalServiceClientFactory.class);
+
+  // TODO need to be replaced by mature client pool in the future
+  private static final Map<Endpoint, InternalService.Iface> mppServiceClientMap =
+      new ConcurrentHashMap<>();
 
-  // TODO: (xingtanzjr) consider the best practice to maintain the clients
-  public static InternalService.Client getInternalServiceClient(String endpoint, int port)
+  public static InternalService.Iface getMppServiceClient(Endpoint endpoint)
       throws TTransportException {
-    TTransport transport =
-        RpcTransportFactory.INSTANCE.getTransport(
-            // as there is a try-catch already, we do not need to use TSocket.wrap
-            endpoint, port, TIMEOUT_MS);
-    transport.open();
-    TProtocol protocol = new TBinaryProtocol(transport);
-    return new InternalService.Client(protocol);
+    return mppServiceClientMap.computeIfAbsent(
+        endpoint,
+        address -> {
+          TTransport transport;
+          try {
+            transport =
+                RpcTransportFactory.INSTANCE.getTransport(
+                    // as there is a try-catch already, we do not need to use TSocket.wrap
+                    address.getIp(), address.getPort(), TIMEOUT_MS);
+            transport.open();
+
+          } catch (TTransportException e) {
+            logger.error(
+                "error happened while creating mpp service client for {}:{}",
+                endpoint.getIp(),
+                endpoint.getPort(),
+                e);
+            throw new RuntimeException(e);
+          }
+          TProtocol protocol = new TBinaryProtocol(transport);
+          return newSynchronizedClient(new InternalService.Client(protocol));
+        });
+  }
+
+  public static InternalService.Iface newSynchronizedClient(InternalService.Iface client) {
+    return (InternalService.Iface)
+        Proxy.newProxyInstance(
+            InternalServiceClientFactory.class.getClassLoader(),
+            new Class[] {InternalService.Iface.class},
+            new SynchronizedHandler(client));
+  }
+
+  private static class SynchronizedHandler implements InvocationHandler {
+
+    private final InternalService.Iface client;
+
+    public SynchronizedHandler(InternalService.Iface client) {
+      this.client = client;
+    }
+
+    @Override
+    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+      try {
+        synchronized (client) {
+          return method.invoke(client, args);
+        }
+      } catch (InvocationTargetException e) {
+        // all IFace APIs throw TException
+        if (e.getTargetException() instanceof TException) {
+          throw e.getTargetException();
+        } else {
+          // should not happen
+          throw new TException(
+              "Error in calling method " + method.getName(), e.getTargetException());
+        }
+      } catch (Exception e) {
+        throw new TException("Error in calling method " + method.getName(), e);
+      }
+    }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
index fa81d65764..bdece41164 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.mpp.execution.scheduler;
 
+import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
 import org.apache.iotdb.mpp.rpc.thrift.InternalService;
@@ -47,10 +48,11 @@ public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher {
           TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp(false);
           try {
             for (FragmentInstance instance : instances) {
-              InternalService.Client client =
-                  InternalServiceClientFactory.getInternalServiceClient(
-                      instance.getHostEndpoint().getIp(),
-                      IoTDBDescriptor.getInstance().getConfig().getMppPort());
+              InternalService.Iface client =
+                  InternalServiceClientFactory.getMppServiceClient(
+                      new Endpoint(
+                          instance.getHostEndpoint().getIp(),
+                          IoTDBDescriptor.getInstance().getConfig().getMppPort()));
               // TODO: (xingtanzjr) consider how to handle the buffer here
               ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
               instance.serializeRequest(buffer);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java
index 60d1e9c5a5..ca5e1f2230 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.mpp.execution.scheduler;
 
 import org.apache.iotdb.commons.cluster.Endpoint;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
 import org.apache.iotdb.mpp.rpc.thrift.InternalService;
@@ -54,9 +55,11 @@ public class SimpleQueryTerminator implements IQueryTerminator {
             () -> {
               try {
                 for (Endpoint endpoint : relatedHost) {
-                  InternalService.Client client =
-                      InternalServiceClientFactory.getInternalServiceClient(
-                          endpoint.getIp(), endpoint.getPort());
+                  InternalService.Iface client =
+                      InternalServiceClientFactory.getMppServiceClient(
+                          new Endpoint(
+                              endpoint.getIp(),
+                              IoTDBDescriptor.getInstance().getConfig().getMppPort()));
                   client.cancelQuery(new TCancelQueryReq(queryId.getId()));
                 }
               } catch (TException e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
index f58af90e77..c5fbaf10b5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
@@ -122,8 +122,9 @@ public class TimeJoinOperator implements ProcessOperator {
     }
 
     if (timeSelector.isEmpty()) {
-      // TODO need to discuss whether to return null or return an empty TSBlock with TsBlockMetadata
-      return null;
+      // return empty TsBlock
+      TsBlockBuilder tsBlockBuilder = new TsBlockBuilder(0, dataTypes);
+      return tsBlockBuilder.build();
     }
 
     TsBlockBuilder tsBlockBuilder = TsBlockBuilder.createWithOnlyTimeColumn();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
index b2200b7690..c7bdb95285 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
@@ -33,11 +33,8 @@ import org.apache.iotdb.db.mpp.schedule.queue.L2PriorityQueue;
 import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
 import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTaskID;
 import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTaskStatus;
-import org.apache.iotdb.mpp.rpc.thrift.InternalService;
-import org.apache.iotdb.mpp.rpc.thrift.TCancelQueryReq;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 
-import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -69,9 +66,8 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
 
   private static final int MAX_CAPACITY = 1000; // TODO: load from config files
   private static final int WORKER_THREAD_NUM = 4; // TODO: load from config files
-  private static final int QUERY_TIMEOUT_MS = 10000; // TODO: load from config files or requests
+  private static final int QUERY_TIMEOUT_MS = 60_000; // TODO: load from config files or requests
   private final ThreadGroup workerGroups;
-  private InternalService.Client mppServiceClient; // TODO: use from client pool
   private final List<AbstractExecutor> threads;
 
   private FragmentInstanceScheduler() {
@@ -242,11 +238,6 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
     this.blockManager = blockManager;
   }
 
-  @TestOnly
-  void setMppServiceClient(InternalService.Client client) {
-    this.mppServiceClient = client;
-  }
-
   private static class InstanceHolder {
 
     private InstanceHolder() {}
@@ -348,12 +339,6 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
       QueryId queryId = task.getId().getQueryId();
       Set<FragmentInstanceTask> queryRelatedTasks = queryMap.remove(queryId);
       if (queryRelatedTasks != null) {
-        try {
-          mppServiceClient.cancelQuery(new TCancelQueryReq(queryId.getId()));
-        } catch (TException e) {
-          // If coordinator cancel query failed, we should continue clean other tasks.
-          logger.error("cancel query " + queryId.getId() + " failed", e);
-        }
         for (FragmentInstanceTask otherTask : queryRelatedTasks) {
           if (task.equals(otherTask)) {
             continue;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
index 783372eca6..3877706717 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
@@ -75,7 +75,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
         coordinator.execute(schemaFetchStatement, queryId, null, "", partitionFetcher, this);
     // TODO: (xingtanzjr) throw exception
     if (executionResult.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      throw new RuntimeException("cannot fetch schema");
+      throw new RuntimeException("cannot fetch schema, status is: " + executionResult.status);
     }
     TsBlock tsBlock = coordinator.getQueryExecution(queryId).getBatchResult();
     // TODO: (xingtanzjr) need to release this query's resource here
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
index 166a3077a7..b0c277f65e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
@@ -68,7 +68,6 @@ import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -301,18 +300,15 @@ public class LocalExecutionPlanner {
       FragmentInstanceId remoteInstanceId = node.getUpstreamInstanceId();
       Endpoint source = node.getUpstreamEndpoint();
 
-      try {
-        ISourceHandle sourceHandle =
-            DATA_BLOCK_MANAGER.createSourceHandle(
-                localInstanceId.toThrift(),
-                node.getPlanNodeId().getId(),
-                source.getIp(),
-                IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort(),
-                remoteInstanceId.toThrift());
-        return new ExchangeOperator(operatorContext, sourceHandle, node.getUpstreamPlanNodeId());
-      } catch (IOException e) {
-        throw new RuntimeException("Error happened while creating source handle", e);
-      }
+      ISourceHandle sourceHandle =
+          DATA_BLOCK_MANAGER.createSourceHandle(
+              localInstanceId.toThrift(),
+              node.getPlanNodeId().getId(),
+              new Endpoint(
+                  source.getIp(),
+                  IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort()),
+              remoteInstanceId.toThrift());
+      return new ExchangeOperator(operatorContext, sourceHandle, node.getUpstreamPlanNodeId());
     }
 
     @Override
@@ -321,19 +317,17 @@ public class LocalExecutionPlanner {
       Endpoint target = node.getDownStreamEndpoint();
       FragmentInstanceId localInstanceId = context.instanceContext.getId();
       FragmentInstanceId targetInstanceId = node.getDownStreamInstanceId();
-      try {
-        ISinkHandle sinkHandle =
-            DATA_BLOCK_MANAGER.createSinkHandle(
-                localInstanceId.toThrift(),
-                target.getIp(),
-                IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort(),
-                targetInstanceId.toThrift(),
-                node.getDownStreamPlanNodeId().getId());
-        context.setSinkHandle(sinkHandle);
-        return child;
-      } catch (IOException e) {
-        throw new RuntimeException("Error happened while creating sink handle", e);
-      }
+      ISinkHandle sinkHandle =
+          DATA_BLOCK_MANAGER.createSinkHandle(
+              localInstanceId.toThrift(),
+              new Endpoint(
+                  target.getIp(),
+                  IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort()),
+              targetInstanceId.toThrift(),
+              node.getDownStreamPlanNodeId().getId(),
+              context.instanceContext);
+      context.setSinkHandle(sinkHandle);
+      return child;
     }
 
     @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNodeManagementServer.java b/server/src/main/java/org/apache/iotdb/db/service/DataNodeManagementServer.java
index 44513c40d1..6c52ac4066 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNodeManagementServer.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNodeManagementServer.java
@@ -66,7 +66,7 @@ public class DataNodeManagementServer extends ThriftService
           new ThriftServiceThread(
               processor,
               getID().getName(),
-              ThreadName.DATA_NODE_MANAGEMENT_CLIENT.getName(),
+              ThreadName.DATA_NODE_MANAGEMENT_RPC_CLIENT.getName(),
               getBindIP(),
               getBindPort(),
               config.getRpcMaxConcurrentClientNum(),
@@ -76,7 +76,7 @@ public class DataNodeManagementServer extends ThriftService
     } catch (RPCServiceException e) {
       throw new IllegalAccessException(e.getMessage());
     }
-    thriftServiceThread.setName(ThreadName.RPC_SERVICE.getName());
+    thriftServiceThread.setName(ThreadName.DATA_NODE_MANAGEMENT_RPC_SERVER.getName());
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/service/InternalService.java b/server/src/main/java/org/apache/iotdb/db/service/InternalService.java
index 4414fb9110..5236d2bf87 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/InternalService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/InternalService.java
@@ -61,7 +61,7 @@ public class InternalService extends ThriftService implements InternalServiceMBe
           new ThriftServiceThread(
               processor,
               getID().getName(),
-              ThreadName.INTERNAL_SERVICE_CLIENT.getName(),
+              ThreadName.INTERNAL_SERVICE_RPC_CLIENT.getName(),
               getBindIP(),
               getBindPort(),
               config.getRpcMaxConcurrentClientNum(),
@@ -72,7 +72,7 @@ public class InternalService extends ThriftService implements InternalServiceMBe
     } catch (RPCServiceException e) {
       throw new IllegalAccessException(e.getMessage());
     }
-    thriftServiceThread.setName(ThreadName.INTERNAL_SERVICE_CLIENT.getName());
+    thriftServiceThread.setName(ThreadName.INTERNAL_SERVICE_RPC_SERVER.getName());
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
index 34634d74cc..c3c398bbf5 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
@@ -88,7 +88,11 @@ public class InternalServiceImpl implements InternalService.Iface {
 
   @Override
   public TCancelResp cancelQuery(TCancelQueryReq req) throws TException {
-    throw new NotImplementedException();
+
+    // TODO need to be implemented and currently in order not to print NotImplementedException log,
+    // we simply return null
+    return null;
+    //    throw new NotImplementedException();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
index 4b86aca612..2c13c68ad7 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
@@ -233,7 +233,7 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
               SCHEMA_FETCHER);
 
       if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        throw new RuntimeException("");
+        throw new RuntimeException("error coed: " + result.status);
       }
 
       IQueryExecution queryExecution = COORDINATOR.getQueryExecution(id);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
index cf299aaa1f..2703d0aa37 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
@@ -158,7 +158,7 @@ public class DataDriverTest {
               dataRegion,
               ImmutableList.of(seriesScanOperator1, seriesScanOperator2));
 
-      StubSinkHandle sinkHandle = new StubSinkHandle();
+      StubSinkHandle sinkHandle = new StubSinkHandle(fragmentInstanceContext);
 
       try (Driver dataDriver = new DataDriver(limitOperator, sinkHandle, driverContext)) {
         assertEquals(fragmentInstanceContext.getId(), dataDriver.getInfo());
@@ -171,7 +171,7 @@ public class DataDriverTest {
           assertTrue(blocked.isDone());
         }
 
-        assertEquals(FragmentInstanceState.FINISHED, state.get());
+        assertEquals(FragmentInstanceState.FLUSHING, state.get());
 
         List<TsBlock> result = sinkHandle.getTsBlocks();
         assertEquals(13, result.size());
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DefaultTaskSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DefaultTaskSchedulerTest.java
index 32e474fd03..e73529aa06 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DefaultTaskSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DefaultTaskSchedulerTest.java
@@ -29,7 +29,6 @@ import org.apache.iotdb.db.utils.stats.CpuTimer;
 import org.apache.iotdb.mpp.rpc.thrift.InternalService;
 
 import io.airlift.units.Duration;
-import org.apache.thrift.TException;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
@@ -305,7 +304,6 @@ public class DefaultTaskSchedulerTest {
     IDataBlockManager mockDataBlockManager = Mockito.mock(IDataBlockManager.class);
     manager.setBlockManager(mockDataBlockManager);
     InternalService.Client mockMppServiceClient = Mockito.mock(InternalService.Client.class);
-    manager.setMppServiceClient(mockMppServiceClient);
     ITaskScheduler defaultScheduler = manager.getScheduler();
     QueryId queryId = new QueryId("test");
     FragmentInstanceId instanceId1 =
@@ -364,12 +362,6 @@ public class DefaultTaskSchedulerTest {
       manager.getTimeoutQueue().push(testTask1);
       defaultScheduler.toAborted(testTask1);
 
-      try {
-        Mockito.verify(mockMppServiceClient, Mockito.times(1)).cancelQuery(Mockito.any());
-      } catch (TException e) {
-        e.printStackTrace();
-        Assert.fail();
-      }
       Mockito.reset(mockMppServiceClient);
       Mockito.verify(mockDataBlockManager, Mockito.times(2))
           .forceDeregisterFragmentInstance(Mockito.any());