You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/04/13 13:47:51 UTC

[iotdb] branch master updated: [IOTDB-2805] Create data region (#5509)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 40585a34d4 [IOTDB-2805] Create data region (#5509)
40585a34d4 is described below

commit 40585a34d480bf146e011944668d0e41e33d8ef5
Author: Mrquan <50...@users.noreply.github.com>
AuthorDate: Wed Apr 13 21:47:47 2022 +0800

    [IOTDB-2805] Create data region (#5509)
---
 .../thrift/impl/DataNodeManagementServiceImpl.java | 51 +++++++++++++++++-----
 thrift/src/main/thrift/management.thrift           | 14 ------
 2 files changed, 40 insertions(+), 25 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeManagementServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeManagementServiceImpl.java
index 7e83ddcb81..fdd0a863ad 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeManagementServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeManagementServiceImpl.java
@@ -24,17 +24,19 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.consensus.SchemaRegionId;
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.db.consensus.ConsensusImpl;
+import org.apache.iotdb.db.engine.StorageEngineV2;
+import org.apache.iotdb.db.exception.DataRegionException;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
 import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.service.rpc.thrift.CreateDataPartitionReq;
 import org.apache.iotdb.service.rpc.thrift.CreateDataRegionReq;
 import org.apache.iotdb.service.rpc.thrift.CreateSchemaRegionReq;
 import org.apache.iotdb.service.rpc.thrift.ManagementIService;
@@ -52,8 +54,9 @@ import java.util.List;
 
 public class DataNodeManagementServiceImpl implements ManagementIService.Iface {
   private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeManagementServiceImpl.class);
-  private SchemaEngine schemaEngine = SchemaEngine.getInstance();
-  private IConsensus consensusImpl = ConsensusImpl.getInstance();
+  private final SchemaEngine schemaEngine = SchemaEngine.getInstance();
+  private final StorageEngineV2 storageEngine = StorageEngineV2.getInstance();
+  private final IConsensus consensusImpl = ConsensusImpl.getInstance();
 
   @Override
   public TSStatus createSchemaRegion(CreateSchemaRegionReq req) throws TException {
@@ -101,12 +104,40 @@ public class DataNodeManagementServiceImpl implements ManagementIService.Iface {
 
   @Override
   public TSStatus createDataRegion(CreateDataRegionReq req) throws TException {
-    return null;
-  }
-
-  @Override
-  public TSStatus createDataPartition(CreateDataPartitionReq req) throws TException {
-    return null;
+    TSStatus tsStatus;
+    try {
+      TRegionReplicaSet regionReplicaSet = req.getRegionReplicaSet();
+      DataRegionId dataRegionId =
+          (DataRegionId)
+              ConsensusGroupId.Factory.create(ByteBuffer.wrap(regionReplicaSet.getRegionId()));
+      storageEngine.createDataRegion(dataRegionId, req.storageGroup, req.ttl);
+      ConsensusGroupId consensusGroupId =
+          ConsensusGroupId.Factory.create(ByteBuffer.wrap(regionReplicaSet.getRegionId()));
+      List<Peer> peers = new ArrayList<>();
+      for (EndPoint endPoint : regionReplicaSet.getEndpoint()) {
+        Endpoint endpoint = new Endpoint(endPoint.getIp(), endPoint.getPort());
+        peers.add(new Peer(consensusGroupId, endpoint));
+      }
+      ConsensusGenericResponse consensusGenericResponse =
+          consensusImpl.addConsensusGroup(consensusGroupId, peers);
+      if (consensusGenericResponse.isSuccess()) {
+        tsStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+      } else {
+        tsStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+        tsStatus.setMessage(consensusGenericResponse.getException().getMessage());
+      }
+    } catch (DataRegionException e1) {
+      LOGGER.error(
+          "Create Data Region {} failed because {}", req.getStorageGroup(), e1.getMessage());
+      tsStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+      tsStatus.setMessage(
+          String.format("Create Data Region failed because of %s", e1.getMessage()));
+    } catch (IOException e2) {
+      LOGGER.error("Can't deserialize regionId", e2);
+      tsStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+      tsStatus.setMessage(String.format("Can't deserialize regionId %s", e2));
+    }
+    return tsStatus;
   }
 
   @Override
@@ -120,6 +151,4 @@ public class DataNodeManagementServiceImpl implements ManagementIService.Iface {
   }
 
   public void handleClientExit() {}
-
-  // TODO: add Mpp interface
 }
diff --git a/thrift/src/main/thrift/management.thrift b/thrift/src/main/thrift/management.thrift
index 30aa28a072..eaaad8d18c 100644
--- a/thrift/src/main/thrift/management.thrift
+++ b/thrift/src/main/thrift/management.thrift
@@ -34,12 +34,6 @@ struct CreateDataRegionReq {
     3: optional long ttl
 }
 
-struct CreateDataPartitionReq{
-    1: required list<int> dataNodeID
-    2: required int dataRegionID
-    3: required long timeInterval
-}
-
 struct MigrateSchemaRegionReq{
     1: required int sourceDataNodeID
     2: required int targetDataNodeID
@@ -66,14 +60,6 @@ service ManagementIService {
     **/
     common.TSStatus createDataRegion(CreateDataRegionReq req)
 
-    /**
-      * Config node will create a new data partition on a existing data region
-      *
-      * @param data nodes of the data region, data region id, and a new time interval
-      * of data partition
-    **/
-    common.TSStatus createDataPartition(CreateDataPartitionReq req)
-
     /**
       * Config node will migrate a schema region from one data node to another
       *