You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by wa...@apache.org on 2022/07/23 01:54:05 UTC

[iotdb] branch master updated: [IOTDB-3593] Broadcast latest ConfigNodeGroup (#6744)

This is an automated email from the ASF dual-hosted git repository.

wangchao316 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new d3e71a2b38 [IOTDB-3593] Broadcast latest ConfigNodeGroup (#6744)
d3e71a2b38 is described below

commit d3e71a2b381e07ea66a543a4e3d63a63d0146045
Author: imquanke <39...@users.noreply.github.com>
AuthorDate: Sat Jul 23 09:53:59 2022 +0800

    [IOTDB-3593] Broadcast latest ConfigNodeGroup (#6744)
    
    [IOTDB-3593] Broadcast latest ConfigNodeGroup (#6744)
---
 .../confignode/client/DataNodeRequestType.java     |  3 +-
 .../async/datanode/AsyncDataNodeClientPool.java    | 34 +++++++++++
 .../handlers/UpdateConfigNodeGroupHandler.java     | 66 ++++++++++++++++++++++
 .../procedure/env/ConfigNodeProcedureEnv.java      |  9 +++
 .../procedure/impl/AddConfigNodeProcedure.java     |  1 +
 .../procedure/impl/RemoveConfigNodeProcedure.java  |  1 +
 .../impl/DataNodeInternalRPCServiceImpl.java       | 17 ++++++
 thrift/src/main/thrift/datanode.thrift             | 10 ++++
 8 files changed, 140 insertions(+), 1 deletion(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
index 715ebfd8de..81e9389a42 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
@@ -34,5 +34,6 @@ public enum DataNodeRequestType {
   CREATE_FUNCTION,
   DROP_FUNCTION,
   FLUSH,
-  UPDATE_REGION_ROUTE_MAP
+  UPDATE_REGION_ROUTE_MAP,
+  BROADCAST_LATEST_CONFIG_NODE_GROUP
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java
index 91c10c5991..132386bce5 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.confignode.client.async.datanode;
 
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TFlushReq;
@@ -33,6 +34,7 @@ import org.apache.iotdb.confignode.client.async.handlers.DataNodeHeartbeatHandle
 import org.apache.iotdb.confignode.client.async.handlers.FlushHandler;
 import org.apache.iotdb.confignode.client.async.handlers.FunctionManagementHandler;
 import org.apache.iotdb.confignode.client.async.handlers.SetTTLHandler;
+import org.apache.iotdb.confignode.client.async.handlers.UpdateConfigNodeGroupHandler;
 import org.apache.iotdb.confignode.client.async.handlers.UpdateRegionRouteMapHandler;
 import org.apache.iotdb.confignode.consensus.request.write.CreateRegionGroupsPlan;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
@@ -41,6 +43,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
 import org.apache.iotdb.mpp.rpc.thrift.TDropFunctionRequest;
 import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
 import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
+import org.apache.iotdb.mpp.rpc.thrift.TUpdateConfigNodeGroupReq;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -116,6 +119,11 @@ public class AsyncDataNodeClientPool {
                 new UpdateRegionRouteMapHandler(
                     countDownLatch, requestType, targetDataNode, dataNodeLocationMap);
             break;
+          case BROADCAST_LATEST_CONFIG_NODE_GROUP:
+            handler =
+                new UpdateConfigNodeGroupHandler(
+                    countDownLatch, requestType, targetDataNode, dataNodeLocationMap);
+            break;
           default:
             return;
         }
@@ -160,6 +168,10 @@ public class AsyncDataNodeClientPool {
         case UPDATE_REGION_ROUTE_MAP:
           client.updateRegionCache((TRegionRouteReq) req, (UpdateRegionRouteMapHandler) handler);
           break;
+        case BROADCAST_LATEST_CONFIG_NODE_GROUP:
+          client.updateConfigNodeGroup(
+              (TUpdateConfigNodeGroupReq) req, (UpdateConfigNodeGroupHandler) handler);
+          break;
         default:
       }
     } catch (Exception e) {
@@ -261,6 +273,28 @@ public class AsyncDataNodeClientPool {
     }
   }
 
+  /**
+   * notify all DataNodes when the capacity of the ConfigNodeGroup is expanded or reduced
+   *
+   * @param registeredDataNodeLocationMap Map<Integer, TDataNodeLocation>
+   * @param registeredConfigNodes List<TConfigNodeLocation>
+   */
+  public void broadCastTheLatestConfigNodeGroup(
+      Map<Integer, TDataNodeLocation> registeredDataNodeLocationMap,
+      List<TConfigNodeLocation> registeredConfigNodes) {
+    if (registeredDataNodeLocationMap != null) {
+      TUpdateConfigNodeGroupReq updateConfigNodeGroupReq =
+          new TUpdateConfigNodeGroupReq(registeredConfigNodes);
+      LOGGER.info("Begin to broadcast the latest configNodeGroup: {}", registeredConfigNodes);
+      sendAsyncRequestToDataNodeWithRetry(
+          updateConfigNodeGroupReq,
+          registeredDataNodeLocationMap,
+          DataNodeRequestType.BROADCAST_LATEST_CONFIG_NODE_GROUP,
+          null);
+      LOGGER.info("Broadcast the latest configNodeGroup finished.");
+    }
+  }
+
   private TCreateSchemaRegionReq genCreateSchemaRegionReq(
       String storageGroup, TRegionReplicaSet regionReplicaSet) {
     TCreateSchemaRegionReq req = new TCreateSchemaRegionReq();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/UpdateConfigNodeGroupHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/UpdateConfigNodeGroupHandler.java
new file mode 100644
index 0000000000..e4cf71b4b1
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/UpdateConfigNodeGroupHandler.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.client.async.handlers;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+public class UpdateConfigNodeGroupHandler extends AbstractRetryHandler
+    implements AsyncMethodCallback<TSStatus> {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(UpdateConfigNodeGroupHandler.class);
+
+  public UpdateConfigNodeGroupHandler(
+      CountDownLatch countDownLatch,
+      DataNodeRequestType requestType,
+      TDataNodeLocation targetDataNode,
+      Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
+    super(countDownLatch, requestType, targetDataNode, dataNodeLocationMap);
+  }
+
+  @Override
+  public void onComplete(TSStatus status) {
+    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
+      LOGGER.info(
+          "Successfully broadCast the latest configNodeGroup on DataNode: {}", targetDataNode);
+    } else {
+      LOGGER.error(
+          "Failed to broadCast the latest configNodeGroup on DataNode: {}, {}",
+          targetDataNode,
+          status);
+    }
+    countDownLatch.countDown();
+  }
+
+  @Override
+  public void onError(Exception e) {
+    LOGGER.error("BroadCast the latest configNodeGroup on DataNode: {} failed", targetDataNode);
+    countDownLatch.countDown();
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index ba316a95da..14f1036b14 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
 import org.apache.iotdb.confignode.client.DataNodeRequestType;
+import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
 import org.apache.iotdb.confignode.client.sync.confignode.SyncConfigNodeClientPool;
 import org.apache.iotdb.confignode.client.sync.datanode.SyncDataNodeClientPool;
 import org.apache.iotdb.confignode.consensus.request.write.DeleteStorageGroupPlan;
@@ -251,6 +252,14 @@ public class ConfigNodeProcedureEnv {
             ConfigNodeRequestType.NOTIFY_REGISTER_SUCCESS);
   }
 
+  /** notify all DataNodes when the capacity of the ConfigNodeGroup is expanded or reduced */
+  public void broadCastTheLatestConfigNodeGroup() {
+    AsyncDataNodeClientPool.getInstance()
+        .broadCastTheLatestConfigNodeGroup(
+            configManager.getNodeManager().getRegisteredDataNodeLocations(-1),
+            configManager.getNodeManager().getRegisteredConfigNodes());
+  }
+
   public LockQueue getNodeLock() {
     return nodeLock;
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/AddConfigNodeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/AddConfigNodeProcedure.java
index 7edda907cc..cf244fb52d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/AddConfigNodeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/AddConfigNodeProcedure.java
@@ -73,6 +73,7 @@ public class AddConfigNodeProcedure extends AbstractNodeProcedure<AddConfigNodeS
         case REGISTER_SUCCESS:
           env.notifyRegisterSuccess(tConfigNodeLocation);
           env.applyConfigNode(tConfigNodeLocation);
+          env.broadCastTheLatestConfigNodeGroup();
           return Flow.NO_MORE_STATE;
       }
     } catch (Exception e) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RemoveConfigNodeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RemoveConfigNodeProcedure.java
index 52f13e4aa6..706e89454d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RemoveConfigNodeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RemoveConfigNodeProcedure.java
@@ -58,6 +58,7 @@ public class RemoveConfigNodeProcedure extends AbstractNodeProcedure<RemoveConfi
     try {
       switch (state) {
         case REMOVE_CONFIG_NODE_PREPARE:
+          env.broadCastTheLatestConfigNodeGroup();
           setNextState(RemoveConfigNodeState.REMOVE_PEER);
           break;
         case REMOVE_PEER:
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 681ad6be72..dcbf606468 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.service.thrift.impl;
 
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
@@ -41,6 +42,7 @@ import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
 import org.apache.iotdb.db.auth.AuthorizerManager;
+import org.apache.iotdb.db.client.ConfigNodeInfo;
 import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
 import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
 import org.apache.iotdb.db.engine.StorageEngineV2;
@@ -92,6 +94,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
 import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeReq;
 import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp;
+import org.apache.iotdb.mpp.rpc.thrift.TUpdateConfigNodeGroupReq;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
@@ -483,6 +486,20 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
     return StorageEngineV2.getInstance().setTTL(req);
   }
 
+  @Override
+  public TSStatus updateConfigNodeGroup(TUpdateConfigNodeGroupReq req) {
+    List<TConfigNodeLocation> configNodeLocations = req.getConfigNodeLocations();
+    if (configNodeLocations != null) {
+      ConfigNodeInfo.getInstance()
+          .updateConfigNodeList(
+              configNodeLocations
+                  .parallelStream()
+                  .map(TConfigNodeLocation::getInternalEndPoint)
+                  .collect(Collectors.toList()));
+    }
+    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+  }
+
   @Override
   public TSStatus deleteRegion(TConsensusGroupId tconsensusGroupId) {
     ConsensusGroupId consensusGroupId =
diff --git a/thrift/src/main/thrift/datanode.thrift b/thrift/src/main/thrift/datanode.thrift
index 56c937cac0..3593651fda 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -194,6 +194,10 @@ struct TRegionRouteReq {
   2: required map<common.TConsensusGroupId, common.TRegionReplicaSet> regionRouteMap
 }
 
+struct TUpdateConfigNodeGroupReq {
+  1: required list<common.TConfigNodeLocation> configNodeLocations
+}
+
 service IDataNodeRPCService {
 
   // -----------------------------------For Data Node-----------------------------------------------
@@ -324,6 +328,12 @@ service IDataNodeRPCService {
   common.TSStatus flush(common.TFlushReq req)
 
   common.TSStatus setTTL(common.TSetTTLReq req)
+  /**
+   * configNode will notify all DataNodes when the capacity of the ConfigNodeGroup is expanded or reduced
+   *
+   * @param list<common.TConfigNodeLocation> configNodeLocations
+   */
+  common.TSStatus updateConfigNodeGroup(TUpdateConfigNodeGroupReq req)
 }
 
 service MPPDataExchangeService {