You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/04/13 13:47:51 UTC
[iotdb] branch master updated: [IOTDB-2805] Create data region (#5509)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 40585a34d4 [IOTDB-2805] Create data region (#5509)
40585a34d4 is described below
commit 40585a34d480bf146e011944668d0e41e33d8ef5
Author: Mrquan <50...@users.noreply.github.com>
AuthorDate: Wed Apr 13 21:47:47 2022 +0800
[IOTDB-2805] Create data region (#5509)
---
.../thrift/impl/DataNodeManagementServiceImpl.java | 51 +++++++++++++++++-----
thrift/src/main/thrift/management.thrift | 14 ------
2 files changed, 40 insertions(+), 25 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeManagementServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeManagementServiceImpl.java
index 7e83ddcb81..fdd0a863ad 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeManagementServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeManagementServiceImpl.java
@@ -24,17 +24,19 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.cluster.Endpoint;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.consensus.IConsensus;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
import org.apache.iotdb.db.consensus.ConsensusImpl;
+import org.apache.iotdb.db.engine.StorageEngineV2;
+import org.apache.iotdb.db.exception.DataRegionException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.service.rpc.thrift.CreateDataPartitionReq;
import org.apache.iotdb.service.rpc.thrift.CreateDataRegionReq;
import org.apache.iotdb.service.rpc.thrift.CreateSchemaRegionReq;
import org.apache.iotdb.service.rpc.thrift.ManagementIService;
@@ -52,8 +54,9 @@ import java.util.List;
public class DataNodeManagementServiceImpl implements ManagementIService.Iface {
private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeManagementServiceImpl.class);
- private SchemaEngine schemaEngine = SchemaEngine.getInstance();
- private IConsensus consensusImpl = ConsensusImpl.getInstance();
+ private final SchemaEngine schemaEngine = SchemaEngine.getInstance();
+ private final StorageEngineV2 storageEngine = StorageEngineV2.getInstance();
+ private final IConsensus consensusImpl = ConsensusImpl.getInstance();
@Override
public TSStatus createSchemaRegion(CreateSchemaRegionReq req) throws TException {
@@ -101,12 +104,40 @@ public class DataNodeManagementServiceImpl implements ManagementIService.Iface {
@Override
public TSStatus createDataRegion(CreateDataRegionReq req) throws TException {
- return null;
- }
-
- @Override
- public TSStatus createDataPartition(CreateDataPartitionReq req) throws TException {
- return null;
+ TSStatus tsStatus;
+ try {
+ TRegionReplicaSet regionReplicaSet = req.getRegionReplicaSet();
+ DataRegionId dataRegionId =
+ (DataRegionId)
+ ConsensusGroupId.Factory.create(ByteBuffer.wrap(regionReplicaSet.getRegionId()));
+ storageEngine.createDataRegion(dataRegionId, req.storageGroup, req.ttl);
+ ConsensusGroupId consensusGroupId =
+ ConsensusGroupId.Factory.create(ByteBuffer.wrap(regionReplicaSet.getRegionId()));
+ List<Peer> peers = new ArrayList<>();
+ for (EndPoint endPoint : regionReplicaSet.getEndpoint()) {
+ Endpoint endpoint = new Endpoint(endPoint.getIp(), endPoint.getPort());
+ peers.add(new Peer(consensusGroupId, endpoint));
+ }
+ ConsensusGenericResponse consensusGenericResponse =
+ consensusImpl.addConsensusGroup(consensusGroupId, peers);
+ if (consensusGenericResponse.isSuccess()) {
+ tsStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ } else {
+ tsStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ tsStatus.setMessage(consensusGenericResponse.getException().getMessage());
+ }
+ } catch (DataRegionException e1) {
+ LOGGER.error(
+ "Create Data Region {} failed because {}", req.getStorageGroup(), e1.getMessage());
+ tsStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ tsStatus.setMessage(
+ String.format("Create Data Region failed because of %s", e1.getMessage()));
+ } catch (IOException e2) {
+ LOGGER.error("Can't deserialize regionId", e2);
+ tsStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ tsStatus.setMessage(String.format("Can't deserialize regionId %s", e2));
+ }
+ return tsStatus;
}
@Override
@@ -120,6 +151,4 @@ public class DataNodeManagementServiceImpl implements ManagementIService.Iface {
}
public void handleClientExit() {}
-
- // TODO: add Mpp interface
}
diff --git a/thrift/src/main/thrift/management.thrift b/thrift/src/main/thrift/management.thrift
index 30aa28a072..eaaad8d18c 100644
--- a/thrift/src/main/thrift/management.thrift
+++ b/thrift/src/main/thrift/management.thrift
@@ -34,12 +34,6 @@ struct CreateDataRegionReq {
3: optional long ttl
}
-struct CreateDataPartitionReq{
- 1: required list<int> dataNodeID
- 2: required int dataRegionID
- 3: required long timeInterval
-}
-
struct MigrateSchemaRegionReq{
1: required int sourceDataNodeID
2: required int targetDataNodeID
@@ -66,14 +60,6 @@ service ManagementIService {
**/
common.TSStatus createDataRegion(CreateDataRegionReq req)
- /**
- * Config node will create a new data partition on a existing data region
- *
- * @param data nodes of the data region, data region id, and a new time interval
- * of data partition
- **/
- common.TSStatus createDataPartition(CreateDataPartitionReq req)
-
/**
* Config node will migrate a schema region from one data node to another
*