You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2021/09/15 03:18:47 UTC
[iotdb] branch master updated: [IOTDB-1484]Auto create schema
functionality with e2e testing in cluster (#3879)
This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 255bc61 [IOTDB-1484]Auto create schema functionality with e2e testing in cluster (#3879)
255bc61 is described below
commit 255bc619e650b3123989138e00ff6e985c8287df
Author: Mrquan <50...@users.noreply.github.com>
AuthorDate: Wed Sep 15 11:18:24 2021 +0800
[IOTDB-1484]Auto create schema functionality with e2e testing in cluster (#3879)
---
.../iotdb/cluster/log/applier/BaseApplier.java | 88 +++++++--
.../iotdb/cluster/log/applier/DataLogApplier.java | 70 +++++--
.../apache/iotdb/cluster/metadata/CMManager.java | 27 ++-
.../cluster/server/member/DataGroupMember.java | 100 ++++++----
.../cluster/log/applier/DataLogApplierTest.java | 9 +-
docs/UserGuide/Cluster/Cluster-Setup-Example.md | 2 +-
docs/zh/UserGuide/Cluster/Cluster-Setup-Example.md | 2 +-
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 11 +-
.../org/apache/iotdb/db/qp/physical/BatchPlan.java | 9 +
.../db/qp/physical/crud/InsertMultiTabletPlan.java | 19 +-
.../iotdb/db/qp/physical/crud/InsertRowPlan.java | 8 +-
.../physical/crud/InsertRowsOfOneDevicePlan.java | 13 +-
.../iotdb/db/qp/physical/crud/InsertRowsPlan.java | 15 +-
.../qp/physical/sys/CreateMultiTimeSeriesPlan.java | 12 +-
.../java/org/apache/iotdb/session/Session.java | 8 +
.../test/java/org/apache/iotdb/db/sql/Cases.java | 220 +++++++++++++++++++++
.../java/org/apache/iotdb/db/sql/ClusterIT.java | 9 +-
17 files changed, 511 insertions(+), 111 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java
index 4fbc628..cf99c8f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.cluster.query.ClusterPlanExecutor;
import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
import org.apache.iotdb.cluster.server.member.DataGroupMember;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.BatchProcessException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -93,6 +94,47 @@ abstract class BaseApplier implements LogApplier {
}
}
+ private void handleBatchProcessException(
+ BatchProcessException e, InsertPlan plan, DataGroupMember dataGroupMember)
+ throws QueryProcessException, StorageGroupNotSetException, StorageEngineException {
+ if (IoTDBDescriptor.getInstance().getConfig().isEnablePartition()) {
+ TSStatus[] failingStatus = e.getFailingStatus();
+ for (int i = 0; i < failingStatus.length; i++) {
+ TSStatus status = failingStatus[i];
+ // skip succeeded plans in later execution
+ if (status != null
+ && status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ && plan instanceof BatchPlan) {
+ ((BatchPlan) plan).setIsExecuted(i);
+ }
+ }
+
+ boolean needRetry = false, hasError = false;
+ for (int i = 0, failingStatusLength = failingStatus.length; i < failingStatusLength; i++) {
+ TSStatus status = failingStatus[i];
+ if (status != null) {
+ if (status.getCode() == TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode()
+ && plan instanceof BatchPlan) {
+ ((BatchPlan) plan).unsetIsExecuted(i);
+ needRetry = true;
+ } else if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ hasError = true;
+ }
+ }
+ }
+ if (hasError) {
+ throw e;
+ }
+ if (needRetry) {
+ pullTimeseriesSchema(plan, dataGroupMember.getHeader());
+ plan.recoverFromFailure();
+ getQueryExecutor().processNonQuery(plan);
+ }
+ } else {
+ throw e;
+ }
+ }
+
private void handleBatchProcessException(BatchProcessException e, PhysicalPlan plan)
throws QueryProcessException, StorageEngineException, StorageGroupNotSetException {
TSStatus[] failingStatus = e.getFailingStatus();
@@ -159,24 +201,26 @@ abstract class BaseApplier implements LogApplier {
throws QueryProcessException, StorageGroupNotSetException, StorageEngineException {
try {
getQueryExecutor().processNonQuery(plan);
+ } catch (BatchProcessException e) {
+ handleBatchProcessException(e, plan, dataGroupMember);
} catch (QueryProcessException | StorageGroupNotSetException | StorageEngineException e) {
- // check if this is caused by metadata missing, if so, pull metadata and retry
- Throwable metaMissingException = SchemaUtils.findMetaMissingException(e);
- boolean causedByPathNotExist = metaMissingException instanceof PathNotExistException;
-
- if (causedByPathNotExist) {
- if (logger.isDebugEnabled()) {
- logger.debug(
- "Timeseries is not found locally[{}], try pulling it from another group: {}",
- metaGroupMember.getName(),
- e.getCause().getMessage());
- }
- pullTimeseriesSchema(plan, dataGroupMember.getHeader());
- plan.recoverFromFailure();
- getQueryExecutor().processNonQuery(plan);
- } else {
- throw e;
- }
+ if (IoTDBDescriptor.getInstance().getConfig().isEnablePartition()) {
+ // check if this is caused by metadata missing, if so, pull metadata and retry
+ Throwable metaMissingException = SchemaUtils.findMetaMissingException(e);
+ boolean causedByPathNotExist = metaMissingException instanceof PathNotExistException;
+
+ if (causedByPathNotExist) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "Timeseries is not found locally[{}], try pulling it from another group: {}",
+ metaGroupMember.getName(),
+ e.getCause().getMessage());
+ }
+ pullTimeseriesSchema(plan, dataGroupMember.getHeader());
+ plan.recoverFromFailure();
+ getQueryExecutor().processNonQuery(plan);
+ } else throw e;
+ } else throw e;
}
}
@@ -188,8 +232,14 @@ abstract class BaseApplier implements LogApplier {
private void pullTimeseriesSchema(InsertPlan plan, RaftNode ignoredGroup)
throws QueryProcessException {
try {
- PartialPath path = plan.getPrefixPath();
- MetaPuller.getInstance().pullTimeSeriesSchemas(Collections.singletonList(path), ignoredGroup);
+ if (plan instanceof BatchPlan) {
+ MetaPuller.getInstance()
+ .pullTimeSeriesSchemas(((BatchPlan) plan).getPrefixPaths(), ignoredGroup);
+ } else {
+ PartialPath path = plan.getPrefixPath();
+ MetaPuller.getInstance()
+ .pullTimeSeriesSchemas(Collections.singletonList(path), ignoredGroup);
+ }
} catch (MetadataException e1) {
throw new QueryProcessException(e1);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
index bd1b7ab..c0ff907 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
@@ -54,7 +54,7 @@ public class DataLogApplier extends BaseApplier {
private static final Logger logger = LoggerFactory.getLogger(DataLogApplier.class);
- private DataGroupMember dataGroupMember;
+ protected DataGroupMember dataGroupMember;
public DataLogApplier(MetaGroupMember metaGroupMember, DataGroupMember dataGroupMember) {
super(metaGroupMember);
@@ -81,21 +81,7 @@ public class DataLogApplier extends BaseApplier {
} else if (log instanceof PhysicalPlanLog) {
PhysicalPlanLog physicalPlanLog = (PhysicalPlanLog) log;
PhysicalPlan plan = physicalPlanLog.getPlan();
- if (plan instanceof DeletePlan) {
- ((DeletePlan) plan).setPartitionFilter(dataGroupMember.getTimePartitionFilter());
- } else if (plan instanceof DeleteTimeSeriesPlan) {
- ((DeleteTimeSeriesPlan) plan)
- .setPartitionFilter(dataGroupMember.getTimePartitionFilter());
- }
- if (plan instanceof InsertMultiTabletPlan) {
- applyInsert((InsertMultiTabletPlan) plan);
- } else if (plan instanceof InsertRowsPlan) {
- applyInsert((InsertRowsPlan) plan);
- } else if (plan instanceof InsertPlan) {
- applyInsert((InsertPlan) plan);
- } else {
- applyPhysicalPlan(plan, dataGroupMember);
- }
+ applyPhysicalPlan(plan);
} else if (log instanceof CloseFileLog) {
CloseFileLog closeFileLog = ((CloseFileLog) log);
StorageEngine.getInstance()
@@ -118,18 +104,66 @@ public class DataLogApplier extends BaseApplier {
}
}
+ public void applyPhysicalPlan(PhysicalPlan plan)
+ throws QueryProcessException, StorageGroupNotSetException, StorageEngineException {
+ if (plan instanceof DeletePlan) {
+ ((DeletePlan) plan).setPartitionFilter(dataGroupMember.getTimePartitionFilter());
+ } else if (plan instanceof DeleteTimeSeriesPlan) {
+ ((DeleteTimeSeriesPlan) plan).setPartitionFilter(dataGroupMember.getTimePartitionFilter());
+ }
+ if (plan instanceof InsertMultiTabletPlan) {
+ applyInsert((InsertMultiTabletPlan) plan);
+ } else if (plan instanceof InsertRowsPlan) {
+ applyInsert((InsertRowsPlan) plan);
+ } else if (plan instanceof InsertPlan) {
+ applyInsert((InsertPlan) plan);
+ } else {
+ applyPhysicalPlan(plan, dataGroupMember);
+ }
+ }
+
private void applyInsert(InsertMultiTabletPlan plan)
throws StorageGroupNotSetException, QueryProcessException, StorageEngineException {
+ boolean hasSync = false;
for (InsertTabletPlan insertTabletPlan : plan.getInsertTabletPlanList()) {
- applyInsert(insertTabletPlan);
+ try {
+ IoTDB.metaManager.getStorageGroupPath(insertTabletPlan.getPrefixPath());
+ } catch (StorageGroupNotSetException e) {
+ try {
+ if (!hasSync) {
+ metaGroupMember.syncLeaderWithConsistencyCheck(true);
+ hasSync = true;
+ } else {
+ throw new StorageEngineException(e.getMessage());
+ }
+ } catch (CheckConsistencyException ce) {
+ throw new QueryProcessException(ce.getMessage());
+ }
+ }
}
+ applyPhysicalPlan(plan, dataGroupMember);
}
private void applyInsert(InsertRowsPlan plan)
throws StorageGroupNotSetException, QueryProcessException, StorageEngineException {
+ boolean hasSync = false;
for (InsertRowPlan insertRowPlan : plan.getInsertRowPlanList()) {
- applyInsert(insertRowPlan);
+ try {
+ IoTDB.metaManager.getStorageGroupPath(insertRowPlan.getPrefixPath());
+ } catch (StorageGroupNotSetException e) {
+ try {
+ if (!hasSync) {
+ metaGroupMember.syncLeaderWithConsistencyCheck(true);
+ hasSync = true;
+ } else {
+ throw new StorageEngineException(e.getMessage());
+ }
+ } catch (CheckConsistencyException ce) {
+ throw new QueryProcessException(ce.getMessage());
+ }
+ }
}
+ applyPhysicalPlan(plan, dataGroupMember);
}
private void applyInsert(InsertPlan plan)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
index 44172df..e5f056d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
@@ -52,12 +52,7 @@ import org.apache.iotdb.db.metadata.utils.MetaUtils;
import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.db.qp.physical.BatchPlan;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
-import org.apache.iotdb.db.qp.physical.crud.SetSchemaTemplatePlan;
+import org.apache.iotdb.db.qp.physical.crud.*;
import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
@@ -679,6 +674,22 @@ public class CMManager extends MManager {
return allSuccess;
}
+ public boolean createTimeseries(InsertRowsOfOneDevicePlan insertRowsOfOneDevicePlan)
+ throws CheckConsistencyException, IllegalPathException {
+ boolean allSuccess = true;
+ for (InsertRowPlan insertRowPlan : insertRowsOfOneDevicePlan.getRowPlans()) {
+ boolean success = createTimeseries(insertRowPlan);
+ allSuccess = allSuccess && success;
+ if (!success) {
+ logger.error(
+ "create timeseries for device={} failed, plan={}",
+ insertRowPlan.getPrefixPath(),
+ insertRowPlan);
+ }
+ }
+ return allSuccess;
+ }
+
/**
* Create timeseries automatically for an InsertPlan.
*
@@ -695,6 +706,10 @@ public class CMManager extends MManager {
return createTimeseries((InsertRowsPlan) insertPlan);
}
+ if (insertPlan instanceof InsertRowsOfOneDevicePlan) {
+ return createTimeseries((InsertRowsOfOneDevicePlan) insertPlan);
+ }
+
List<String> seriesList = new ArrayList<>();
PartialPath deviceId = insertPlan.getPrefixPath();
PartialPath storageGroupName;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 39b4d8d..535a86c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -79,18 +79,17 @@ import org.apache.iotdb.cluster.utils.StatusUtils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.TimePartitionFilter;
+import org.apache.iotdb.db.exception.BatchProcessException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
-import org.apache.iotdb.db.exception.metadata.UndefinedTemplateException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.executor.PlanExecutor;
+import org.apache.iotdb.db.qp.physical.BatchPlan;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.qp.physical.crud.*;
import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
import org.apache.iotdb.db.qp.physical.sys.LogPlan;
import org.apache.iotdb.db.service.IoTDB;
@@ -162,6 +161,7 @@ public class DataGroupMember extends RaftMember {
private LocalQueryExecutor localQueryExecutor;
+ LogApplier dataLogApplier;
/**
* When a new partition table is installed, all data members will be checked if unchanged. If not,
* such members will be removed.
@@ -202,13 +202,14 @@ public class DataGroupMember extends RaftMember {
allNodes = nodes;
setQueryManager(new ClusterQueryManager());
slotManager = new SlotManager(ClusterConstant.SLOT_NUM, getMemberDir(), getName());
- LogApplier applier = new DataLogApplier(metaGroupMember, this);
- if (ClusterDescriptor.getInstance().getConfig().isUseAsyncApplier()) {
- applier = new AsyncDataLogApplier(applier, name);
+ dataLogApplier = new DataLogApplier(metaGroupMember, this);
+ if (ClusterDescriptor.getInstance().getConfig().isUseAsyncApplier()
+ && ClusterDescriptor.getInstance().getConfig().getReplicationNum() != 1) {
+ dataLogApplier = new AsyncDataLogApplier(dataLogApplier, name);
}
logManager =
new FilePartitionedSnapshotLogManager(
- applier, metaGroupMember.getPartitionTable(), allNodes.get(0), thisNode, this);
+ dataLogApplier, metaGroupMember.getPartitionTable(), allNodes.get(0), thisNode, this);
initPeerMap();
term.set(logManager.getHardState().getCurrentTerm());
voteFor = logManager.getHardState().getVoteFor();
@@ -696,40 +697,35 @@ public class DataGroupMember extends RaftMember {
}
handleChangeMembershipLogWithoutRaft(log);
} else {
- getLocalExecutor().processNonQuery(plan);
+ ((DataLogApplier) dataLogApplier).applyPhysicalPlan(plan);
}
return StatusUtils.OK;
} catch (Exception e) {
Throwable cause = IOUtils.getRootCause(e);
- if (cause instanceof StorageGroupNotSetException
- || cause instanceof UndefinedTemplateException) {
- try {
- metaGroupMember.syncLeaderWithConsistencyCheck(true);
- if (plan instanceof InsertPlan && ((InsertPlan) plan).getFailedMeasurements() != null) {
- ((InsertPlan) plan).recoverFromFailure();
- }
- getLocalExecutor().processNonQuery(plan);
- return StatusUtils.OK;
- } catch (CheckConsistencyException ce) {
- return StatusUtils.getStatus(StatusUtils.CONSISTENCY_FAILURE, ce.getMessage());
- } catch (Exception ne) {
- return handleLogExecutionException(plan, IOUtils.getRootCause(ne));
- }
- }
- if (cause instanceof PathNotExistException) {
- boolean hasCreated = false;
- try {
- if (plan instanceof InsertPlan
- && ClusterDescriptor.getInstance().getConfig().isEnableAutoCreateSchema()) {
+ boolean hasCreated = false;
+ try {
+ if (plan instanceof InsertPlan
+ && ClusterDescriptor.getInstance().getConfig().isEnableAutoCreateSchema()) {
+ if (plan instanceof InsertRowsPlan || plan instanceof InsertMultiTabletPlan) {
+ if (e instanceof BatchProcessException) {
+ for (TSStatus status : ((BatchProcessException) e).getFailingStatus()) {
+ if (status.getCode() == TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode()) {
+ hasCreated = createTimeseriesForFailedInsertion(((InsertPlan) plan));
+ ((BatchPlan) plan).getResults().clear();
+ break;
+ }
+ }
+ }
+ } else if (cause instanceof PathNotExistException) {
hasCreated = createTimeseriesForFailedInsertion(((InsertPlan) plan));
}
- } catch (MetadataException | CheckConsistencyException ex) {
- logger.error("{}: Cannot auto-create timeseries for {}", name, plan, e);
- return StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR, ex.getMessage());
- }
- if (hasCreated) {
- return executeNonQueryPlan(plan);
}
+ } catch (MetadataException | CheckConsistencyException ex) {
+ logger.error("{}: Cannot auto-create timeseries for {}", name, plan, e);
+ return StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR, ex.getMessage());
+ }
+ if (hasCreated) {
+ return executeNonQueryPlan(plan);
}
return handleLogExecutionException(plan, cause);
}
@@ -776,14 +772,28 @@ public class DataGroupMember extends RaftMember {
boolean hasCreated = false;
try {
if (plan instanceof InsertPlan
- && status.getCode() == TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode()
&& ClusterDescriptor.getInstance().getConfig().isEnableAutoCreateSchema()) {
- hasCreated = createTimeseriesForFailedInsertion(((InsertPlan) plan));
+ if (plan instanceof InsertRowsPlan || plan instanceof InsertMultiTabletPlan) {
+ if (status.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+ for (TSStatus tmpStatus : status.getSubStatus()) {
+ if (tmpStatus.getCode() == TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode()) {
+ hasCreated = createTimeseriesForFailedInsertion(((InsertPlan) plan));
+ ((BatchPlan) plan).getResults().clear();
+ break;
+ }
+ }
+ }
+ } else {
+ if (status.getCode() == TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode()) {
+ hasCreated = createTimeseriesForFailedInsertion(((InsertPlan) plan));
+ }
+ }
}
} catch (MetadataException | CheckConsistencyException e) {
logger.error("{}: Cannot auto-create timeseries for {}", name, plan, e);
return StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR, e.getMessage());
}
+
if (hasCreated) {
status = processPlanLocally(plan);
}
@@ -816,6 +826,22 @@ public class DataGroupMember extends RaftMember {
}
}
+ if (plan instanceof InsertRowsPlan) {
+ for (InsertRowPlan insertPlan : ((InsertRowsPlan) plan).getInsertRowPlanList()) {
+ if (insertPlan.getFailedMeasurements() != null) {
+ insertPlan.getPlanFromFailed();
+ }
+ }
+ }
+
+ if (plan instanceof InsertRowsOfOneDevicePlan) {
+ for (InsertRowPlan insertPlan : ((InsertRowsOfOneDevicePlan) plan).getRowPlans()) {
+ if (insertPlan.getFailedMeasurements() != null) {
+ insertPlan.getPlanFromFailed();
+ }
+ }
+ }
+
if (plan.getFailedMeasurements() != null) {
plan.getPlanFromFailed();
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
index d362fa1..0831b51 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
@@ -101,7 +101,7 @@ public class DataLogApplierTest extends IoTDBTest {
private static final Logger logger = LoggerFactory.getLogger(DataLogApplierTest.class);
private boolean partialWriteEnabled;
-
+ private boolean isPartitionEnabled;
private TestMetaGroupMember testMetaGroupMember =
new TestMetaGroupMember() {
@Override
@@ -177,6 +177,8 @@ public class DataLogApplierTest extends IoTDBTest {
NodeStatusManager.getINSTANCE().setMetaGroupMember(testMetaGroupMember);
partialWriteEnabled = IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert();
IoTDBDescriptor.getInstance().getConfig().setEnablePartialInsert(false);
+ isPartitionEnabled = IoTDBDescriptor.getInstance().getConfig().isEnablePartition();
+ IoTDBDescriptor.getInstance().getConfig().setEnablePartition(true);
testMetaGroupMember.setClientProvider(
new DataClientProvider(new Factory()) {
@Override
@@ -206,6 +208,10 @@ public class DataLogApplierTest extends IoTDBTest {
for (int i = 0; i < 10; i++) {
timeseriesSchemas.add(TestUtils.getTestTimeSeriesSchema(4, i));
}
+ } else if (path.startsWith(TestUtils.getTestSg(1))
+ || path.startsWith(TestUtils.getTestSg(2))
+ || path.startsWith(TestUtils.getTestSg(3))) {
+ // do nothing
} else if (!path.startsWith(TestUtils.getTestSg(5))) {
resultHandler.onError(new StorageGroupNotSetException(path));
return;
@@ -257,6 +263,7 @@ public class DataLogApplierTest extends IoTDBTest {
super.tearDown();
NodeStatusManager.getINSTANCE().setMetaGroupMember(null);
IoTDBDescriptor.getInstance().getConfig().setEnablePartialInsert(partialWriteEnabled);
+ IoTDBDescriptor.getInstance().getConfig().setEnablePartition(isPartitionEnabled);
}
@Test
diff --git a/docs/UserGuide/Cluster/Cluster-Setup-Example.md b/docs/UserGuide/Cluster/Cluster-Setup-Example.md
index 7abf211..81b9ee3 100644
--- a/docs/UserGuide/Cluster/Cluster-Setup-Example.md
+++ b/docs/UserGuide/Cluster/Cluster-Setup-Example.md
@@ -200,7 +200,7 @@ default\_replica\_num = 3
internal\_ip = *A\_private\_Ip* (or *B\_private\_Ip*, *C\_private\_Ip*)
-***iotdb-enginer.properties***
+***iotdb-engine.properties***
rpc\_address = *A\_public\_Ip* (or *B\_private\_Ip*, *C\_public\_Ip*)
diff --git a/docs/zh/UserGuide/Cluster/Cluster-Setup-Example.md b/docs/zh/UserGuide/Cluster/Cluster-Setup-Example.md
index 92adb4e..78889a7 100644
--- a/docs/zh/UserGuide/Cluster/Cluster-Setup-Example.md
+++ b/docs/zh/UserGuide/Cluster/Cluster-Setup-Example.md
@@ -202,7 +202,7 @@ default\_replica\_num = 3
internal\_ip = *A\_private\_Ip* (or *B\_private\_Ip*, *C\_private\_Ip*)
-***iotdb-enginer.properties***
+***iotdb-engine.properties***
rpc\_address = *A\_public\_Ip* (or *B\_private\_Ip*, *C\_public\_Ip*)
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index a7bba20..3b1b12e 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -1459,7 +1459,16 @@ public class PlanExecutor implements IPlanExecutor {
|| insertMultiTabletPlan.isExecuted(i)) {
continue;
}
- insertTablet(insertMultiTabletPlan.getInsertTabletPlanList().get(i));
+ try {
+ insertTablet(insertMultiTabletPlan.getInsertTabletPlanList().get(i));
+ } catch (QueryProcessException e) {
+ insertMultiTabletPlan
+ .getResults()
+ .put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
+ }
+ }
+ if (!insertMultiTabletPlan.getResults().isEmpty()) {
+ throw new BatchProcessException(insertMultiTabletPlan.getFailingStatus());
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/BatchPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/BatchPlan.java
index 318b628..493af85 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/BatchPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/BatchPlan.java
@@ -19,8 +19,10 @@
package org.apache.iotdb.db.qp.physical;
+import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import java.util.List;
import java.util.Map;
/** BatchPlan contains multiple sub-plans. */
@@ -61,4 +63,11 @@ public interface BatchPlan {
* @return execution status for each path
*/
Map<Integer, TSStatus> getResults();
+
+ /**
+ * Return prefix paths of all sub-plans
+ *
+ * @return prefix paths of all sub-plans
+ */
+ List<PartialPath> getPrefixPaths();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java
index 59d5ae2..e593cc9 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java
@@ -23,16 +23,13 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
import org.apache.iotdb.db.qp.physical.BatchPlan;
+import org.apache.iotdb.db.utils.StatusUtils;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.TreeMap;
+import java.util.*;
/**
* Mainly used in the distributed version, when multiple InsertTabletPlans belong to a raft
@@ -126,6 +123,14 @@ public class InsertMultiTabletPlan extends InsertPlan implements BatchPlan {
return result;
}
+ public List<PartialPath> getPrefixPaths() {
+ Set<PartialPath> result = new HashSet<>();
+ for (InsertTabletPlan insertTabletPlan : insertTabletPlanList) {
+ result.add(insertTabletPlan.getPrefixPath());
+ }
+ return new ArrayList<>(result);
+ }
+
@Override
public long getMinTime() {
long minTime = Long.MAX_VALUE;
@@ -226,6 +231,10 @@ public class InsertMultiTabletPlan extends InsertPlan implements BatchPlan {
return insertTabletPlanList;
}
+ public TSStatus[] getFailingStatus() {
+ return StatusUtils.getFailingStatus(results, insertTabletPlanList.size());
+ }
+
public void setResults(Map<Integer, TSStatus> results) {
this.results = results;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
index 40ef6ca..c8d4d73 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
@@ -324,7 +324,9 @@ public class InsertRowPlan extends InsertPlan {
}
failedValues.add(values[index]);
values[index] = null;
- dataTypes[index] = null;
+ if (isNeedInferType) {
+ dataTypes[index] = null;
+ }
}
@Override
@@ -412,7 +414,7 @@ public class InsertRowPlan extends InsertPlan {
// and is forwarded to other nodes
if (dataTypes == null || dataTypes[i] == null) {
ReadWriteIOUtils.write(TYPE_RAW_STRING, outputStream);
- ReadWriteIOUtils.write((String) values[i], outputStream);
+ ReadWriteIOUtils.write(values[i].toString(), outputStream);
} else {
ReadWriteIOUtils.write(dataTypes[i], outputStream);
switch (dataTypes[i]) {
@@ -451,7 +453,7 @@ public class InsertRowPlan extends InsertPlan {
// and is forwarded to other nodes
if (dataTypes == null || dataTypes[i] == null) {
ReadWriteIOUtils.write(TYPE_RAW_STRING, buffer);
- ReadWriteIOUtils.write((String) values[i], buffer);
+ ReadWriteIOUtils.write(values[i].toString(), buffer);
} else {
ReadWriteIOUtils.write(dataTypes[i], buffer);
switch (dataTypes[i]) {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
index 64a8c11..d69945c 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
@@ -28,13 +28,7 @@ import org.apache.iotdb.service.rpc.thrift.TSStatus;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
public class InsertRowsOfOneDevicePlan extends InsertPlan implements BatchPlan {
@@ -235,6 +229,11 @@ public class InsertRowsOfOneDevicePlan extends InsertPlan implements BatchPlan {
}
@Override
+ public List<PartialPath> getPrefixPaths() {
+ return Collections.singletonList(this.prefixPath);
+ }
+
+ @Override
public int getBatchSize() {
return rowPlans.length;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsPlan.java
index 03d3c49..f67e742 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsPlan.java
@@ -29,11 +29,7 @@ import org.apache.iotdb.service.rpc.thrift.TSStatus;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
+import java.util.*;
public class InsertRowsPlan extends InsertPlan implements BatchPlan {
@@ -84,6 +80,15 @@ public class InsertRowsPlan extends InsertPlan implements BatchPlan {
}
@Override
+ public List<PartialPath> getPrefixPaths() {
+ Set<PartialPath> result = new HashSet<>();
+ for (InsertRowPlan insertRowPlan : insertRowPlanList) {
+ result.add(insertRowPlan.getPrefixPath());
+ }
+ return new ArrayList<>(result);
+ }
+
+ @Override
public void checkIntegrity() throws QueryProcessException {
if (insertRowPlanList.isEmpty()) {
throw new QueryProcessException("sub plan are empty.");
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java
index 1720945..b0b9e8a 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java
@@ -34,12 +34,7 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.TreeMap;
+import java.util.*;
/**
* create multiple timeSeries, could be split to several sub Plans to execute in different DataGroup
@@ -144,6 +139,11 @@ public class CreateMultiTimeSeriesPlan extends PhysicalPlan implements BatchPlan
return results;
}
+ @Override
+ public List<PartialPath> getPrefixPaths() {
+ return Collections.emptyList();
+ }
+
public TSStatus[] getFailingStatus() {
return StatusUtils.getFailingStatus(results, paths.size());
}
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 2336142..e6dd440 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -1912,6 +1912,14 @@ public class Session {
this.enableQueryRedirection = enableQueryRedirection;
}
+ public boolean isEnableCacheLeader() {
+ return enableCacheLeader;
+ }
+
+ public void setEnableCacheLeader(boolean enableCacheLeader) {
+ this.enableCacheLeader = enableCacheLeader;
+ }
+
public static class Builder {
private String host = Config.DEFAULT_HOST;
private int rpcPort = Config.DEFAULT_PORT;
diff --git a/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java b/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java
index cd48873..7c9f08f 100644
--- a/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java
+++ b/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java
@@ -34,6 +34,9 @@ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.junit.Assert;
import org.junit.Test;
@@ -42,6 +45,7 @@ import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
@@ -610,4 +614,220 @@ public abstract class Cases {
writeStatement.execute(setWritable);
}
+
+ @Test
+ public void testAutoCreateSchemaInClusterMode()
+ throws IoTDBConnectionException, StatementExecutionException, SQLException {
+ List<String> measurementList = new ArrayList<>();
+ measurementList.add("s1");
+ measurementList.add("s2");
+ measurementList.add("s3");
+
+ List<TSDataType> typeList = new ArrayList<>();
+ typeList.add(TSDataType.INT64);
+ typeList.add(TSDataType.INT64);
+ typeList.add(TSDataType.INT64);
+
+ List<Object> valueList = new ArrayList<>();
+ valueList.add(1L);
+ valueList.add(2L);
+ valueList.add(3L);
+
+ for (int i = 0; i < 5; i++) {
+ String sg = "root.sg" + String.valueOf(i);
+ session.setStorageGroup(sg);
+ for (int j = 0; j < 10; j++) {
+ session.createTimeseries(
+ String.format("%s.d1.s%s", sg, j),
+ TSDataType.INT64,
+ TSEncoding.RLE,
+ CompressionType.SNAPPY);
+ session.createTimeseries(
+ String.format("%s.d2.s%s", sg, j),
+ TSDataType.INT64,
+ TSEncoding.RLE,
+ CompressionType.SNAPPY);
+ session.createTimeseries(
+ String.format("%s.d3.s%s", sg, j),
+ TSDataType.INT64,
+ TSEncoding.RLE,
+ CompressionType.SNAPPY);
+ session.createTimeseries(
+ String.format("%s.d4.s%s", sg, j),
+ TSDataType.INT64,
+ TSEncoding.RLE,
+ CompressionType.SNAPPY);
+ }
+ }
+
+ // step 1: insert into existing time series.
+ for (int i = 0; i < 5; i++) {
+ for (long t = 0; t < 3; t++) {
+ session.insertRecord(
+ String.format("root.sg%s.d1", i), t, measurementList, typeList, 1L, 2L, 3L);
+ }
+ }
+
+ List<List<String>> measurementsList = new ArrayList<>();
+ List<List<Object>> valuesList = new ArrayList<>();
+ List<List<TSDataType>> typesList = new ArrayList<>();
+ List<String> deviceList = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ String devicePath = String.format("root.sg%s.d2", i);
+ deviceList.add(devicePath);
+ typesList.add(typeList);
+ measurementsList.add(measurementList);
+ valuesList.add(valueList);
+ }
+
+ for (long t = 0; t < 3; t++) {
+ List<Long> timeList = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ timeList.add(t);
+ }
+ session.insertRecords(deviceList, timeList, measurementsList, typesList, valuesList);
+ }
+
+ List<IMeasurementSchema> schemaList = new ArrayList<>();
+ schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
+ schemaList.add(new MeasurementSchema("s2", TSDataType.INT64));
+ schemaList.add(new MeasurementSchema("s3", TSDataType.INT64));
+
+ Map<String, Tablet> tabletMap = new HashMap<>();
+ for (int i = 0; i < 5; i++) {
+ Tablet tablet = new Tablet(String.format("root.sg%s.d3", i), schemaList, 10);
+ for (long row = 0; row < 3; row++) {
+ int rowIndex = tablet.rowSize++;
+ tablet.addTimestamp(rowIndex, row);
+ tablet.addValue("s1", rowIndex, 1L);
+ tablet.addValue("s2", rowIndex, 2L);
+ tablet.addValue("s3", rowIndex, 3L);
+ }
+ session.insertTablet(tablet);
+ tablet.setPrefixPath(String.format("root.sg%s.d4", i));
+ tabletMap.put(String.format("root.sg%s.d4", i), tablet);
+ }
+
+ session.insertTablets(tabletMap);
+
+ // step 2: test auto create sg and time series schema
+ for (int i = 5; i < 10; i++) {
+ for (long t = 0; t < 3; t++) {
+ session.insertRecord(
+ String.format("root.sg%s.d1", i), t, measurementList, typeList, 1L, 2L, 3L);
+ }
+ }
+
+ deviceList.clear();
+ for (int i = 5; i < 10; i++) {
+ String device_path = String.format("root.sg%s.d2", i);
+ deviceList.add(device_path);
+ }
+
+ for (long t = 0; t < 3; t++) {
+ List<Long> timeList = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ timeList.add(t);
+ }
+ session.insertRecords(deviceList, timeList, measurementsList, typesList, valuesList);
+ }
+
+ tabletMap.clear();
+ for (int i = 5; i < 10; i++) {
+ Tablet tablet = new Tablet(String.format("root.sg%s.d3", i), schemaList, 10);
+ for (long row = 0; row < 3; row++) {
+ int rowIndex = tablet.rowSize++;
+ tablet.addTimestamp(rowIndex, row);
+ tablet.addValue("s1", rowIndex, 1L);
+ tablet.addValue("s2", rowIndex, 2L);
+ tablet.addValue("s3", rowIndex, 3L);
+ }
+ session.insertTablet(tablet);
+ tablet.setPrefixPath(String.format("root.sg%s.d4", i));
+ tabletMap.put(String.format("root.sg%s.d4", i), tablet);
+ }
+
+ session.insertTablets(tabletMap);
+
+ measurementsList.clear();
+ List<Long> timeList = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ timeList.add((long) i);
+ List<String> measurements = new ArrayList<>();
+ measurements.add(String.format("s%d", i));
+ measurements.add(String.format("s%d", i + 5));
+ measurements.add(String.format("s%d", i + 10));
+ measurementsList.add(measurements);
+ }
+
+ session.insertRecordsOfOneDevice(
+ "root.sg0.d5", timeList, measurementsList, typesList, valuesList);
+ session.insertRecordsOfOneDevice(
+ "root.sg20.d1", timeList, measurementsList, typesList, valuesList);
+
+ for (Statement readStatement : readStatements) {
+ for (int i = 0; i < 10; i++) {
+ for (int d = 1; d <= 4; d++) {
+ ResultSet resultSet =
+ readStatement.executeQuery(String.format("SELECT s1,s2,s3 from root.sg%s.d%s", i, d));
+ for (long t = 0; t < 3; t++) {
+ Assert.assertTrue(resultSet.next());
+ Assert.assertEquals(resultSet.getLong(1), t);
+ Assert.assertEquals(resultSet.getString(2), "1");
+ Assert.assertEquals(resultSet.getString(3), "2");
+ Assert.assertEquals(resultSet.getString(4), "3");
+ }
+ }
+ }
+
+ for (int i = 0; i < 5; i++) {
+ ResultSet resultSet =
+ readStatement.executeQuery(
+ String.format("select s%d,s%d,s%d from root.sg0.d5", i, i + 5, i + 10));
+ Assert.assertTrue(resultSet.next());
+ Assert.assertEquals(resultSet.getLong(1), i);
+ Assert.assertEquals(resultSet.getString(2), "1");
+ Assert.assertEquals(resultSet.getString(3), "2");
+ Assert.assertEquals(resultSet.getString(4), "3");
+
+ resultSet =
+ readStatement.executeQuery(
+ String.format("select s%d,s%d,s%d from root.sg20.d1", i, i + 5, i + 10));
+ Assert.assertTrue(resultSet.next());
+ Assert.assertEquals(resultSet.getLong(1), i);
+ Assert.assertEquals(resultSet.getString(2), "1");
+ Assert.assertEquals(resultSet.getString(3), "2");
+ Assert.assertEquals(resultSet.getString(4), "3");
+ }
+ }
+
+ // test create time series
+ for (int i = 0; i < 5; i++) {
+ session.createTimeseries(
+ String.format("root.sg1%s.d1.s1", i),
+ TSDataType.INT64,
+ TSEncoding.RLE,
+ CompressionType.SNAPPY);
+ }
+
+ List<String> path = new ArrayList<>();
+ List<TSDataType> dataTypes = new ArrayList<>();
+ List<TSEncoding> encodings = new ArrayList<>();
+ List<CompressionType> compressionTypes = new ArrayList<>();
+ for (int i = 5; i < 10; i++) {
+ path.add(String.format("root.sg1%s.d1.s1", i));
+ dataTypes.add(TSDataType.INT64);
+ encodings.add(TSEncoding.RLE);
+ compressionTypes.add(CompressionType.SNAPPY);
+ }
+ session.createMultiTimeseries(
+ path, dataTypes, encodings, compressionTypes, null, null, null, null);
+ for (Statement readStatement : readStatements) {
+ for (int i = 0; i < 10; i++) {
+ ResultSet resultSet =
+ readStatement.executeQuery(String.format("show timeseries root.sg1%s.d1.s1", i));
+ Assert.assertTrue(resultSet.next());
+ }
+ }
+ }
}
diff --git a/testcontainer/src/test/java/org/apache/iotdb/db/sql/ClusterIT.java b/testcontainer/src/test/java/org/apache/iotdb/db/sql/ClusterIT.java
index 003454f..f14dea9 100644
--- a/testcontainer/src/test/java/org/apache/iotdb/db/sql/ClusterIT.java
+++ b/testcontainer/src/test/java/org/apache/iotdb/db/sql/ClusterIT.java
@@ -83,7 +83,14 @@ public abstract class ClusterIT extends Cases {
"jdbc:iotdb://" + readIps[i] + ":" + readPorts[i], "root", "root");
readStatements[i] = readConnections[i].createStatement();
}
- session = new Session(getWriteRpcIp(), getWriteRpcPort());
+ session =
+ new Session.Builder()
+ .host(getWriteRpcIp())
+ .port(getWriteRpcPort())
+ .username("root")
+ .password("root")
+ .enableCacheLeader(false)
+ .build();
session.open();
TimeUnit.MILLISECONDS.sleep(3000);
}