You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/11/22 12:36:56 UTC
[iotdb] branch master updated: [IOTDB-4968] Auto register sg error in sync receiver and OOM (#8034)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 55bc50e53e [IOTDB-4968] Auto register sg error in sync receiver and OOM (#8034)
55bc50e53e is described below
commit 55bc50e53e210463cd3180d8a7501f780b4545d6
Author: yschengzi <87...@users.noreply.github.com>
AuthorDate: Tue Nov 22 20:36:49 2022 +0800
[IOTDB-4968] Auto register sg error in sync receiver and OOM (#8034)
---
.../Edge-Cloud-Collaboration/Sync-Tool.md | 3 ++
docs/UserGuide/Write-Data/TsFile-Tool.md | 5 +-
.../Edge-Cloud-Collaboration/Sync-Tool.md | 3 ++
docs/zh/UserGuide/Write-Data/TsFile-Tool.md | 6 ++-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 7 +++
.../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java | 5 +-
.../plan/scheduler/load/LoadTsFileScheduler.java | 22 +++++---
.../plan/statement/crud/LoadTsFileStatement.java | 12 ++---
.../service/thrift/impl/ClientRPCServiceImpl.java | 2 +-
.../db/service/thrift/impl/TSServiceImpl.java | 2 +-
.../java/org/apache/iotdb/db/sync/SyncService.java | 9 +++-
.../iotdb/db/sync/receiver/load/TsFileLoader.java | 2 +-
.../db/sync/transport/server/ReceiverManager.java | 61 ++++++++++++++++++++--
13 files changed, 111 insertions(+), 28 deletions(-)
diff --git a/docs/UserGuide/Edge-Cloud-Collaboration/Sync-Tool.md b/docs/UserGuide/Edge-Cloud-Collaboration/Sync-Tool.md
index 230717a746..516256835a 100644
--- a/docs/UserGuide/Edge-Cloud-Collaboration/Sync-Tool.md
+++ b/docs/UserGuide/Edge-Cloud-Collaboration/Sync-Tool.md
@@ -369,3 +369,6 @@ It costs 0.134s
- Execute `DROP PIPE p` get message `Fail to DROP_PIPE because Fail to drop PIPE [p] because Connection refused on DataNode: {id=2, internalEndPoint=TEndPoint(ip:127.0.0.1, port:9005)}. Please execute [DROP PIPE p] later to retry.`
- Cause by: There are some DataNodes with the status Running cannot be connected. Pipe has been deleted on some nodes and the status has been set to ***DROP***.
- Solution: Execute `SHOW DATANODES`, and check for unreachable DataNode networks, or wait for their status to change to Unknown and re-execute the statement.
+- Sync.log prompts `org.apache.iotdb.commons.exception.IoTDBException: root.** already been created as database`
+ - Cause by: The synchronization tool attempts to automatically create a database at the sender at the receiver. This is a normal phenomenon.
+ - Solution: No intervention is required.
\ No newline at end of file
diff --git a/docs/UserGuide/Write-Data/TsFile-Tool.md b/docs/UserGuide/Write-Data/TsFile-Tool.md
index cd9197f8a0..dd07145856 100644
--- a/docs/UserGuide/Write-Data/TsFile-Tool.md
+++ b/docs/UserGuide/Write-Data/TsFile-Tool.md
@@ -77,4 +77,7 @@ In addition, if you do not use the `-s` and `-q` parameters, after the export sc
> tools/export-tsfile.bat -h 127.0.0.1 -p 6667 -u root -pw root -td ./ -s ./sql.txt
# Or
> tools/export-tsfile.bat -h 127.0.0.1 -p 6667 -u root -pw root -td ./ -s ./sql.txt -f myTsFile
-```
\ No newline at end of file
+```
+
+### example
+- It is recommended not to execute the write data command at the same time when loading data, which may lead to insufficient memory in the JVM.
\ No newline at end of file
diff --git a/docs/zh/UserGuide/Edge-Cloud-Collaboration/Sync-Tool.md b/docs/zh/UserGuide/Edge-Cloud-Collaboration/Sync-Tool.md
index ed77a3f0eb..4431760979 100644
--- a/docs/zh/UserGuide/Edge-Cloud-Collaboration/Sync-Tool.md
+++ b/docs/zh/UserGuide/Edge-Cloud-Collaboration/Sync-Tool.md
@@ -356,3 +356,6 @@ It costs 0.134s
- 执行 `DROP PIPE p` 提示 `Fail to DROP_PIPE because Fail to drop PIPE [p] because Connection refused on DataNode: {id=2, internalEndPoint=TEndPoint(ip:127.0.0.1, port:9005)}. Please execute [DROP PIPE p] later to retry.`
- 原因:存在状态为 Running 的 DataNode 无法连通,Pipe 已在部分节点上被删除,状态被置为 ***DROP***。
- 解决方案:执行 `SHOW DATANODES` 语句,检查无法连通的 DataNode 网络,或等待其状态变为 Unknown 后重新执行语句。
+- 运行时日志提示 `org.apache.iotdb.commons.exception.IoTDBException: root.** already been created as database`
+ - 原因:同步工具试图在接收端自动创建发送端的Database,属于正常现象
+ - 解决方案:无需干预
diff --git a/docs/zh/UserGuide/Write-Data/TsFile-Tool.md b/docs/zh/UserGuide/Write-Data/TsFile-Tool.md
index 23852a488e..ed464a390c 100644
--- a/docs/zh/UserGuide/Write-Data/TsFile-Tool.md
+++ b/docs/zh/UserGuide/Write-Data/TsFile-Tool.md
@@ -78,4 +78,8 @@ TsFile 工具可帮您 通过执行指定sql、命令行sql、sql文件的方式
> tools/export-tsfile.bat -h 127.0.0.1 -p 6667 -u root -pw root -td ./ -s ./sql.txt
# Or
> tools/export-tsfile.bat -h 127.0.0.1 -p 6667 -u root -pw root -td ./ -s ./sql.txt -f myTsFile
-```
\ No newline at end of file
+```
+
+### Q&A
+
+- 建议在导入数据时不要同时执行写入数据命令,这将有可能导致JVM内存不足的情况。
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index aff501e62a..6f84eb6608 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -155,6 +155,9 @@ public class IoTDBConfig {
/** The proportion of write memory for compaction */
private double compactionProportion = 0.2;
+ /** The proportion of write memory for loading TsFile */
+ private double loadTsFileProportion = 0.125;
+
/**
* If memory cost of data region increased more than proportion of {@linkplain
* IoTDBConfig#getAllocateMemoryForStorageEngine()}*{@linkplain
@@ -3258,6 +3261,10 @@ public class IoTDBConfig {
return compactionProportion;
}
+ public double getLoadTsFileProportion() {
+ return loadTsFileProportion;
+ }
+
public void setCompactionProportion(double compactionProportion) {
this.compactionProportion = compactionProportion;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index 19169af3b7..11e97a4561 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -1570,7 +1570,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
if (loadTsFileStatement.isVerifySchema()) {
verifyLoadingMeasurements(device2Schemas);
}
- if (loadTsFileStatement.isAutoCreateSchema()) {
+ if (loadTsFileStatement.isAutoCreateDatabase()) {
autoCreateSg(loadTsFileStatement.getSgLevel(), device2Schemas);
}
ISchemaTree schemaTree =
@@ -1638,7 +1638,8 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
try (TsFileSequenceReader reader = new TsFileSequenceReader(tsFile.getAbsolutePath())) {
Map<String, List<TimeseriesMetadata>> device2Metadata = reader.getAllTimeseriesMetadata(true);
- if (statement.isAutoCreateSchema() || statement.isVerifySchema()) {
+ if (IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()
+ || statement.isVerifySchema()) {
// construct schema
for (Map.Entry<String, List<TimeseriesMetadata>> entry : device2Metadata.entrySet()) {
String device = entry.getKey();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java
index 2d6be8ad60..a255e92eef 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java
@@ -77,7 +77,9 @@ public class LoadTsFileScheduler implements IScheduler {
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
public static final long LOAD_TASK_MAX_TIME_IN_SECOND = 5184000L; // one day
private static final long MAX_MEMORY_SIZE =
- Math.min(config.getThriftMaxFrameSize() / 2, config.getAllocateMemoryForStorageEngine() / 8);
+ Math.min(
+ config.getThriftMaxFrameSize() / 2,
+ (long) (config.getAllocateMemoryForStorageEngine() * config.getLoadTsFileProportion()));
private final MPPQueryContext queryContext;
private final QueryStateMachine stateMachine;
@@ -315,7 +317,15 @@ public class LoadTsFileScheduler implements IScheduler {
}
private boolean addOrSendChunkData(ChunkData chunkData) {
- dataSize += chunkData.getDataSize();
+ TRegionReplicaSet replicaSet =
+ singleTsFileNode
+ .getDataPartition()
+ .getDataRegionReplicaSetForWriting(
+ chunkData.getDevice(), chunkData.getTimePartitionSlot());
+ dataSize +=
+ (1 + replicaSet.getDataNodeLocationsSize())
+ * chunkData.getDataSize(); // should multiply datanode factor
+
if (dataSize > MAX_MEMORY_SIZE) {
List<TRegionReplicaSet> sortedReplicaSets =
replicaSet2Piece.keySet().stream()
@@ -332,7 +342,7 @@ public class LoadTsFileScheduler implements IScheduler {
return false;
}
- dataSize -= pieceNode.getDataSize();
+ dataSize -= (1 + sortedReplicaSet.getDataNodeLocationsSize()) * pieceNode.getDataSize();
replicaSet2Piece.put(
sortedReplicaSet,
new LoadTsFilePieceNode(
@@ -343,11 +353,7 @@ public class LoadTsFileScheduler implements IScheduler {
}
}
}
- TRegionReplicaSet replicaSet =
- singleTsFileNode
- .getDataPartition()
- .getDataRegionReplicaSetForWriting(
- chunkData.getDevice(), chunkData.getTimePartitionSlot());
+
replicaSet2Piece
.computeIfAbsent(
replicaSet,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/LoadTsFileStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/LoadTsFileStatement.java
index e0d492dd03..c547de12de 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/LoadTsFileStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/LoadTsFileStatement.java
@@ -39,7 +39,7 @@ public class LoadTsFileStatement extends Statement {
private int sgLevel;
private boolean verifySchema;
private boolean deleteAfterLoad;
- private boolean autoCreateSchema;
+ private boolean autoCreateDatabase;
private List<File> tsFiles;
private List<TsFileResource> resources;
@@ -49,7 +49,7 @@ public class LoadTsFileStatement extends Statement {
this.sgLevel = IoTDBDescriptor.getInstance().getConfig().getDefaultStorageGroupLevel();
this.verifySchema = true;
this.deleteAfterLoad = true;
- this.autoCreateSchema = IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled();
+ this.autoCreateDatabase = IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled();
this.tsFiles = new ArrayList<>();
this.resources = new ArrayList<>();
this.statementType = StatementType.MULTI_BATCH_INSERT;
@@ -103,8 +103,8 @@ public class LoadTsFileStatement extends Statement {
this.verifySchema = verifySchema;
}
- public void setAutoCreateSchema(boolean autoCreateSchema) {
- this.autoCreateSchema = autoCreateSchema;
+ public void setAutoCreateDatabase(boolean autoCreateDatabase) {
+ this.autoCreateDatabase = autoCreateDatabase;
}
public boolean isVerifySchema() {
@@ -115,8 +115,8 @@ public class LoadTsFileStatement extends Statement {
return deleteAfterLoad;
}
- public boolean isAutoCreateSchema() {
- return autoCreateSchema;
+ public boolean isAutoCreateDatabase() {
+ return autoCreateDatabase;
}
public int getSgLevel() {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index 43515a61f2..e586de0d18 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -1693,7 +1693,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSStatus handshake(TSyncIdentityInfo info) throws TException {
// TODO(sync): Check permissions here
- return SyncService.getInstance().handshake(info);
+ return SyncService.getInstance().handshake(info, PARTITION_FETCHER, SCHEMA_FETCHER);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index bc1a81a8a1..cd384011ff 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -1375,7 +1375,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
@Override
public TSStatus handshake(TSyncIdentityInfo info) throws TException {
- return SyncService.getInstance().handshake(info);
+ return SyncService.getInstance().handshake(info, null, null);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java b/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
index 692818537a..2b6cb050d9 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
@@ -37,6 +37,8 @@ import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
import org.apache.iotdb.db.qp.utils.DateTimeUtils;
import org.apache.iotdb.db.sync.common.ClusterSyncInfoFetcher;
@@ -106,8 +108,11 @@ public class SyncService implements IService {
// region Interfaces and Implementation of Transport Layer
- public TSStatus handshake(TSyncIdentityInfo identityInfo) {
- return receiverManager.handshake(identityInfo);
+ public TSStatus handshake(
+ TSyncIdentityInfo identityInfo,
+ IPartitionFetcher partitionFetcher,
+ ISchemaFetcher schemaFetcher) {
+ return receiverManager.handshake(identityInfo, partitionFetcher, schemaFetcher);
}
public TSStatus transportFile(TSyncTransportMetaInfo metaInfo, ByteBuffer buff)
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/TsFileLoader.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/TsFileLoader.java
index 8a65c00ea8..e86c10ec46 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/TsFileLoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/TsFileLoader.java
@@ -54,7 +54,7 @@ public class TsFileLoader implements ILoader {
statement.setDeleteAfterLoad(true);
statement.setSgLevel(parseSgLevel());
statement.setVerifySchema(true);
- statement.setAutoCreateSchema(true);
+ statement.setAutoCreateDatabase(false);
long queryId = SessionManager.getInstance().requestQueryId();
ExecutionResult result =
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java b/server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java
index cf0542aaf2..7d8ee67d90 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java
@@ -22,10 +22,17 @@ package org.apache.iotdb.db.sync.transport.server;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.sync.PipeDataLoadException;
+import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.sync.utils.SyncConstant;
import org.apache.iotdb.commons.sync.utils.SyncPathUtil;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.plan.Coordinator;
+import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.SetStorageGroupStatement;
+import org.apache.iotdb.db.query.control.SessionManager;
import org.apache.iotdb.db.sync.pipedata.PipeData;
import org.apache.iotdb.db.sync.pipedata.TsFilePipeData;
import org.apache.iotdb.rpc.RpcUtils;
@@ -53,9 +60,9 @@ import java.util.concurrent.atomic.AtomicLong;
* only be accessed by the {@linkplain org.apache.iotdb.db.sync.SyncService}
*/
public class ReceiverManager {
- private static Logger logger = LoggerFactory.getLogger(ReceiverManager.class);
+ private static final Logger logger = LoggerFactory.getLogger(ReceiverManager.class);
- private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
// When the client abnormally exits, we can still know who to disconnect
private final ThreadLocal<Long> currentConnectionId;
@@ -63,6 +70,7 @@ public class ReceiverManager {
private final Map<Long, TSyncIdentityInfo> connectionIdToIdentityInfoMap;
// Record the remote message for every rpc connection
private final Map<Long, Map<String, Long>> connectionIdToStartIndexRecord;
+ private final Map<String, String> registeredDatabase;
// The sync connectionId is unique in one IoTDB instance.
private final AtomicLong connectionIdGenerator;
@@ -71,6 +79,7 @@ public class ReceiverManager {
currentConnectionId = new ThreadLocal<>();
connectionIdToIdentityInfoMap = new ConcurrentHashMap<>();
connectionIdToStartIndexRecord = new ConcurrentHashMap<>();
+ registeredDatabase = new ConcurrentHashMap<>();
connectionIdGenerator = new AtomicLong();
}
@@ -139,7 +148,10 @@ public class ReceiverManager {
* @return {@link TSStatusCode#PIPESERVER_ERROR} if fail to connect; {@link
* TSStatusCode#SUCCESS_STATUS} if success to connect.
*/
- public TSStatus handshake(TSyncIdentityInfo identityInfo) {
+ public TSStatus handshake(
+ TSyncIdentityInfo identityInfo,
+ IPartitionFetcher partitionFetcher,
+ ISchemaFetcher schemaFetcher) {
logger.info("Invoke handshake method from client ip = {}", identityInfo.address);
// check ip address
if (!verifyIPSegment(config.getIpWhiteList(), identityInfo.address)) {
@@ -160,6 +172,11 @@ public class ReceiverManager {
new File(SyncPathUtil.getFileDataDirPath(identityInfo)).mkdirs();
}
createConnection(identityInfo);
+ if (!registerDatabase(identityInfo.getDatabase(), partitionFetcher, schemaFetcher)) {
+ return RpcUtils.getStatus(
+ TSStatusCode.PIPESERVER_ERROR,
+ String.format("Auto register database %s error.", identityInfo.getDatabase()));
+ }
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "");
}
@@ -209,7 +226,7 @@ public class ReceiverManager {
* @return {@link TSStatusCode#PIPESERVER_ERROR} if fail to receive or load; {@link
* TSStatusCode#SUCCESS_STATUS} if load successfully.
* @throws TException The connection between the sender and the receiver has not been established
- * by {@link ReceiverManager#handshake(TSyncIdentityInfo)}
+ * by {@link ReceiverManager#handshake(TSyncIdentityInfo, IPartitionFetcher, ISchemaFetcher)}
*/
public TSStatus transportPipeData(ByteBuffer buff) throws TException {
// step1. check connection
@@ -264,7 +281,7 @@ public class ReceiverManager {
* TSStatusCode#SYNC_FILE_REDIRECTION_ERROR} if startIndex needs to rollback because
* mismatched; {@link TSStatusCode#SYNC_FILE_ERROR} if fail to receive file.
* @throws TException The connection between the sender and the receiver has not been established
- * by {@link ReceiverManager#handshake(TSyncIdentityInfo)}
+ * by {@link ReceiverManager#handshake(TSyncIdentityInfo, IPartitionFetcher, ISchemaFetcher)}
*/
public TSStatus transportFile(TSyncTransportMetaInfo metaInfo, ByteBuffer buff)
throws TException {
@@ -389,6 +406,40 @@ public class ReceiverManager {
connectionIdToIdentityInfoMap.put(connectionId, identityInfo);
}
+ private boolean registerDatabase(
+ String database, IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher) {
+ if (registeredDatabase.containsKey(database)) {
+ return true;
+ }
+ try {
+ SetStorageGroupStatement statement = new SetStorageGroupStatement();
+ statement.setStorageGroupPath(new PartialPath(database));
+ long queryId = SessionManager.getInstance().requestQueryId();
+ ExecutionResult result =
+ Coordinator.getInstance()
+ .execute(
+ statement,
+ queryId,
+ null,
+ "",
+ partitionFetcher,
+ schemaFetcher,
+ IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold());
+ if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ && result.status.code != TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
+ logger.error(String.format("Create Database error, statement: %s.", statement));
+ logger.error(String.format("Create database result status : %s.", result.status));
+ return false;
+ }
+ } catch (IllegalPathException e) {
+ logger.error(String.format("Parse database PartialPath %s error", database), e);
+ return false;
+ }
+
+ registeredDatabase.put(database, "");
+ return true;
+ }
+
/**
* release resources or cleanup when a client (a sender) is disconnected (normally or abnormally).
*/