You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/04/09 02:28:15 UTC

[incubator-iotdb] branch cluster updated: rehandle excepetion during sending task vai client

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/cluster by this push:
     new b5d3fa3  rehandle excepetion during sending task vai client
     new b882b3f  Merge branch 'cluster' of github.com:apache/incubator-iotdb into cluster
b5d3fa3 is described below

commit b5d3fa37b231e85880db37fba548c5a831b6bc49
Author: lta <li...@163.com>
AuthorDate: Tue Apr 9 10:23:37 2019 +0800

    rehandle excepetion during sending task vai client
---
 .../apache/iotdb/cluster/callback/BatchQPTask.java |  5 ++--
 .../org/apache/iotdb/cluster/callback/QPTask.java  |  2 +-
 .../iotdb/cluster/callback/SingleQPTask.java       | 12 ++++++--
 .../apache/iotdb/cluster/config/ClusterConfig.java |  6 ++--
 .../apache/iotdb/cluster/qp/ClusterQPExecutor.java | 24 ++++++++-------
 .../cluster/qp/executor/NonQueryExecutor.java      | 14 +++++----
 .../cluster/qp/executor/QueryMetadataExecutor.java | 35 +++++++++++-----------
 .../cluster/rpc/impl/RaftNodeAsClientManager.java  |  9 ++----
 .../processor/DataGroupNonQueryAsyncProcessor.java |  4 +--
 .../processor/MetaGroupNonQueryAsyncProcessor.java |  4 +--
 .../QueryMetadataInStringAsyncProcessor.java       |  7 ++---
 .../processor/QueryTimeSeriesAsyncProcessor.java   | 11 ++++---
 .../rpc/response/DataGroupNonQueryResponse.java    |  6 ++--
 .../rpc/response/MetaGroupNonQueryResponse.java    |  4 +--
 .../response/QueryMetadataInStringResponse.java    |  4 +--
 .../rpc/response/QueryStorageGroupResponse.java    |  4 +--
 .../rpc/response/QueryTimeSeriesResponse.java      |  7 ++---
 .../org/apache/iotdb/cluster/utils/RaftUtils.java  |  5 ++--
 iotdb/iotdb/conf/iotdb-cluster.properties          |  6 ++--
 19 files changed, 86 insertions(+), 83 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/callback/BatchQPTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/callback/BatchQPTask.java
index bd8a073..491848e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/callback/BatchQPTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/callback/BatchQPTask.java
@@ -25,7 +25,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.locks.ReentrantLock;
-import org.apache.iotdb.cluster.config.ClusterConfig;
 import org.apache.iotdb.cluster.exception.RaftConnectionException;
 import org.apache.iotdb.cluster.qp.executor.NonQueryExecutor;
 import org.apache.iotdb.cluster.rpc.response.BasicResponse;
@@ -141,7 +140,7 @@ public class BatchQPTask extends MultiQPTask {
         this.run(subTask.getResponse());
       } catch (InterruptedException e) {
         LOGGER.error("Handle sub task locally failed.");
-        this.run(DataGroupNonQueryResponse.createErrorInstance(groupId, e.getMessage()));
+        this.run(DataGroupNonQueryResponse.createErrorResponse(groupId, e.getMessage()));
       }
     }
 
@@ -154,7 +153,7 @@ public class BatchQPTask extends MultiQPTask {
         this.run(subTask.getResponse());
       } catch (RaftConnectionException | InterruptedException e) {
         LOGGER.error("Async handle sub task failed.");
-        this.run(DataGroupNonQueryResponse.createErrorInstance(groupId, e.getMessage()));
+        this.run(DataGroupNonQueryResponse.createErrorResponse(groupId, e.getMessage()));
       }
     }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/callback/QPTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/callback/QPTask.java
