You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ea...@apache.org on 2019/05/27 08:47:27 UTC

[incubator-iotdb] branch cluster updated: update logic of trying all nodes based on update of NodeAsClient

This is an automated email from the ASF dual-hosted git repository.

east pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/cluster by this push:
     new 4916e26  update logic of trying all nodes based on update of NodeAsClient
4916e26 is described below

commit 4916e26deca556bc18a4eb6e6c7386716ab30880
Author: mdf369 <95...@qq.com>
AuthorDate: Mon May 27 16:47:12 2019 +0800

    update logic of trying all nodes based on update of NodeAsClient
---
 .../cluster/qp/executor/AbstractQPExecutor.java    |  57 ++++++-
 .../cluster/qp/executor/NonQueryExecutor.java      |  51 +-----
 .../cluster/qp/executor/QueryMetadataExecutor.java | 187 +++++----------------
 .../apache/iotdb/cluster/qp/task/BatchQPTask.java  |  40 ++++-
 .../cluster/service/TSServiceClusterImpl.java      |   4 +-
 5 files changed, 131 insertions(+), 208 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
index faacfff..9b2da09 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
@@ -90,10 +90,42 @@ public abstract class AbstractQPExecutor {
    * @param taskRetryNum Number of QPTask retries due to timeout and redirected.
    * @return basic response
    */
-  protected BasicResponse syncHandleNonQuerySingleTaskGetRes(SingleQPTask task, int taskRetryNum)
+  protected BasicResponse syncHandleSingleTaskGetRes(SingleQPTask task, int taskRetryNum, String taskInfo, String groupId)
       throws InterruptedException, RaftConnectionException {
-    asyncSendNonQuerySingleTask(task, taskRetryNum);
-    return syncGetNonQueryRes(task, taskRetryNum);
+    PeerId firstNode = task.getTargetNode();
+    BasicResponse response = null;
+    try {
+      asyncSendSingleTask(task, taskRetryNum);
+      response = syncGetSingleTaskRes(task, taskRetryNum, taskInfo, groupId);
+    } catch (RaftConnectionException ex) {
+      boolean success = false;
+      while (!success) {
+        PeerId nextNode = null;
+        try {
+          nextNode = RaftUtils.getPeerIDInOrder(groupId);
+          if (firstNode.equals(nextNode)) {
+            break;
+          }
+          LOGGER.debug(
+              "Previous task fail, then send {} task for group {} to node {}.", taskInfo, groupId,
+              nextNode);
+          task.resetTask();
+          task.setTargetNode(nextNode);
+          task.setTaskState(TaskState.INITIAL);
+          asyncSendSingleTask(task, taskRetryNum);
+          response = syncGetSingleTaskRes(task, taskRetryNum, taskInfo, groupId);
+          LOGGER.debug("{} task for group {} to node {} succeed.", taskInfo, groupId, nextNode);
+          success = true;
+        } catch (RaftConnectionException e1) {
+          LOGGER.debug("{} task for group {} to node {} fail.", taskInfo, groupId, nextNode);
+        }
+      }
+      LOGGER.debug("The final result for {} task is {}", taskInfo, success);
+      if (!success) {
+        throw ex;
+      }
+    }
+    return response;
   }
 
   /**
@@ -101,7 +133,7 @@ public abstract class AbstractQPExecutor {
    *  @param task rpc task
    * @param taskRetryNum Retry time of the task
    */
-  protected void asyncSendNonQuerySingleTask(SingleQPTask task, int taskRetryNum)
+  protected void asyncSendSingleTask(SingleQPTask task, int taskRetryNum)
       throws RaftConnectionException {
     if (taskRetryNum >= TASK_MAX_RETRY) {
       throw new RaftConnectionException(String.format("QPTask retries reach the upper bound %s",
@@ -117,7 +149,7 @@ public abstract class AbstractQPExecutor {
    * @param task rpc task
    * @param taskRetryNum Retry time of the task
    */
-  private BasicResponse syncGetNonQueryRes(SingleQPTask task, int taskRetryNum)
+  private BasicResponse syncGetSingleTaskRes(SingleQPTask task, int taskRetryNum, String taskInfo, String groupId)
       throws InterruptedException, RaftConnectionException {
     task.await();
     PeerId leader;
@@ -131,14 +163,13 @@ public abstract class AbstractQPExecutor {
         LOGGER.debug("Redirect leader: {}, group id = {}", leader, task.getRequest().getGroupID());
         RaftUtils.updateRaftGroupLeader(task.getRequest().getGroupID(), leader);
       } else {
-        String groupId = task.getRequest().getGroupID();
         RaftUtils.removeCachedRaftGroupLeader(groupId);
         LOGGER.debug("Remove cached raft group leader of {}", groupId);
         leader = RaftUtils.getLocalLeaderPeerID(groupId);
       }
       task.setTargetNode(leader);
       task.resetTask();
-      return syncHandleNonQuerySingleTaskGetRes(task, taskRetryNum + 1);
+      return syncHandleSingleTaskGetRes(task, taskRetryNum + 1, taskInfo, groupId);
     }
     return task.getResponse();
   }
@@ -174,4 +205,16 @@ public abstract class AbstractQPExecutor {
     checkInitConsistencyLevel();
     return readDataConsistencyLevel.get();
   }
+
+  /**
+   * Async handle task by SingleQPTask and leader id.
+   *
+   * @param task request SingleQPTask
+   * @return request result
+   */
+  public boolean syncHandleSingleTask(SingleQPTask task, String taskInfo, String groupId)
+      throws RaftConnectionException, InterruptedException {
+    BasicResponse response = syncHandleSingleTaskGetRes(task, 0, taskInfo, groupId);
+    return response != null && response.isSuccess();
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
index 7ba9ef7..b7dd242 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
@@ -37,7 +37,6 @@ import org.apache.iotdb.cluster.entity.raft.RaftService;
 import org.apache.iotdb.cluster.exception.RaftConnectionException;
 import org.apache.iotdb.cluster.qp.task.BatchQPTask;
 import org.apache.iotdb.cluster.qp.task.QPTask;
-import org.apache.iotdb.cluster.qp.task.QPTask.TaskState;
 import org.apache.iotdb.cluster.qp.task.SingleQPTask;
 import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
 import org.apache.iotdb.cluster.rpc.raft.request.nonquery.DataGroupNonQueryRequest;
@@ -327,43 +326,8 @@ public class NonQueryExecutor extends AbstractQPExecutor {
       return handleNonQueryRequestLocally(groupId, qpTask);
     } else {
       PeerId leader = RaftUtils.getLocalLeaderPeerID(groupId);
-      boolean res = false;
       qpTask.setTargetNode(leader);
-      try {
-         res = syncHandleNonQueryTask(qpTask);
-      } catch (RaftConnectionException ex) {
-        boolean success = false;
-        PeerId nextNode = RaftUtils.getPeerIDInOrder(groupId);
-        PeerId firstNode = nextNode;
-        boolean first = true;
-        while (!success) {
-          try {
-            if (!first) {
-              nextNode = RaftUtils.getPeerIDInOrder(groupId);
-              if (firstNode.equals(nextNode)) {
-                break;
-              }
-            }
-            first = false;
-            LOGGER.debug("Previous task fail, then send non-query task for group {} to node {}.", groupId, nextNode);
-            qpTask.resetTask();
-            qpTask.setTargetNode(nextNode);
-            qpTask.setTaskState(TaskState.INITIAL);
-            currentTask.set(qpTask);
-            res = syncHandleNonQueryTask(qpTask);
-            LOGGER.debug("Non-query task for group {} to node {} succeed.", groupId, nextNode);
-            success = true;
-            RaftUtils.updateRaftGroupLeader(groupId, nextNode);
-          } catch (RaftConnectionException e1) {
-            LOGGER.debug("Non-query task for group {} to node {} fail.", groupId, nextNode);
-          }
-        }
-        LOGGER.debug("The final result for non-query task is {}", success);
-        if (!success) {
-          throw ex;
-        }
-      }
-      return res;
+      return syncHandleSingleTask(qpTask, "non-query", groupId);
     }
   }
 
@@ -387,17 +351,4 @@ public class NonQueryExecutor extends AbstractQPExecutor {
     /** Apply qpTask to Raft Node **/
     return RaftUtils.executeRaftTaskForLocalProcessor(service, qpTask, response);
   }
-
-  /**
-   * Async handle task by QPTask and leader id.
-   *
-   * @param task request QPTask
-   * @return request result
-   */
-  public boolean syncHandleNonQueryTask(SingleQPTask task)
-      throws RaftConnectionException, InterruptedException {
-    BasicResponse response = syncHandleNonQuerySingleTaskGetRes(task, 0);
-    return response != null && response.isSuccess();
-  }
-
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
index 625269e..a2528c6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
@@ -22,6 +22,7 @@ import com.alipay.sofa.jraft.Status;
 import com.alipay.sofa.jraft.closure.ReadIndexClosure;
 import com.alipay.sofa.jraft.entity.PeerId;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -31,6 +32,7 @@ import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.entity.raft.MetadataRaftHolder;
 import org.apache.iotdb.cluster.entity.raft.RaftService;
 import org.apache.iotdb.cluster.exception.RaftConnectionException;
+import org.apache.iotdb.cluster.qp.task.BatchQPTask;
 import org.apache.iotdb.cluster.qp.task.QPTask.TaskState;
 import org.apache.iotdb.cluster.qp.task.SingleQPTask;
 import org.apache.iotdb.cluster.rpc.raft.request.querymetadata.QueryMetadataInStringRequest;
@@ -46,6 +48,7 @@ import org.apache.iotdb.cluster.rpc.raft.response.querymetadata.QueryPathsRespon
 import org.apache.iotdb.cluster.rpc.raft.response.querymetadata.QuerySeriesTypeResponse;
 import org.apache.iotdb.cluster.rpc.raft.response.querymetadata.QueryStorageGroupResponse;
 import org.apache.iotdb.cluster.rpc.raft.response.querymetadata.QueryTimeSeriesResponse;
+import org.apache.iotdb.cluster.service.TSServiceClusterImpl.BatchResult;
 import org.apache.iotdb.cluster.utils.QPExecutorUtils;
 import org.apache.iotdb.cluster.utils.RaftUtils;
 import org.apache.iotdb.db.exception.PathErrorException;
@@ -145,50 +148,34 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
     try {
       LOGGER.debug("Send show timeseries {} task for group {} to node {}.", pathList, groupId,
           holder);
-      res.addAll(queryTimeSeries(task));
+      res.addAll(queryTimeSeries(task, pathList, groupId));
     } catch (RaftConnectionException e) {
-      boolean success = false;
-      while (!success) {
-        PeerId nextNode = null;
-        try {
-          nextNode = RaftUtils.getPeerIDInOrder(groupId);
-          if (holder.equals(nextNode)) {
-            break;
-          }
-          LOGGER.debug(
-              "Previous task fail, then send show timeseries {} task for group {} to node {}.",
-              pathList, groupId, nextNode);
-          task.resetTask();
-          task.setTargetNode(holder);
-          task.setTaskState(TaskState.INITIAL);
-          res.addAll(queryTimeSeries(task));
-          LOGGER
-              .debug("Show timeseries {} task for group {} to node {} succeed.", pathList, groupId,
-                  nextNode);
-          success = true;
-        } catch (RaftConnectionException e1) {
-          LOGGER.debug("Show timeseries {} task for group {} to node {} fail.", pathList, groupId,
-              nextNode);
-        }
-      }
-      LOGGER.debug("The final result for show timeseries {} task is {}", pathList, success);
-      if (!success) {
-        throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
-      }
+      throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
     }
   }
 
+  private List<List<String>> queryTimeSeries(SingleQPTask task, List<String> pathList, String groupId)
+      throws InterruptedException, RaftConnectionException {
+    BasicResponse response = syncHandleSingleTaskGetRes(task, 0, "show timeseries " + pathList, groupId);
+    return response == null ? new ArrayList<>()
+        : ((QueryTimeSeriesResponse) response).getTimeSeries();
+  }
+
   public String processMetadataInStringQuery()
       throws InterruptedException, ProcessorException {
     Set<String> groupIdSet = router.getAllGroupId();
 
     List<String> metadataList = new ArrayList<>(groupIdSet.size());
-    List<SingleQPTask> taskList = new ArrayList<>();
+
+    BatchResult batchResult = new BatchResult(true, new StringBuilder(), new int[groupIdSet.size()]);
+    Map<String, List<Integer>> planIndexMap = new HashMap<>();
+    Map<String, SingleQPTask> subTaskMap = new HashMap<>();
+
+    int index = 0;
     for (String groupId : groupIdSet) {
       QueryMetadataInStringRequest request = new QueryMetadataInStringRequest(groupId,
           getReadMetadataConsistencyLevel());
       SingleQPTask task = new SingleQPTask(false, request);
-      taskList.add(task);
 
       LOGGER.debug("Execute show metadata in string statement for group {}.", groupId);
       PeerId holder;
@@ -202,43 +189,17 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
         holder = RaftUtils.getPeerIDInOrder(groupId);
       }
       task.setTargetNode(holder);
-      try {
-        LOGGER.debug("Send show metadata in string task for group {} to node {}.", groupId, holder);
-        asyncSendNonQuerySingleTask(task, 0);
-      } catch (RaftConnectionException e) {
-        boolean success = false;
-        while (!success) {
-          PeerId nextNode = null;
-          try {
-            nextNode = RaftUtils.getPeerIDInOrder(groupId);
-            if (holder.equals(nextNode)) {
-              break;
-            }
-            LOGGER.debug(
-                "Previous task fail, then send show metadata in string task for group {} to node {}.",
-                groupId, nextNode);
-            task.resetTask();
-            task.setTargetNode(nextNode);
-            task.setTaskState(TaskState.INITIAL);
-            asyncSendNonQuerySingleTask(task, 0);
-            LOGGER.debug("Show metadata in string task for group {} to node {} succeed.", groupId,
-                nextNode);
-            success = true;
-          } catch (RaftConnectionException e1) {
-            LOGGER.debug("Show metadata in string task for group {} to node {} fail.", groupId,
-                nextNode);
-          }
-        }
-        LOGGER.debug("The final result for show metadata in string task is {}", success);
-        if (!success) {
-          throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
-        }
-      }
+      subTaskMap.put(groupId, task);
+      planIndexMap.computeIfAbsent(groupId, l -> new ArrayList<>()).add(index++);
     }
-    for (int i = 0; i < taskList.size(); i++) {
-      SingleQPTask task = taskList.get(i);
-      task.await();
-      BasicResponse response = task.getResponse();
+
+    BatchQPTask batchTask = new BatchQPTask(subTaskMap.size(), batchResult, subTaskMap, planIndexMap);
+    currentTask.set(batchTask);
+    batchTask.executeQueryMetadataBy(this, "show metadata in string");
+    batchTask.await();
+
+    for (SingleQPTask subTask : subTaskMap.values()) {
+      BasicResponse response = subTask.getResponse();
       if (response == null || !response.isSuccess()) {
         String errorMessage = "response is null";
         if (response != null && response.getErrorMsg() != null) {
@@ -278,7 +239,7 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
       task.setTargetNode(holder);
       try {
         LOGGER.debug("Send query metadata task for group {} to node {}.", groupId, holder);
-        asyncSendNonQuerySingleTask(task, 0);
+        asyncSendSingleTask(task, 0);
       } catch (RaftConnectionException e) {
         boolean success = false;
         while (!success) {
@@ -294,7 +255,7 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
             task.resetTask();
             task.setTargetNode(nextNode);
             task.setTaskState(TaskState.INITIAL);
-            asyncSendNonQuerySingleTask(task, 0);
+            asyncSendSingleTask(task, 0);
             LOGGER.debug("Query metadata task for group {} to node {} succeed.", groupId, nextNode);
             success = true;
           } catch (RaftConnectionException e1) {
@@ -352,41 +313,21 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
       try {
         LOGGER.debug("Send get series type for {} task for group {} to node {}.", path, groupId,
             holder);
-        dataType = querySeriesType(task);
+        dataType = querySeriesType(task, path, groupId);
       } catch (RaftConnectionException e) {
-        boolean success = false;
-        while (!success) {
-          PeerId nextNode = null;
-          try {
-            nextNode = RaftUtils.getPeerIDInOrder(groupId);
-            if (holder.equals(nextNode)) {
-              break;
-            }
-            LOGGER.debug(
-                "Previous task fail, then send get series type for {} task for group {} to node {}.",
-                path, groupId, nextNode);
-            task.resetTask();
-            task.setTargetNode(nextNode);
-            task.setTaskState(TaskState.INITIAL);
-            dataType = querySeriesType(task);
-            LOGGER.debug("Get series type for {} task for group {} to node {} succeed.", path,
-                groupId, nextNode);
-            success = true;
-          } catch (RaftConnectionException e1) {
-            LOGGER.debug("Get series type for {} task for group {} to node {} fail.", path, groupId,
-                nextNode);
-            continue;
-          }
-        }
-        LOGGER.debug("The final result for get series type for {} task is {}", path, success);
-        if (!success) {
-          throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
-        }
+        throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
       }
     }
     return dataType;
   }
 
+  private TSDataType querySeriesType(SingleQPTask task, String path, String groupId)
+      throws InterruptedException, RaftConnectionException {
+    BasicResponse response = syncHandleSingleTaskGetRes(task, 0, "get series type for " + path, groupId);
+    return response == null ? null
+        : ((QuerySeriesTypeResponse) response).getDataType();
+  }
+
   /**
    * Handle show timeseries <path> statement
    */
@@ -433,50 +374,17 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
     try {
       LOGGER
           .debug("Send get paths for {} task for group {} to node {}.", pathList, groupId, holder);
-      res.addAll(queryPaths(task));
+      res.addAll(queryPaths(task, pathList, groupId));
     } catch (RaftConnectionException e) {
-      boolean success = false;
-      while (!success) {
-        PeerId nextNode = null;
-        try {
-          nextNode = RaftUtils.getPeerIDInOrder(groupId);
-          if (holder.equals(nextNode)) {
-            break;
-          }
-          LOGGER
-              .debug("Previous task fail, then send get paths for {} task for group {} to node {}.",
-                  pathList, groupId, nextNode);
-          task.setTargetNode(nextNode);
-          task.resetTask();
-          task.setTaskState(TaskState.INITIAL);
-          res.addAll(queryPaths(task));
-          LOGGER.debug("Get paths for {} task for group {} to node {} succeed.", pathList, groupId,
-              nextNode);
-          success = true;
-        } catch (RaftConnectionException e1) {
-          LOGGER.debug("Get paths for {} task for group {} to node {} fail.", pathList, groupId,
-              nextNode);
-        }
-      }
-      LOGGER.debug("The final result for get paths for {} task is {}", pathList, success);
-      if (!success) {
-        throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
-      }
+      throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
     }
   }
 
-  private List<List<String>> queryTimeSeries(SingleQPTask task)
+  private List<String> queryPaths(SingleQPTask task, List<String> pathList, String groupId)
       throws InterruptedException, RaftConnectionException {
-    BasicResponse response = syncHandleNonQuerySingleTaskGetRes(task, 0);
+    BasicResponse response = syncHandleSingleTaskGetRes(task, 0, "get paths for " + pathList, groupId);
     return response == null ? new ArrayList<>()
-        : ((QueryTimeSeriesResponse) response).getTimeSeries();
-  }
-
-  private TSDataType querySeriesType(SingleQPTask task)
-      throws InterruptedException, RaftConnectionException {
-    BasicResponse response = syncHandleNonQuerySingleTaskGetRes(task, 0);
-    return response == null ? null
-        : ((QuerySeriesTypeResponse) response).getDataType();
+        : ((QueryPathsResponse) response).getPaths();
   }
 
   /**
@@ -516,13 +424,6 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
     return ((QueryStorageGroupResponse) task.getResponse()).getStorageGroups();
   }
 
-  private List<String> queryPaths(SingleQPTask task)
-      throws InterruptedException, RaftConnectionException {
-    BasicResponse response = syncHandleNonQuerySingleTaskGetRes(task, 0);
-    return response == null ? new ArrayList<>()
-        : ((QueryPathsResponse) response).getPaths();
-  }
-
   /**
    * Combine multiple metadata in String format into single String
    *
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java
index b881549..8548879 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java
@@ -28,7 +28,9 @@ import java.util.concurrent.Future;
 import java.util.concurrent.locks.ReentrantLock;
 import org.apache.iotdb.cluster.concurrent.pool.QPTaskThreadManager;
 import org.apache.iotdb.cluster.exception.RaftConnectionException;
+import org.apache.iotdb.cluster.qp.executor.AbstractQPExecutor;
 import org.apache.iotdb.cluster.qp.executor.NonQueryExecutor;
+import org.apache.iotdb.cluster.qp.executor.QueryMetadataExecutor;
 import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
 import org.apache.iotdb.cluster.rpc.raft.response.nonquery.DataGroupNonQueryResponse;
 import org.apache.iotdb.cluster.service.TSServiceClusterImpl.BatchResult;
@@ -66,7 +68,7 @@ public class BatchQPTask extends MultiQPTask {
    */
   private ReentrantLock lock = new ReentrantLock();
 
-  private NonQueryExecutor executor;
+  private AbstractQPExecutor executor;
 
   public BatchQPTask(int taskNum, BatchResult result, Map<String, SingleQPTask> taskMap,
       Map<String, List<Integer>> planIndexMap) {
@@ -90,8 +92,6 @@ public class BatchQPTask extends MultiQPTask {
       String groupId = basicResponse.getGroupId();
       List<Boolean> results = basicResponse.getResults();
       List<Integer> indexList = planIndexMap.get(groupId);
-      List<String> errorMsgList = ((DataGroupNonQueryResponse) basicResponse).getErrorMsgList();
-      int errorMsgIndex = 0;
       for (int i = 0; i < indexList.size(); i++) {
         if (i >= results.size()) {
           resultArray[indexList.get(i)] = Statement.EXECUTE_FAILED;
@@ -101,19 +101,47 @@ public class BatchQPTask extends MultiQPTask {
             resultArray[indexList.get(i)] = Statement.SUCCESS_NO_INFO;
           } else {
             resultArray[indexList.get(i)] = Statement.EXECUTE_FAILED;
-            batchResult.addBatchErrorMessage(indexList.get(i), errorMsgList.get(errorMsgIndex++));
+            batchResult.addBatchErrorMessage(indexList.get(i), basicResponse.getErrorMsg());
           }
         }
       }
       if (!basicResponse.isSuccess()) {
         batchResult.setAllSuccessful(false);
       }
+    } catch (Exception ex) {
+      ex.printStackTrace();
     } finally {
       lock.unlock();
     }
     taskCountDownLatch.countDown();
   }
 
+  public void executeQueryMetadataBy(QueryMetadataExecutor executor, String taskInfo) {
+    this.executor = executor;
+
+    for (Entry<String, SingleQPTask> entry : taskMap.entrySet()) {
+      String groupId = entry.getKey();
+      SingleQPTask subTask = entry.getValue();
+      Future<?> taskThread;
+      taskThread = QPTaskThreadManager.getInstance()
+          .submit(() -> executeRpcSubQueryMetadataTask(subTask, taskInfo, groupId));
+      taskThreadMap.put(groupId, taskThread);
+    }
+  }
+
+  /**
+   * Execute RPC sub task
+   */
+  private void executeRpcSubQueryMetadataTask(SingleQPTask subTask, String taskInfo, String groupId) {
+    try {
+      executor.syncHandleSingleTask(subTask, taskInfo, groupId);
+      this.receive(subTask.getResponse());
+    } catch (RaftConnectionException | InterruptedException e) {
+      LOGGER.error("Async handle sub {} task failed.", taskInfo);
+      this.receive(DataGroupNonQueryResponse.createErrorResponse(groupId, e.getMessage()));
+    }
+  }
+
   public void executeBy(NonQueryExecutor executor) {
     this.executor = executor;
 
@@ -139,7 +167,7 @@ public class BatchQPTask extends MultiQPTask {
    */
   private void executeLocalSubTask(QPTask subTask, String groupId) {
     try {
-      executor.handleNonQueryRequestLocally(groupId, subTask);
+      ((NonQueryExecutor) executor).handleNonQueryRequestLocally(groupId, subTask);
       this.receive(subTask.getResponse());
     } catch (InterruptedException e) {
       LOGGER.error("Handle sub task locally failed.");
@@ -152,7 +180,7 @@ public class BatchQPTask extends MultiQPTask {
    */
   private void executeRpcSubTask(SingleQPTask subTask, String groupId) {
     try {
-      executor.syncHandleNonQueryTask(subTask);
+      executor.syncHandleSingleTask(subTask, "sub non-query", groupId);
       this.receive(subTask.getResponse());
     } catch (RaftConnectionException | InterruptedException e) {
       LOGGER.error("Async handle sub task failed.");
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java b/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java
index 1da5072..7ae40c2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java
@@ -204,13 +204,13 @@ public class TSServiceClusterImpl extends TSServiceImpl {
   /**
    * Present batch results.
    */
-  public class BatchResult {
+  public static class BatchResult {
 
     private boolean isAllSuccessful;
     private StringBuilder batchErrorMessage;
     private int[] resultArray;
 
-    private BatchResult(boolean isAllSuccessful, StringBuilder batchErrorMessage,
+    public BatchResult(boolean isAllSuccessful, StringBuilder batchErrorMessage,
         int[] resultArray) {
       this.isAllSuccessful = isAllSuccessful;
       this.batchErrorMessage = batchErrorMessage;