You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2021/06/07 07:55:01 UTC

[GitHub] [iotdb] neuyilan commented on a change in pull request #3191: New features of cluster scalability and multi-raft

neuyilan commented on a change in pull request #3191:
URL: https://github.com/apache/iotdb/pull/3191#discussion_r646250496



##########
File path: cluster/src/assembly/resources/sbin/remove-node.sh
##########
@@ -0,0 +1,88 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+echo ---------------------
+echo "Starting to remove a node(Cluster Mode)"
+echo ---------------------
+
+if [ -z "${IOTDB_HOME}" ]; then
+  export IOTDB_HOME="`dirname "$0"`/.."
+fi
+
+IOTDB_CONF=${IOTDB_HOME}/conf
+
+is_conf_path=false
+for arg do
+    shift
+    if [ "$arg" == "-c" ]; then
+        is_conf_path=true
+        continue

Review comment:
       The same as above.

##########
File path: cluster/src/assembly/resources/sbin/remove-node.bat
##########
@@ -0,0 +1,117 @@
+@REM
+@REM Licensed to the Apache Software Foundation (ASF) under one
+@REM or more contributor license agreements.  See the NOTICE file
+@REM distributed with this work for additional information
+@REM regarding copyright ownership.  The ASF licenses this file
+@REM to you under the Apache License, Version 2.0 (the
+@REM "License"); you may not use this file except in compliance
+@REM with the License.  You may obtain a copy of the License at
+@REM
+@REM     http://www.apache.org/licenses/LICENSE-2.0
+@REM
+@REM Unless required by applicable law or agreed to in writing,
+@REM software distributed under the License is distributed on an
+@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+@REM KIND, either express or implied.  See the License for the
+@REM specific language governing permissions and limitations
+@REM under the License.
+@REM
+
+@echo off
+echo ````````````````````````
+echo Starting to remove a node (Cluster Mode)
+echo ````````````````````````
+
+PATH %PATH%;%JAVA_HOME%\bin\
+set "FULL_VERSION="
+set "MAJOR_VERSION="
+set "MINOR_VERSION="
+
+
+for /f tokens^=2-5^ delims^=.-_+^" %%j in ('java -fullversion 2^>^&1') do (
+	set "FULL_VERSION=%%j-%%k-%%l-%%m"
+	IF "%%j" == "1" (
+	    set "MAJOR_VERSION=%%k"
+	    set "MINOR_VERSION=%%l"
+	) else (
+	    set "MAJOR_VERSION=%%j"
+	    set "MINOR_VERSION=%%k"
+	)
+)
+
+set JAVA_VERSION=%MAJOR_VERSION%
+
+IF NOT %JAVA_VERSION% == 8 (
+	IF NOT %JAVA_VERSION% == 11 (
+		echo IoTDB only supports jdk8 or jdk11, please check your java version.
+		goto finally
+	)
+)
+
+if "%OS%" == "Windows_NT" setlocal
+
+pushd %~dp0..
+if NOT DEFINED IOTDB_HOME set IOTDB_HOME=%cd%
+popd
+
+set IOTDB_CONF=%IOTDB_HOME%\conf
+set IOTDB_LOGS=%IOTDB_HOME%\logs
+
+@setlocal ENABLEDELAYEDEXPANSION ENABLEEXTENSIONS
+set CONF_PARAMS=-r
+set is_conf_path=false
+for %%i in (%*) do (
+	IF "%%i" == "-c" (
+		set is_conf_path=true
+	) ELSE IF "!is_conf_path!" == "true" (

Review comment:
       The `-c` params is not supported in the cluster mode, maybe we  should remove here.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
##########
@@ -144,6 +151,29 @@ private PartitionGroup routePlan(ShowChildPathsPlan plan) {
     throw new UnsupportedPlanException(plan);
   }
 
+  public Map<PhysicalPlan, PartitionGroup> splitAndRouteChangeMembershipLog(Log log) {
+    Map<PhysicalPlan, PartitionGroup> result = new HashMap<>();
+    LogPlan plan = new LogPlan(log.serialize());
+    List<Node> oldRing = new ArrayList<>(partitionTable.getAllNodes());
+    if (log instanceof AddNodeLog) {
+      oldRing.remove(((AddNodeLog) log).getNewNode());
+    } else if (log instanceof RemoveNodeLog) {
+      if (!oldRing.contains(((RemoveNodeLog) log).getRemovedNode())) {
+        oldRing.add(((RemoveNodeLog) log).getRemovedNode());
+        oldRing.sort(Comparator.comparingInt(Node::getNodeIdentifier));
+      }
+    }
+    for (PartitionGroup partitionGroup : partitionTable.calculateGlobalGroups(oldRing)) {
+      // It doesn't need to notify the data group which will be removed from cluster.
+      if (log instanceof RemoveNodeLog
+          && partitionGroup.getHeader().equals(((RemoveNodeLog) log).getRemovedNode())) {
+        continue;

Review comment:
       `partitionGroup.getHeader()` is one instance of `RaftNode`, while `(RemoveNodeLog) log).getRemovedNode())` return the `Node` instance,  the two Object can not compare, Maybe we should sort out the relationship among `ClusterNode`, `RaftNode` and `Node` in the cluster module.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
##########
@@ -1759,17 +1951,17 @@ public void setAllNodes(List<Node> allNodes) {
    * @param request the toString() of this parameter should explain what the request is and it is
    *     only used in logs for tracing
    */
-  public DataGroupMember getLocalDataMember(Node header, Object request) {
+  public DataGroupMember getLocalDataMember(RaftNode header, Object request) {
     return dataClusterServer.getDataMember(header, null, request);
   }
 

Review comment:
       The method signature says the `request` is the toString() of this parameter should explain what the request is and it is only used in logs for tracing, however the caller user `raftId` like following, is it being misused?
   `metaGroupMember.getLocalDataMember(partitionGroup.getHeader(), partitionGroup.getId());`

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
##########
@@ -304,6 +311,74 @@ private TSStatus forwardPlan(List<PartitionGroup> partitionGroups, PhysicalPlan
     return status;
   }
 
+  public void sendLogToAllDataGroups(Log log) throws ChangeMembershipException {
+    if (logger.isDebugEnabled()) {
+      logger.debug("Send log {} to all data groups: start", log);
+    }
+
+    Map<PhysicalPlan, PartitionGroup> planGroupMap = router.splitAndRouteChangeMembershipLog(log);
+    List<String> errorCodePartitionGroups = new CopyOnWriteArrayList<>();
+    CountDownLatch counter = new CountDownLatch(planGroupMap.size());
+    for (Map.Entry<PhysicalPlan, PartitionGroup> entry : planGroupMap.entrySet()) {
+      metaGroupMember
+          .getAppendLogThreadPool()
+          .submit(() -> forwardChangeMembershipPlan(log, entry, errorCodePartitionGroups, counter));
+    }
+    try {
+      counter.await();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new ChangeMembershipException(
+          String.format("Can not wait all data groups to apply %s", log));
+    }
+    if (!errorCodePartitionGroups.isEmpty()) {
+      throw new ChangeMembershipException(
+          String.format("Apply %s failed with status {%s}", log, errorCodePartitionGroups));
+    }
+    if (logger.isDebugEnabled()) {
+      logger.debug("Send log {} to all data groups: end", log);
+    }
+  }
+
+  private void forwardChangeMembershipPlan(
+      Log log,
+      Map.Entry<PhysicalPlan, PartitionGroup> entry,
+      List<String> errorCodePartitionGroups,
+      CountDownLatch counter) {
+    int retryTime = 0;
+    try {
+      while (true) {
+        if (logger.isDebugEnabled()) {
+          logger.debug(
+              "Send change membership log {} to data group {}, retry time: {}",
+              log,
+              entry.getValue(),
+              retryTime);
+        }
+        try {
+          TSStatus status = forwardToSingleGroup(entry);
+          if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+            if (logger.isDebugEnabled()) {
+              logger.debug(
+                  "Success to send change membership log {} to data group {}",
+                  log,
+                  entry.getValue());
+            }
+            return;
+          }
+          Thread.sleep(ClusterConstant.RETRY_WAIT_TIME_MS);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          errorCodePartitionGroups.add(e.getMessage());
+          return;
+        }
+        retryTime++;
+      }
+    } finally {
+      counter.countDown();
+    }
+  }

Review comment:
       Add timeout logic here? if this `DataGroup` has some problems, then the whole operation will be blocked, and thus the upper called layer will be blocked, and the operation of the `metaGroupMember` will be blocked.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
##########
@@ -266,7 +273,7 @@ private TSStatus forwardPlan(List<PartitionGroup> partitionGroups, PhysicalPlan
         // the query should be handled by a group the local node is in, handle it with in the group
         status =
             metaGroupMember
-                .getLocalDataMember(partitionGroup.getHeader())
+                .getLocalDataMember(partitionGroup.getHeader(), partitionGroup.getId())

Review comment:
       Please see the `public DataGroupMember getLocalDataMember(RaftNode header, Object request)` signature  and make sure is it being misused? maybe we can  design one new method called `getLocalDataMember(RaftNode header, int raftId)`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org