index cdb1325..04b74ab 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/callback/QPTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/callback/QPTask.java
@@ -126,7 +126,7 @@ public abstract class QPTask {
   }
 
   public enum TaskType {
-    SINGLE, BATCH, DELETE
+    SINGLE, BATCH
   }
 
   /**
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/callback/SingleQPTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/callback/SingleQPTask.java
index feefcd8..f733920 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/callback/SingleQPTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/callback/SingleQPTask.java
@@ -20,12 +20,16 @@ package org.apache.iotdb.cluster.callback;
 
 import org.apache.iotdb.cluster.rpc.request.BasicRequest;
 import org.apache.iotdb.cluster.rpc.response.BasicResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Process single task.
  */
 public class SingleQPTask extends QPTask {
 
+  private static final Logger LOGGER = LoggerFactory.getLogger(SingleQPTask.class);
+
   private static final int TASK_NUM = 1;
 
   public SingleQPTask(boolean isSyncTask, BasicRequest request) {
@@ -38,11 +42,13 @@ public class SingleQPTask extends QPTask {
    */
   @Override
   public void run(BasicResponse response) {
-    if(response != null) {
+    if(taskState != TaskState.EXCEPTION) {
       this.response = response;
-      if (response.isRedirected()) {
+      if(response == null){
+        LOGGER.error("Response is null");
+      } else if (response.isRedirected()) {
         this.taskState = TaskState.REDIRECT;
-      } else if (taskState != TaskState.EXCEPTION) {
+      } else {
         this.taskState = TaskState.FINISH;
       }
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index 40a20ef..d1d4e90 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -95,7 +95,7 @@ public class ClusterConfig {
   /**
    * Count limit to redo a single task
    **/
-  private int taskRedoCount = 3;
+  private int taskRedoCount = 10;
   /**
    * Timeout limit for a single task, the unit is milliseconds
    **/
@@ -109,13 +109,13 @@ public class ClusterConfig {
   /**
    * Max number of @NodeAsClient usage
    */
-  private int maxNumOfInnerRpcClient = 50;
+  private int maxNumOfInnerRpcClient = 500;
 
   /**
    * Max number of queue length to use @NodeAsClient, the request which exceed to this
    * number will be rejected.
    */
-  private int maxQueueNumOfInnerRpcClient = 50;
+  private int maxQueueNumOfInnerRpcClient = 500;
 
   /**
    * ReadMetadataConsistencyLevel: 1 Strong consistency, 2 Weak consistency
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
index 49db9cb..90cd1e1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
@@ -24,7 +24,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.iotdb.cluster.callback.QPTask;
 import org.apache.iotdb.cluster.callback.QPTask.TaskState;
 import org.apache.iotdb.cluster.config.ClusterConfig;
@@ -84,12 +83,12 @@ public abstract class ClusterQPExecutor {
   /**
    * ReadDataConsistencyLevel: 1 Strong consistency, 2 Weak consistency
    */
-  protected int readDataConsistencyLevel = CLUSTER_CONFIG.getReadDataConsistencyLevel();
+  private int readDataConsistencyLevel = CLUSTER_CONFIG.getReadDataConsistencyLevel();
 
   /**
    * Get Storage Group Name by device name
    */
-  public String getStroageGroupByDevice(String device) throws PathErrorException {
+  protected String getStroageGroupByDevice(String device) throws PathErrorException {
     String storageGroup;
     try {
       storageGroup = MManager.getInstance().getFileNameByPath(device);
@@ -117,7 +116,7 @@ public abstract class ClusterQPExecutor {
    *
    * @return key is groupId, value is all SGs belong to this data group
    */
-  public Map<String, Set<String>> classifySGByGroupId(List<String> sgList) {
+  protected Map<String, Set<String>> classifySGByGroupId(List<String> sgList) {
     Map<String, Set<String>> map = new HashMap<>();
     for (int i = 0; i < sgList.size(); i++) {
       String sg = sgList.get(i);
@@ -136,12 +135,12 @@ public abstract class ClusterQPExecutor {
   /**
    * Get raft group id by storage group name
    */
-  public String getGroupIdBySG(String storageGroup) {
+  protected String getGroupIdBySG(String storageGroup) {
     return router.getGroupID(router.routeGroup(storageGroup));
   }
 
   /**
-   * Verify if the non query command can execute in local. 1. If this node belongs to the storage
+   * Check if the non query command can execute in local. 1. If this node belongs to the storage
    * group 2. If this node is leader.
    */
   public boolean canHandleNonQueryByGroupId(String groupId) {
@@ -158,9 +157,9 @@ public abstract class ClusterQPExecutor {
   }
 
   /**
-   * Verify if the query command can execute in local. Check if this node belongs to the group id
+   * Check if the query command can execute in local. Check if this node belongs to the group id
    */
-  public boolean canHandleQueryByGroupId(String groupId) {
+  protected boolean canHandleQueryByGroupId(String groupId) {
     return router.containPhysicalNodeByGroupId(groupId, localNode);
   }
 
@@ -172,7 +171,8 @@ public abstract class ClusterQPExecutor {
    * @param taskRetryNum Number of QPTask retries due to timeout and redirected.
    * @return basic response
    */
-  public BasicResponse asyncHandleNonQueryTaskGetRes(QPTask task, PeerId leader, int taskRetryNum)
+  protected BasicResponse asyncHandleNonQueryTaskGetRes(QPTask task, PeerId leader,
+      int taskRetryNum)
       throws InterruptedException, RaftConnectionException {
     asyncSendNonQueryTask(task, leader, taskRetryNum);
     return asyncGetNonQueryRes(task, leader, taskRetryNum);
@@ -211,13 +211,15 @@ public abstract class ClusterQPExecutor {
   }
 
   /**
-   * Asynchronous get task response. If it's redirected, the task needs to be resent.
+   * Asynchronous get task response. If it's redirected or status is exception, the task needs to be
+   * resent. Note: If status is Exception, it marks that an exception occurred during the task is
+   * being sent instead of executed.
    *
    * @param task rpc task
    * @param leader leader node of the group
    * @param taskRetryNum Retry time of the task
    */
-  public BasicResponse asyncGetNonQueryRes(QPTask task, PeerId leader, int taskRetryNum)
+  private BasicResponse asyncGetNonQueryRes(QPTask task, PeerId leader, int taskRetryNum)
       throws InterruptedException, RaftConnectionException {
     task.await();
     if (task.getTaskState() != TaskState.FINISH) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
index 1098ba2..ebaabdb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
@@ -106,7 +106,7 @@ public class NonQueryExecutor extends ClusterQPExecutor {
     Status nullReadTaskStatus = Status.OK();
     RaftUtils.handleNullReadToMetaGroup(nullReadTaskStatus);
 
-    /** 1. Classify physical plan by group id **/
+    /** 1. Classify physical plans by group id **/
     Map<String, List<PhysicalPlan>> physicalPlansMap = new HashMap<>();
     Map<String, List<Integer>> planIndexMap = new HashMap<>();
     for (int i = 0; i < result.length; i++) {
@@ -117,8 +117,10 @@ public class NonQueryExecutor extends ClusterQPExecutor {
           String groupId = getGroupIdFromPhysicalPlan(plan);
           if (groupId.equals(ClusterConfig.METADATA_GROUP_ID)) {
             LOGGER.debug("Execute metadata group task");
-            result[i] = handleNonQueryRequest(groupId, plan) ? Statement.SUCCESS_NO_INFO
+            boolean executeResult = handleNonQueryRequest(groupId, plan);
+            result[i] =  executeResult ? Statement.SUCCESS_NO_INFO
                 : Statement.EXECUTE_FAILED;
+            batchResult.setAllSuccessful(executeResult);
           }else {
             if (!physicalPlansMap.containsKey(groupId)) {
               physicalPlansMap.put(groupId, new ArrayList<>());
@@ -157,7 +159,7 @@ public class NonQueryExecutor extends ClusterQPExecutor {
       }
     }
 
-    /** 3. Execute Multiple Tasks **/
+    /** 3. Execute Multiple Sub Tasks **/
     BatchQPTask task = new BatchQPTask(subTaskMap.size(), batchResult, subTaskMap, planIndexMap);
     currentTask = task;
     task.execute(this);
@@ -169,7 +171,7 @@ public class NonQueryExecutor extends ClusterQPExecutor {
   /**
    * Get group id from physical plan
    */
-  public String getGroupIdFromPhysicalPlan(PhysicalPlan plan)
+  private String getGroupIdFromPhysicalPlan(PhysicalPlan plan)
       throws PathErrorException, ProcessorException {
     String storageGroup;
     String groupId;
@@ -315,11 +317,11 @@ public class NonQueryExecutor extends ClusterQPExecutor {
     BasicResponse response;
     RaftService service;
     if (groupId.equals(ClusterConfig.METADATA_GROUP_ID)) {
-      response = MetaGroupNonQueryResponse.createEmptyInstance(groupId);
+      response = MetaGroupNonQueryResponse.createEmptyResponse(groupId);
       MetadataRaftHolder metadataRaftHolder = RaftUtils.getMetadataRaftHolder();
       service = (RaftService) metadataRaftHolder.getService();
     } else {
-      response = DataGroupNonQueryResponse.createEmptyInstance(groupId);
+      response = DataGroupNonQueryResponse.createEmptyResponse(groupId);
       DataPartitionRaftHolder dataRaftHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
       service = (RaftService) dataRaftHolder.getService();
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
index f138c12..42fb66c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
@@ -167,12 +167,12 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
     for (int i = 0; i < taskList.size(); i++) {
       SingleQPTask task = taskList.get(i);
       task.await();
-      QueryMetadataInStringResponse response = (QueryMetadataInStringResponse) task.getResponse();
-      if (!response.isSuccess()) {
+      BasicResponse response = task.getResponse();
+      if (response == null || !response.isSuccess()) {
         LOGGER.error("Execute show timeseries statement false.");
         throw new ProcessorException();
       }
-      metadataList.add(response.getMetadata());
+      metadataList.add(((QueryMetadataInStringResponse)response).getMetadata());
     }
     return combineMetadataInStringList(metadataList);
   }
@@ -191,13 +191,13 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
     /** Check consistency level**/
     if (readMetadataConsistencyLevel == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
       QueryTimeSeriesResponse response = QueryTimeSeriesResponse
-          .createEmptyInstance(groupId);
+          .createEmptyResponse(groupId);
       try {
         for (String path : pathList) {
           response.addTimeSeries(mManager.getShowTimeseriesPath(path));
         }
       } catch (final PathErrorException e) {
-        response = QueryTimeSeriesResponse.createErrorInstance(groupId, e.getMessage());
+        response = QueryTimeSeriesResponse.createErrorResponse(groupId, e.getMessage());
       }
       task.run(response);
     } else {
@@ -207,7 +207,7 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
             @Override
             public void run(Status status, long index, byte[] reqCtx) {
               QueryTimeSeriesResponse response = QueryTimeSeriesResponse
-                  .createEmptyInstance(groupId);
+                  .createEmptyResponse(groupId);
               if (status.isOk()) {
                 try {
                   LOGGER.debug("start to read");
@@ -215,11 +215,11 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
                     response.addTimeSeries(mManager.getShowTimeseriesPath(path));
                   }
                 } catch (final PathErrorException e) {
-                  response = QueryTimeSeriesResponse.createErrorInstance(groupId, e.getMessage());
+                  response = QueryTimeSeriesResponse.createErrorResponse(groupId, e.getMessage());
                 }
               } else {
                 response = QueryTimeSeriesResponse
-                    .createErrorInstance(groupId, status.getErrorMsg());
+                    .createErrorResponse(groupId, status.getErrorMsg());
               }
               task.run(response);
             }
@@ -237,7 +237,8 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
   private List<List<String>> queryTimeSeries(SingleQPTask task, PeerId leader)
       throws InterruptedException, RaftConnectionException {
     BasicResponse response = asyncHandleNonQueryTaskGetRes(task, leader, 0);
-    return ((QueryTimeSeriesResponse) response).getTimeSeries();
+    return response == null ? new ArrayList<>()
+        : ((QueryTimeSeriesResponse) response).getTimeSeries();
   }
 
   /**
@@ -255,9 +256,9 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
       QueryStorageGroupResponse response;
       try {
         response = QueryStorageGroupResponse
-            .createSuccessInstance(metadataHolder.getFsm().getAllStorageGroups());
+            .createSuccessResponse(metadataHolder.getFsm().getAllStorageGroups());
       } catch (final PathErrorException e) {
-        response = QueryStorageGroupResponse.createErrorInstance(e.getMessage());
+        response = QueryStorageGroupResponse.createErrorResponse(e.getMessage());
       }
       task.run(response);
     } else {
@@ -270,12 +271,12 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
               if (status.isOk()) {
                 try {
                   response = QueryStorageGroupResponse
-                      .createSuccessInstance(metadataHolder.getFsm().getAllStorageGroups());
+                      .createSuccessResponse(metadataHolder.getFsm().getAllStorageGroups());
                 } catch (final PathErrorException e) {
-                  response = QueryStorageGroupResponse.createErrorInstance(e.getMessage());
+                  response = QueryStorageGroupResponse.createErrorResponse(e.getMessage());
                 }
               } else {
-                response = QueryStorageGroupResponse.createErrorInstance(status.getErrorMsg());
+                response = QueryStorageGroupResponse.createErrorResponse(status.getErrorMsg());
               }
               task.run(response);
             }
@@ -294,7 +295,7 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
         .getDataPartitionHolder(groupId);
     if (readMetadataConsistencyLevel == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
       QueryMetadataInStringResponse response = QueryMetadataInStringResponse
-          .createSuccessInstance(groupId, mManager.getMetadataInString());
+          .createSuccessResponse(groupId, mManager.getMetadataInString());
       response.addResult(true);
       task.run(response);
     } else {
@@ -307,11 +308,11 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
               if (status.isOk()) {
                 LOGGER.debug("start to read");
                 response = QueryMetadataInStringResponse
-                    .createSuccessInstance(groupId, mManager.getMetadataInString());
+                    .createSuccessResponse(groupId, mManager.getMetadataInString());
                 response.addResult(true);
               } else {
                 response = QueryMetadataInStringResponse
-                    .createErrorInstance(groupId, status.getErrorMsg());
+                    .createErrorResponse(groupId, status.getErrorMsg());
                 response.addResult(false);
               }
               task.run(response);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/impl/RaftNodeAsClientManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/impl/RaftNodeAsClientManager.java
index 0033a9f..b827ef0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/impl/RaftNodeAsClientManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/impl/RaftNodeAsClientManager.java
@@ -193,8 +193,7 @@ public class RaftNodeAsClientManager {
                     LOGGER.error("Bolt rpc client occurs errors when handling Request", e);
                     qpTask.setTaskState(TaskState.EXCEPTION);
                     releaseClient();
-                    qpTask.run(DataGroupNonQueryResponse
-                        .createErrorInstance(request.getGroupID(), e.getMessage()));
+                    qpTask.run(null);
                   }
 
                   @Override
@@ -206,8 +205,7 @@ public class RaftNodeAsClientManager {
         LOGGER.error(e.getMessage());
         qpTask.setTaskState(TaskState.EXCEPTION);
         releaseClient();
-        qpTask.run(DataGroupNonQueryResponse
-            .createErrorInstance(request.getGroupID(), e.getMessage()));
+        qpTask.run(null);
         throw new RaftConnectionException(e);
       }
     }
@@ -222,8 +220,7 @@ public class RaftNodeAsClientManager {
         qpTask.run(response);
       } catch (RemotingException | InterruptedException e) {
         qpTask.setTaskState(TaskState.EXCEPTION);
-        qpTask.run(DataGroupNonQueryResponse
-            .createErrorInstance(request.getGroupID(), e.getMessage()));
+        qpTask.run(null);
         throw new RaftConnectionException(e);
       } finally {
         releaseClient();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/DataGroupNonQueryAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/DataGroupNonQueryAsyncProcessor.java
index 7119e53..bc8d7cb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/DataGroupNonQueryAsyncProcessor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/DataGroupNonQueryAsyncProcessor.java
@@ -55,13 +55,13 @@ public class DataGroupNonQueryAsyncProcessor extends
       LOGGER.debug("Request need to redirect leader: {}, groupId : {} ", leader, groupId);
 
       DataGroupNonQueryResponse response = DataGroupNonQueryResponse
-          .createRedirectedInstance(groupId, leader.toString());
+          .createRedirectedResponse(groupId, leader.toString());
       asyncContext.sendResponse(response);
     } else {
       LOGGER.debug("Apply task to raft node");
 
       /** Apply Task to Raft Node **/
-      BasicResponse response = DataGroupNonQueryResponse.createEmptyInstance(groupId);
+      BasicResponse response = DataGroupNonQueryResponse.createEmptyResponse(groupId);
       RaftService service = (RaftService) dataPartitionRaftHolder.getService();
       RaftUtils.executeRaftTaskForRpcProcessor(service, asyncContext, request, response);
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/MetaGroupNonQueryAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/MetaGroupNonQueryAsyncProcessor.java
index 95d58c7..f38897e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/MetaGroupNonQueryAsyncProcessor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/MetaGroupNonQueryAsyncProcessor.java
@@ -52,13 +52,13 @@ public class MetaGroupNonQueryAsyncProcessor extends
       LOGGER.debug("Request need to redirect leader: {}, groupId : {} ", leader, groupId);
 
       MetaGroupNonQueryResponse response = MetaGroupNonQueryResponse
-          .createRedirectedInstance(groupId, leader.toString());
+          .createRedirectedResponse(groupId, leader.toString());
       asyncContext.sendResponse(response);
     } else {
       LOGGER.debug("Apply task to metadata raft node");
 
       /** Apply Task to Raft Node **/
-      BasicResponse response = MetaGroupNonQueryResponse.createEmptyInstance(groupId);
+      BasicResponse response = MetaGroupNonQueryResponse.createEmptyResponse(groupId);
       RaftService service = (RaftService) metadataHolder.getService();
       RaftUtils.executeRaftTaskForRpcProcessor(service, asyncContext, request, response);
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryMetadataInStringAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryMetadataInStringAsyncProcessor.java
index ae9ab2e..ceeea87 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryMetadataInStringAsyncProcessor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryMetadataInStringAsyncProcessor.java
@@ -22,7 +22,6 @@ import com.alipay.remoting.AsyncContext;
 import com.alipay.remoting.BizContext;
 import com.alipay.sofa.jraft.Status;
 import com.alipay.sofa.jraft.closure.ReadIndexClosure;
-import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
 import org.apache.iotdb.cluster.entity.raft.RaftService;
@@ -45,7 +44,7 @@ public class QueryMetadataInStringAsyncProcessor extends
 
     if (request.getReadConsistencyLevel() == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
       QueryMetadataInStringResponse response = QueryMetadataInStringResponse
-          .createSuccessInstance(groupId, mManager.getMetadataInString());
+          .createSuccessResponse(groupId, mManager.getMetadataInString());
       response.addResult(true);
       asyncContext.sendResponse(response);
     } else {
@@ -57,11 +56,11 @@ public class QueryMetadataInStringAsyncProcessor extends
               QueryMetadataInStringResponse response;
               if (status.isOk()) {
                 response = QueryMetadataInStringResponse
-                    .createSuccessInstance(groupId, mManager.getMetadataInString());
+                    .createSuccessResponse(groupId, mManager.getMetadataInString());
                 response.addResult(true);
               } else {
                 response = QueryMetadataInStringResponse
-                    .createErrorInstance(groupId, status.getErrorMsg());
+                    .createErrorResponse(groupId, status.getErrorMsg());
                 response.addResult(false);
               }
               asyncContext.sendResponse(response);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryTimeSeriesAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryTimeSeriesAsyncProcessor.java
index 431933f..e564e57 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryTimeSeriesAsyncProcessor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryTimeSeriesAsyncProcessor.java
@@ -22,7 +22,6 @@ import com.alipay.remoting.AsyncContext;
 import com.alipay.remoting.BizContext;
 import com.alipay.sofa.jraft.Status;
 import com.alipay.sofa.jraft.closure.ReadIndexClosure;
-import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
 import org.apache.iotdb.cluster.entity.raft.RaftService;
@@ -46,11 +45,11 @@ public class QueryTimeSeriesAsyncProcessor extends BasicAsyncUserProcessor<Query
 
     if (request.getReadConsistencyLevel() == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
       QueryTimeSeriesResponse response = QueryTimeSeriesResponse
-          .createEmptyInstance(groupId);
+          .createEmptyResponse(groupId);
       try {
         queryTimeSeries(request, response);
       } catch (final PathErrorException e) {
-        response = QueryTimeSeriesResponse.createErrorInstance(groupId, e.getMessage());
+        response = QueryTimeSeriesResponse.createErrorResponse(groupId, e.getMessage());
       }
       asyncContext.sendResponse(response);
     } else {
@@ -60,16 +59,16 @@ public class QueryTimeSeriesAsyncProcessor extends BasicAsyncUserProcessor<Query
             @Override
             public void run(Status status, long index, byte[] reqCtx) {
               QueryTimeSeriesResponse response = QueryTimeSeriesResponse
-                  .createEmptyInstance(groupId);
+                  .createEmptyResponse(groupId);
               if (status.isOk()) {
                 try {
                   queryTimeSeries(request, response);
                 } catch (final PathErrorException e) {
-                  response = QueryTimeSeriesResponse.createErrorInstance(groupId, e.getMessage());
+                  response = QueryTimeSeriesResponse.createErrorResponse(groupId, e.getMessage());
                 }
               } else {
                 response = QueryTimeSeriesResponse
-                    .createErrorInstance(groupId, status.getErrorMsg());
+                    .createErrorResponse(groupId, status.getErrorMsg());
               }
               asyncContext.sendResponse(response);
             }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/DataGroupNonQueryResponse.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/DataGroupNonQueryResponse.java
index e63634d..505cc11 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/DataGroupNonQueryResponse.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/DataGroupNonQueryResponse.java
@@ -28,15 +28,15 @@ public class DataGroupNonQueryResponse extends BasicResponse {
     super(groupId, redirected, leaderStr, errorMsg);
   }
 
-  public static DataGroupNonQueryResponse createRedirectedInstance(String groupId, String leaderStr) {
+  public static DataGroupNonQueryResponse createRedirectedResponse(String groupId, String leaderStr) {
     return new DataGroupNonQueryResponse(groupId, true, leaderStr, null);
   }
 
-  public static DataGroupNonQueryResponse createEmptyInstance(String groupId) {
+  public static DataGroupNonQueryResponse createEmptyResponse(String groupId) {
     return new DataGroupNonQueryResponse(groupId, false, null, null);
   }
 
-  public static DataGroupNonQueryResponse createErrorInstance(String groupId, String errorMsg) {
+  public static DataGroupNonQueryResponse createErrorResponse(String groupId, String errorMsg) {
     return new DataGroupNonQueryResponse(groupId, false, null, errorMsg);
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/MetaGroupNonQueryResponse.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/MetaGroupNonQueryResponse.java
index ea8561f..131f7c1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/MetaGroupNonQueryResponse.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/MetaGroupNonQueryResponse.java
@@ -28,11 +28,11 @@ public class MetaGroupNonQueryResponse extends BasicResponse {
     super(groupId, redirected, leaderStr, errorMsg);
   }
 
-  public static MetaGroupNonQueryResponse createRedirectedInstance(String groupId, String leaderStr) {
+  public static MetaGroupNonQueryResponse createRedirectedResponse(String groupId, String leaderStr) {
     return new MetaGroupNonQueryResponse(groupId, true, leaderStr, null);
   }
 
-  public static MetaGroupNonQueryResponse createEmptyInstance(String groupId) {
+  public static MetaGroupNonQueryResponse createEmptyResponse(String groupId) {
     return new MetaGroupNonQueryResponse(groupId, false, null, null);
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/QueryMetadataInStringResponse.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/QueryMetadataInStringResponse.java
index 5a291ef..ab5137b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/QueryMetadataInStringResponse.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/QueryMetadataInStringResponse.java
@@ -27,7 +27,7 @@ public class QueryMetadataInStringResponse extends BasicResponse {
     super(groupId, redirected, leaderStr, errorMsg);
   }
 
-  public static QueryMetadataInStringResponse createSuccessInstance(String groupId,
+  public static QueryMetadataInStringResponse createSuccessResponse(String groupId,
       String metadata) {
     QueryMetadataInStringResponse response = new QueryMetadataInStringResponse(groupId, false, null,
         null);
@@ -35,7 +35,7 @@ public class QueryMetadataInStringResponse extends BasicResponse {
     return response;
   }
 
-  public static QueryMetadataInStringResponse createErrorInstance(String groupId, String errorMsg) {
+  public static QueryMetadataInStringResponse createErrorResponse(String groupId, String errorMsg) {
     return new QueryMetadataInStringResponse(groupId, false, null, errorMsg);
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/QueryStorageGroupResponse.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/QueryStorageGroupResponse.java
index 788d3f4..d9c2061 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/QueryStorageGroupResponse.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/QueryStorageGroupResponse.java
@@ -29,13 +29,13 @@ public class QueryStorageGroupResponse extends BasicResponse {
     this.addResult(success);
   }
 
-  public static QueryStorageGroupResponse createSuccessInstance(Set<String> storageGroups) {
+  public static QueryStorageGroupResponse createSuccessResponse(Set<String> storageGroups) {
     QueryStorageGroupResponse response = new QueryStorageGroupResponse(true, null, null);
     response.setStorageGroups(storageGroups);
     return response;
   }
 
-  public static QueryStorageGroupResponse createErrorInstance(String errorMsg) {
+  public static QueryStorageGroupResponse createErrorResponse(String errorMsg) {
     return new QueryStorageGroupResponse(false, null, errorMsg);
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/QueryTimeSeriesResponse.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/QueryTimeSeriesResponse.java
index b0ca82b..ed42eea 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/QueryTimeSeriesResponse.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/QueryTimeSeriesResponse.java
@@ -31,11 +31,11 @@ public class QueryTimeSeriesResponse extends BasicResponse {
     timeSeries = new ArrayList<>();
   }
 
-  public static QueryTimeSeriesResponse createEmptyInstance(String groupId){
+  public static QueryTimeSeriesResponse createEmptyResponse(String groupId){
     return new QueryTimeSeriesResponse(groupId, false, true, null, null);
   }
 
-  public static QueryTimeSeriesResponse createErrorInstance(String groupId, String errorMsg) {
+  public static QueryTimeSeriesResponse createErrorResponse(String groupId, String errorMsg) {
     return new QueryTimeSeriesResponse(groupId, false, false, null, errorMsg);
   }
 
@@ -47,7 +47,4 @@ public class QueryTimeSeriesResponse extends BasicResponse {
     this.timeSeries.addAll(timeSeries);
   }
 
-  public void setTimeSeries(List<List<String>> timeSeries) {
-    this.timeSeries = timeSeries;
-  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
index 3838938..08c252a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.cluster.entity.raft.RaftService;
 import org.apache.iotdb.cluster.rpc.closure.ResponseClosure;
 import org.apache.iotdb.cluster.rpc.request.BasicRequest;
 import org.apache.iotdb.cluster.rpc.response.BasicResponse;
+import org.apache.iotdb.cluster.rpc.response.MetaGroupNonQueryResponse;
 import org.apache.iotdb.cluster.utils.hash.PhysicalNode;
 import org.apache.iotdb.cluster.utils.hash.Router;
 import org.slf4j.Logger;
@@ -262,8 +263,8 @@ public class RaftUtils {
           .readIndex(reqContext, new ReadIndexClosure() {
             @Override
             public void run(Status status, long index, byte[] reqCtx) {
-              BasicResponse response = new BasicResponse(null, false, null, null) {
-              };
+              BasicResponse response = MetaGroupNonQueryResponse
+                  .createEmptyResponse(ClusterConfig.METADATA_GROUP_ID);
               if (!status.isOk()) {
                 status.setCode(-1);
                 status.setErrorMsg(status.getErrorMsg());
diff --git a/iotdb/iotdb/conf/iotdb-cluster.properties b/iotdb/iotdb/conf/iotdb-cluster.properties
index dab4c7c..06ba43f 100644
--- a/iotdb/iotdb/conf/iotdb-cluster.properties
+++ b/iotdb/iotdb/conf/iotdb-cluster.properties
@@ -51,7 +51,7 @@ delay_snapshot = false
 delay_hours = 2
 
 # Count limit to redo a single task
-task_redo_count = 3
+task_redo_count = 10
 
 # Timeout limit for a single task, the unit is milliseconds
 task_timeout_ms = 1000
@@ -60,11 +60,11 @@ task_timeout_ms = 1000
 num_of_virtual_nodes = 2
 
 # Max number of use inner rpc client
-max_num_of_inner_rpc_client = 50
+max_num_of_inner_rpc_client = 500
 
 # Max number of queue length to use inner rpc client, the request which exceed to this
 # number will be rejected.
-max_queue_num_of_inner_rpc_client = 50
+max_queue_num_of_inner_rpc_client = 500
 
 # ReadMetadataConsistencyLevel: 1  Strong consistency, 2  Weak consistency
 read_metadata_consistency_level = 1