You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2022/03/22 11:37:08 UTC

[GitHub] [iotdb] OneSizeFitsQuorum commented on a change in pull request #5308: [IOTDB-2686] Region allocation policy

OneSizeFitsQuorum commented on a change in pull request #5308:
URL: https://github.com/apache/iotdb/pull/5308#discussion_r832061531



##########
File path: confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/BaseStateMachine.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.consensus.statemachine;
+
+import org.apache.iotdb.confignode.physical.PhysicalPlan;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public abstract class BaseStateMachine implements IStateMachine {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(BaseStateMachine.class);
+
+  @Override
+  public void start() {

Review comment:
       As it's abstract class, we can just do not inherit these two methods 

##########
File path: confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodesInfoDataSet.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.consensus.response;
+
+import org.apache.iotdb.confignode.partition.DataNodeInfo;
+import org.apache.iotdb.consensus.common.DataSet;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class DataNodesInfoDataSet implements DataSet {
+
+  private final Map<Integer, DataNodeInfo> infoMap;
+
+  public DataNodesInfoDataSet() {
+    this.infoMap = new HashMap<>();
+  }
+
+  public void addDataNodeInfo(int dataNodeID, DataNodeInfo info) {
+    this.infoMap.put(dataNodeID, info);
+  }
+
+  public Map<Integer, DataNodeInfo> getInfoMap() {
+    return this.infoMap;
+  }
+
+  public DataNodeInfo getInfoWithMaximumID() {
+    int maxKey = Integer.MIN_VALUE;

Review comment:
       Why not use TreeMap?

##########
File path: confignode/src/assembly/resources/conf/iotdb-confignode.properties
##########
@@ -101,4 +101,20 @@ config_node_address_lists=host0:22278,host1:22278,host2:22278
 # data_dirs=data\\data
 # For Linux platform
 # If its prefix is "/", then the path is absolute. Otherwise, it is relative.
-# data_dirs=data/data
\ No newline at end of file
+# data_dirs=data/data
+
+####################
+### Region Configuration
+####################
+
+# The number of replicas of each region
+# Datatype: int
+# region_replica_count=3
+
+# The number of SchemaRegions of each StorageGroup
+# Datatype: int
+# schema_region_count=1
+
+# The number of DataRegions of each StorageGroup
+# Datatype: int
+# data_region_count=1

Review comment:
       Why 1?Shall we use 0 to indicate cores / region_replica_count

##########
File path: confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
##########
@@ -83,9 +86,38 @@ public int getDeviceGroupID(String device) {
     return hashExecutor.getDeviceGroupID(device);
   }
 
-  // TODO: Interfaces for metadata operations
+  /** Build ConfigNodeGroup ConsensusLayer */
+  private void setConsensusLayer(ConfigNodeConf config) {
+    // TODO: Support other consensus protocol
+    this.consensusImpl =
+        new StandAloneConsensus(
+            id -> {

Review comment:
       You can just use ` new StandAloneConsensus(id ->  new StandAloneStateMachine()`

##########
File path: confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
##########
@@ -83,9 +86,38 @@ public int getDeviceGroupID(String device) {
     return hashExecutor.getDeviceGroupID(device);
   }
 
-  // TODO: Interfaces for metadata operations
+  /** Build ConfigNodeGroup ConsensusLayer */
+  private void setConsensusLayer(ConfigNodeConf config) {
+    // TODO: Support other consensus protocol
+    this.consensusImpl =
+        new StandAloneConsensus(
+            id -> {
+              if (id.getType() == GroupType.PartitionRegion) {
+                return new StandAloneStateMachine();
+              } else {
+                return new EmptyStateMachine();
+              }
+            });
+    this.consensusImpl.start();
+
+    this.consensusGroupId = new ConsensusGroupId(GroupType.PartitionRegion, 0);
+    this.consensusImpl.addConsensusGroup(
+        this.consensusGroupId,
+        Collections.singletonList(
+            new Peer(

Review comment:
       We may need to support construct PartitionRegionConsensusGroup from conf here?

##########
File path: confignode/src/main/java/org/apache/iotdb/confignode/partition/PartitionTable.java
##########
@@ -18,42 +18,169 @@
  */
 package org.apache.iotdb.confignode.partition;
 
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.physical.sys.QueryDataNodeInfoPlan;
+import org.apache.iotdb.confignode.physical.sys.RegisterDataNodePlan;
+import org.apache.iotdb.confignode.physical.sys.SetStorageGroupPlan;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
- * PartitionTable stores schema partition table, data partition table, and real-time write load
- * allocation rules
+ * PartitionTable stores schema partition table, data partition table, DataNode information,
+ * StorageGroup schema and real-time write load allocation rules. The PartitionTable is thread-safe.
  */
 public class PartitionTable {
 
-  // Map<StorageGroup, Map<DeviceGroupID, SchemaRegionID>>
-  private final Map<String, Map<Integer, Integer>> schemaPartitionTable;
-  // Map<SchemaRegionID, List<DataNodeID>>
-  private final Map<Integer, List<Integer>> schemaRegionDataNodesMap;
+  private static final int regionReplicaCount =
+      ConfigNodeDescriptor.getInstance().getConf().getRegionReplicaCount();
+  private static final int schemaRegionCount =
+      ConfigNodeDescriptor.getInstance().getConf().getSchemaRegionCount();
+  private static final int dataRegionCount =
+      ConfigNodeDescriptor.getInstance().getConf().getDataRegionCount();
+
+  private final ReentrantReadWriteLock storageGroupLock;
+  // TODO: Serialize and Deserialize
+  private final Map<String, StorageGroupSchema> storageGroupsMap;
+
+  private final ReentrantReadWriteLock dataNodeLock;
+  // TODO: Serialize and Deserialize
+  private int nextDataNode = 0;
+  // TODO: Serialize and Deserialize
+  private int nextSchemaRegionGroup = 0;
+  // TODO: Serialize and Deserialize
+  private int nextDataRegionGroup = 0;
+  // TODO: Serialize and Deserialize
+  private final Map<Integer, DataNodeInfo> dataNodesMap; // Map<DataNodeID, DataNodeInfo>
 
-  // Map<StorageGroup, Map<DeviceGroupID, Map<TimeInterval, List<DataRegionID>>>>
-  private final Map<String, Map<Integer, Map<Long, List<Integer>>>> dataPartitionTable;
-  // Map<DataRegionID, List<DataNodeID>>
-  private final Map<Integer, List<Integer>> dataRegionDataNodesMap;
+  private final ReentrantReadWriteLock schemaLock;
+  // TODO: Serialize and Deserialize
+  private final SchemaPartitionInfo schemaPartition;
 
-  // Map<StorageGroup, Map<DeviceGroupID, DataPartitionRule>>
-  private final Map<String, Map<Integer, DataPartitionRule>> dataPartitionRuleTable;
+  private final ReentrantReadWriteLock dataLock;
+  // TODO: Serialize and Deserialize
+  private final DataPartitionInfo dataPartition;
 
   public PartitionTable() {
-    this.schemaPartitionTable = new HashMap<>();
-    this.schemaRegionDataNodesMap = new HashMap<>();
+    this.storageGroupLock = new ReentrantReadWriteLock();
+    this.storageGroupsMap = new HashMap<>();
+
+    this.dataNodeLock = new ReentrantReadWriteLock();
+    this.dataNodesMap = new HashMap<>();
+
+    this.schemaLock = new ReentrantReadWriteLock();
+    this.schemaPartition = new SchemaPartitionInfo();
 
-    this.dataPartitionTable = new HashMap<>();
-    this.dataRegionDataNodesMap = new HashMap<>();
+    this.dataLock = new ReentrantReadWriteLock();
+    this.dataPartition = new DataPartitionInfo();
+  }
+
+  public TSStatus registerDataNode(RegisterDataNodePlan plan) {
+    TSStatus result;
+    DataNodeInfo info = plan.getInfo();
+    dataNodeLock.writeLock().lock();
+
+    if (dataNodesMap.containsValue(info)) {
+      result = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+      result.setMessage(
+          String.format(
+              "DataNode %s is already registered.", plan.getInfo().getEndPoint().toString()));
+    } else {
+      info.setDataNodeID(nextDataNode);
+      dataNodesMap.put(info.getDataNodeID(), info);
+      result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+      result.setMessage(String.valueOf(nextDataNode));
+      nextDataNode += 1;
+    }
 
-    this.dataPartitionRuleTable = new HashMap<>();
+    dataNodeLock.writeLock().unlock();
+    return result;
   }
 
-  // TODO: Interfaces for metadata operations
+  public Map<Integer, DataNodeInfo> getDataNodeInfo(QueryDataNodeInfoPlan plan) {
+    Map<Integer, DataNodeInfo> result = new HashMap<>();
+    dataNodeLock.readLock().lock();
 
-  // TODO: Interfaces for data operations
+    if (plan.getDataNodeID() == -1) {
+      result.putAll(dataNodesMap);
+    } else {
+      if (dataNodesMap.containsKey(plan.getDataNodeID())) {
+        result.put(plan.getDataNodeID(), dataNodesMap.get(plan.getDataNodeID()));
+      } else {
+        result = null;
+      }
+    }
+
+    dataNodeLock.readLock().unlock();
+    return result;
+  }
 
-  // TODO: Interfaces for data partition rules
+  public TSStatus setStorageGroup(SetStorageGroupPlan plan) {
+    TSStatus result;
+    storageGroupLock.writeLock().lock();
+
+    if (dataNodesMap.size() < regionReplicaCount) {
+      result = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+      result.setMessage("DataNode is not enough, please register more.");
+    } else {
+      if (storageGroupsMap.containsKey(plan.getSchema().getName())) {
+        result = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+        result.setMessage(
+            String.format("StorageGroup %s is already set.", plan.getSchema().getName()));
+      } else {
+        StorageGroupSchema schema = new StorageGroupSchema(plan.getSchema().getName());
+        regionAllocation(schema);
+        storageGroupsMap.put(schema.getName(), schema);
+        result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+      }
+    }
+
+    storageGroupLock.writeLock().unlock();
+    return result;
+  }
+
+  private void regionAllocation(StorageGroupSchema schema) {
+    // TODO: 2PL may cause deadlock, remember to optimize
+    dataNodeLock.writeLock().lock();

Review comment:
       Why use so many locks... The PartitionTable can only be written by a single thread and read by multiple threads. It seems that a read/write lock is sufficient.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org