You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/11/30 01:56:07 UTC

[iotdb] branch cluster_new updated: fix many bugs and imporve performance

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch cluster_new
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/cluster_new by this push:
     new 57cfab3  fix many bugs and imporve performance
     new 65d7209  Merge pull request #2130 from LebronAl/cluster_new_fix_many_bugs
57cfab3 is described below

commit 57cfab39cb5b4c42a85e550ee9b9e2a0ee9aec7f
Author: LebronAl <TX...@gmail.com>
AuthorDate: Thu Nov 26 20:12:57 2020 +0800

    fix many bugs and imporve performance
---
 .../iotdb/cluster/log/applier/BaseApplier.java     |   2 +-
 .../iotdb/cluster/log/applier/DataLogApplier.java  |   9 +-
 .../iotdb/cluster/log/snapshot/FileSnapshot.java   |   2 +-
 .../cluster/log/snapshot/PartitionedSnapshot.java  |   2 +-
 .../apache/iotdb/cluster/metadata/CMManager.java   |  29 +++-
 .../iotdb/cluster/query/ClusterPlanExecutor.java   |  14 +-
 .../iotdb/cluster/query/LocalQueryExecutor.java    |  22 +--
 .../cluster/query/aggregate/ClusterAggregator.java |   2 +-
 .../cluster/query/fill/ClusterPreviousFill.java    |   2 +-
 .../query/last/ClusterLastQueryExecutor.java       |   4 +-
 .../cluster/query/reader/ClusterReaderFactory.java |  12 +-
 .../iotdb/cluster/query/reader/DataSourceInfo.java |  57 +++++--
 .../iotdb/cluster/server/DataClusterServer.java    |   2 +-
 .../org/apache/iotdb/cluster/server/Timer.java     |  15 +-
 .../handlers/caller/AppendNodeEntryHandler.java    |  15 +-
 .../cluster/server/member/MetaGroupMember.java     |  45 ++++--
 .../iotdb/cluster/server/member/RaftMember.java    |  51 +++---
 .../cluster/server/service/DataAsyncService.java   |   8 +-
 .../cluster/server/service/DataSyncService.java    |   8 +-
 .../apache/iotdb/cluster/utils/ClusterUtils.java   |   2 +-
 .../cluster/log/snapshot/DataSnapshotTest.java     |   2 +-
 .../iotdb/cluster/server/member/MemberTest.java    |  56 +++++--
 .../cluster/server/member/MetaGroupMemberTest.java |   3 +
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  54 ++++---
 .../org/apache/iotdb/rpc/RedirectException.java    |  13 ++
 .../main/java/org/apache/iotdb/rpc/RpcUtils.java   |  27 +++-
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   1 +
 .../java/org/apache/iotdb/session/Session.java     | 178 ++++++++++++---------
 .../apache/iotdb/session/SessionConnection.java    |   5 +-
 29 files changed, 414 insertions(+), 228 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java
