You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2021/09/15 03:18:47 UTC

[iotdb] branch master updated: [IOTDB-1484]Auto create schema functionality with e2e testing in cluster (#3879)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 255bc61  [IOTDB-1484]Auto create schema functionality with e2e testing in cluster (#3879)
255bc61 is described below

commit 255bc619e650b3123989138e00ff6e985c8287df
Author: Mrquan <50...@users.noreply.github.com>
AuthorDate: Wed Sep 15 11:18:24 2021 +0800

    [IOTDB-1484]Auto create schema functionality with e2e testing in cluster (#3879)
---
 .../iotdb/cluster/log/applier/BaseApplier.java     |  88 +++++++--
 .../iotdb/cluster/log/applier/DataLogApplier.java  |  70 +++++--
 .../apache/iotdb/cluster/metadata/CMManager.java   |  27 ++-
 .../cluster/server/member/DataGroupMember.java     | 100 ++++++----
 .../cluster/log/applier/DataLogApplierTest.java    |   9 +-
 docs/UserGuide/Cluster/Cluster-Setup-Example.md    |   2 +-
 docs/zh/UserGuide/Cluster/Cluster-Setup-Example.md |   2 +-
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |  11 +-
 .../org/apache/iotdb/db/qp/physical/BatchPlan.java |   9 +
 .../db/qp/physical/crud/InsertMultiTabletPlan.java |  19 +-
 .../iotdb/db/qp/physical/crud/InsertRowPlan.java   |   8 +-
 .../physical/crud/InsertRowsOfOneDevicePlan.java   |  13 +-
 .../iotdb/db/qp/physical/crud/InsertRowsPlan.java  |  15 +-
 .../qp/physical/sys/CreateMultiTimeSeriesPlan.java |  12 +-
 .../java/org/apache/iotdb/session/Session.java     |   8 +
 .../test/java/org/apache/iotdb/db/sql/Cases.java   | 220 +++++++++++++++++++++
 .../java/org/apache/iotdb/db/sql/ClusterIT.java    |   9 +-
 17 files changed, 511 insertions(+), 111 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java
index 4fbc628..cf99c8f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.cluster.query.ClusterPlanExecutor;
 import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 import org.apache.iotdb.cluster.server.member.DataGroupMember;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.BatchProcessException;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -93,6 +94,47 @@ abstract class BaseApplier implements LogApplier {
     }
   }
 
+  private void handleBatchProcessException(
+      BatchProcessException e, InsertPlan plan, DataGroupMember dataGroupMember)
+      throws QueryProcessException, StorageGroupNotSetException, StorageEngineException {
+    if (IoTDBDescriptor.getInstance().getConfig().isEnablePartition()) {
+      TSStatus[] failingStatus = e.getFailingStatus();
+      for (int i = 0; i < failingStatus.length; i++) {
+        TSStatus status = failingStatus[i];
+        // skip succeeded plans in later execution
+        if (status != null
+            && status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+            && plan instanceof BatchPlan) {
+          ((BatchPlan) plan).setIsExecuted(i);
+        }
+      }
+
+      boolean needRetry = false, hasError = false;
+      for (int i = 0, failingStatusLength = failingStatus.length; i < failingStatusLength; i++) {
+        TSStatus status = failingStatus[i];
+        if (status != null) {
+          if (status.getCode() == TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode()
+              && plan instanceof BatchPlan) {
+            ((BatchPlan) plan).unsetIsExecuted(i);
+            needRetry = true;
+          } else if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+            hasError = true;
+          }
+        }
+      }
+      if (hasError) {
+        throw e;
+      }
+      if (needRetry) {
+        pullTimeseriesSchema(plan, dataGroupMember.getHeader());
+        plan.recoverFromFailure();
+        getQueryExecutor().processNonQuery(plan);
+      }
+    } else {
+      throw e;
+    }
+  }
+
   private void handleBatchProcessException(BatchProcessException e, PhysicalPlan plan)
       throws QueryProcessException, StorageEngineException, StorageGroupNotSetException {
     TSStatus[] failingStatus = e.getFailingStatus();
@@ -159,24 +201,26 @@ abstract class BaseApplier implements LogApplier {
       throws QueryProcessException, StorageGroupNotSetException, StorageEngineException {
     try {
       getQueryExecutor().processNonQuery(plan);
+    } catch (BatchProcessException e) {
+      handleBatchProcessException(e, plan, dataGroupMember);
     } catch (QueryProcessException | StorageGroupNotSetException | StorageEngineException e) {
-      // check if this is caused by metadata missing, if so, pull metadata and retry
-      Throwable metaMissingException = SchemaUtils.findMetaMissingException(e);
-      boolean causedByPathNotExist = metaMissingException instanceof PathNotExistException;
-
-      if (causedByPathNotExist) {
-        if (logger.isDebugEnabled()) {
-          logger.debug(
-              "Timeseries is not found locally[{}], try pulling it from another group: {}",
-              metaGroupMember.getName(),
-              e.getCause().getMessage());
-        }
-        pullTimeseriesSchema(plan, dataGroupMember.getHeader());
-        plan.recoverFromFailure();
-        getQueryExecutor().processNonQuery(plan);
-      } else {
-        throw e;
-      }
+      if (IoTDBDescriptor.getInstance().getConfig().isEnablePartition()) {
+        // check if this is caused by metadata missing, if so, pull metadata and retry
+        Throwable metaMissingException = SchemaUtils.findMetaMissingException(e);
+        boolean causedByPathNotExist = metaMissingException instanceof PathNotExistException;
+
+        if (causedByPathNotExist) {
+          if (logger.isDebugEnabled()) {
+            logger.debug(
+                "Timeseries is not found locally[{}], try pulling it from another group: {}",
+                metaGroupMember.getName(),
+                e.getCause().getMessage());
+          }
+          pullTimeseriesSchema(plan, dataGroupMember.getHeader());
+          plan.recoverFromFailure();
+          getQueryExecutor().processNonQuery(plan);
+        } else throw e;
+      } else throw e;
     }
   }
 
@@ -188,8 +232,14 @@ abstract class BaseApplier implements LogApplier {
   private void pullTimeseriesSchema(InsertPlan plan, RaftNode ignoredGroup)
       throws QueryProcessException {
     try {
-      PartialPath path = plan.getPrefixPath();
-      MetaPuller.getInstance().pullTimeSeriesSchemas(Collections.singletonList(path), ignoredGroup);
+      if (plan instanceof BatchPlan) {
+        MetaPuller.getInstance()
+            .pullTimeSeriesSchemas(((BatchPlan) plan).getPrefixPaths(), ignoredGroup);
+      } else {
+        PartialPath path = plan.getPrefixPath();
+        MetaPuller.getInstance()
+            .pullTimeSeriesSchemas(Collections.singletonList(path), ignoredGroup);
+      }
     } catch (MetadataException e1) {
       throw new QueryProcessException(e1);
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
index bd1b7ab..c0ff907 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
@@ -54,7 +54,7 @@ public class DataLogApplier extends BaseApplier {
 
   private static final Logger logger = LoggerFactory.getLogger(DataLogApplier.class);
 
-  private DataGroupMember dataGroupMember;
+  protected DataGroupMember dataGroupMember;
 
   public DataLogApplier(MetaGroupMember metaGroupMember, DataGroupMember dataGroupMember) {
     super(metaGroupMember);
@@ -81,21 +81,7 @@ public class DataLogApplier extends BaseApplier {
       } else if (log instanceof PhysicalPlanLog) {
         PhysicalPlanLog physicalPlanLog = (PhysicalPlanLog) log;
         PhysicalPlan plan = physicalPlanLog.getPlan();
-        if (plan instanceof DeletePlan) {
-          ((DeletePlan) plan).setPartitionFilter(dataGroupMember.getTimePartitionFilter());
-        } else if (plan instanceof DeleteTimeSeriesPlan) {
-          ((DeleteTimeSeriesPlan) plan)
-              .setPartitionFilter(dataGroupMember.getTimePartitionFilter());
-        }
-        if (plan instanceof InsertMultiTabletPlan) {
-          applyInsert((InsertMultiTabletPlan) plan);
-        } else if (plan instanceof InsertRowsPlan) {
-          applyInsert((InsertRowsPlan) plan);
-        } else if (plan instanceof InsertPlan) {
-          applyInsert((InsertPlan) plan);
-        } else {
-          applyPhysicalPlan(plan, dataGroupMember);
-        }
+        applyPhysicalPlan(plan);
       } else if (log instanceof CloseFileLog) {
         CloseFileLog closeFileLog = ((CloseFileLog) log);
         StorageEngine.getInstance()
@@ -118,18 +104,66 @@ public class DataLogApplier extends BaseApplier {
     }
   }
 
+  public void applyPhysicalPlan(PhysicalPlan plan)
+      throws QueryProcessException, StorageGroupNotSetException, StorageEngineException {
+    if (plan instanceof DeletePlan) {
+      ((DeletePlan) plan).setPartitionFilter(dataGroupMember.getTimePartitionFilter());
+    } else if (plan instanceof DeleteTimeSeriesPlan) {
+      ((DeleteTimeSeriesPlan) plan).setPartitionFilter(dataGroupMember.getTimePartitionFilter());
+    }
+    if (plan instanceof InsertMultiTabletPlan) {
+      applyInsert((InsertMultiTabletPlan) plan);
+    } else if (plan instanceof InsertRowsPlan) {
+      applyInsert((InsertRowsPlan) plan);
+    } else if (plan instanceof InsertPlan) {
+      applyInsert((InsertPlan) plan);
+    } else {
+      applyPhysicalPlan(plan, dataGroupMember);
+    }
+  }
+
   private void applyInsert(InsertMultiTabletPlan plan)
       throws StorageGroupNotSetException, QueryProcessException, StorageEngineException {
+    boolean hasSync = false;
     for (InsertTabletPlan insertTabletPlan : plan.getInsertTabletPlanList()) {
-      applyInsert(insertTabletPlan);
+      try {
+        IoTDB.metaManager.getStorageGroupPath(insertTabletPlan.getPrefixPath());
+      } catch (StorageGroupNotSetException e) {
+        try {
+          if (!hasSync) {
+            metaGroupMember.syncLeaderWithConsistencyCheck(true);
+            hasSync = true;
+          } else {
+            throw new StorageEngineException(e.getMessage());
+          }
+        } catch (CheckConsistencyException ce) {
+          throw new QueryProcessException(ce.getMessage());
+        }
+      }
     }
+    applyPhysicalPlan(plan, dataGroupMember);
   }
 
   private void applyInsert(InsertRowsPlan plan)
       throws StorageGroupNotSetException, QueryProcessException, StorageEngineException {
+    boolean hasSync = false;
     for (InsertRowPlan insertRowPlan : plan.getInsertRowPlanList()) {
-      applyInsert(insertRowPlan);
+      try {
+        IoTDB.metaManager.getStorageGroupPath(insertRowPlan.getPrefixPath());
+      } catch (StorageGroupNotSetException e) {
+        try {
+          if (!hasSync) {
+            metaGroupMember.syncLeaderWithConsistencyCheck(true);
+            hasSync = true;
+          } else {
+            throw new StorageEngineException(e.getMessage());
+          }
+        } catch (CheckConsistencyException ce) {
+          throw new QueryProcessException(ce.getMessage());
+        }
+      }
     }
+    applyPhysicalPlan(plan, dataGroupMember);
   }
 
   private void applyInsert(InsertPlan plan)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
index 44172df..e5f056d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
@@ -52,12 +52,7 @@ import org.apache.iotdb.db.metadata.utils.MetaUtils;
 import org.apache.iotdb.db.qp.constant.SQLConstant;
 import org.apache.iotdb.db.qp.physical.BatchPlan;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
-import org.apache.iotdb.db.qp.physical.crud.SetSchemaTemplatePlan;
+import org.apache.iotdb.db.qp.physical.crud.*;
 import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
@@ -679,6 +674,22 @@ public class CMManager extends MManager {
     return allSuccess;
   }
 
+  public boolean createTimeseries(InsertRowsOfOneDevicePlan insertRowsOfOneDevicePlan)
+      throws CheckConsistencyException, IllegalPathException {
+    boolean allSuccess = true;
+    for (InsertRowPlan insertRowPlan : insertRowsOfOneDevicePlan.getRowPlans()) {
+      boolean success = createTimeseries(insertRowPlan);
+      allSuccess = allSuccess && success;
+      if (!success) {
+        logger.error(
+            "create timeseries for device={} failed, plan={}",
+            insertRowPlan.getPrefixPath(),
+            insertRowPlan);
+      }
+    }
+    return allSuccess;
+  }
+
   /**
    * Create timeseries automatically for an InsertPlan.
    *
@@ -695,6 +706,10 @@ public class CMManager extends MManager {
       return createTimeseries((InsertRowsPlan) insertPlan);
     }
 
+    if (insertPlan instanceof InsertRowsOfOneDevicePlan) {
+      return createTimeseries((InsertRowsOfOneDevicePlan) insertPlan);
+    }
+
     List<String> seriesList = new ArrayList<>();
     PartialPath deviceId = insertPlan.getPrefixPath();
     PartialPath storageGroupName;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 39b4d8d..535a86c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -79,18 +79,17 @@ import org.apache.iotdb.cluster.utils.StatusUtils;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.TimePartitionFilter;
+import org.apache.iotdb.db.exception.BatchProcessException;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
-import org.apache.iotdb.db.exception.metadata.UndefinedTemplateException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.executor.PlanExecutor;
+import org.apache.iotdb.db.qp.physical.BatchPlan;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.qp.physical.crud.*;
 import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
 import org.apache.iotdb.db.qp.physical.sys.LogPlan;
 import org.apache.iotdb.db.service.IoTDB;
@@ -162,6 +161,7 @@ public class DataGroupMember extends RaftMember {
 
   private LocalQueryExecutor localQueryExecutor;
 
+  LogApplier dataLogApplier;
   /**
    * When a new partition table is installed, all data members will be checked if unchanged. If not,
    * such members will be removed.
@@ -202,13 +202,14 @@ public class DataGroupMember extends RaftMember {
     allNodes = nodes;
     setQueryManager(new ClusterQueryManager());
     slotManager = new SlotManager(ClusterConstant.SLOT_NUM, getMemberDir(), getName());
-    LogApplier applier = new DataLogApplier(metaGroupMember, this);
-    if (ClusterDescriptor.getInstance().getConfig().isUseAsyncApplier()) {
-      applier = new AsyncDataLogApplier(applier, name);
+    dataLogApplier = new DataLogApplier(metaGroupMember, this);
+    if (ClusterDescriptor.getInstance().getConfig().isUseAsyncApplier()
+        && ClusterDescriptor.getInstance().getConfig().getReplicationNum() != 1) {
+      dataLogApplier = new AsyncDataLogApplier(dataLogApplier, name);
     }
     logManager =
         new FilePartitionedSnapshotLogManager(
-            applier, metaGroupMember.getPartitionTable(), allNodes.get(0), thisNode, this);
+            dataLogApplier, metaGroupMember.getPartitionTable(), allNodes.get(0), thisNode, this);
     initPeerMap();
     term.set(logManager.getHardState().getCurrentTerm());
     voteFor = logManager.getHardState().getVoteFor();
@@ -696,40 +697,35 @@ public class DataGroupMember extends RaftMember {
           }
           handleChangeMembershipLogWithoutRaft(log);
         } else {
-          getLocalExecutor().processNonQuery(plan);
+          ((DataLogApplier) dataLogApplier).applyPhysicalPlan(plan);
         }
         return StatusUtils.OK;
       } catch (Exception e) {
         Throwable cause = IOUtils.getRootCause(e);
-        if (cause instanceof StorageGroupNotSetException
-            || cause instanceof UndefinedTemplateException) {
-          try {
-            metaGroupMember.syncLeaderWithConsistencyCheck(true);
-            if (plan instanceof InsertPlan && ((InsertPlan) plan).getFailedMeasurements() != null) {
-              ((InsertPlan) plan).recoverFromFailure();
-            }
-            getLocalExecutor().processNonQuery(plan);
-            return StatusUtils.OK;
-          } catch (CheckConsistencyException ce) {
-            return StatusUtils.getStatus(StatusUtils.CONSISTENCY_FAILURE, ce.getMessage());
-          } catch (Exception ne) {
-            return handleLogExecutionException(plan, IOUtils.getRootCause(ne));
-          }
-        }
-        if (cause instanceof PathNotExistException) {
-          boolean hasCreated = false;
-          try {
-            if (plan instanceof InsertPlan
-                && ClusterDescriptor.getInstance().getConfig().isEnableAutoCreateSchema()) {
+        boolean hasCreated = false;
+        try {
+          if (plan instanceof InsertPlan
+              && ClusterDescriptor.getInstance().getConfig().isEnableAutoCreateSchema()) {
+            if (plan instanceof InsertRowsPlan || plan instanceof InsertMultiTabletPlan) {
+              if (e instanceof BatchProcessException) {
+                for (TSStatus status : ((BatchProcessException) e).getFailingStatus()) {
+                  if (status.getCode() == TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode()) {
+                    hasCreated = createTimeseriesForFailedInsertion(((InsertPlan) plan));
+                    ((BatchPlan) plan).getResults().clear();
+                    break;
+                  }
+                }
+              }
+            } else if (cause instanceof PathNotExistException) {
               hasCreated = createTimeseriesForFailedInsertion(((InsertPlan) plan));
             }
-          } catch (MetadataException | CheckConsistencyException ex) {
-            logger.error("{}: Cannot auto-create timeseries for {}", name, plan, e);
-            return StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR, ex.getMessage());
-          }
-          if (hasCreated) {
-            return executeNonQueryPlan(plan);
           }
+        } catch (MetadataException | CheckConsistencyException ex) {
+          logger.error("{}: Cannot auto-create timeseries for {}", name, plan, e);
+          return StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR, ex.getMessage());
+        }
+        if (hasCreated) {
+          return executeNonQueryPlan(plan);
         }
         return handleLogExecutionException(plan, cause);
       }
@@ -776,14 +772,28 @@ public class DataGroupMember extends RaftMember {
       boolean hasCreated = false;
       try {
         if (plan instanceof InsertPlan
-            && status.getCode() == TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode()
             && ClusterDescriptor.getInstance().getConfig().isEnableAutoCreateSchema()) {
-          hasCreated = createTimeseriesForFailedInsertion(((InsertPlan) plan));
+          if (plan instanceof InsertRowsPlan || plan instanceof InsertMultiTabletPlan) {
+            if (status.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+              for (TSStatus tmpStatus : status.getSubStatus()) {
+                if (tmpStatus.getCode() == TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode()) {
+                  hasCreated = createTimeseriesForFailedInsertion(((InsertPlan) plan));
+                  ((BatchPlan) plan).getResults().clear();
+                  break;
+                }
+              }
+            }
+          } else {
+            if (status.getCode() == TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode()) {
+              hasCreated = createTimeseriesForFailedInsertion(((InsertPlan) plan));
+            }
+          }
         }
       } catch (MetadataException | CheckConsistencyException e) {
         logger.error("{}: Cannot auto-create timeseries for {}", name, plan, e);
         return StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR, e.getMessage());
       }
+
       if (hasCreated) {
         status = processPlanLocally(plan);
       }
@@ -816,6 +826,22 @@ public class DataGroupMember extends RaftMember {
       }
     }
 
+    if (plan instanceof InsertRowsPlan) {
+      for (InsertRowPlan insertPlan : ((InsertRowsPlan) plan).getInsertRowPlanList()) {
+        if (insertPlan.getFailedMeasurements() != null) {
+          insertPlan.getPlanFromFailed();
+        }
+      }
+    }
+
+    if (plan instanceof InsertRowsOfOneDevicePlan) {
+      for (InsertRowPlan insertPlan : ((InsertRowsOfOneDevicePlan) plan).getRowPlans()) {
+        if (insertPlan.getFailedMeasurements() != null) {
+          insertPlan.getPlanFromFailed();
+        }
+      }
+    }
+
     if (plan.getFailedMeasurements() != null) {
       plan.getPlanFromFailed();
     }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
index d362fa1..0831b51 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
@@ -101,7 +101,7 @@ public class DataLogApplierTest extends IoTDBTest {
 
   private static final Logger logger = LoggerFactory.getLogger(DataLogApplierTest.class);
   private boolean partialWriteEnabled;
-
+  private boolean isPartitionEnabled;
   private TestMetaGroupMember testMetaGroupMember =
       new TestMetaGroupMember() {
         @Override
@@ -177,6 +177,8 @@ public class DataLogApplierTest extends IoTDBTest {
     NodeStatusManager.getINSTANCE().setMetaGroupMember(testMetaGroupMember);
     partialWriteEnabled = IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert();
     IoTDBDescriptor.getInstance().getConfig().setEnablePartialInsert(false);
+    isPartitionEnabled = IoTDBDescriptor.getInstance().getConfig().isEnablePartition();
+    IoTDBDescriptor.getInstance().getConfig().setEnablePartition(true);
     testMetaGroupMember.setClientProvider(
         new DataClientProvider(new Factory()) {
           @Override
@@ -206,6 +208,10 @@ public class DataLogApplierTest extends IoTDBTest {
                               for (int i = 0; i < 10; i++) {
                                 timeseriesSchemas.add(TestUtils.getTestTimeSeriesSchema(4, i));
                               }
+                            } else if (path.startsWith(TestUtils.getTestSg(1))
+                                || path.startsWith(TestUtils.getTestSg(2))
+                                || path.startsWith(TestUtils.getTestSg(3))) {
+                              // do nothing
                             } else if (!path.startsWith(TestUtils.getTestSg(5))) {
                               resultHandler.onError(new StorageGroupNotSetException(path));
                               return;
@@ -257,6 +263,7 @@ public class DataLogApplierTest extends IoTDBTest {
     super.tearDown();
     NodeStatusManager.getINSTANCE().setMetaGroupMember(null);
     IoTDBDescriptor.getInstance().getConfig().setEnablePartialInsert(partialWriteEnabled);
+    IoTDBDescriptor.getInstance().getConfig().setEnablePartition(isPartitionEnabled);
   }
 
   @Test
diff --git a/docs/UserGuide/Cluster/Cluster-Setup-Example.md b/docs/UserGuide/Cluster/Cluster-Setup-Example.md
index 7abf211..81b9ee3 100644
--- a/docs/UserGuide/Cluster/Cluster-Setup-Example.md
+++ b/docs/UserGuide/Cluster/Cluster-Setup-Example.md
@@ -200,7 +200,7 @@ default\_replica\_num = 3
 
 internal\_ip = *A\_private\_Ip* (or *B\_private\_Ip*, *C\_private\_Ip*)
 
-***iotdb-enginer.properties***
+***iotdb-engine.properties***
 
 rpc\_address = *A\_public\_Ip* (or *B\_private\_Ip*, *C\_public\_Ip*)
 
diff --git a/docs/zh/UserGuide/Cluster/Cluster-Setup-Example.md b/docs/zh/UserGuide/Cluster/Cluster-Setup-Example.md
index 92adb4e..78889a7 100644
--- a/docs/zh/UserGuide/Cluster/Cluster-Setup-Example.md
+++ b/docs/zh/UserGuide/Cluster/Cluster-Setup-Example.md
@@ -202,7 +202,7 @@ default\_replica\_num = 3
 
 internal\_ip = *A\_private\_Ip* (or *B\_private\_Ip*, *C\_private\_Ip*)
 
-***iotdb-enginer.properties***
+***iotdb-engine.properties***
 
 rpc\_address = *A\_public\_Ip* (or *B\_private\_Ip*, *C\_public\_Ip*)
 
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index a7bba20..3b1b12e 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -1459,7 +1459,16 @@ public class PlanExecutor implements IPlanExecutor {
           || insertMultiTabletPlan.isExecuted(i)) {
         continue;
       }
-      insertTablet(insertMultiTabletPlan.getInsertTabletPlanList().get(i));
+      try {
+        insertTablet(insertMultiTabletPlan.getInsertTabletPlanList().get(i));
+      } catch (QueryProcessException e) {
+        insertMultiTabletPlan
+            .getResults()
+            .put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
+      }
+    }
+    if (!insertMultiTabletPlan.getResults().isEmpty()) {
+      throw new BatchProcessException(insertMultiTabletPlan.getFailingStatus());
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/BatchPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/BatchPlan.java
index 318b628..493af85 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/BatchPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/BatchPlan.java
@@ -19,8 +19,10 @@
 
 package org.apache.iotdb.db.qp.physical;
 
+import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
+import java.util.List;
 import java.util.Map;
 
 /** BatchPlan contains multiple sub-plans. */
@@ -61,4 +63,11 @@ public interface BatchPlan {
    * @return execution status for each path
    */
   Map<Integer, TSStatus> getResults();
+
+  /**
+   * Return prefix paths of all sub-plans
+   *
+   * @return prefix paths of all sub-plans
+   */
+  List<PartialPath> getPrefixPaths();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java
index 59d5ae2..e593cc9 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java
@@ -23,16 +23,13 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
 import org.apache.iotdb.db.qp.physical.BatchPlan;
+import org.apache.iotdb.db.utils.StatusUtils;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.TreeMap;
+import java.util.*;
 
 /**
  * Mainly used in the distributed version, when multiple InsertTabletPlans belong to a raft
@@ -126,6 +123,14 @@ public class InsertMultiTabletPlan extends InsertPlan implements BatchPlan {
     return result;
   }
 
+  public List<PartialPath> getPrefixPaths() {
+    Set<PartialPath> result = new HashSet<>();
+    for (InsertTabletPlan insertTabletPlan : insertTabletPlanList) {
+      result.add(insertTabletPlan.getPrefixPath());
+    }
+    return new ArrayList<>(result);
+  }
+
   @Override
   public long getMinTime() {
     long minTime = Long.MAX_VALUE;
@@ -226,6 +231,10 @@ public class InsertMultiTabletPlan extends InsertPlan implements BatchPlan {
     return insertTabletPlanList;
   }
 
+  public TSStatus[] getFailingStatus() {
+    return StatusUtils.getFailingStatus(results, insertTabletPlanList.size());
+  }
+
   public void setResults(Map<Integer, TSStatus> results) {
     this.results = results;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
index 40ef6ca..c8d4d73 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
@@ -324,7 +324,9 @@ public class InsertRowPlan extends InsertPlan {
     }
     failedValues.add(values[index]);
     values[index] = null;
-    dataTypes[index] = null;
+    if (isNeedInferType) {
+      dataTypes[index] = null;
+    }
   }
 
   @Override
@@ -412,7 +414,7 @@ public class InsertRowPlan extends InsertPlan {
       // and is forwarded to other nodes
       if (dataTypes == null || dataTypes[i] == null) {
         ReadWriteIOUtils.write(TYPE_RAW_STRING, outputStream);
-        ReadWriteIOUtils.write((String) values[i], outputStream);
+        ReadWriteIOUtils.write(values[i].toString(), outputStream);
       } else {
         ReadWriteIOUtils.write(dataTypes[i], outputStream);
         switch (dataTypes[i]) {
@@ -451,7 +453,7 @@ public class InsertRowPlan extends InsertPlan {
       // and is forwarded to other nodes
       if (dataTypes == null || dataTypes[i] == null) {
         ReadWriteIOUtils.write(TYPE_RAW_STRING, buffer);
-        ReadWriteIOUtils.write((String) values[i], buffer);
+        ReadWriteIOUtils.write(values[i].toString(), buffer);
       } else {
         ReadWriteIOUtils.write(dataTypes[i], buffer);
         switch (dataTypes[i]) {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
index 64a8c11..d69945c 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
@@ -28,13 +28,7 @@ import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 
 public class InsertRowsOfOneDevicePlan extends InsertPlan implements BatchPlan {
 
@@ -235,6 +229,11 @@ public class InsertRowsOfOneDevicePlan extends InsertPlan implements BatchPlan {
   }
 
   @Override
+  public List<PartialPath> getPrefixPaths() {
+    return Collections.singletonList(this.prefixPath);
+  }
+
+  @Override
   public int getBatchSize() {
     return rowPlans.length;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsPlan.java
index 03d3c49..f67e742 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsPlan.java
@@ -29,11 +29,7 @@ import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
+import java.util.*;
 
 public class InsertRowsPlan extends InsertPlan implements BatchPlan {
 
@@ -84,6 +80,15 @@ public class InsertRowsPlan extends InsertPlan implements BatchPlan {
   }
 
   @Override
+  public List<PartialPath> getPrefixPaths() {
+    Set<PartialPath> result = new HashSet<>();
+    for (InsertRowPlan insertRowPlan : insertRowPlanList) {
+      result.add(insertRowPlan.getPrefixPath());
+    }
+    return new ArrayList<>(result);
+  }
+
+  @Override
   public void checkIntegrity() throws QueryProcessException {
     if (insertRowPlanList.isEmpty()) {
       throw new QueryProcessException("sub plan are empty.");
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java
index 1720945..b0b9e8a 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java
@@ -34,12 +34,7 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.TreeMap;
+import java.util.*;
 
 /**
  * create multiple timeSeries, could be split to several sub Plans to execute in different DataGroup
@@ -144,6 +139,11 @@ public class CreateMultiTimeSeriesPlan extends PhysicalPlan implements BatchPlan
     return results;
   }
 
+  @Override
+  public List<PartialPath> getPrefixPaths() {
+    return Collections.emptyList();
+  }
+
   public TSStatus[] getFailingStatus() {
     return StatusUtils.getFailingStatus(results, paths.size());
   }
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 2336142..e6dd440 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -1912,6 +1912,14 @@ public class Session {
     this.enableQueryRedirection = enableQueryRedirection;
   }
 
+  public boolean isEnableCacheLeader() {
+    return enableCacheLeader;
+  }
+
+  public void setEnableCacheLeader(boolean enableCacheLeader) {
+    this.enableCacheLeader = enableCacheLeader;
+  }
+
   public static class Builder {
     private String host = Config.DEFAULT_HOST;
     private int rpcPort = Config.DEFAULT_PORT;
diff --git a/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java b/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java
index cd48873..7c9f08f 100644
--- a/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java
+++ b/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java
@@ -34,6 +34,9 @@ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -42,6 +45,7 @@ import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.*;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
@@ -610,4 +614,220 @@ public abstract class Cases {
 
     writeStatement.execute(setWritable);
   }
+
+  @Test
+  public void testAutoCreateSchemaInClusterMode()
+      throws IoTDBConnectionException, StatementExecutionException, SQLException {
+    List<String> measurementList = new ArrayList<>();
+    measurementList.add("s1");
+    measurementList.add("s2");
+    measurementList.add("s3");
+
+    List<TSDataType> typeList = new ArrayList<>();
+    typeList.add(TSDataType.INT64);
+    typeList.add(TSDataType.INT64);
+    typeList.add(TSDataType.INT64);
+
+    List<Object> valueList = new ArrayList<>();
+    valueList.add(1L);
+    valueList.add(2L);
+    valueList.add(3L);
+
+    for (int i = 0; i < 5; i++) {
+      String sg = "root.sg" + String.valueOf(i);
+      session.setStorageGroup(sg);
+      for (int j = 0; j < 10; j++) {
+        session.createTimeseries(
+            String.format("%s.d1.s%s", sg, j),
+            TSDataType.INT64,
+            TSEncoding.RLE,
+            CompressionType.SNAPPY);
+        session.createTimeseries(
+            String.format("%s.d2.s%s", sg, j),
+            TSDataType.INT64,
+            TSEncoding.RLE,
+            CompressionType.SNAPPY);
+        session.createTimeseries(
+            String.format("%s.d3.s%s", sg, j),
+            TSDataType.INT64,
+            TSEncoding.RLE,
+            CompressionType.SNAPPY);
+        session.createTimeseries(
+            String.format("%s.d4.s%s", sg, j),
+            TSDataType.INT64,
+            TSEncoding.RLE,
+            CompressionType.SNAPPY);
+      }
+    }
+
+    // step 1: insert into existing time series.
+    for (int i = 0; i < 5; i++) {
+      for (long t = 0; t < 3; t++) {
+        session.insertRecord(
+            String.format("root.sg%s.d1", i), t, measurementList, typeList, 1L, 2L, 3L);
+      }
+    }
+
+    List<List<String>> measurementsList = new ArrayList<>();
+    List<List<Object>> valuesList = new ArrayList<>();
+    List<List<TSDataType>> typesList = new ArrayList<>();
+    List<String> deviceList = new ArrayList<>();
+    for (int i = 0; i < 5; i++) {
+      String devicePath = String.format("root.sg%s.d2", i);
+      deviceList.add(devicePath);
+      typesList.add(typeList);
+      measurementsList.add(measurementList);
+      valuesList.add(valueList);
+    }
+
+    for (long t = 0; t < 3; t++) {
+      List<Long> timeList = new ArrayList<>();
+      for (int i = 0; i < 5; i++) {
+        timeList.add(t);
+      }
+      session.insertRecords(deviceList, timeList, measurementsList, typesList, valuesList);
+    }
+
+    List<IMeasurementSchema> schemaList = new ArrayList<>();
+    schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
+    schemaList.add(new MeasurementSchema("s2", TSDataType.INT64));
+    schemaList.add(new MeasurementSchema("s3", TSDataType.INT64));
+
+    Map<String, Tablet> tabletMap = new HashMap<>();
+    for (int i = 0; i < 5; i++) {
+      Tablet tablet = new Tablet(String.format("root.sg%s.d3", i), schemaList, 10);
+      for (long row = 0; row < 3; row++) {
+        int rowIndex = tablet.rowSize++;
+        tablet.addTimestamp(rowIndex, row);
+        tablet.addValue("s1", rowIndex, 1L);
+        tablet.addValue("s2", rowIndex, 2L);
+        tablet.addValue("s3", rowIndex, 3L);
+      }
+      session.insertTablet(tablet);
+      tablet.setPrefixPath(String.format("root.sg%s.d4", i));
+      tabletMap.put(String.format("root.sg%s.d4", i), tablet);
+    }
+
+    session.insertTablets(tabletMap);
+
+    // step 2: test auto create sg and time series schema
+    for (int i = 5; i < 10; i++) {
+      for (long t = 0; t < 3; t++) {
+        session.insertRecord(
+            String.format("root.sg%s.d1", i), t, measurementList, typeList, 1L, 2L, 3L);
+      }
+    }
+
+    deviceList.clear();
+    for (int i = 5; i < 10; i++) {
+      String device_path = String.format("root.sg%s.d2", i);
+      deviceList.add(device_path);
+    }
+
+    for (long t = 0; t < 3; t++) {
+      List<Long> timeList = new ArrayList<>();
+      for (int i = 0; i < 5; i++) {
+        timeList.add(t);
+      }
+      session.insertRecords(deviceList, timeList, measurementsList, typesList, valuesList);
+    }
+
+    tabletMap.clear();
+    for (int i = 5; i < 10; i++) {
+      Tablet tablet = new Tablet(String.format("root.sg%s.d3", i), schemaList, 10);
+      for (long row = 0; row < 3; row++) {
+        int rowIndex = tablet.rowSize++;
+        tablet.addTimestamp(rowIndex, row);
+        tablet.addValue("s1", rowIndex, 1L);
+        tablet.addValue("s2", rowIndex, 2L);
+        tablet.addValue("s3", rowIndex, 3L);
+      }
+      session.insertTablet(tablet);
+      tablet.setPrefixPath(String.format("root.sg%s.d4", i));
+      tabletMap.put(String.format("root.sg%s.d4", i), tablet);
+    }
+
+    session.insertTablets(tabletMap);
+
+    measurementsList.clear();
+    List<Long> timeList = new ArrayList<>();
+    for (int i = 0; i < 5; i++) {
+      timeList.add((long) i);
+      List<String> measurements = new ArrayList<>();
+      measurements.add(String.format("s%d", i));
+      measurements.add(String.format("s%d", i + 5));
+      measurements.add(String.format("s%d", i + 10));
+      measurementsList.add(measurements);
+    }
+
+    session.insertRecordsOfOneDevice(
+        "root.sg0.d5", timeList, measurementsList, typesList, valuesList);
+    session.insertRecordsOfOneDevice(
+        "root.sg20.d1", timeList, measurementsList, typesList, valuesList);
+
+    for (Statement readStatement : readStatements) {
+      for (int i = 0; i < 10; i++) {
+        for (int d = 1; d <= 4; d++) {
+          ResultSet resultSet =
+              readStatement.executeQuery(String.format("SELECT s1,s2,s3 from root.sg%s.d%s", i, d));
+          for (long t = 0; t < 3; t++) {
+            Assert.assertTrue(resultSet.next());
+            Assert.assertEquals(resultSet.getLong(1), t);
+            Assert.assertEquals(resultSet.getString(2), "1");
+            Assert.assertEquals(resultSet.getString(3), "2");
+            Assert.assertEquals(resultSet.getString(4), "3");
+          }
+        }
+      }
+
+      for (int i = 0; i < 5; i++) {
+        ResultSet resultSet =
+            readStatement.executeQuery(
+                String.format("select s%d,s%d,s%d from root.sg0.d5", i, i + 5, i + 10));
+        Assert.assertTrue(resultSet.next());
+        Assert.assertEquals(resultSet.getLong(1), i);
+        Assert.assertEquals(resultSet.getString(2), "1");
+        Assert.assertEquals(resultSet.getString(3), "2");
+        Assert.assertEquals(resultSet.getString(4), "3");
+
+        resultSet =
+            readStatement.executeQuery(
+                String.format("select s%d,s%d,s%d from root.sg20.d1", i, i + 5, i + 10));
+        Assert.assertTrue(resultSet.next());
+        Assert.assertEquals(resultSet.getLong(1), i);
+        Assert.assertEquals(resultSet.getString(2), "1");
+        Assert.assertEquals(resultSet.getString(3), "2");
+        Assert.assertEquals(resultSet.getString(4), "3");
+      }
+    }
+
+    // test create time series
+    for (int i = 0; i < 5; i++) {
+      session.createTimeseries(
+          String.format("root.sg1%s.d1.s1", i),
+          TSDataType.INT64,
+          TSEncoding.RLE,
+          CompressionType.SNAPPY);
+    }
+
+    List<String> path = new ArrayList<>();
+    List<TSDataType> dataTypes = new ArrayList<>();
+    List<TSEncoding> encodings = new ArrayList<>();
+    List<CompressionType> compressionTypes = new ArrayList<>();
+    for (int i = 5; i < 10; i++) {
+      path.add(String.format("root.sg1%s.d1.s1", i));
+      dataTypes.add(TSDataType.INT64);
+      encodings.add(TSEncoding.RLE);
+      compressionTypes.add(CompressionType.SNAPPY);
+    }
+    session.createMultiTimeseries(
+        path, dataTypes, encodings, compressionTypes, null, null, null, null);
+    for (Statement readStatement : readStatements) {
+      for (int i = 0; i < 10; i++) {
+        ResultSet resultSet =
+            readStatement.executeQuery(String.format("show timeseries root.sg1%s.d1.s1", i));
+        Assert.assertTrue(resultSet.next());
+      }
+    }
+  }
 }
diff --git a/testcontainer/src/test/java/org/apache/iotdb/db/sql/ClusterIT.java b/testcontainer/src/test/java/org/apache/iotdb/db/sql/ClusterIT.java
index 003454f..f14dea9 100644
--- a/testcontainer/src/test/java/org/apache/iotdb/db/sql/ClusterIT.java
+++ b/testcontainer/src/test/java/org/apache/iotdb/db/sql/ClusterIT.java
@@ -83,7 +83,14 @@ public abstract class ClusterIT extends Cases {
               "jdbc:iotdb://" + readIps[i] + ":" + readPorts[i], "root", "root");
       readStatements[i] = readConnections[i].createStatement();
     }
-    session = new Session(getWriteRpcIp(), getWriteRpcPort());
+    session =
+        new Session.Builder()
+            .host(getWriteRpcIp())
+            .port(getWriteRpcPort())
+            .username("root")
+            .password("root")
+            .enableCacheLeader(false)
+            .build();
     session.open();
     TimeUnit.MILLISECONDS.sleep(3000);
   }