You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/11/22 12:36:56 UTC

[iotdb] branch master updated: [IOTDB-4968] Auto register sg error in sync receiver and OOM (#8034)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 55bc50e53e [IOTDB-4968] Auto register sg error in sync receiver and OOM (#8034)
55bc50e53e is described below

commit 55bc50e53e210463cd3180d8a7501f780b4545d6
Author: yschengzi <87...@users.noreply.github.com>
AuthorDate: Tue Nov 22 20:36:49 2022 +0800

    [IOTDB-4968] Auto register sg error in sync receiver and OOM (#8034)
---
 .../Edge-Cloud-Collaboration/Sync-Tool.md          |  3 ++
 docs/UserGuide/Write-Data/TsFile-Tool.md           |  5 +-
 .../Edge-Cloud-Collaboration/Sync-Tool.md          |  3 ++
 docs/zh/UserGuide/Write-Data/TsFile-Tool.md        |  6 ++-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  7 +++
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  |  5 +-
 .../plan/scheduler/load/LoadTsFileScheduler.java   | 22 +++++---
 .../plan/statement/crud/LoadTsFileStatement.java   | 12 ++---
 .../service/thrift/impl/ClientRPCServiceImpl.java  |  2 +-
 .../db/service/thrift/impl/TSServiceImpl.java      |  2 +-
 .../java/org/apache/iotdb/db/sync/SyncService.java |  9 +++-
 .../iotdb/db/sync/receiver/load/TsFileLoader.java  |  2 +-
 .../db/sync/transport/server/ReceiverManager.java  | 61 ++++++++++++++++++++--
 13 files changed, 111 insertions(+), 28 deletions(-)

diff --git a/docs/UserGuide/Edge-Cloud-Collaboration/Sync-Tool.md b/docs/UserGuide/Edge-Cloud-Collaboration/Sync-Tool.md
index 230717a746..516256835a 100644
--- a/docs/UserGuide/Edge-Cloud-Collaboration/Sync-Tool.md
+++ b/docs/UserGuide/Edge-Cloud-Collaboration/Sync-Tool.md
@@ -369,3 +369,6 @@ It costs 0.134s
 - Execute `DROP PIPE p`  get message  `Fail to DROP_PIPE because Fail to drop PIPE [p] because Connection refused on DataNode: {id=2, internalEndPoint=TEndPoint(ip:127.0.0.1, port:9005)}. Please execute [DROP PIPE p] later to retry.`
   - Cause by: There are some DataNodes with the status Running cannot be connected. Pipe has been deleted on some nodes and the status has been set to ***DROP***.
   - Solution: Execute `SHOW DATANODES`, and check for unreachable DataNode networks, or wait for their status to change to Unknown and re-execute the statement.
+- Sync.log prompts `org.apache.iotdb.commons.exception.IoTDBException: root.** already been created as database`
+  - Cause by: The synchronization tool attempts to automatically create a database at the sender at the receiver. This is a normal phenomenon.
+  - Solution: No intervention is required.
\ No newline at end of file
diff --git a/docs/UserGuide/Write-Data/TsFile-Tool.md b/docs/UserGuide/Write-Data/TsFile-Tool.md
index cd9197f8a0..dd07145856 100644
--- a/docs/UserGuide/Write-Data/TsFile-Tool.md
+++ b/docs/UserGuide/Write-Data/TsFile-Tool.md
@@ -77,4 +77,7 @@ In addition, if you do not use the `-s` and `-q` parameters, after the export sc
 > tools/export-tsfile.bat -h 127.0.0.1 -p 6667 -u root -pw root -td ./ -s ./sql.txt
 # Or
 > tools/export-tsfile.bat -h 127.0.0.1 -p 6667 -u root -pw root -td ./ -s ./sql.txt -f myTsFile
-```
\ No newline at end of file
+```
+
+### example
+- It is recommended not to execute the write data command at the same time when loading data, which may lead to insufficient memory in the JVM.
\ No newline at end of file
diff --git a/docs/zh/UserGuide/Edge-Cloud-Collaboration/Sync-Tool.md b/docs/zh/UserGuide/Edge-Cloud-Collaboration/Sync-Tool.md
index ed77a3f0eb..4431760979 100644
--- a/docs/zh/UserGuide/Edge-Cloud-Collaboration/Sync-Tool.md
+++ b/docs/zh/UserGuide/Edge-Cloud-Collaboration/Sync-Tool.md
@@ -356,3 +356,6 @@ It costs 0.134s
 - 执行 `DROP PIPE p` 提示 `Fail to DROP_PIPE because Fail to drop PIPE [p] because Connection refused on DataNode: {id=2, internalEndPoint=TEndPoint(ip:127.0.0.1, port:9005)}. Please execute [DROP PIPE p] later to retry.`
   - 原因:存在状态为 Running 的 DataNode 无法连通,Pipe 已在部分节点上被删除,状态被置为 ***DROP***。
   - 解决方案:执行 `SHOW DATANODES` 语句,检查无法连通的 DataNode 网络,或等待其状态变为 Unknown 后重新执行语句。
+- 运行时日志提示 `org.apache.iotdb.commons.exception.IoTDBException: root.** already been created as database`
+  - 原因:同步工具试图在接收端自动创建发送端的Database,属于正常现象
+  - 解决方案:无需干预
diff --git a/docs/zh/UserGuide/Write-Data/TsFile-Tool.md b/docs/zh/UserGuide/Write-Data/TsFile-Tool.md
index 23852a488e..ed464a390c 100644
--- a/docs/zh/UserGuide/Write-Data/TsFile-Tool.md
+++ b/docs/zh/UserGuide/Write-Data/TsFile-Tool.md
@@ -78,4 +78,8 @@ TsFile 工具可帮您 通过执行指定sql、命令行sql、sql文件的方式
 > tools/export-tsfile.bat -h 127.0.0.1 -p 6667 -u root -pw root -td ./ -s ./sql.txt
 # Or
 > tools/export-tsfile.bat -h 127.0.0.1 -p 6667 -u root -pw root -td ./ -s ./sql.txt -f myTsFile
-```
\ No newline at end of file
+```
+
+### Q&A
+
+- 建议在导入数据时不要同时执行写入数据命令,这将有可能导致JVM内存不足的情况。
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index aff501e62a..6f84eb6608 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -155,6 +155,9 @@ public class IoTDBConfig {
   /** The proportion of write memory for compaction */
   private double compactionProportion = 0.2;
 
+  /** The proportion of write memory for loading TsFile */
+  private double loadTsFileProportion = 0.125;
+
   /**
    * If memory cost of data region increased more than proportion of {@linkplain
    * IoTDBConfig#getAllocateMemoryForStorageEngine()}*{@linkplain
@@ -3258,6 +3261,10 @@ public class IoTDBConfig {
     return compactionProportion;
   }
 
+  public double getLoadTsFileProportion() {
+    return loadTsFileProportion;
+  }
+
   public void setCompactionProportion(double compactionProportion) {
     this.compactionProportion = compactionProportion;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index 19169af3b7..11e97a4561 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -1570,7 +1570,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
       if (loadTsFileStatement.isVerifySchema()) {
         verifyLoadingMeasurements(device2Schemas);
       }
-      if (loadTsFileStatement.isAutoCreateSchema()) {
+      if (loadTsFileStatement.isAutoCreateDatabase()) {
         autoCreateSg(loadTsFileStatement.getSgLevel(), device2Schemas);
       }
       ISchemaTree schemaTree =
@@ -1638,7 +1638,8 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
     try (TsFileSequenceReader reader = new TsFileSequenceReader(tsFile.getAbsolutePath())) {
       Map<String, List<TimeseriesMetadata>> device2Metadata = reader.getAllTimeseriesMetadata(true);
 
-      if (statement.isAutoCreateSchema() || statement.isVerifySchema()) {
+      if (IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()
+          || statement.isVerifySchema()) {
         // construct schema
         for (Map.Entry<String, List<TimeseriesMetadata>> entry : device2Metadata.entrySet()) {
           String device = entry.getKey();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java
index 2d6be8ad60..a255e92eef 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java
@@ -77,7 +77,9 @@ public class LoadTsFileScheduler implements IScheduler {
   private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
   public static final long LOAD_TASK_MAX_TIME_IN_SECOND = 5184000L; // one day
   private static final long MAX_MEMORY_SIZE =
-      Math.min(config.getThriftMaxFrameSize() / 2, config.getAllocateMemoryForStorageEngine() / 8);
+      Math.min(
+          config.getThriftMaxFrameSize() / 2,
+          (long) (config.getAllocateMemoryForStorageEngine() * config.getLoadTsFileProportion()));
 
   private final MPPQueryContext queryContext;
   private final QueryStateMachine stateMachine;
@@ -315,7 +317,15 @@ public class LoadTsFileScheduler implements IScheduler {
     }
 
     private boolean addOrSendChunkData(ChunkData chunkData) {
-      dataSize += chunkData.getDataSize();
+      TRegionReplicaSet replicaSet =
+          singleTsFileNode
+              .getDataPartition()
+              .getDataRegionReplicaSetForWriting(
+                  chunkData.getDevice(), chunkData.getTimePartitionSlot());
+      dataSize +=
+          (1 + replicaSet.getDataNodeLocationsSize())
+              * chunkData.getDataSize(); // should multiply datanode factor
+
       if (dataSize > MAX_MEMORY_SIZE) {
         List<TRegionReplicaSet> sortedReplicaSets =
             replicaSet2Piece.keySet().stream()
@@ -332,7 +342,7 @@ public class LoadTsFileScheduler implements IScheduler {
             return false;
           }
 
-          dataSize -= pieceNode.getDataSize();
+          dataSize -= (1 + sortedReplicaSet.getDataNodeLocationsSize()) * pieceNode.getDataSize();
           replicaSet2Piece.put(
               sortedReplicaSet,
               new LoadTsFilePieceNode(
@@ -343,11 +353,7 @@ public class LoadTsFileScheduler implements IScheduler {
           }
         }
       }
-      TRegionReplicaSet replicaSet =
-          singleTsFileNode
-              .getDataPartition()
-              .getDataRegionReplicaSetForWriting(
-                  chunkData.getDevice(), chunkData.getTimePartitionSlot());
+
       replicaSet2Piece
           .computeIfAbsent(
               replicaSet,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/LoadTsFileStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/LoadTsFileStatement.java
index e0d492dd03..c547de12de 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/LoadTsFileStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/LoadTsFileStatement.java
@@ -39,7 +39,7 @@ public class LoadTsFileStatement extends Statement {
   private int sgLevel;
   private boolean verifySchema;
   private boolean deleteAfterLoad;
-  private boolean autoCreateSchema;
+  private boolean autoCreateDatabase;
 
   private List<File> tsFiles;
   private List<TsFileResource> resources;
@@ -49,7 +49,7 @@ public class LoadTsFileStatement extends Statement {
     this.sgLevel = IoTDBDescriptor.getInstance().getConfig().getDefaultStorageGroupLevel();
     this.verifySchema = true;
     this.deleteAfterLoad = true;
-    this.autoCreateSchema = IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled();
+    this.autoCreateDatabase = IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled();
     this.tsFiles = new ArrayList<>();
     this.resources = new ArrayList<>();
     this.statementType = StatementType.MULTI_BATCH_INSERT;
@@ -103,8 +103,8 @@ public class LoadTsFileStatement extends Statement {
     this.verifySchema = verifySchema;
   }
 
-  public void setAutoCreateSchema(boolean autoCreateSchema) {
-    this.autoCreateSchema = autoCreateSchema;
+  public void setAutoCreateDatabase(boolean autoCreateDatabase) {
+    this.autoCreateDatabase = autoCreateDatabase;
   }
 
   public boolean isVerifySchema() {
@@ -115,8 +115,8 @@ public class LoadTsFileStatement extends Statement {
     return deleteAfterLoad;
   }
 
-  public boolean isAutoCreateSchema() {
-    return autoCreateSchema;
+  public boolean isAutoCreateDatabase() {
+    return autoCreateDatabase;
   }
 
   public int getSgLevel() {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index 43515a61f2..e586de0d18 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -1693,7 +1693,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
   @Override
   public TSStatus handshake(TSyncIdentityInfo info) throws TException {
     // TODO(sync): Check permissions here
-    return SyncService.getInstance().handshake(info);
+    return SyncService.getInstance().handshake(info, PARTITION_FETCHER, SCHEMA_FETCHER);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index bc1a81a8a1..cd384011ff 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -1375,7 +1375,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
 
   @Override
   public TSStatus handshake(TSyncIdentityInfo info) throws TException {
-    return SyncService.getInstance().handshake(info);
+    return SyncService.getInstance().handshake(info, null, null);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java b/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
index 692818537a..2b6cb050d9 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
@@ -37,6 +37,8 @@ import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
 import org.apache.iotdb.db.qp.utils.DateTimeUtils;
 import org.apache.iotdb.db.sync.common.ClusterSyncInfoFetcher;
@@ -106,8 +108,11 @@ public class SyncService implements IService {
 
   // region Interfaces and Implementation of Transport Layer
 
-  public TSStatus handshake(TSyncIdentityInfo identityInfo) {
-    return receiverManager.handshake(identityInfo);
+  public TSStatus handshake(
+      TSyncIdentityInfo identityInfo,
+      IPartitionFetcher partitionFetcher,
+      ISchemaFetcher schemaFetcher) {
+    return receiverManager.handshake(identityInfo, partitionFetcher, schemaFetcher);
   }
 
   public TSStatus transportFile(TSyncTransportMetaInfo metaInfo, ByteBuffer buff)
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/TsFileLoader.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/TsFileLoader.java
index 8a65c00ea8..e86c10ec46 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/TsFileLoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/TsFileLoader.java
@@ -54,7 +54,7 @@ public class TsFileLoader implements ILoader {
       statement.setDeleteAfterLoad(true);
       statement.setSgLevel(parseSgLevel());
       statement.setVerifySchema(true);
-      statement.setAutoCreateSchema(true);
+      statement.setAutoCreateDatabase(false);
 
       long queryId = SessionManager.getInstance().requestQueryId();
       ExecutionResult result =
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java b/server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java
index cf0542aaf2..7d8ee67d90 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java
@@ -22,10 +22,17 @@ package org.apache.iotdb.db.sync.transport.server;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.sync.PipeDataLoadException;
+import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.sync.utils.SyncConstant;
 import org.apache.iotdb.commons.sync.utils.SyncPathUtil;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.plan.Coordinator;
+import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.SetStorageGroupStatement;
+import org.apache.iotdb.db.query.control.SessionManager;
 import org.apache.iotdb.db.sync.pipedata.PipeData;
 import org.apache.iotdb.db.sync.pipedata.TsFilePipeData;
 import org.apache.iotdb.rpc.RpcUtils;
@@ -53,9 +60,9 @@ import java.util.concurrent.atomic.AtomicLong;
  * only be accessed by the {@linkplain org.apache.iotdb.db.sync.SyncService}
  */
 public class ReceiverManager {
-  private static Logger logger = LoggerFactory.getLogger(ReceiverManager.class);
+  private static final Logger logger = LoggerFactory.getLogger(ReceiverManager.class);
 
-  private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 
   // When the client abnormally exits, we can still know who to disconnect
   private final ThreadLocal<Long> currentConnectionId;
@@ -63,6 +70,7 @@ public class ReceiverManager {
   private final Map<Long, TSyncIdentityInfo> connectionIdToIdentityInfoMap;
   // Record the remote message for every rpc connection
   private final Map<Long, Map<String, Long>> connectionIdToStartIndexRecord;
+  private final Map<String, String> registeredDatabase;
 
   // The sync connectionId is unique in one IoTDB instance.
   private final AtomicLong connectionIdGenerator;
@@ -71,6 +79,7 @@ public class ReceiverManager {
     currentConnectionId = new ThreadLocal<>();
     connectionIdToIdentityInfoMap = new ConcurrentHashMap<>();
     connectionIdToStartIndexRecord = new ConcurrentHashMap<>();
+    registeredDatabase = new ConcurrentHashMap<>();
     connectionIdGenerator = new AtomicLong();
   }
 
@@ -139,7 +148,10 @@ public class ReceiverManager {
    * @return {@link TSStatusCode#PIPESERVER_ERROR} if fail to connect; {@link
    *     TSStatusCode#SUCCESS_STATUS} if success to connect.
    */
-  public TSStatus handshake(TSyncIdentityInfo identityInfo) {
+  public TSStatus handshake(
+      TSyncIdentityInfo identityInfo,
+      IPartitionFetcher partitionFetcher,
+      ISchemaFetcher schemaFetcher) {
     logger.info("Invoke handshake method from client ip = {}", identityInfo.address);
     // check ip address
     if (!verifyIPSegment(config.getIpWhiteList(), identityInfo.address)) {
@@ -160,6 +172,11 @@ public class ReceiverManager {
       new File(SyncPathUtil.getFileDataDirPath(identityInfo)).mkdirs();
     }
     createConnection(identityInfo);
+    if (!registerDatabase(identityInfo.getDatabase(), partitionFetcher, schemaFetcher)) {
+      return RpcUtils.getStatus(
+          TSStatusCode.PIPESERVER_ERROR,
+          String.format("Auto register database %s error.", identityInfo.getDatabase()));
+    }
     return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "");
   }
 
@@ -209,7 +226,7 @@ public class ReceiverManager {
    * @return {@link TSStatusCode#PIPESERVER_ERROR} if fail to receive or load; {@link
    *     TSStatusCode#SUCCESS_STATUS} if load successfully.
    * @throws TException The connection between the sender and the receiver has not been established
-   *     by {@link ReceiverManager#handshake(TSyncIdentityInfo)}
+   *     by {@link ReceiverManager#handshake(TSyncIdentityInfo, IPartitionFetcher, ISchemaFetcher)}
    */
   public TSStatus transportPipeData(ByteBuffer buff) throws TException {
     // step1. check connection
@@ -264,7 +281,7 @@ public class ReceiverManager {
    *     TSStatusCode#SYNC_FILE_REDIRECTION_ERROR} if startIndex needs to rollback because
    *     mismatched; {@link TSStatusCode#SYNC_FILE_ERROR} if fail to receive file.
    * @throws TException The connection between the sender and the receiver has not been established
-   *     by {@link ReceiverManager#handshake(TSyncIdentityInfo)}
+   *     by {@link ReceiverManager#handshake(TSyncIdentityInfo, IPartitionFetcher, ISchemaFetcher)}
    */
   public TSStatus transportFile(TSyncTransportMetaInfo metaInfo, ByteBuffer buff)
       throws TException {
@@ -389,6 +406,40 @@ public class ReceiverManager {
     connectionIdToIdentityInfoMap.put(connectionId, identityInfo);
   }
 
+  private boolean registerDatabase(
+      String database, IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher) {
+    if (registeredDatabase.containsKey(database)) {
+      return true;
+    }
+    try {
+      SetStorageGroupStatement statement = new SetStorageGroupStatement();
+      statement.setStorageGroupPath(new PartialPath(database));
+      long queryId = SessionManager.getInstance().requestQueryId();
+      ExecutionResult result =
+          Coordinator.getInstance()
+              .execute(
+                  statement,
+                  queryId,
+                  null,
+                  "",
+                  partitionFetcher,
+                  schemaFetcher,
+                  IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold());
+      if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+          && result.status.code != TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
+        logger.error(String.format("Create Database error, statement: %s.", statement));
+        logger.error(String.format("Create database result status : %s.", result.status));
+        return false;
+      }
+    } catch (IllegalPathException e) {
+      logger.error(String.format("Parse database PartialPath %s error", database), e);
+      return false;
+    }
+
+    registeredDatabase.put(database, "");
+    return true;
+  }
+
   /**
    * release resources or cleanup when a client (a sender) is disconnected (normally or abnormally).
    */