You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/04/16 07:20:15 UTC

[iotdb] branch stable-mpp created (now 34dd4a2515)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch stable-mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 34dd4a2515 Make MPP

This branch includes the following new commits:

     new 34dd4a2515 Make MPP

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: Make MPP

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch stable-mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 34dd4a251530f50e23d88e578287dacf130c0601
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Sat Apr 16 15:19:48 2022 +0800

    Make MPP
---
 .../iotdb/commons/concurrent/ThreadName.java       | 11 +--
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 19 -----
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  6 --
 .../iotdb/db/mpp/buffer/DataBlockManager.java      | 36 ++++----
 .../iotdb/db/mpp/buffer/DataBlockService.java      |  4 +-
 .../mpp/buffer/DataBlockServiceClientFactory.java  | 95 ++++++++++++++++++----
 .../iotdb/db/mpp/buffer/IDataBlockManager.java     | 31 +++----
 .../org/apache/iotdb/db/mpp/buffer/SinkHandle.java |  7 +-
 .../apache/iotdb/db/mpp/buffer/SourceHandle.java   |  6 +-
 .../apache/iotdb/db/mpp/buffer/StubSinkHandle.java | 11 +++
 .../apache/iotdb/db/mpp/execution/Coordinator.java |  4 +-
 .../apache/iotdb/db/mpp/execution/DataDriver.java  |  1 -
 .../iotdb/db/mpp/execution/DriverContext.java      |  4 +
 .../db/mpp/execution/FragmentInstanceContext.java  |  7 ++
 .../db/mpp/execution/FragmentInstanceManager.java  |  2 +-
 .../iotdb/db/mpp/execution/QueryExecution.java     | 24 +++---
 .../iotdb/db/mpp/execution/QueryStateMachine.java  |  6 +-
 .../iotdb/db/mpp/execution/SchemaDriver.java       | 21 ++++-
 .../scheduler/AbstractFragInsStateTracker.java     | 10 ++-
 .../scheduler/FixedRateFragInsStateTracker.java    |  8 ++
 .../scheduler/InternalServiceClientFactory.java    | 89 +++++++++++++++++---
 .../scheduler/SimpleFragInstanceDispatcher.java    | 10 ++-
 .../execution/scheduler/SimpleQueryTerminator.java |  9 +-
 .../db/mpp/operator/process/TimeJoinOperator.java  |  5 +-
 .../db/mpp/schedule/FragmentInstanceScheduler.java | 17 +---
 .../db/mpp/sql/analyze/ClusterSchemaFetcher.java   |  2 +-
 .../db/mpp/sql/planner/LocalExecutionPlanner.java  | 46 +++++------
 .../iotdb/db/service/DataNodeManagementServer.java |  4 +-
 .../apache/iotdb/db/service/InternalService.java   |  4 +-
 .../iotdb/db/service/InternalServiceImpl.java      |  6 +-
 .../thrift/impl/DataNodeTSIServiceImpl.java        |  2 +-
 .../iotdb/db/mpp/execution/DataDriverTest.java     |  4 +-
 .../db/mpp/schedule/DefaultTaskSchedulerTest.java  |  8 --
 33 files changed, 330 insertions(+), 189 deletions(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index e587d38730..33b99446aa 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -74,12 +74,13 @@ public enum ThreadName {
   CLUSTER_MONITOR("ClusterMonitor"),
   CONFIG_NODE_RPC_SERVER("ConfigNodeRpcServer"),
   CONFIG_NODE_RPC_CLIENT("ConfigNodeRPC-Client"),
-  DATA_NODE_MANAGEMENT_SERVER("DataNodeManagementServer"),
-  DATA_NODE_MANAGEMENT_CLIENT("DataNodeManagementClient"),
+  DATA_NODE_MANAGEMENT_RPC_SERVER("DataNodeManagementRPC"),
+  DATA_NODE_MANAGEMENT_RPC_CLIENT("DataNodeManagementRPC-Client"),
   Cluster_Monitor("ClusterMonitor"),
-  DATA_BLOCK_MANAGER_SERVICE("DataBlockManagerService"),
-  DATA_BLOCK_MANAGER_CLIENT("DataBlockManagerService-Client"),
-  INTERNAL_SERVICE_CLIENT("InternalService-Client"),
+  DATA_BLOCK_MANAGER_RPC_SERVER("DataBlockManagerRPC"),
+  DATA_BLOCK_MANAGER_RPC_CLIENT("DataBlockManagerRPC-Client"),
+  INTERNAL_SERVICE_RPC_SERVER("InternalServiceRPC"),
+  INTERNAL_SERVICE_RPC_CLIENT("InternalServiceRPC-Client"),
   ;
 
   private final String name;
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 856381c0d9..9540120af5 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -303,17 +303,6 @@ public class IoTDBConfig {
   /** How many threads can concurrently build index. When <= 0, use CPU core number. */
   private int concurrentIndexBuildThread = Runtime.getRuntime().availableProcessors();
 
-  /**
-   * If we enable the memory-control mechanism during index building , {@code indexBufferSize}
-   * refers to the byte-size of memory buffer threshold. For each index processor, all indexes in
-   * one {@linkplain org.apache.iotdb.db.index.IndexFileProcessor IndexFileProcessor} share a total
-   * common buffer size. With the memory-control mechanism, the occupied memory of all raw data and
-   * index structures will be counted. If the memory buffer size reaches this threshold, the indexes
-   * will be flushed to the disk file. As a result, data in one series may be divided into more than
-   * one part and indexed separately. Unit: byte
-   */
-  private long indexBufferSize = 128 * 1024 * 1024L;
-
   /**
    * the index framework adopts sliding window model to preprocess the original tv list in the
    * subsequence matching task.
@@ -2330,14 +2319,6 @@ public class IoTDBConfig {
     return concurrentIndexBuildThread;
   }
 
-  public long getIndexBufferSize() {
-    return indexBufferSize;
-  }
-
-  public void setIndexBufferSize(long indexBufferSize) {
-    this.indexBufferSize = indexBufferSize;
-  }
-
   public String getIndexRootFolder() {
     return indexRootFolder;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 189d65a85d..c11b14ccab 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -465,12 +465,6 @@ public class IoTDBDescriptor {
                   "default_index_window_range",
                   Integer.toString(conf.getDefaultIndexWindowRange()))));
 
-      conf.setIndexBufferSize(
-          Long.parseLong(
-              properties.getProperty(
-                  "index_buffer_size", Long.toString(conf.getIndexBufferSize()))));
-      // end: index parameter setting
-
       conf.setConcurrentQueryThread(
           Integer.parseInt(
               properties.getProperty(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
index f90fce1f3e..a5278bdfd4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.mpp.buffer;
 
+import org.apache.iotdb.commons.cluster.Endpoint;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
 import org.apache.iotdb.mpp.rpc.thrift.AcknowledgeDataBlockEvent;
 import org.apache.iotdb.mpp.rpc.thrift.DataBlockService;
@@ -191,6 +193,12 @@ public class DataBlockManager implements IDataBlockManager {
   /** Listen to the state changes of a sink handle. */
   class SinkHandleListenerImpl implements SinkHandleListener {
 
+    private final FragmentInstanceContext context;
+
+    public SinkHandleListenerImpl(FragmentInstanceContext context) {
+      this.context = context;
+    }
+
     @Override
     public void onFinish(SinkHandle sinkHandle) {
       logger.info("Release resources of finished sink handle {}", sourceHandles);
@@ -198,10 +206,13 @@ public class DataBlockManager implements IDataBlockManager {
         logger.info("Resources of finished sink handle {} has already been released", sinkHandle);
       }
       sinkHandles.remove(sinkHandle.getLocalFragmentInstanceId());
+      context.finish();
     }
 
     @Override
-    public void onClosed(SinkHandle sinkHandle) {}
+    public void onClosed(SinkHandle sinkHandle) {
+      context.flushing();
+    }
 
     @Override
     public void onAborted(SinkHandle sinkHandle) {
@@ -245,11 +256,10 @@ public class DataBlockManager implements IDataBlockManager {
   @Override
   public ISinkHandle createSinkHandle(
       TFragmentInstanceId localFragmentInstanceId,
-      String remoteHostname,
-      int remotePort,
+      Endpoint endpoint,
       TFragmentInstanceId remoteFragmentInstanceId,
-      String remotePlanNodeId)
-      throws IOException {
+      String remotePlanNodeId,
+      FragmentInstanceContext instanceContext) {
     if (sinkHandles.containsKey(localFragmentInstanceId)) {
       throw new IllegalStateException("Sink handle for " + localFragmentInstanceId + " exists.");
     }
@@ -262,15 +272,15 @@ public class DataBlockManager implements IDataBlockManager {
 
     SinkHandle sinkHandle =
         new SinkHandle(
-            remoteHostname,
+            endpoint.toString(),
             remoteFragmentInstanceId,
             remotePlanNodeId,
             localFragmentInstanceId,
             localMemoryManager,
             executorService,
-            clientFactory.getDataBlockServiceClient(remoteHostname, remotePort),
+            clientFactory.getDataBlockServiceClient(endpoint),
             tsBlockSerdeFactory.get(),
-            new SinkHandleListenerImpl());
+            new SinkHandleListenerImpl(instanceContext));
     sinkHandles.put(localFragmentInstanceId, sinkHandle);
     return sinkHandle;
   }
@@ -279,10 +289,8 @@ public class DataBlockManager implements IDataBlockManager {
   public ISourceHandle createSourceHandle(
       TFragmentInstanceId localFragmentInstanceId,
       String localPlanNodeId,
-      String remoteHostname,
-      int remotePort,
-      TFragmentInstanceId remoteFragmentInstanceId)
-      throws IOException {
+      Endpoint endpoint,
+      TFragmentInstanceId remoteFragmentInstanceId) {
     if (sourceHandles.containsKey(localFragmentInstanceId)
         && sourceHandles.get(localFragmentInstanceId).containsKey(localPlanNodeId)) {
       throw new IllegalStateException(
@@ -301,13 +309,13 @@ public class DataBlockManager implements IDataBlockManager {
 
     SourceHandle sourceHandle =
         new SourceHandle(
-            remoteHostname,
+            endpoint.getIp(),
             remoteFragmentInstanceId,
             localFragmentInstanceId,
             localPlanNodeId,
             localMemoryManager,
             executorService,
-            clientFactory.getDataBlockServiceClient(remoteHostname, remotePort),
+            clientFactory.getDataBlockServiceClient(endpoint),
             tsBlockSerdeFactory.get(),
             new SourceHandleListenerImpl());
     sourceHandles
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockService.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockService.java
index fa9b30cd8a..cabb725b95 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockService.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockService.java
@@ -84,7 +84,7 @@ public class DataBlockService extends ThriftService implements DataBlockServiceM
           new ThriftServiceThread(
               processor,
               getID().getName(),
-              ThreadName.DATA_BLOCK_MANAGER_CLIENT.getName(),
+              ThreadName.DATA_BLOCK_MANAGER_RPC_CLIENT.getName(),
               getBindIP(),
               getBindPort(),
               config.getRpcMaxConcurrentClientNum(),
@@ -95,7 +95,7 @@ public class DataBlockService extends ThriftService implements DataBlockServiceM
     } catch (RPCServiceException e) {
       throw new IllegalAccessException(e.getMessage());
     }
-    thriftServiceThread.setName(ThreadName.DATA_BLOCK_MANAGER_SERVICE.getName());
+    thriftServiceThread.setName(ThreadName.DATA_BLOCK_MANAGER_RPC_SERVER.getName());
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockServiceClientFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockServiceClientFactory.java
index f289662d80..397755cd4e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockServiceClientFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockServiceClientFactory.java
@@ -19,9 +19,9 @@
 
 package org.apache.iotdb.db.mpp.buffer;
 
+import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.mpp.rpc.thrift.DataBlockService;
-import org.apache.iotdb.mpp.rpc.thrift.DataBlockService.Client;
 import org.apache.iotdb.rpc.RpcTransportFactory;
 
 import org.apache.thrift.TException;
@@ -29,22 +29,89 @@ import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TCompactProtocol;
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 public class DataBlockServiceClientFactory {
-  public DataBlockService.Client getDataBlockServiceClient(String hostname, int port)
-      throws IOException {
-    try {
-      TTransport transport = RpcTransportFactory.INSTANCE.getTransportWithNoTimeout(hostname, port);
-      transport.open();
-      TProtocol protocol =
-          IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable()
-              ? new TCompactProtocol(transport)
-              : new TBinaryProtocol(transport);
-      return new Client(protocol);
-    } catch (TException e) {
-      throw new IOException(e);
+
+  private static final int TIMEOUT_MS = 30_000;
+
+  private static final Logger logger = LoggerFactory.getLogger(DataBlockServiceClientFactory.class);
+
+  // TODO need to be replaced by mature client pool in the future
+  private static final Map<Endpoint, DataBlockService.Iface> dataBlockServiceClientMap =
+      new ConcurrentHashMap<>();
+
+  public DataBlockService.Iface getDataBlockServiceClient(Endpoint endpoint) {
+
+    return dataBlockServiceClientMap.computeIfAbsent(
+        endpoint,
+        address -> {
+          TTransport transport;
+          try {
+            transport =
+                RpcTransportFactory.INSTANCE.getTransport(
+                    // as there is a try-catch already, we do not need to use TSocket.wrap
+                    address.getIp(), address.getPort(), TIMEOUT_MS);
+            transport.open();
+            TProtocol protocol =
+                IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable()
+                    ? new TCompactProtocol(transport)
+                    : new TBinaryProtocol(transport);
+            return newSynchronizedClient(new DataBlockService.Client(protocol));
+          } catch (TTransportException e) {
+            logger.error(
+                "error happened while creating mpp service client for {}:{}",
+                endpoint.getIp(),
+                endpoint.getPort(),
+                e);
+            throw new RuntimeException(e);
+          }
+        });
+  }
+
+  public static DataBlockService.Iface newSynchronizedClient(DataBlockService.Iface client) {
+    return (DataBlockService.Iface)
+        Proxy.newProxyInstance(
+            DataBlockServiceClientFactory.class.getClassLoader(),
+            new Class[] {DataBlockService.Iface.class},
+            new SynchronizedHandler(client));
+  }
+
+  private static class SynchronizedHandler implements InvocationHandler {
+
+    private final DataBlockService.Iface client;
+
+    public SynchronizedHandler(DataBlockService.Iface client) {
+      this.client = client;
+    }
+
+    @Override
+    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+      try {
+        synchronized (client) {
+          return method.invoke(client, args);
+        }
+      } catch (InvocationTargetException e) {
+        // all IFace APIs throw TException
+        if (e.getTargetException() instanceof TException) {
+          throw e.getTargetException();
+        } else {
+          // should not happen
+          throw new TException(
+              "Error in calling method " + method.getName(), e.getTargetException());
+        }
+      } catch (Exception e) {
+        throw new TException("Error in calling method " + method.getName(), e);
+      }
     }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManager.java
index e69a4a6b9c..0eff0c6620 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManager.java
@@ -19,12 +19,10 @@
 
 package org.apache.iotdb.db.mpp.buffer;
 
+import org.apache.iotdb.commons.cluster.Endpoint;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 
-import org.apache.thrift.transport.TTransportException;
-
-import java.io.IOException;
-
 public interface IDataBlockManager {
   /**
    * Create a sink handle who sends data blocks to a remote downstream fragment instance in async
@@ -32,19 +30,16 @@ public interface IDataBlockManager {
    *
    * @param localFragmentInstanceId ID of the local fragment instance who generates and sends data
    *     blocks to the sink handle.
-   * @param remoteHostname Hostname of the remote fragment instance where the data blocks should be
-   *     sent to.
-   * @param remotePort Port of the remote fragment instance where the data blocks should be sent to.
-   * @param remoteFragmentInstanceId ID of the remote fragment instance.
+   * @param endpoint Hostname and Port of the remote fragment instance where the data blocks should
+   *     be sent to.
    * @param remotePlanNodeId The sink plan node ID of the remote fragment instance.
    */
   ISinkHandle createSinkHandle(
       TFragmentInstanceId localFragmentInstanceId,
-      String remoteHostname,
-      int remotePort,
+      Endpoint endpoint,
       TFragmentInstanceId remoteFragmentInstanceId,
-      String remotePlanNodeId)
-      throws TTransportException, IOException;
+      String remotePlanNodeId,
+      FragmentInstanceContext instanceContext);
 
   /**
    * Create a source handle who fetches data blocks from a remote upstream fragment instance for a
@@ -53,19 +48,15 @@ public interface IDataBlockManager {
    * @param localFragmentInstanceId ID of the local fragment instance who receives data blocks from
    *     the source handle.
    * @param localPlanNodeId The local sink plan node ID.
-   * @param remoteHostname Hostname of the remote fragment instance where the data blocks should be
-   *     received from.
-   * @param remotePort Port of the remote fragment instance where the data blocks should be received
-   *     from.
+   * @param endpoint Hostname and Port of the remote fragment instance where the data blocks should
+   *     be received from.
    * @param remoteFragmentInstanceId ID of the remote fragment instance.
    */
   ISourceHandle createSourceHandle(
       TFragmentInstanceId localFragmentInstanceId,
       String localPlanNodeId,
-      String remoteHostname,
-      int remotePort,
-      TFragmentInstanceId remoteFragmentInstanceId)
-      throws IOException;
+      Endpoint endpoint,
+      TFragmentInstanceId remoteFragmentInstanceId);
 
   /**
    * Release all the related resources of a fragment instance, including data blocks that are not
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
index 2934a6bf7a..0beab26e36 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
@@ -59,7 +59,7 @@ public class SinkHandle implements ISinkHandle {
   private final TFragmentInstanceId localFragmentInstanceId;
   private final LocalMemoryManager localMemoryManager;
   private final ExecutorService executorService;
-  private final DataBlockService.Client client;
+  private final DataBlockService.Iface client;
   private final TsBlockSerde serde;
   private final SinkHandleListener sinkHandleListener;
 
@@ -82,7 +82,7 @@ public class SinkHandle implements ISinkHandle {
       TFragmentInstanceId localFragmentInstanceId,
       LocalMemoryManager localMemoryManager,
       ExecutorService executorService,
-      DataBlockService.Client client,
+      DataBlockService.Iface client,
       TsBlockSerde serde,
       SinkHandleListener sinkHandleListener) {
     this.remoteHostname = Validate.notNull(remoteHostname);
@@ -359,7 +359,8 @@ public class SinkHandle implements ISinkHandle {
               remotePlanNodeId,
               remoteFragmentInstanceId,
               e.getMessage(),
-              attempt);
+              attempt,
+              e);
           if (attempt == MAX_ATTEMPT_TIMES) {
             synchronized (this) {
               throwable = e;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
index b577174702..802634efa7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
@@ -59,7 +59,7 @@ public class SourceHandle implements ISourceHandle {
   private final String localPlanNodeId;
   private final LocalMemoryManager localMemoryManager;
   private final ExecutorService executorService;
-  private final DataBlockService.Client client;
+  private final DataBlockService.Iface client;
   private final TsBlockSerde serde;
   private final SourceHandleListener sourceHandleListener;
 
@@ -83,7 +83,7 @@ public class SourceHandle implements ISourceHandle {
       String localPlanNodeId,
       LocalMemoryManager localMemoryManager,
       ExecutorService executorService,
-      DataBlockService.Client client,
+      DataBlockService.Iface client,
       TsBlockSerde serde,
       SourceHandleListener sourceHandleListener) {
     this.remoteHostname = Validate.notNull(remoteHostname);
@@ -242,7 +242,7 @@ public class SourceHandle implements ISourceHandle {
     return throwable == null
         && noMoreTsBlocks
         && numActiveGetDataBlocksTask == 0
-        && nextSequenceId - 1 == lastSequenceId
+        && currSequenceId - 1 == lastSequenceId
         && sequenceIdToTsBlock.isEmpty();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
index 7753bc3ff1..9f3b9240c2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/StubSinkHandle.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.buffer;
 
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
@@ -34,8 +35,14 @@ public class StubSinkHandle implements ISinkHandle {
 
   private final List<TsBlock> tsBlocks = new ArrayList<>();
 
+  private final FragmentInstanceContext instanceContext;
+
   private boolean closed = false;
 
+  public StubSinkHandle(FragmentInstanceContext instanceContext) {
+    this.instanceContext = instanceContext;
+  }
+
   @Override
   public long getBufferRetainedSizeInBytes() {
     return 0;
@@ -76,7 +83,11 @@ public class StubSinkHandle implements ISinkHandle {
 
   @Override
   public void close() {
+    if (closed) {
+      return;
+    }
     closed = true;
+    instanceContext.flushing();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
index d92368cee4..51f3a6d6f3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
@@ -44,9 +44,9 @@ import java.util.concurrent.ScheduledExecutorService;
  */
 public class Coordinator {
   private static final String COORDINATOR_EXECUTOR_NAME = "MPPCoordinator";
-  private static final int COORDINATOR_EXECUTOR_SIZE = 10;
+  private static final int COORDINATOR_EXECUTOR_SIZE = 2;
   private static final String COORDINATOR_SCHEDULED_EXECUTOR_NAME = "MPPCoordinatorScheduled";
-  private static final int COORDINATOR_SCHEDULED_EXECUTOR_SIZE = 10;
+  private static final int COORDINATOR_SCHEDULED_EXECUTOR_SIZE = 2;
 
   private static final Endpoint LOCAL_HOST =
       new Endpoint(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
index 88c160dd28..785d8d8db3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
@@ -91,7 +91,6 @@ public class DataDriver implements Driver {
           closed || (driverBlockedFuture.get().isDone() && root != null && root.isFinished());
       if (isFinished) {
         close();
-        driverContext.finish();
       }
       return isFinished;
     } catch (Throwable t) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
index 2168c24f4e..8c20a2c334 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
@@ -43,4 +43,8 @@ public class DriverContext {
   public void finish() {
     fragmentInstanceContext.finish();
   }
+
+  public void flushing() {
+    fragmentInstanceContext.flushing();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
index 0884eb6f9c..c8f3e3ebd8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
@@ -114,6 +114,13 @@ public class FragmentInstanceContext extends QueryContext {
     this.endTime = System.currentTimeMillis();
   }
 
+  public void flushing() {
+    if (state.get().isDone()) {
+      return;
+    }
+    state.set(FragmentInstanceState.FLUSHING);
+  }
+
   public long getEndTime() {
     return endTime;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
index 1af0abfece..537f330f04 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
@@ -60,7 +60,7 @@ public class FragmentInstanceManager {
     this.instanceContext = new ConcurrentHashMap<>();
     this.instanceExecution = new ConcurrentHashMap<>();
     this.instanceManagementExecutor =
-        IoTDBThreadPoolFactory.newScheduledThreadPool(5, "instance-management");
+        IoTDBThreadPoolFactory.newScheduledThreadPool(1, "instance-management");
 
     this.infoCacheTime = new Duration(15, TimeUnit.MINUTES);
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index e7df772ad9..e7e4995995 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.execution;
 
+import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.buffer.DataBlockService;
 import org.apache.iotdb.db.mpp.buffer.ISourceHandle;
@@ -246,19 +247,16 @@ public class QueryExecution implements IQueryExecution {
 
   private void initResultHandle() {
     if (this.resultHandle == null) {
-      try {
-        this.resultHandle =
-            DataBlockService.getInstance()
-                .getDataBlockManager()
-                .createSourceHandle(
-                    context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift(),
-                    context.getResultNodeContext().getVirtualResultNodeId().getId(),
-                    context.getResultNodeContext().getUpStreamEndpoint().getIp(),
-                    IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort(),
-                    context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift());
-      } catch (IOException e) {
-        stateMachine.transitionToFailed();
-      }
+      this.resultHandle =
+          DataBlockService.getInstance()
+              .getDataBlockManager()
+              .createSourceHandle(
+                  context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift(),
+                  context.getResultNodeContext().getVirtualResultNodeId().getId(),
+                  new Endpoint(
+                      context.getResultNodeContext().getUpStreamEndpoint().getIp(),
+                      IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort()),
+                  context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift());
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
index 7bcded27dc..c5bc48ef42 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
@@ -33,9 +33,9 @@ import java.util.concurrent.ExecutorService;
  * register listeners when the state changes of the QueryExecution.
  */
 public class QueryStateMachine {
-  private String name;
-  private StateMachine<QueryState> queryState;
-  private Map<FragmentInstanceId, FragmentInstanceState> fragInstanceStateMap;
+  private final String name;
+  private final StateMachine<QueryState> queryState;
+  private final Map<FragmentInstanceId, FragmentInstanceState> fragInstanceStateMap;
 
   // The executor will be used in all the state machines belonged to this query.
   private Executor stateMachineExecutor;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java
index ea49d012e3..47edf98973 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java
@@ -50,6 +50,8 @@ public class SchemaDriver implements Driver {
 
   private final AtomicReference<SettableFuture<Void>> driverBlockedFuture = new AtomicReference<>();
 
+  private boolean closed = false;
+
   public SchemaDriver(Operator root, ISinkHandle sinkHandle, SchemaDriverContext driverContext) {
     this.root = root;
     this.sinkHandle = sinkHandle;
@@ -66,7 +68,6 @@ public class SchemaDriver implements Driver {
       boolean isFinished = driverBlockedFuture.get().isDone() && root != null && root.isFinished();
       if (isFinished) {
         close();
-        driverContext.finish();
       }
       return isFinished;
     } catch (Throwable t) {
@@ -146,5 +147,21 @@ public class SchemaDriver implements Driver {
   }
 
   @Override
-  public void close() {}
+  public void close() {
+    if (closed) {
+      return;
+    }
+    closed = true;
+    try {
+      if (root != null) {
+        root.close();
+      }
+      if (sinkHandle != null) {
+        sinkHandle.close();
+      }
+    } catch (Throwable t) {
+      logger.error("Failed to closed driver {}", driverContext.getId(), t);
+      driverContext.failed(t);
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java
index b87579561c..0e59d80855 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.mpp.execution.scheduler;
 
+import org.apache.iotdb.commons.cluster.Endpoint;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
 import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
 import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
@@ -56,9 +58,11 @@ public abstract class AbstractFragInsStateTracker implements IFragInstanceStateT
   public abstract void abort();
 
   protected FragmentInstanceState fetchState(FragmentInstance instance) throws TException {
-    InternalService.Client client =
-        InternalServiceClientFactory.getInternalServiceClient(
-            instance.getHostEndpoint().getIp(), instance.getHostEndpoint().getPort());
+    InternalService.Iface client =
+        InternalServiceClientFactory.getMppServiceClient(
+            new Endpoint(
+                instance.getHostEndpoint().getIp(),
+                IoTDBDescriptor.getInstance().getConfig().getMppPort()));
     TFragmentInstanceStateResp resp =
         client.fetchFragmentInstanceState(new TFetchFragmentInstanceStateReq(getTId(instance)));
     return FragmentInstanceState.valueOf(resp.state);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateTracker.java
index 0fb90f160a..16cb7fcdda 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateTracker.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateTracker.java
@@ -24,11 +24,16 @@ import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
 import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
 
 import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.concurrent.*;
 
 public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker {
+
+  private static final Logger logger = LoggerFactory.getLogger(FixedRateFragInsStateTracker.class);
+
   // TODO: (xingtanzjr) consider how much Interval is OK for state tracker
   private static final long STATE_FETCH_INTERVAL_IN_MS = 500;
   private ScheduledFuture<?> trackTask;
@@ -59,11 +64,14 @@ public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker {
     for (FragmentInstance instance : instances) {
       try {
         FragmentInstanceState state = fetchState(instance);
+        logger.info("Instance {}'s State is {}", instance.getId(), state);
+
         if (state != null) {
           stateMachine.updateFragInstanceState(instance.getId(), state);
         }
       } catch (TException e) {
         // TODO: do nothing ?
+        logger.error("error happened while fetching query state", e);
       }
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/InternalServiceClientFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/InternalServiceClientFactory.java
index 4a5a94b4bf..ce60581900 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/InternalServiceClientFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/InternalServiceClientFactory.java
@@ -19,26 +19,95 @@
 
 package org.apache.iotdb.db.mpp.execution.scheduler;
 
+import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.mpp.rpc.thrift.InternalService;
 import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.SynchronizedHandler;
 
+import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 public class InternalServiceClientFactory {
-  private static final int TIMEOUT_MS = 10000;
+  private static final int TIMEOUT_MS = 10_000;
+
+  private static final Logger logger = LoggerFactory.getLogger(InternalServiceClientFactory.class);
+
+  // TODO need to be replaced by mature client pool in the future
+  private static final Map<Endpoint, InternalService.Iface> mppServiceClientMap =
+      new ConcurrentHashMap<>();
 
-  // TODO: (xingtanzjr) consider the best practice to maintain the clients
-  public static InternalService.Client getInternalServiceClient(String endpoint, int port)
+  public static InternalService.Iface getMppServiceClient(Endpoint endpoint)
       throws TTransportException {
-    TTransport transport =
-        RpcTransportFactory.INSTANCE.getTransport(
-            // as there is a try-catch already, we do not need to use TSocket.wrap
-            endpoint, port, TIMEOUT_MS);
-    transport.open();
-    TProtocol protocol = new TBinaryProtocol(transport);
-    return new InternalService.Client(protocol);
+    return mppServiceClientMap.computeIfAbsent(
+        endpoint,
+        address -> {
+          TTransport transport;
+          try {
+            transport =
+                RpcTransportFactory.INSTANCE.getTransport(
+                    // as there is a try-catch already, we do not need to use TSocket.wrap
+                    address.getIp(), address.getPort(), TIMEOUT_MS);
+            transport.open();
+
+          } catch (TTransportException e) {
+            logger.error(
+                "error happened while creating mpp service client for {}:{}",
+                endpoint.getIp(),
+                endpoint.getPort(),
+                e);
+            throw new RuntimeException(e);
+          }
+          TProtocol protocol = new TBinaryProtocol(transport);
+          return newSynchronizedClient(new InternalService.Client(protocol));
+        });
+  }
+
+  public static InternalService.Iface newSynchronizedClient(InternalService.Iface client) {
+    return (InternalService.Iface)
+        Proxy.newProxyInstance(
+            InternalServiceClientFactory.class.getClassLoader(),
+            new Class[] {InternalService.Iface.class},
+            new SynchronizedHandler(client));
+  }
+
+  private static class SynchronizedHandler implements InvocationHandler {
+
+    private final InternalService.Iface client;
+
+    public SynchronizedHandler(InternalService.Iface client) {
+      this.client = client;
+    }
+
+    @Override
+    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+      try {
+        synchronized (client) {
+          return method.invoke(client, args);
+        }
+      } catch (InvocationTargetException e) {
+        // all IFace APIs throw TException
+        if (e.getTargetException() instanceof TException) {
+          throw e.getTargetException();
+        } else {
+          // should not happen
+          throw new TException(
+              "Error in calling method " + method.getName(), e.getTargetException());
+        }
+      } catch (Exception e) {
+        throw new TException("Error in calling method " + method.getName(), e);
+      }
+    }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
index fa81d65764..bdece41164 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.mpp.execution.scheduler;
 
+import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
 import org.apache.iotdb.mpp.rpc.thrift.InternalService;
@@ -47,10 +48,11 @@ public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher {
           TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp(false);
           try {
             for (FragmentInstance instance : instances) {
-              InternalService.Client client =
-                  InternalServiceClientFactory.getInternalServiceClient(
-                      instance.getHostEndpoint().getIp(),
-                      IoTDBDescriptor.getInstance().getConfig().getMppPort());
+              InternalService.Iface client =
+                  InternalServiceClientFactory.getMppServiceClient(
+                      new Endpoint(
+                          instance.getHostEndpoint().getIp(),
+                          IoTDBDescriptor.getInstance().getConfig().getMppPort()));
               // TODO: (xingtanzjr) consider how to handle the buffer here
               ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
               instance.serializeRequest(buffer);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java
index 60d1e9c5a5..ca5e1f2230 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.mpp.execution.scheduler;
 
 import org.apache.iotdb.commons.cluster.Endpoint;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
 import org.apache.iotdb.mpp.rpc.thrift.InternalService;
@@ -54,9 +55,11 @@ public class SimpleQueryTerminator implements IQueryTerminator {
             () -> {
               try {
                 for (Endpoint endpoint : relatedHost) {
-                  InternalService.Client client =
-                      InternalServiceClientFactory.getInternalServiceClient(
-                          endpoint.getIp(), endpoint.getPort());
+                  InternalService.Iface client =
+                      InternalServiceClientFactory.getMppServiceClient(
+                          new Endpoint(
+                              endpoint.getIp(),
+                              IoTDBDescriptor.getInstance().getConfig().getMppPort()));
                   client.cancelQuery(new TCancelQueryReq(queryId.getId()));
                 }
               } catch (TException e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
index f58af90e77..c5fbaf10b5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
@@ -122,8 +122,9 @@ public class TimeJoinOperator implements ProcessOperator {
     }
 
     if (timeSelector.isEmpty()) {
-      // TODO need to discuss whether to return null or return an empty TSBlock with TsBlockMetadata
-      return null;
+      // return empty TsBlock
+      TsBlockBuilder tsBlockBuilder = new TsBlockBuilder(0, dataTypes);
+      return tsBlockBuilder.build();
     }
 
     TsBlockBuilder tsBlockBuilder = TsBlockBuilder.createWithOnlyTimeColumn();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
index b2200b7690..a08e5662a2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
@@ -33,11 +33,8 @@ import org.apache.iotdb.db.mpp.schedule.queue.L2PriorityQueue;
 import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
 import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTaskID;
 import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTaskStatus;
-import org.apache.iotdb.mpp.rpc.thrift.InternalService;
-import org.apache.iotdb.mpp.rpc.thrift.TCancelQueryReq;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 
-import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -69,9 +66,8 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
 
   private static final int MAX_CAPACITY = 1000; // TODO: load from config files
   private static final int WORKER_THREAD_NUM = 4; // TODO: load from config files
-  private static final int QUERY_TIMEOUT_MS = 10000; // TODO: load from config files or requests
+  private static final int QUERY_TIMEOUT_MS = 600_000; // TODO: load from config files or requests
   private final ThreadGroup workerGroups;
-  private InternalService.Client mppServiceClient; // TODO: use from client pool
   private final List<AbstractExecutor> threads;
 
   private FragmentInstanceScheduler() {
@@ -242,11 +238,6 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
     this.blockManager = blockManager;
   }
 
-  @TestOnly
-  void setMppServiceClient(InternalService.Client client) {
-    this.mppServiceClient = client;
-  }
-
   private static class InstanceHolder {
 
     private InstanceHolder() {}
@@ -348,12 +339,6 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
       QueryId queryId = task.getId().getQueryId();
       Set<FragmentInstanceTask> queryRelatedTasks = queryMap.remove(queryId);
       if (queryRelatedTasks != null) {
-        try {
-          mppServiceClient.cancelQuery(new TCancelQueryReq(queryId.getId()));
-        } catch (TException e) {
-          // If coordinator cancel query failed, we should continue clean other tasks.
-          logger.error("cancel query " + queryId.getId() + " failed", e);
-        }
         for (FragmentInstanceTask otherTask : queryRelatedTasks) {
           if (task.equals(otherTask)) {
             continue;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
index 783372eca6..3877706717 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
@@ -75,7 +75,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
         coordinator.execute(schemaFetchStatement, queryId, null, "", partitionFetcher, this);
     // TODO: (xingtanzjr) throw exception
     if (executionResult.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      throw new RuntimeException("cannot fetch schema");
+      throw new RuntimeException("cannot fetch schema, status is: " + executionResult.status);
     }
     TsBlock tsBlock = coordinator.getQueryExecution(queryId).getBatchResult();
     // TODO: (xingtanzjr) need to release this query's resource here
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
index 166a3077a7..b0c277f65e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
@@ -68,7 +68,6 @@ import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -301,18 +300,15 @@ public class LocalExecutionPlanner {
       FragmentInstanceId remoteInstanceId = node.getUpstreamInstanceId();
       Endpoint source = node.getUpstreamEndpoint();
 
-      try {
-        ISourceHandle sourceHandle =
-            DATA_BLOCK_MANAGER.createSourceHandle(
-                localInstanceId.toThrift(),
-                node.getPlanNodeId().getId(),
-                source.getIp(),
-                IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort(),
-                remoteInstanceId.toThrift());
-        return new ExchangeOperator(operatorContext, sourceHandle, node.getUpstreamPlanNodeId());
-      } catch (IOException e) {
-        throw new RuntimeException("Error happened while creating source handle", e);
-      }
+      ISourceHandle sourceHandle =
+          DATA_BLOCK_MANAGER.createSourceHandle(
+              localInstanceId.toThrift(),
+              node.getPlanNodeId().getId(),
+              new Endpoint(
+                  source.getIp(),
+                  IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort()),
+              remoteInstanceId.toThrift());
+      return new ExchangeOperator(operatorContext, sourceHandle, node.getUpstreamPlanNodeId());
     }
 
     @Override
@@ -321,19 +317,17 @@ public class LocalExecutionPlanner {
       Endpoint target = node.getDownStreamEndpoint();
       FragmentInstanceId localInstanceId = context.instanceContext.getId();
       FragmentInstanceId targetInstanceId = node.getDownStreamInstanceId();
-      try {
-        ISinkHandle sinkHandle =
-            DATA_BLOCK_MANAGER.createSinkHandle(
-                localInstanceId.toThrift(),
-                target.getIp(),
-                IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort(),
-                targetInstanceId.toThrift(),
-                node.getDownStreamPlanNodeId().getId());
-        context.setSinkHandle(sinkHandle);
-        return child;
-      } catch (IOException e) {
-        throw new RuntimeException("Error happened while creating sink handle", e);
-      }
+      ISinkHandle sinkHandle =
+          DATA_BLOCK_MANAGER.createSinkHandle(
+              localInstanceId.toThrift(),
+              new Endpoint(
+                  target.getIp(),
+                  IoTDBDescriptor.getInstance().getConfig().getDataBlockManagerPort()),
+              targetInstanceId.toThrift(),
+              node.getDownStreamPlanNodeId().getId(),
+              context.instanceContext);
+      context.setSinkHandle(sinkHandle);
+      return child;
     }
 
     @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNodeManagementServer.java b/server/src/main/java/org/apache/iotdb/db/service/DataNodeManagementServer.java
index 44513c40d1..6c52ac4066 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNodeManagementServer.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNodeManagementServer.java
@@ -66,7 +66,7 @@ public class DataNodeManagementServer extends ThriftService
           new ThriftServiceThread(
               processor,
               getID().getName(),
-              ThreadName.DATA_NODE_MANAGEMENT_CLIENT.getName(),
+              ThreadName.DATA_NODE_MANAGEMENT_RPC_CLIENT.getName(),
               getBindIP(),
               getBindPort(),
               config.getRpcMaxConcurrentClientNum(),
@@ -76,7 +76,7 @@ public class DataNodeManagementServer extends ThriftService
     } catch (RPCServiceException e) {
       throw new IllegalAccessException(e.getMessage());
     }
-    thriftServiceThread.setName(ThreadName.RPC_SERVICE.getName());
+    thriftServiceThread.setName(ThreadName.DATA_NODE_MANAGEMENT_RPC_SERVER.getName());
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/service/InternalService.java b/server/src/main/java/org/apache/iotdb/db/service/InternalService.java
index 4414fb9110..5236d2bf87 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/InternalService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/InternalService.java
@@ -61,7 +61,7 @@ public class InternalService extends ThriftService implements InternalServiceMBe
           new ThriftServiceThread(
               processor,
               getID().getName(),
-              ThreadName.INTERNAL_SERVICE_CLIENT.getName(),
+              ThreadName.INTERNAL_SERVICE_RPC_CLIENT.getName(),
               getBindIP(),
               getBindPort(),
               config.getRpcMaxConcurrentClientNum(),
@@ -72,7 +72,7 @@ public class InternalService extends ThriftService implements InternalServiceMBe
     } catch (RPCServiceException e) {
       throw new IllegalAccessException(e.getMessage());
     }
-    thriftServiceThread.setName(ThreadName.INTERNAL_SERVICE_CLIENT.getName());
+    thriftServiceThread.setName(ThreadName.INTERNAL_SERVICE_RPC_SERVER.getName());
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
index 34634d74cc..c3c398bbf5 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
@@ -88,7 +88,11 @@ public class InternalServiceImpl implements InternalService.Iface {
 
   @Override
   public TCancelResp cancelQuery(TCancelQueryReq req) throws TException {
-    throw new NotImplementedException();
+
+    // TODO need to be implemented and currently in order not to print NotImplementedException log,
+    // we simply return null
+    return null;
+    //    throw new NotImplementedException();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
index 4b86aca612..2c13c68ad7 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
@@ -233,7 +233,7 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
               SCHEMA_FETCHER);
 
       if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        throw new RuntimeException("");
+        throw new RuntimeException("error coed: " + result.status);
       }
 
       IQueryExecution queryExecution = COORDINATOR.getQueryExecution(id);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
index cf299aaa1f..2703d0aa37 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
@@ -158,7 +158,7 @@ public class DataDriverTest {
               dataRegion,
               ImmutableList.of(seriesScanOperator1, seriesScanOperator2));
 
-      StubSinkHandle sinkHandle = new StubSinkHandle();
+      StubSinkHandle sinkHandle = new StubSinkHandle(fragmentInstanceContext);
 
       try (Driver dataDriver = new DataDriver(limitOperator, sinkHandle, driverContext)) {
         assertEquals(fragmentInstanceContext.getId(), dataDriver.getInfo());
@@ -171,7 +171,7 @@ public class DataDriverTest {
           assertTrue(blocked.isDone());
         }
 
-        assertEquals(FragmentInstanceState.FINISHED, state.get());
+        assertEquals(FragmentInstanceState.FLUSHING, state.get());
 
         List<TsBlock> result = sinkHandle.getTsBlocks();
         assertEquals(13, result.size());
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DefaultTaskSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DefaultTaskSchedulerTest.java
index 32e474fd03..e73529aa06 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DefaultTaskSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DefaultTaskSchedulerTest.java
@@ -29,7 +29,6 @@ import org.apache.iotdb.db.utils.stats.CpuTimer;
 import org.apache.iotdb.mpp.rpc.thrift.InternalService;
 
 import io.airlift.units.Duration;
-import org.apache.thrift.TException;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
@@ -305,7 +304,6 @@ public class DefaultTaskSchedulerTest {
     IDataBlockManager mockDataBlockManager = Mockito.mock(IDataBlockManager.class);
     manager.setBlockManager(mockDataBlockManager);
     InternalService.Client mockMppServiceClient = Mockito.mock(InternalService.Client.class);
-    manager.setMppServiceClient(mockMppServiceClient);
     ITaskScheduler defaultScheduler = manager.getScheduler();
     QueryId queryId = new QueryId("test");
     FragmentInstanceId instanceId1 =
@@ -364,12 +362,6 @@ public class DefaultTaskSchedulerTest {
       manager.getTimeoutQueue().push(testTask1);
       defaultScheduler.toAborted(testTask1);
 
-      try {
-        Mockito.verify(mockMppServiceClient, Mockito.times(1)).cancelQuery(Mockito.any());
-      } catch (TException e) {
-        e.printStackTrace();
-        Assert.fail();
-      }
       Mockito.reset(mockMppServiceClient);
       Mockito.verify(mockDataBlockManager, Mockito.times(2))
           .forceDeregisterFragmentInstance(Mockito.any());