index ac8db78..75cef69 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java
@@ -87,7 +87,7 @@ abstract class BaseApplier implements LogApplier {
   private void executeAfterSync(PhysicalPlan plan)
       throws QueryProcessException, StorageGroupNotSetException, StorageEngineException {
     try {
-      metaGroupMember.syncLeaderWithConsistencyCheck();
+      metaGroupMember.syncLeaderWithConsistencyCheck(true);
     } catch (CheckConsistencyException ce) {
       throw new QueryProcessException(ce.getMessage());
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
index 108007c..8ce84b5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
@@ -27,8 +27,10 @@ import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
 import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
 import org.apache.iotdb.cluster.server.member.DataGroupMember;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
+import org.apache.iotdb.cluster.utils.IOUtils;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
@@ -76,7 +78,10 @@ public class DataLogApplier extends BaseApplier {
         logger.error("Unsupported log: {}", log);
       }
     } catch (Exception e) {
-      logger.debug("Exception occurred when applying {}", log, e);
+      Throwable rootCause = IOUtils.getRootCause(e);
+      if (!(rootCause instanceof PathNotExistException)) {
+        logger.debug("Exception occurred when applying {}", log, e);
+      }
       log.setException(e);
     } finally {
       log.setApplied(true);
@@ -94,7 +99,7 @@ public class DataLogApplier extends BaseApplier {
       // the sg may not exist because the node does not catch up with the leader, retry after
       // synchronization
       try {
-        metaGroupMember.syncLeaderWithConsistencyCheck();
+        metaGroupMember.syncLeaderWithConsistencyCheck(true);
       } catch (CheckConsistencyException ce) {
         throw new QueryProcessException(ce.getMessage());
       }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java
index cd595ed..c5f75e9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java
@@ -217,7 +217,7 @@ public class FileSnapshot extends Snapshot implements TimeseriesSchemaSnapshot {
         throws SnapshotInstallationException {
       // ensure StorageGroups are synchronized
       try {
-        dataGroupMember.getMetaGroupMember().syncLeaderWithConsistencyCheck();
+        dataGroupMember.getMetaGroupMember().syncLeaderWithConsistencyCheck(true);
       } catch (CheckConsistencyException e) {
         throw new SnapshotInstallationException(e);
       }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshot.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshot.java
index d88bec5..a0eff88 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshot.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshot.java
@@ -193,7 +193,7 @@ public class PartitionedSnapshot<T extends Snapshot> extends Snapshot {
       }
       // ensure storage groups are synchronized
       try {
-        dataGroupMember.getMetaGroupMember().syncLeaderWithConsistencyCheck();
+        dataGroupMember.getMetaGroupMember().syncLeaderWithConsistencyCheck(true);
       } catch (CheckConsistencyException e) {
         throw new SnapshotInstallationException(e);
       }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
index 5aebe7e..ac1a9bb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
@@ -70,6 +70,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.metadata.PathNotExistException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.metadata.MetaUtils;
 import org.apache.iotdb.db.metadata.PartialPath;
@@ -547,9 +548,10 @@ public class CMManager extends MManager {
         return false;
       }
       if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() &&
-          result.getCode() != TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode()) {
-        logger.error("{} failed to execute create timeseries {}", metaGroupMember.getThisNode(),
-            seriesPath);
+          result.getCode() != TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode() &&
+          result.getCode() != TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
+        logger.error("{} failed to execute create timeseries {}: {}", metaGroupMember.getThisNode(),
+            seriesPath, result);
         return false;
       }
     }
@@ -611,7 +613,8 @@ public class CMManager extends MManager {
   public void pullTimeSeriesSchemas(List<PartialPath> prefixPaths,
       Node ignoredGroup)
       throws MetadataException {
-    logger.debug("{}: Pulling timeseries schemas of {}", metaGroupMember.getName(), prefixPaths);
+    logger.debug("{}: Pulling timeseries schemas of {}, ignored group {}",
+        metaGroupMember.getName(), prefixPaths, ignoredGroup);
     // split the paths by the data groups that should hold them
     Map<PartitionGroup, List<String>> partitionGroupPathMap = new HashMap<>();
     for (PartialPath prefixPath : prefixPaths) {
@@ -843,7 +846,7 @@ public class CMManager extends MManager {
   public Set<PartialPath> getMatchedDevices(PartialPath originPath) throws MetadataException {
     // make sure this node knows all storage groups
     try {
-      metaGroupMember.syncLeaderWithConsistencyCheck();
+      metaGroupMember.syncLeaderWithConsistencyCheck(false);
     } catch (CheckConsistencyException e) {
       throw new MetadataException(e);
     }
@@ -1078,7 +1081,7 @@ public class CMManager extends MManager {
       int limit, int offset) throws MetadataException {
     // make sure this node knows all storage groups
     try {
-      metaGroupMember.syncLeaderWithConsistencyCheck();
+      metaGroupMember.syncLeaderWithConsistencyCheck(false);
     } catch (CheckConsistencyException e) {
       throw new MetadataException(e);
     }
@@ -1116,7 +1119,7 @@ public class CMManager extends MManager {
   public List<PartialPath> getMatchedPaths(PartialPath originPath) throws MetadataException {
     // make sure this node knows all storage groups
     try {
-      metaGroupMember.syncLeaderWithConsistencyCheck();
+      metaGroupMember.syncLeaderWithConsistencyCheck(false);
     } catch (CheckConsistencyException e) {
       throw new MetadataException(e);
     }
@@ -1322,7 +1325,7 @@ public class CMManager extends MManager {
       throws CheckConsistencyException, MetadataException {
     Node header = group.getHeader();
     DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(header);
-    localDataMember.syncLeaderWithConsistencyCheck();
+    localDataMember.syncLeaderWithConsistencyCheck(false);
     try {
       List<ShowTimeSeriesResult> localResult = super.showTimeseries(plan, context);
       resultSet.addAll(localResult);
@@ -1412,4 +1415,14 @@ public class CMManager extends MManager {
     getAllPathsResult.setAliasList(alias);
     return getAllPathsResult;
   }
+
+  @Override
+  public PartialPath getStorageGroupPath(PartialPath path) throws StorageGroupNotSetException {
+    try {
+      return super.getStorageGroupPath(path);
+    } catch (StorageGroupNotSetException e) {
+      metaGroupMember.syncLeader();
+      return super.getStorageGroupPath(path);
+    }
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
index c7f7b36..8b7116b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
@@ -57,13 +57,11 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
-import org.apache.iotdb.db.metadata.mnode.MNode;
 import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
 import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
 import org.apache.iotdb.db.qp.physical.sys.LoadConfigurationPlan;
@@ -101,14 +99,14 @@ public class ClusterPlanExecutor extends PlanExecutor {
       return processDataQuery((QueryPlan) queryPlan, context);
     } else if (queryPlan instanceof ShowPlan) {
       try {
-        metaGroupMember.syncLeaderWithConsistencyCheck();
+        metaGroupMember.syncLeaderWithConsistencyCheck(false);
       } catch (CheckConsistencyException e) {
         throw new QueryProcessException(e.getMessage());
       }
       return processShowQuery((ShowPlan) queryPlan, context);
     } else if (queryPlan instanceof AuthorPlan) {
       try {
-        metaGroupMember.syncLeaderWithConsistencyCheck();
+        metaGroupMember.syncLeaderWithConsistencyCheck(false);
       } catch (CheckConsistencyException e) {
         throw new QueryProcessException(e.getMessage());
       }
@@ -132,7 +130,7 @@ public class ClusterPlanExecutor extends PlanExecutor {
   protected int getNodesNumInGivenLevel(PartialPath path, int level) throws MetadataException {
     // make sure this node knows all storage groups
     try {
-      metaGroupMember.syncLeaderWithConsistencyCheck();
+      metaGroupMember.syncLeaderWithConsistencyCheck(false);
     } catch (CheckConsistencyException e) {
       throw new MetadataException(e);
     }
@@ -180,7 +178,7 @@ public class ClusterPlanExecutor extends PlanExecutor {
         // this node is a member of the group, perform a local query after synchronizing with the
         // leader
         metaGroupMember.getLocalDataMember(partitionGroup.getHeader())
-            .syncLeaderWithConsistencyCheck();
+            .syncLeaderWithConsistencyCheck(false);
         int localResult = getLocalPathCount(pathUnderSG, level);
         logger.debug("{}: get path count of {} locally, result {}", metaGroupMember.getName(),
             partitionGroup, localResult);
@@ -341,7 +339,7 @@ public class ClusterPlanExecutor extends PlanExecutor {
       int level) throws CheckConsistencyException, MetadataException {
     Node header = group.getHeader();
     DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(header);
-    localDataMember.syncLeaderWithConsistencyCheck();
+    localDataMember.syncLeaderWithConsistencyCheck(false);
     try {
       return IoTDB.metaManager.getNodesList(schemaPattern, level,
           new SlotSgFilter(
@@ -449,7 +447,7 @@ public class ClusterPlanExecutor extends PlanExecutor {
       throws CheckConsistencyException {
     Node header = group.getHeader();
     DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(header);
-    localDataMember.syncLeaderWithConsistencyCheck();
+    localDataMember.syncLeaderWithConsistencyCheck(false);
     try {
       return IoTDB.metaManager.getChildNodePathInNextLevel(path);
     } catch (MetadataException e) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
index 77c701e..3f3880e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
@@ -160,7 +160,7 @@ public class LocalQueryExecutor {
       throws CheckConsistencyException, QueryProcessException, StorageEngineException, IOException {
     logger.debug("{}: {} is querying {}, queryId: {}", name, request.getRequester(),
         request.getPath(), request.getQueryId());
-    dataGroupMember.syncLeaderWithConsistencyCheck();
+    dataGroupMember.syncLeaderWithConsistencyCheck(false);
 
     PartialPath path = null;
     try {
@@ -218,7 +218,7 @@ public class LocalQueryExecutor {
       throws CheckConsistencyException, MetadataException {
     // try to synchronize with the leader first in case that some schema logs are accepted but
     // not committed yet
-    dataGroupMember.syncLeaderWithConsistencyCheck();
+    dataGroupMember.syncLeaderWithConsistencyCheck(false);
 
     // collect local timeseries schemas and send to the requester
     // the measurements in them are the full paths.
@@ -259,7 +259,7 @@ public class LocalQueryExecutor {
       throws CheckConsistencyException, IllegalPathException {
     // try to synchronize with the leader first in case that some schema logs are accepted but
     // not committed yet
-    dataGroupMember.syncLeaderWithConsistencyCheck();
+    dataGroupMember.syncLeaderWithConsistencyCheck(false);
 
     // collect local timeseries schemas and send to the requester
     // the measurements in them are the full paths.
@@ -301,7 +301,7 @@ public class LocalQueryExecutor {
     logger
         .debug("{}: {} is querying {} by timestamp, queryId: {}", name, request.getRequester(),
             request.getPath(), request.getQueryId());
-    dataGroupMember.syncLeaderWithConsistencyCheck();
+    dataGroupMember.syncLeaderWithConsistencyCheck(false);
 
     PartialPath path = null;
     try {
@@ -334,7 +334,7 @@ public class LocalQueryExecutor {
 
   public ByteBuffer getAllMeasurementSchema(ByteBuffer planBuffer)
       throws CheckConsistencyException, IOException, MetadataException {
-    dataGroupMember.syncLeaderWithConsistencyCheck();
+    dataGroupMember.syncLeaderWithConsistencyCheck(false);
 
     ShowTimeSeriesPlan plan = (ShowTimeSeriesPlan) PhysicalPlan.Factory.create(planBuffer);
     List<ShowTimeSeriesResult> allTimeseriesSchema;
@@ -413,7 +413,7 @@ public class LocalQueryExecutor {
       Filter timeFilter, QueryContext context, boolean ascending)
       throws IOException, StorageEngineException, QueryProcessException {
     try {
-      dataGroupMember.syncLeaderWithConsistencyCheck();
+      dataGroupMember.syncLeaderWithConsistencyCheck(false);
     } catch (CheckConsistencyException e) {
       throw new QueryProcessException(e.getMessage());
     }
@@ -447,7 +447,7 @@ public class LocalQueryExecutor {
    */
   public List<String> getUnregisteredTimeseries(List<String> timeseriesList)
       throws CheckConsistencyException {
-    dataGroupMember.syncLeaderWithConsistencyCheck();
+    dataGroupMember.syncLeaderWithConsistencyCheck(false);
 
     List<String> result = new ArrayList<>();
     for (String seriesPath : timeseriesList) {
@@ -483,7 +483,7 @@ public class LocalQueryExecutor {
       throws StorageEngineException, QueryProcessException {
     // pull the newest data
     try {
-      dataGroupMember.syncLeaderWithConsistencyCheck();
+      dataGroupMember.syncLeaderWithConsistencyCheck(false);
     } catch (CheckConsistencyException e) {
       throw new StorageEngineException(e);
     }
@@ -628,7 +628,7 @@ public class LocalQueryExecutor {
       long beforeRange, Set<String> deviceMeasurements, QueryContext context)
       throws QueryProcessException, StorageEngineException, IOException {
     try {
-      dataGroupMember.syncLeaderWithConsistencyCheck();
+      dataGroupMember.syncLeaderWithConsistencyCheck(false);
     } catch (CheckConsistencyException e) {
       throw new QueryProcessException(e.getMessage());
     }
@@ -640,7 +640,7 @@ public class LocalQueryExecutor {
 
   public int getPathCount(List<String> pathsToQuery, int level)
       throws CheckConsistencyException, MetadataException {
-    dataGroupMember.syncLeaderWithConsistencyCheck();
+    dataGroupMember.syncLeaderWithConsistencyCheck(false);
 
     int count = 0;
     for (String s : pathsToQuery) {
@@ -656,7 +656,7 @@ public class LocalQueryExecutor {
   @SuppressWarnings("java:S1135") // ignore todos
   public ByteBuffer last(LastQueryRequest request)
       throws CheckConsistencyException, QueryProcessException, IOException, StorageEngineException, IllegalPathException {
-    dataGroupMember.syncLeaderWithConsistencyCheck();
+    dataGroupMember.syncLeaderWithConsistencyCheck(false);
 
     RemoteQueryContext queryContext = queryManager
         .getQueryContext(request.getRequestor(), request.getQueryId(), DEFAULT_FETCH_SIZE, -1);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregator.java
index a711021..99067bc 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregator.java
@@ -78,7 +78,7 @@ public class ClusterAggregator {
       QueryContext context, boolean ascending) throws StorageEngineException {
     // make sure the partition table is new
     try {
-      metaGroupMember.syncLeaderWithConsistencyCheck();
+      metaGroupMember.syncLeaderWithConsistencyCheck(false);
     } catch (CheckConsistencyException e) {
       throw new StorageEngineException(e);
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java
index c78b6d9..3887b34 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java
@@ -91,7 +91,7 @@ public class ClusterPreviousFill extends PreviousFill {
       throws StorageEngineException {
     // make sure the partition table is new
     try {
-      metaGroupMember.syncLeaderWithConsistencyCheck();
+      metaGroupMember.syncLeaderWithConsistencyCheck(false);
     } catch (CheckConsistencyException e) {
       throw new StorageEngineException(e);
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java
index 0787df5..1dac967 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java
@@ -87,7 +87,7 @@ public class ClusterLastQueryExecutor extends LastQueryExecutor {
       throws IOException, QueryProcessException {
     // calculate the global last from all data groups
     try {
-      metaGroupMember.syncLeaderWithConsistencyCheck();
+      metaGroupMember.syncLeaderWithConsistencyCheck(false);
     } catch (CheckConsistencyException e) {
       throw new IOException(e);
     }
@@ -174,7 +174,7 @@ public class ClusterLastQueryExecutor extends LastQueryExecutor {
         throws StorageEngineException, QueryProcessException, IOException {
       DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(group.getHeader());
       try {
-        localDataMember.syncLeaderWithConsistencyCheck();
+        localDataMember.syncLeaderWithConsistencyCheck(false);
       } catch (CheckConsistencyException e) {
         throw new QueryProcessException(e.getMessage());
       }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
index 9c05214..3baab29 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
@@ -93,7 +93,7 @@ public class ClusterReaderFactory {
       throws StorageEngineException, QueryProcessException {
     // make sure the partition table is new
     try {
-      metaGroupMember.syncLeaderWithConsistencyCheck();
+      metaGroupMember.syncLeaderWithConsistencyCheck(false);
     } catch (CheckConsistencyException e) {
       throw new QueryProcessException(e.getMessage());
     }
@@ -186,7 +186,7 @@ public class ClusterReaderFactory {
       throws StorageEngineException, EmptyIntervalException {
     // make sure the partition table is new
     try {
-      metaGroupMember.syncLeaderWithConsistencyCheck();
+      metaGroupMember.syncLeaderWithConsistencyCheck(false);
     } catch (CheckConsistencyException e) {
       throw new StorageEngineException(e);
     }
@@ -262,7 +262,7 @@ public class ClusterReaderFactory {
       throws StorageEngineException, QueryProcessException {
     // pull the newest data
     try {
-      dataGroupMember.syncLeaderWithConsistencyCheck();
+      dataGroupMember.syncLeaderWithConsistencyCheck(false);
     } catch (CheckConsistencyException e) {
       throw new StorageEngineException(e);
     }
@@ -368,7 +368,7 @@ public class ClusterReaderFactory {
       throws StorageEngineException, QueryProcessException {
     // make sure the partition table is new
     try {
-      metaGroupMember.syncLeaderWithConsistencyCheck();
+      metaGroupMember.syncLeaderWithConsistencyCheck(false);
     } catch (CheckConsistencyException e) {
       throw new QueryProcessException(e.getMessage());
     }
@@ -524,7 +524,7 @@ public class ClusterReaderFactory {
       throws StorageEngineException, QueryProcessException, IOException {
     // pull the newest data
     try {
-      dataGroupMember.syncLeaderWithConsistencyCheck();
+      dataGroupMember.syncLeaderWithConsistencyCheck(false);
     } catch (CheckConsistencyException e) {
       throw new StorageEngineException(e);
     }
@@ -551,7 +551,7 @@ public class ClusterReaderFactory {
       TSDataType dataType, QueryContext context, DataGroupMember dataGroupMember, boolean ascending)
       throws StorageEngineException, QueryProcessException {
     try {
-      dataGroupMember.syncLeaderWithConsistencyCheck();
+      dataGroupMember.syncLeaderWithConsistencyCheck(false);
     } catch (CheckConsistencyException e) {
       throw new StorageEngineException(e);
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java
index d279970..7f408de 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java
@@ -24,13 +24,19 @@ import java.util.List;
 import org.apache.iotdb.cluster.client.async.AsyncDataClient;
 import org.apache.iotdb.cluster.client.sync.SyncClientAdaptor;
 import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.query.RemoteQueryContext;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.SingleSeriesQueryRequest;
 import org.apache.iotdb.cluster.server.RaftServer;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
+import org.apache.iotdb.db.utils.SerializeUtils;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
+import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -81,11 +87,7 @@ public class DataSourceInfo {
       Node node = nodes.get(nextNodePos);
       logger.debug("querying {} from {} of {}", request.path, node, partitionGroup.getHeader());
       try {
-
-        AsyncDataClient client = this.metaGroupMember
-            .getClientProvider().getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
-        Long newReaderId = applyForReaderId(client, byTimestamp, timestamp);
-
+        Long newReaderId = getReaderId(node, byTimestamp, timestamp);
         if (newReaderId != null) {
           logger.debug("get a readerId {} for {} from {}", newReaderId, request.path, node);
           if (newReaderId != -1) {
@@ -121,8 +123,18 @@ public class DataSourceInfo {
     return false;
   }
 
-  private Long applyForReaderId(AsyncDataClient client, boolean byTimestamp, long timestamp)
-      throws TException, InterruptedException {
+  private Long getReaderId(Node node, boolean byTimestamp, long timestamp)
+      throws TException, InterruptedException, IOException {
+    if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+      return applyForReaderIdAsync(node, byTimestamp, timestamp);
+    }
+    return applyForReaderIdSync(node, byTimestamp, timestamp);
+  }
+
+  private Long applyForReaderIdAsync(Node node, boolean byTimestamp, long timestamp)
+      throws TException, InterruptedException, IOException {
+    AsyncDataClient client = this.metaGroupMember
+        .getClientProvider().getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
     Long newReaderId;
     if (byTimestamp) {
       newReaderId = SyncClientAdaptor.querySingleSeriesByTimestamp(client, request);
@@ -132,6 +144,32 @@ public class DataSourceInfo {
     return newReaderId;
   }
 
+  private Long applyForReaderIdSync(Node node, boolean byTimestamp, long timestamp)
+      throws TException {
+    SyncDataClient client = this.metaGroupMember
+        .getClientProvider().getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
+    Long newReaderId;
+    try {
+      if (byTimestamp) {
+        newReaderId = client.querySingleSeriesByTimestamp(request);
+      } else {
+        Filter newFilter;
+        // add timestamp to as a timeFilter to skip the data which has been read
+        if (request.isSetTimeFilterBytes()) {
+          Filter timeFilter = FilterFactory.deserialize(request.timeFilterBytes);
+          newFilter = new AndFilter(timeFilter, TimeFilter.gt(timestamp));
+        } else {
+          newFilter = TimeFilter.gt(timestamp);
+        }
+        request.setTimeFilterBytes(SerializeUtils.serializeFilter(newFilter));
+        newReaderId = client.querySingleSeries(request);
+      }
+      return newReaderId;
+    } finally {
+      client.putBack();
+    }
+  }
+
   public long getReaderId() {
     return this.readerId;
   }
@@ -149,7 +187,8 @@ public class DataSourceInfo {
   }
 
   AsyncDataClient getCurAsyncClient(int timeout) throws IOException {
-    return isNoClient ? null : metaGroupMember.getClientProvider().getAsyncDataClient(this.curSource, timeout);
+    return isNoClient ? null
+        : metaGroupMember.getClientProvider().getAsyncDataClient(this.curSource, timeout);
   }
 
   SyncDataClient getCurSyncClient(int timeout) {
@@ -161,7 +200,7 @@ public class DataSourceInfo {
     return this.isNoData;
   }
 
-  private boolean isNoClient(){
+  private boolean isNoClient() {
     return this.isNoClient;
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index 1e8a277..391cf69 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -202,7 +202,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
     partitionGroup = partitionTable.getHeaderGroup(header);
     if (partitionGroup == null || !partitionGroup.contains(thisNode)) {
       // if the partition table is old, this node may have not been moved to the new group
-      metaGroupMember.syncLeaderWithConsistencyCheck();
+      metaGroupMember.syncLeaderWithConsistencyCheck(true);
       partitionGroup = partitionTable.getHeaderGroup(header);
     }
     if (partitionGroup != null && partitionGroup.contains(thisNode)) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/Timer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/Timer.java
index 255041c..04dfe50 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/Timer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/Timer.java
@@ -23,6 +23,7 @@ package org.apache.iotdb.cluster.server;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.server.member.RaftMember;
 
 public class Timer {
@@ -86,6 +87,13 @@ public class Timer {
     RAFT_SENDER_WAIT_FOR_PREV_LOG(
         RAFT_MEMBER_SENDER, "sender wait for prev log", TIME_SCALE, true,
         RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
+    RAFT_SENDER_SERIALIZE_LOG(
+        RAFT_MEMBER_SENDER, "serialize logs", TIME_SCALE, true,
+        RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
+    RAFT_SENDER_SEND_LOG_ASYNC(
+        RAFT_MEMBER_SENDER, "send log async", TIME_SCALE,
+        ClusterDescriptor.getInstance().getConfig().isUseAsyncServer(),
+        RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
     RAFT_SENDER_SEND_LOG(
         RAFT_MEMBER_SENDER, "send log", TIME_SCALE, true, RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
     RAFT_SENDER_VOTE_COUNTER(
@@ -136,11 +144,12 @@ public class Timer {
         RaftMember.USE_LOG_DISPATCHER, DATA_GROUP_MEMBER_LOCAL_EXECUTION),
     // raft member - receiver
     RAFT_RECEIVER_LOG_PARSE(
-        RAFT_MEMBER_RECEIVER, "log parse", TIME_SCALE, true, RAFT_SENDER_SEND_LOG),
+        RAFT_MEMBER_RECEIVER, "log parse", TIME_SCALE, true, RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
     RAFT_RECEIVER_WAIT_FOR_PREV_LOG(
-        RAFT_MEMBER_RECEIVER, "receiver wait for prev log", TIME_SCALE, true, RAFT_SENDER_SEND_LOG),
+        RAFT_MEMBER_RECEIVER, "receiver wait for prev log", TIME_SCALE, true,
+        RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
     RAFT_RECEIVER_APPEND_ENTRY(
-        RAFT_MEMBER_RECEIVER, "append entrys", TIME_SCALE, true, RAFT_SENDER_SEND_LOG),
+        RAFT_MEMBER_RECEIVER, "append entrys", TIME_SCALE, true, RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
     RAFT_RECEIVER_INDEX_DIFF(
         RAFT_MEMBER_RECEIVER, "index diff", 1.0, true, ROOT),
     // log dispatcher
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
index b2ea3f4..8720c65 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
@@ -61,13 +61,17 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<Long> {
 
 
   public AppendNodeEntryHandler() {
-    if (Timer.ENABLE_INSTRUMENTING && ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+    if (Timer.ENABLE_INSTRUMENTING && ClusterDescriptor.getInstance().getConfig()
+        .isUseAsyncServer()) {
       sendStart = System.nanoTime();
     }
   }
 
   @Override
   public void onComplete(Long response) {
+    if (Timer.ENABLE_INSTRUMENTING) {
+      Statistic.RAFT_SENDER_SEND_LOG_ASYNC.calOperationCostTimeFromStart(sendStart);
+    }
     logger.debug("{}: Append response {} from {}", member.getName(), response, receiver);
     if (leaderShipStale.get()) {
       // someone has rejected this log because the leadership is stale
@@ -85,9 +89,6 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<Long> {
           voteCounter.notifyAll();
         }
         peer.setMatchIndex(Math.max(log.getCurrLogIndex(), peer.getMatchIndex()));
-        if (Timer.ENABLE_INSTRUMENTING) {
-          Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(sendStart);
-        }
       } else if (resp > 0) {
         // a response > 0 is the follower's term
         // the leader ship is stale, wait for the new leader's heartbeat
@@ -100,17 +101,11 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<Long> {
         }
         leaderShipStale.set(true);
         voteCounter.notifyAll();
-        if (Timer.ENABLE_INSTRUMENTING) {
-          Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(sendStart);
-        }
       } else {
         //e.g., Response.RESPONSE_LOG_MISMATCH
         logger.debug("{}: The log {} is rejected by {} because: {}", member.getName(), log,
             receiver, resp);
         onFail();
-        if (Timer.ENABLE_INSTRUMENTING) {
-          Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(sendStart);
-        }
       }
       // rejected because the receiver's logs are stale or the receiver has no cluster info, just
       // wait for the heartbeat to handle
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index ca8de8d..b02f7a8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -1412,7 +1412,7 @@ public class MetaGroupMember extends RaftMember {
       }
     }
     try {
-      syncLeaderWithConsistencyCheck();
+      syncLeaderWithConsistencyCheck(true);
       List<PartitionGroup> globalGroups = partitionTable.getGlobalGroups();
       logger.debug("Forwarding global data plan {} to {} groups", plan, globalGroups.size());
       return forwardPlan(globalGroups, plan);
@@ -1475,7 +1475,7 @@ public class MetaGroupMember extends RaftMember {
       planGroupMap = router.splitAndRoutePlan(plan);
     } catch (StorageGroupNotSetException e) {
       // synchronize with the leader to see if this node has unpulled storage groups
-      syncLeaderWithConsistencyCheck();
+      syncLeaderWithConsistencyCheck(true);
       try {
         planGroupMap = router.splitAndRoutePlan(plan);
       } catch (MetadataException ex) {
@@ -1514,26 +1514,39 @@ public class MetaGroupMember extends RaftMember {
     if (plan instanceof InsertPlan
         && status.getCode() == TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode()
         && ClusterDescriptor.getInstance().getConfig().isEnableAutoCreateSchema()) {
-      // try to create timeseries
-      if (((InsertPlan) plan).getFailedMeasurements() != null) {
-        ((InsertPlan) plan).getPlanFromFailed();
-      }
-      boolean hasCreate;
-      try {
-        hasCreate = ((CMManager) IoTDB.metaManager).createTimeseries((InsertPlan) plan);
-      } catch (IllegalPathException e) {
-        return StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR, e.getMessage());
-      }
-      if (hasCreate) {
-        status = forwardPlan(planGroupMap, plan);
-      } else {
-        logger.error("{}, Cannot auto create timeseries.", thisNode);
+      TSStatus tmpStatus = createTimeseriesForFailedInsertion(planGroupMap, ((InsertPlan) plan));
+      if (tmpStatus != null) {
+        status = tmpStatus;
       }
     }
+    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() && status
+        .isSetRedirectNode()) {
+      status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
+    }
     logger.debug("{}: executed {} with answer {}", name, plan, status);
     return status;
   }
 
+  private TSStatus createTimeseriesForFailedInsertion(
+      Map<PhysicalPlan, PartitionGroup> planGroupMap, InsertPlan plan) {
+    // try to create timeseries
+    if (plan.getFailedMeasurements() != null) {
+      plan.getPlanFromFailed();
+    }
+    boolean hasCreate;
+    try {
+      hasCreate = ((CMManager) IoTDB.metaManager).createTimeseries(plan);
+    } catch (IllegalPathException e) {
+      return StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR, e.getMessage());
+    }
+    if (hasCreate) {
+      return forwardPlan(planGroupMap, plan);
+    } else {
+      logger.error("{}, Cannot auto create timeseries.", thisNode);
+    }
+    return null;
+  }
+
   /**
    * Forward each sub-plan to its belonging data group, and combine responses from the groups.
    *
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 67e0651..a64292c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -572,7 +572,7 @@ public abstract class RaftMember {
     return getSyncClient(syncHeartbeatClientPool, node);
   }
 
-  private void sendLogAsync(Log log, AtomicInteger voteCounter, Node node,
+  public void sendLogAsync(Log log, AtomicInteger voteCounter, Node node,
       AtomicBoolean leaderShipStale, AtomicLong newLeaderTerm, AppendEntryRequest request,
       Peer peer) {
     AsyncClient client = getSendLogAsyncClient(node);
@@ -740,27 +740,34 @@ public abstract class RaftMember {
 
   /**
    * according to the consistency configuration, decide whether to execute syncLeader or not and
-   * throws exception when failed
+   * throws exception when failed. Note that the write request will always try to sync leader
    */
-  public void syncLeaderWithConsistencyCheck() throws CheckConsistencyException {
-    switch (ClusterDescriptor.getInstance().getConfig().getConsistencyLevel()) {
-      case STRONG_CONSISTENCY:
-        if (!syncLeader()) {
-          throw CheckConsistencyException.CHECK_STRONG_CONSISTENCY_EXCEPTION;
-        }
-        return;
-      case MID_CONSISTENCY:
-        // do not care success or not
-        syncLeader();
-        return;
-      case WEAK_CONSISTENCY:
-        // do nothing
-        return;
-      default:
-        // this should not happen in theory
-        throw new CheckConsistencyException(
-            "unknown consistency=" + ClusterDescriptor.getInstance().getConfig()
-                .getConsistencyLevel().name());
+  public void syncLeaderWithConsistencyCheck(boolean isWriteRequest)
+      throws CheckConsistencyException {
+    if (isWriteRequest) {
+      if (!syncLeader()) {
+        throw CheckConsistencyException.CHECK_STRONG_CONSISTENCY_EXCEPTION;
+      }
+    } else {
+      switch (ClusterDescriptor.getInstance().getConfig().getConsistencyLevel()) {
+        case STRONG_CONSISTENCY:
+          if (!syncLeader()) {
+            throw CheckConsistencyException.CHECK_STRONG_CONSISTENCY_EXCEPTION;
+          }
+          return;
+        case MID_CONSISTENCY:
+          // do not care success or not
+          syncLeader();
+          return;
+        case WEAK_CONSISTENCY:
+          // do nothing
+          return;
+        default:
+          // this should not happen in theory
+          throw new CheckConsistencyException(
+              "unknown consistency=" + ClusterDescriptor.getInstance().getConfig()
+                  .getConsistencyLevel().name());
+      }
     }
   }
 
@@ -1616,9 +1623,7 @@ public abstract class RaftMember {
     if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
       sendLogAsync(log, voteCounter, node, leaderShipStale, newLeaderTerm, request, peer);
     } else {
-      startTime = Timer.Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
       sendLogSync(log, voteCounter, node, leaderShipStale, newLeaderTerm, request, peer);
-      Timer.Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(startTime);
     }
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
index 0e8f3b5..3e3accf 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
@@ -211,7 +211,7 @@ public class DataAsyncService extends BaseAsyncService implements TSDataService.
   public void getAllPaths(Node header, List<String> paths, boolean withAlias,
       AsyncMethodCallback<GetAllPathsResult> resultHandler) {
     try {
-      dataGroupMember.syncLeaderWithConsistencyCheck();
+      dataGroupMember.syncLeaderWithConsistencyCheck(false);
       resultHandler.onComplete(((CMManager) IoTDB.metaManager).getAllPaths(paths, withAlias));
     } catch (MetadataException | CheckConsistencyException e) {
       resultHandler.onError(e);
@@ -222,7 +222,7 @@ public class DataAsyncService extends BaseAsyncService implements TSDataService.
   public void getAllDevices(Node header, List<String> path,
       AsyncMethodCallback<Set<String>> resultHandler) {
     try {
-      dataGroupMember.syncLeaderWithConsistencyCheck();
+      dataGroupMember.syncLeaderWithConsistencyCheck(false);
       resultHandler.onComplete(((CMManager) IoTDB.metaManager).getAllDevices(path));
     } catch (MetadataException | CheckConsistencyException e) {
       resultHandler.onError(e);
@@ -233,7 +233,7 @@ public class DataAsyncService extends BaseAsyncService implements TSDataService.
   public void getNodeList(Node header, String path, int nodeLevel,
       AsyncMethodCallback<List<String>> resultHandler) {
     try {
-      dataGroupMember.syncLeaderWithConsistencyCheck();
+      dataGroupMember.syncLeaderWithConsistencyCheck(false);
       resultHandler.onComplete(((CMManager) IoTDB.metaManager).getNodeList(path, nodeLevel));
     } catch (CheckConsistencyException | MetadataException e) {
       resultHandler.onError(e);
@@ -244,7 +244,7 @@ public class DataAsyncService extends BaseAsyncService implements TSDataService.
   public void getChildNodePathInNextLevel(Node header, String path,
       AsyncMethodCallback<Set<String>> resultHandler) {
     try {
-      dataGroupMember.syncLeaderWithConsistencyCheck();
+      dataGroupMember.syncLeaderWithConsistencyCheck(false);
       resultHandler.onComplete(((CMManager) IoTDB.metaManager).getChildNodePathInNextLevel(path));
     } catch (CheckConsistencyException | MetadataException e) {
       resultHandler.onError(e);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
index 53a6a12..7ce1c66 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
@@ -215,7 +215,7 @@ public class DataSyncService extends BaseSyncService implements TSDataService.If
   public GetAllPathsResult getAllPaths(Node header, List<String> paths, boolean withAlias)
       throws TException {
     try {
-      dataGroupMember.syncLeaderWithConsistencyCheck();
+      dataGroupMember.syncLeaderWithConsistencyCheck(false);
       return ((CMManager) IoTDB.metaManager).getAllPaths(paths, withAlias);
     } catch (MetadataException | CheckConsistencyException e) {
       throw new TException(e);
@@ -225,7 +225,7 @@ public class DataSyncService extends BaseSyncService implements TSDataService.If
   @Override
   public Set<String> getAllDevices(Node header, List<String> path) throws TException {
     try {
-      dataGroupMember.syncLeaderWithConsistencyCheck();
+      dataGroupMember.syncLeaderWithConsistencyCheck(false);
       return ((CMManager) IoTDB.metaManager).getAllDevices(path);
     } catch (MetadataException | CheckConsistencyException e) {
       throw new TException(e);
@@ -235,7 +235,7 @@ public class DataSyncService extends BaseSyncService implements TSDataService.If
   @Override
   public List<String> getNodeList(Node header, String path, int nodeLevel) throws TException {
     try {
-      dataGroupMember.syncLeaderWithConsistencyCheck();
+      dataGroupMember.syncLeaderWithConsistencyCheck(false);
       return ((CMManager) IoTDB.metaManager).getNodeList(path, nodeLevel);
     } catch (CheckConsistencyException | MetadataException e) {
       throw new TException(e);
@@ -245,7 +245,7 @@ public class DataSyncService extends BaseSyncService implements TSDataService.If
   @Override
   public Set<String> getChildNodePathInNextLevel(Node header, String path) throws TException {
     try {
-      dataGroupMember.syncLeaderWithConsistencyCheck();
+      dataGroupMember.syncLeaderWithConsistencyCheck(false);
       return ((CMManager) IoTDB.metaManager).getChildNodePathInNextLevel(path);
     } catch (CheckConsistencyException | MetadataException e) {
       throw new TException(e);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
index d334e06..8346ddc 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
@@ -319,7 +319,7 @@ public class ClusterUtils {
       // the storage group is not found locally, but may be found in the leader, retry after
       // synchronizing with the leader
       try {
-        metaGroupMember.syncLeaderWithConsistencyCheck();
+        metaGroupMember.syncLeaderWithConsistencyCheck(true);
       } catch (CheckConsistencyException checkConsistencyException) {
         throw new MetadataException(checkConsistencyException.getMessage());
       }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/DataSnapshotTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/DataSnapshotTest.java
index 957f441..b89d058 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/DataSnapshotTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/DataSnapshotTest.java
@@ -138,7 +138,7 @@ public abstract class DataSnapshotTest {
     // do nothing
     metaGroupMember = new TestMetaGroupMember() {
       @Override
-      public void syncLeaderWithConsistencyCheck() {
+      public void syncLeaderWithConsistencyCheck(boolean isWriteRequest) {
         // do nothing
       }
     };
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java
index 839ac72..4f8a07b 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java
@@ -307,81 +307,109 @@ public class MemberTest {
   }
 
   @Test
-  public void testSyncLeaderWithConsistencyCheck() {
-    // 1. Strong consistency level with syncLeader false
+  public void testsyncLeaderWithConsistencyCheck() {
+
+    // 1. write request : Strong consistency level with syncLeader false
+    DataGroupMember dataGroupMemberWithWriteStrongConsistencyFalse = newDataGroupMemberWithSyncLeader(
+        TestUtils.getNode(0), false);
+    ClusterDescriptor.getInstance().getConfig()
+        .setConsistencyLevel(ConsistencyLevel.WEAK_CONSISTENCY);
+    CheckConsistencyException exception = null;
+    try {
+      dataGroupMemberWithWriteStrongConsistencyFalse.syncLeaderWithConsistencyCheck(true);
+    } catch (CheckConsistencyException e) {
+      exception = e;
+    }
+    Assert.assertNotNull(exception);
+    Assert.assertEquals(CheckConsistencyException.CHECK_STRONG_CONSISTENCY_EXCEPTION, exception);
+
+    // 2. write request : Strong consistency level with syncLeader true
+    DataGroupMember dataGroupMemberWithWriteStrongConsistencyTrue = newDataGroupMemberWithSyncLeader(
+        TestUtils.getNode(0), true);
+    ClusterDescriptor.getInstance().getConfig()
+        .setConsistencyLevel(ConsistencyLevel.WEAK_CONSISTENCY);
+    exception = null;
+    try {
+      dataGroupMemberWithWriteStrongConsistencyTrue.syncLeaderWithConsistencyCheck(true);
+    } catch (CheckConsistencyException e) {
+      exception = e;
+    }
+    Assert.assertNull(exception);
+
+    // 3. Strong consistency level with syncLeader false
     DataGroupMember dataGroupMemberWithStrongConsistencyFalse = newDataGroupMemberWithSyncLeader(
         TestUtils.getNode(0), false);
     ClusterDescriptor.getInstance().getConfig()
         .setConsistencyLevel(ConsistencyLevel.STRONG_CONSISTENCY);
-    CheckConsistencyException exception = null;
+    exception = null;
     try {
-      dataGroupMemberWithStrongConsistencyFalse.syncLeaderWithConsistencyCheck();
+      dataGroupMemberWithStrongConsistencyFalse.syncLeaderWithConsistencyCheck(false);
     } catch (CheckConsistencyException e) {
       exception = e;
     }
     Assert.assertNotNull(exception);
     Assert.assertEquals(CheckConsistencyException.CHECK_STRONG_CONSISTENCY_EXCEPTION, exception);
 
-    // 2. Strong consistency level with syncLeader true
+    // 4. Strong consistency level with syncLeader true
     DataGroupMember dataGroupMemberWithStrongConsistencyTrue = newDataGroupMemberWithSyncLeader(
         TestUtils.getNode(0), true);
     ClusterDescriptor.getInstance().getConfig()
         .setConsistencyLevel(ConsistencyLevel.STRONG_CONSISTENCY);
     exception = null;
     try {
-      dataGroupMemberWithStrongConsistencyTrue.syncLeaderWithConsistencyCheck();
+      dataGroupMemberWithStrongConsistencyTrue.syncLeaderWithConsistencyCheck(false);
     } catch (CheckConsistencyException e) {
       exception = e;
     }
     Assert.assertNull(exception);
 
-    // 3. Mid consistency level with syncLeader false
+    // 5. Mid consistency level with syncLeader false
     DataGroupMember dataGroupMemberWithMidConsistencyFalse = newDataGroupMemberWithSyncLeader(
         TestUtils.getNode(0), false);
     ClusterDescriptor.getInstance().getConfig()
         .setConsistencyLevel(ConsistencyLevel.MID_CONSISTENCY);
     exception = null;
     try {
-      dataGroupMemberWithMidConsistencyFalse.syncLeaderWithConsistencyCheck();
+      dataGroupMemberWithMidConsistencyFalse.syncLeaderWithConsistencyCheck(false);
     } catch (CheckConsistencyException e) {
       exception = e;
     }
     Assert.assertNull(exception);
 
-    // 4. Mid consistency level with syncLeader true
+    // 6. Mid consistency level with syncLeader true
     DataGroupMember dataGroupMemberWithMidConsistencyTrue = newDataGroupMemberWithSyncLeader(
         TestUtils.getNode(0), true);
     ClusterDescriptor.getInstance().getConfig()
         .setConsistencyLevel(ConsistencyLevel.MID_CONSISTENCY);
     exception = null;
     try {
-      dataGroupMemberWithMidConsistencyTrue.syncLeaderWithConsistencyCheck();
+      dataGroupMemberWithMidConsistencyTrue.syncLeaderWithConsistencyCheck(false);
     } catch (CheckConsistencyException e) {
       exception = e;
     }
     Assert.assertNull(exception);
 
-    // 5. Weak consistency level with syncLeader false
+    // 7. Weak consistency level with syncLeader false
     DataGroupMember dataGroupMemberWithWeakConsistencyFalse = newDataGroupMemberWithSyncLeader(
         TestUtils.getNode(0), false);
     ClusterDescriptor.getInstance().getConfig()
         .setConsistencyLevel(ConsistencyLevel.WEAK_CONSISTENCY);
     exception = null;
     try {
-      dataGroupMemberWithWeakConsistencyFalse.syncLeaderWithConsistencyCheck();
+      dataGroupMemberWithWeakConsistencyFalse.syncLeaderWithConsistencyCheck(false);
     } catch (CheckConsistencyException e) {
       exception = e;
     }
     Assert.assertNull(exception);
 
-    // 6. Weak consistency level with syncLeader true
+    // 8. Weak consistency level with syncLeader true
     DataGroupMember dataGroupMemberWithWeakConsistencyTrue = newDataGroupMemberWithSyncLeader(
         TestUtils.getNode(0), true);
     ClusterDescriptor.getInstance().getConfig()
         .setConsistencyLevel(ConsistencyLevel.WEAK_CONSISTENCY);
     exception = null;
     try {
-      dataGroupMemberWithWeakConsistencyTrue.syncLeaderWithConsistencyCheck();
+      dataGroupMemberWithWeakConsistencyTrue.syncLeaderWithConsistencyCheck(false);
     } catch (CheckConsistencyException e) {
       exception = e;
     }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index 07e8071..32b0b30 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -734,6 +734,9 @@ public class MetaGroupMemberTest extends MemberTest {
           schema.getEncodingType(), schema.getCompressor(), schema.getProps(),
           Collections.emptyMap(), Collections.emptyMap(), null);
       status = testMetaMember.executeNonQueryPlan(createTimeSeriesPlan);
+      if (status.getCode() == TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
+        status.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+      }
       assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.code);
       assertTrue(IoTDB.metaManager.isPathExist(new PartialPath(TestUtils.getTestSeries(i, 0))));
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index e99565d..b2ff57c 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -56,6 +56,7 @@ import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
 import org.apache.iotdb.db.exception.BatchProcessException;
 import org.apache.iotdb.db.exception.QueryInBatchStatementException;
 import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -1507,30 +1508,14 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         return RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR);
       }
 
-      List<TSStatus> statusList = new ArrayList<>();
+      List<TSStatus> statusList = insertTabletsInternal(req);
       boolean isAllSuccessful = true;
-      for (int i = 0; i < req.deviceIds.size(); i++) {
-        InsertTabletPlan insertTabletPlan = new InsertTabletPlan(
-            new PartialPath(req.deviceIds.get(i)),
-            req.measurementsList.get(i));
-        insertTabletPlan.setTimes(
-            QueryDataSetUtils.readTimesFromBuffer(req.timestampsList.get(i), req.sizeList.get(i)));
-        insertTabletPlan.setColumns(
-            QueryDataSetUtils.readValuesFromBuffer(
-                req.valuesList.get(i), req.typesList.get(i), req.measurementsList.get(i).size(),
-                req.sizeList.get(i)));
-        insertTabletPlan.setRowCount(req.sizeList.get(i));
-        insertTabletPlan.setDataTypes(req.typesList.get(i));
-
-        TSStatus status = checkAuthority(insertTabletPlan, req.getSessionId());
-        if (status == null) {
-          status = executeNonQueryPlan(insertTabletPlan);
-          isAllSuccessful =
-              ((status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode())
-                  && isAllSuccessful);
-        }
-        statusList.add(status);
+      for (TSStatus subStatus : statusList) {
+        isAllSuccessful =
+            ((subStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode())
+                && isAllSuccessful);
       }
+
       if (!isAllSuccessful) {
         return RpcUtils.getStatus(statusList);
       }
@@ -1544,6 +1529,31 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     }
   }
 
+  public List<TSStatus> insertTabletsInternal(TSInsertTabletsReq req) throws IllegalPathException {
+    List<TSStatus> statusList = new ArrayList<>();
+
+    for (int i = 0; i < req.deviceIds.size(); i++) {
+      InsertTabletPlan insertTabletPlan = new InsertTabletPlan(
+          new PartialPath(req.deviceIds.get(i)),
+          req.measurementsList.get(i));
+      insertTabletPlan.setTimes(
+          QueryDataSetUtils.readTimesFromBuffer(req.timestampsList.get(i), req.sizeList.get(i)));
+      insertTabletPlan.setColumns(
+          QueryDataSetUtils.readValuesFromBuffer(
+              req.valuesList.get(i), req.typesList.get(i), req.measurementsList.get(i).size(),
+              req.sizeList.get(i)));
+      insertTabletPlan.setRowCount(req.sizeList.get(i));
+      insertTabletPlan.setDataTypes(req.typesList.get(i));
+
+      TSStatus status = checkAuthority(insertTabletPlan, req.getSessionId());
+      if (status == null) {
+        status = executeNonQueryPlan(insertTabletPlan);
+      }
+      statusList.add(status);
+    }
+    return statusList;
+  }
+
   @Override
   public TSStatus setStorageGroup(long sessionId, String storageGroup) {
     try {
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/RedirectException.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/RedirectException.java
index d8f39d0..c7989b6 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/RedirectException.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/RedirectException.java
@@ -19,19 +19,32 @@
 
 package org.apache.iotdb.rpc;
 
+import java.util.Map;
 import org.apache.iotdb.service.rpc.thrift.EndPoint;
 
 public class RedirectException extends Exception {
 
   private final EndPoint endPoint;
 
+  private final Map<String,EndPoint> deviceEndPointMap;
+
   public RedirectException(EndPoint endpoint) {
     super("later request in same group will be redirected to " + endpoint.toString());
     this.endPoint = endpoint;
+    this.deviceEndPointMap = null;
+  }
+
+  public RedirectException(Map<String,EndPoint> deviceEndPointMap) {
+    super("later request in same group will be redirected to " + deviceEndPointMap);
+    this.endPoint = null;
+    this.deviceEndPointMap = deviceEndPointMap;
   }
 
   public EndPoint getEndPoint() {
     return this.endPoint;
   }
 
+  public Map<String, EndPoint> getDeviceEndPointMap() {
+    return deviceEndPointMap;
+  }
 }
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
index 8149dc5..7131009 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
@@ -19,10 +19,14 @@
 package org.apache.iotdb.rpc;
 
 import java.lang.reflect.Proxy;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import org.apache.iotdb.service.rpc.thrift.EndPoint;
 import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
 import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
 import org.apache.iotdb.service.rpc.thrift.TSIService;
+import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
 public class RpcUtils {
@@ -50,6 +54,9 @@ public class RpcUtils {
       verifySuccess(status.getSubStatus());
       return;
     }
+    if (status.getCode() == TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
+      return;
+    }
     if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       throw new StatementExecutionException(status);
     }
@@ -63,10 +70,28 @@ public class RpcUtils {
     }
   }
 
+  public static void verifySuccessWithRedirectionForInsertTablets(TSStatus status,
+      TSInsertTabletsReq req)
+      throws StatementExecutionException, RedirectException {
+    verifySuccess(status);
+    if (status.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+      Map<String, EndPoint> deviceEndPointMap = new HashMap<>();
+      List<TSStatus> statusSubStatus = status.getSubStatus();
+      for (int i = 0; i < statusSubStatus.size(); i++) {
+        TSStatus subStatus = statusSubStatus.get(i);
+        if (subStatus.isSetRedirectNode()) {
+          deviceEndPointMap.put(req.getDeviceIds().get(i), subStatus.getRedirectNode());
+        }
+      }
+      throw new RedirectException(deviceEndPointMap);
+    }
+  }
+
   public static void verifySuccess(List<TSStatus> statuses) throws BatchExecutionException {
     StringBuilder errMsgs = new StringBuilder();
     for (TSStatus status : statuses) {
-      if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+          && status.getCode() != TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
         errMsgs.append(status.getMessage()).append(";");
       }
     }
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index d70616f..29b0596 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -86,6 +86,7 @@ public enum TSStatusCode {
   NODE_READ_ONLY(704),
   CONSISTENCY_FAILURE(705),
   NO_CONNECTION(706),
+  NEED_REDIRECTION(707)
 
   ;
 
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 7a4c0c9..333bdbc 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -44,7 +44,6 @@ import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
 import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
-import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -153,8 +152,7 @@ public class Session {
     }
   }
 
-  public synchronized String getTimeZone()
-      throws StatementExecutionException, IoTDBConnectionException {
+  public synchronized String getTimeZone() {
     return defaultSessionConnection.getTimeZone();
   }
 
@@ -310,13 +308,6 @@ public class Session {
     return defaultSessionConnection.executeRawDataQuery(paths, startTime, endTime);
   }
 
-  private TSRawDataQueryReq genTSRawDataQueryReq(List<String> paths, long startTime, long endTime) {
-    TSRawDataQueryReq request = new TSRawDataQueryReq();
-    request.setPaths(paths);
-    request.setStartTime(startTime);
-    request.setEndTime(endTime);
-    return request;
-  }
 
   /**
    * insert data in one row, if you want to improve your performance, please use insertRecords
@@ -338,7 +329,7 @@ public class Session {
     try {
       getSessionConnection(deviceId).insertRecord(request);
     } catch (RedirectException e) {
-      handleRedirection(deviceId, e);
+      handleRedirection(deviceId, e.getEndPoint());
     }
   }
 
@@ -347,7 +338,7 @@ public class Session {
     try {
       getSessionConnection(deviceId).insertRecord(request);
     } catch (RedirectException e) {
-      handleRedirection(deviceId, e);
+      handleRedirection(deviceId, e.getEndPoint());
     }
   }
 
@@ -381,15 +372,14 @@ public class Session {
     }
   }
 
-  private void handleRedirection(String deviceId, RedirectException e)
+  private void handleRedirection(String deviceId, EndPoint endpoint)
       throws IoTDBConnectionException {
     if (Config.DEFAULT_CACHE_LEADER_MODE) {
-      logger.debug("DeviceId[{}]:{}", deviceId, e.getMessage());
-      deviceIdToEndpoint.put(deviceId, e.getEndPoint());
+      deviceIdToEndpoint.put(deviceId, endpoint);
       SessionConnection connection = endPointToSessionConnection
-          .computeIfAbsent(e.getEndPoint(), k -> {
+          .computeIfAbsent(endpoint, k -> {
             try {
-              return new SessionConnection(this, e.getEndPoint(), zoneId);
+              return new SessionConnection(this, endpoint, zoneId);
             } catch (IoTDBConnectionException ex) {
               tmp.set(ex);
               return null;
@@ -470,29 +460,8 @@ public class Session {
       throw new IllegalArgumentException(
           "deviceIds, times, measurementsList and valuesList's size should be equal");
     }
-    StringBuilder errMsgBuilder = new StringBuilder("");
     if (Config.DEFAULT_CACHE_LEADER_MODE) {
-      Map<String, TSInsertStringRecordsReq> deviceGroup = new HashMap<>();
-      for (int i = 0; i < deviceIds.size(); i++) {
-        TSInsertStringRecordsReq request = deviceGroup
-            .computeIfAbsent(deviceIds.get(i), k -> new TSInsertStringRecordsReq());
-        updateTSInsertStringRecordsReq(request, deviceIds.get(i), times.get(i),
-            measurementsList.get(i), valuesList.get(i));
-      }
-      //TODO parallel
-      for (Entry<String, TSInsertStringRecordsReq> entry : deviceGroup.entrySet()) {
-        try {
-          getSessionConnection(entry.getKey()).insertRecords(entry.getValue());
-        } catch (RedirectException e) {
-          handleRedirection(entry.getKey(), e);
-        } catch (StatementExecutionException e) {
-          errMsgBuilder.append(e.getMessage());
-        }
-      }
-      String errMsg = errMsgBuilder.toString();
-      if (!errMsg.isEmpty()) {
-        throw new StatementExecutionException(errMsg);
-      }
+      insertStringRecordsWithLeaderCache(deviceIds, times, measurementsList, valuesList);
     } else {
       TSInsertStringRecordsReq request = genTSInsertStringRecordsReq(deviceIds, times,
           measurementsList, valuesList);
@@ -504,6 +473,33 @@ public class Session {
     }
   }
 
+  private void insertStringRecordsWithLeaderCache(List<String> deviceIds, List<Long> times,
+      List<List<String>> measurementsList, List<List<String>> valuesList)
+      throws IoTDBConnectionException, StatementExecutionException {
+    Map<String, TSInsertStringRecordsReq> deviceGroup = new HashMap<>();
+    for (int i = 0; i < deviceIds.size(); i++) {
+      TSInsertStringRecordsReq request = deviceGroup
+          .computeIfAbsent(deviceIds.get(i), k -> new TSInsertStringRecordsReq());
+      updateTSInsertStringRecordsReq(request, deviceIds.get(i), times.get(i),
+          measurementsList.get(i), valuesList.get(i));
+    }
+    //TODO parallel
+    StringBuilder errMsgBuilder = new StringBuilder();
+    for (Entry<String, TSInsertStringRecordsReq> entry : deviceGroup.entrySet()) {
+      try {
+        getSessionConnection(entry.getKey()).insertRecords(entry.getValue());
+      } catch (RedirectException e) {
+        handleRedirection(entry.getKey(), e.getEndPoint());
+      } catch (StatementExecutionException e) {
+        errMsgBuilder.append(e.getMessage());
+      }
+    }
+    String errMsg = errMsgBuilder.toString();
+    if (!errMsg.isEmpty()) {
+      throw new StatementExecutionException(errMsg);
+    }
+  }
+
   private TSInsertStringRecordsReq genTSInsertStringRecordsReq(List<String> deviceId,
       List<Long> time,
       List<List<String>> measurements, List<List<String>> values) {
@@ -543,21 +539,7 @@ public class Session {
           "deviceIds, times, measurementsList and valuesList's size should be equal");
     }
     if (Config.DEFAULT_CACHE_LEADER_MODE) {
-      Map<String, TSInsertRecordsReq> deviceGroup = new HashMap<>();
-      for (int i = 0; i < deviceIds.size(); i++) {
-        TSInsertRecordsReq request = deviceGroup
-            .computeIfAbsent(deviceIds.get(i), k -> new TSInsertRecordsReq());
-        updateTSInsertRecordsReq(request, deviceIds.get(i), times.get(i),
-            measurementsList.get(i), typesList.get(i), valuesList.get(i));
-      }
-      //TODO parallel
-      for (Entry<String, TSInsertRecordsReq> entry : deviceGroup.entrySet()) {
-        try {
-          getSessionConnection(entry.getKey()).insertRecords(entry.getValue());
-        } catch (RedirectException e) {
-          handleRedirection(entry.getKey(), e);
-        }
-      }
+      insertRecordsWithLeaderCache(deviceIds, times, measurementsList, typesList, valuesList);
     } else {
       TSInsertRecordsReq request = genTSInsertRecordsReq(deviceIds, times, measurementsList,
           typesList, valuesList);
@@ -570,6 +552,34 @@ public class Session {
     }
   }
 
+  private void insertRecordsWithLeaderCache(List<String> deviceIds, List<Long> times,
+      List<List<String>> measurementsList, List<List<TSDataType>> typesList,
+      List<List<Object>> valuesList)
+      throws IoTDBConnectionException, StatementExecutionException {
+    Map<String, TSInsertRecordsReq> deviceGroup = new HashMap<>();
+    for (int i = 0; i < deviceIds.size(); i++) {
+      TSInsertRecordsReq request = deviceGroup
+          .computeIfAbsent(deviceIds.get(i), k -> new TSInsertRecordsReq());
+      updateTSInsertRecordsReq(request, deviceIds.get(i), times.get(i),
+          measurementsList.get(i), typesList.get(i), valuesList.get(i));
+    }
+    //TODO parallel
+    StringBuilder errMsgBuilder = new StringBuilder();
+    for (Entry<String, TSInsertRecordsReq> entry : deviceGroup.entrySet()) {
+      try {
+        getSessionConnection(entry.getKey()).insertRecords(entry.getValue());
+      } catch (RedirectException e) {
+        handleRedirection(entry.getKey(), e.getEndPoint());
+      } catch (StatementExecutionException e) {
+        errMsgBuilder.append(e.getMessage());
+      }
+    }
+    String errMsg = errMsgBuilder.toString();
+    if (!errMsg.isEmpty()) {
+      throw new StatementExecutionException(errMsg);
+    }
+  }
+
   private TSInsertRecordsReq genTSInsertRecordsReq(List<String> deviceIds, List<Long> times,
       List<List<String>> measurementsList, List<List<TSDataType>> typesList,
       List<List<Object>> valuesList) throws IoTDBConnectionException {
@@ -619,7 +629,7 @@ public class Session {
         defaultSessionConnection.insertTablet(request);
       }
     } catch (RedirectException e) {
-      handleRedirection(tablet.deviceId, e);
+      handleRedirection(tablet.deviceId, e.getEndPoint());
     }
   }
 
@@ -641,7 +651,7 @@ public class Session {
         defaultSessionConnection.insertTablet(request);
       }
     } catch (RedirectException e) {
-      handleRedirection(tablet.deviceId, e);
+      handleRedirection(tablet.deviceId, e.getEndPoint());
     }
   }
 
@@ -689,25 +699,7 @@ public class Session {
   public void insertTablets(Map<String, Tablet> tablets, boolean sorted)
       throws IoTDBConnectionException, StatementExecutionException {
     if (Config.DEFAULT_CACHE_LEADER_MODE) {
-      Map<String, TSInsertTabletsReq> tabletGroup = new HashMap<>();
-      for (Entry<String, Tablet> entry : tablets.entrySet()) {
-        TSInsertTabletsReq request = tabletGroup
-            .computeIfAbsent(entry.getKey(), k -> new TSInsertTabletsReq());
-        updateTSInsertTabletsReq(request, entry.getValue(), sorted);
-      }
-      EndPoint endPoint;
-      //TODO parallel
-      for (Entry<String, TSInsertTabletsReq> entry : tabletGroup.entrySet()) {
-        try {
-          if ((endPoint = deviceIdToEndpoint.get(entry.getKey())) != null) {
-            endPointToSessionConnection.get(endPoint).insertTablets(entry.getValue());
-          } else {
-            defaultSessionConnection.insertTablets(entry.getValue());
-          }
-        } catch (RedirectException e) {
-          handleRedirection(entry.getKey(), e);
-        }
-      }
+      insertTabletsWithLeaderCache(tablets, sorted);
     } else {
       TSInsertTabletsReq request = genTSInsertTabletsReq(new ArrayList<>(tablets.values()), sorted);
       try {
@@ -718,6 +710,42 @@ public class Session {
     }
   }
 
+  private void insertTabletsWithLeaderCache(Map<String, Tablet> tablets, boolean sorted) throws
+      IoTDBConnectionException, StatementExecutionException {
+    EndPoint endPoint;
+    SessionConnection connection;
+    Map<SessionConnection, TSInsertTabletsReq> tabletGroup = new HashMap<>();
+    for (Entry<String, Tablet> entry : tablets.entrySet()) {
+      endPoint = deviceIdToEndpoint.get(entry.getKey());
+      if (endPoint != null) {
+        connection = endPointToSessionConnection.get(endPoint);
+      } else {
+        connection = defaultSessionConnection;
+      }
+      TSInsertTabletsReq request = tabletGroup
+          .computeIfAbsent(connection, k -> new TSInsertTabletsReq());
+      updateTSInsertTabletsReq(request, entry.getValue(), sorted);
+    }
+
+    //TODO parallel
+    StringBuilder errMsgBuilder = new StringBuilder();
+    for (Entry<SessionConnection, TSInsertTabletsReq> entry : tabletGroup.entrySet()) {
+      try {
+        entry.getKey().insertTablets(entry.getValue());
+      } catch (RedirectException e) {
+        for (Entry<String, EndPoint> deviceEndPointEntry : e.getDeviceEndPointMap().entrySet()) {
+          handleRedirection(deviceEndPointEntry.getKey(), deviceEndPointEntry.getValue());
+        }
+      } catch (StatementExecutionException e) {
+        errMsgBuilder.append(e.getMessage());
+      }
+    }
+    String errMsg = errMsgBuilder.toString();
+    if (!errMsg.isEmpty()) {
+      throw new StatementExecutionException(errMsg);
+    }
+  }
+
   private TSInsertTabletsReq genTSInsertTabletsReq(List<Tablet> tablets, boolean sorted)
       throws BatchExecutionException {
     TSInsertTabletsReq request = new TSInsertTabletsReq();
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 62ea7e1..d5ed5eb 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -66,7 +66,8 @@ public class SessionConnection {
   private ZoneId zoneId;
   private EndPoint endPoint;
 
-  public SessionConnection(Session session, EndPoint endPoint, ZoneId zoneId) throws IoTDBConnectionException {
+  public SessionConnection(Session session, EndPoint endPoint, ZoneId zoneId)
+      throws IoTDBConnectionException {
     this.session = session;
     this.endPoint = endPoint;
     this.zoneId = zoneId == null ? ZoneId.systemDefault() : zoneId;
@@ -434,7 +435,7 @@ public class SessionConnection {
       throws IoTDBConnectionException, StatementExecutionException, RedirectException {
     request.setSessionId(sessionId);
     try {
-      RpcUtils.verifySuccessWithRedirection(client.insertTablets(request));
+      RpcUtils.verifySuccessWithRedirectionForInsertTablets(client.insertTablets(request), request);
     } catch (TException e) {
       if (reconnect()) {
         try {