You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by wa...@apache.org on 2022/07/23 01:54:05 UTC
[iotdb] branch master updated: [IOTDB-3593] Broadcast latest ConfigNodeGroup (#6744)
This is an automated email from the ASF dual-hosted git repository.
wangchao316 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d3e71a2b38 [IOTDB-3593] Broadcast latest ConfigNodeGroup (#6744)
d3e71a2b38 is described below
commit d3e71a2b381e07ea66a543a4e3d63a63d0146045
Author: imquanke <39...@users.noreply.github.com>
AuthorDate: Sat Jul 23 09:53:59 2022 +0800
[IOTDB-3593] Broadcast latest ConfigNodeGroup (#6744)
[IOTDB-3593] Broadcast latest ConfigNodeGroup (#6744)
---
.../confignode/client/DataNodeRequestType.java | 3 +-
.../async/datanode/AsyncDataNodeClientPool.java | 34 +++++++++++
.../handlers/UpdateConfigNodeGroupHandler.java | 66 ++++++++++++++++++++++
.../procedure/env/ConfigNodeProcedureEnv.java | 9 +++
.../procedure/impl/AddConfigNodeProcedure.java | 1 +
.../procedure/impl/RemoveConfigNodeProcedure.java | 1 +
.../impl/DataNodeInternalRPCServiceImpl.java | 17 ++++++
thrift/src/main/thrift/datanode.thrift | 10 ++++
8 files changed, 140 insertions(+), 1 deletion(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
index 715ebfd8de..81e9389a42 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
@@ -34,5 +34,6 @@ public enum DataNodeRequestType {
CREATE_FUNCTION,
DROP_FUNCTION,
FLUSH,
- UPDATE_REGION_ROUTE_MAP
+ UPDATE_REGION_ROUTE_MAP,
+ BROADCAST_LATEST_CONFIG_NODE_GROUP
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java
index 91c10c5991..132386bce5 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.confignode.client.async.datanode;
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
@@ -33,6 +34,7 @@ import org.apache.iotdb.confignode.client.async.handlers.DataNodeHeartbeatHandle
import org.apache.iotdb.confignode.client.async.handlers.FlushHandler;
import org.apache.iotdb.confignode.client.async.handlers.FunctionManagementHandler;
import org.apache.iotdb.confignode.client.async.handlers.SetTTLHandler;
+import org.apache.iotdb.confignode.client.async.handlers.UpdateConfigNodeGroupHandler;
import org.apache.iotdb.confignode.client.async.handlers.UpdateRegionRouteMapHandler;
import org.apache.iotdb.confignode.consensus.request.write.CreateRegionGroupsPlan;
import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
@@ -41,6 +43,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TDropFunctionRequest;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
+import org.apache.iotdb.mpp.rpc.thrift.TUpdateConfigNodeGroupReq;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -116,6 +119,11 @@ public class AsyncDataNodeClientPool {
new UpdateRegionRouteMapHandler(
countDownLatch, requestType, targetDataNode, dataNodeLocationMap);
break;
+ case BROADCAST_LATEST_CONFIG_NODE_GROUP:
+ handler =
+ new UpdateConfigNodeGroupHandler(
+ countDownLatch, requestType, targetDataNode, dataNodeLocationMap);
+ break;
default:
return;
}
@@ -160,6 +168,10 @@ public class AsyncDataNodeClientPool {
case UPDATE_REGION_ROUTE_MAP:
client.updateRegionCache((TRegionRouteReq) req, (UpdateRegionRouteMapHandler) handler);
break;
+ case BROADCAST_LATEST_CONFIG_NODE_GROUP:
+ client.updateConfigNodeGroup(
+ (TUpdateConfigNodeGroupReq) req, (UpdateConfigNodeGroupHandler) handler);
+ break;
default:
}
} catch (Exception e) {
@@ -261,6 +273,28 @@ public class AsyncDataNodeClientPool {
}
}
+ /**
+ * notify all DataNodes when the capacity of the ConfigNodeGroup is expanded or reduced
+ *
+ * @param registeredDataNodeLocationMap Map<Integer, TDataNodeLocation>
+ * @param registeredConfigNodes List<TConfigNodeLocation>
+ */
+ public void broadCastTheLatestConfigNodeGroup(
+ Map<Integer, TDataNodeLocation> registeredDataNodeLocationMap,
+ List<TConfigNodeLocation> registeredConfigNodes) {
+ if (registeredDataNodeLocationMap != null) {
+ TUpdateConfigNodeGroupReq updateConfigNodeGroupReq =
+ new TUpdateConfigNodeGroupReq(registeredConfigNodes);
+ LOGGER.info("Begin to broadcast the latest configNodeGroup: {}", registeredConfigNodes);
+ sendAsyncRequestToDataNodeWithRetry(
+ updateConfigNodeGroupReq,
+ registeredDataNodeLocationMap,
+ DataNodeRequestType.BROADCAST_LATEST_CONFIG_NODE_GROUP,
+ null);
+ LOGGER.info("Broadcast the latest configNodeGroup finished.");
+ }
+ }
+
private TCreateSchemaRegionReq genCreateSchemaRegionReq(
String storageGroup, TRegionReplicaSet regionReplicaSet) {
TCreateSchemaRegionReq req = new TCreateSchemaRegionReq();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/UpdateConfigNodeGroupHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/UpdateConfigNodeGroupHandler.java
new file mode 100644
index 0000000000..e4cf71b4b1
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/UpdateConfigNodeGroupHandler.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.client.async.handlers;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+public class UpdateConfigNodeGroupHandler extends AbstractRetryHandler
+ implements AsyncMethodCallback<TSStatus> {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(UpdateConfigNodeGroupHandler.class);
+
+ public UpdateConfigNodeGroupHandler(
+ CountDownLatch countDownLatch,
+ DataNodeRequestType requestType,
+ TDataNodeLocation targetDataNode,
+ Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
+ super(countDownLatch, requestType, targetDataNode, dataNodeLocationMap);
+ }
+
+ @Override
+ public void onComplete(TSStatus status) {
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
+ LOGGER.info(
+ "Successfully broadCast the latest configNodeGroup on DataNode: {}", targetDataNode);
+ } else {
+ LOGGER.error(
+ "Failed to broadCast the latest configNodeGroup on DataNode: {}, {}",
+ targetDataNode,
+ status);
+ }
+ countDownLatch.countDown();
+ }
+
+ @Override
+ public void onError(Exception e) {
+ LOGGER.error("BroadCast the latest configNodeGroup on DataNode: {} failed", targetDataNode);
+ countDownLatch.countDown();
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index ba316a95da..14f1036b14 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
+import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
import org.apache.iotdb.confignode.client.sync.confignode.SyncConfigNodeClientPool;
import org.apache.iotdb.confignode.client.sync.datanode.SyncDataNodeClientPool;
import org.apache.iotdb.confignode.consensus.request.write.DeleteStorageGroupPlan;
@@ -251,6 +252,14 @@ public class ConfigNodeProcedureEnv {
ConfigNodeRequestType.NOTIFY_REGISTER_SUCCESS);
}
+ /** notify all DataNodes when the capacity of the ConfigNodeGroup is expanded or reduced */
+ public void broadCastTheLatestConfigNodeGroup() {
+ AsyncDataNodeClientPool.getInstance()
+ .broadCastTheLatestConfigNodeGroup(
+ configManager.getNodeManager().getRegisteredDataNodeLocations(-1),
+ configManager.getNodeManager().getRegisteredConfigNodes());
+ }
+
public LockQueue getNodeLock() {
return nodeLock;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/AddConfigNodeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/AddConfigNodeProcedure.java
index 7edda907cc..cf244fb52d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/AddConfigNodeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/AddConfigNodeProcedure.java
@@ -73,6 +73,7 @@ public class AddConfigNodeProcedure extends AbstractNodeProcedure<AddConfigNodeS
case REGISTER_SUCCESS:
env.notifyRegisterSuccess(tConfigNodeLocation);
env.applyConfigNode(tConfigNodeLocation);
+ env.broadCastTheLatestConfigNodeGroup();
return Flow.NO_MORE_STATE;
}
} catch (Exception e) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RemoveConfigNodeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RemoveConfigNodeProcedure.java
index 52f13e4aa6..706e89454d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RemoveConfigNodeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RemoveConfigNodeProcedure.java
@@ -58,6 +58,7 @@ public class RemoveConfigNodeProcedure extends AbstractNodeProcedure<RemoveConfi
try {
switch (state) {
case REMOVE_CONFIG_NODE_PREPARE:
+ env.broadCastTheLatestConfigNodeGroup();
setNextState(RemoveConfigNodeState.REMOVE_PEER);
break;
case REMOVE_PEER:
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 681ad6be72..dcbf606468 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.service.thrift.impl;
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
@@ -41,6 +42,7 @@ import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
import org.apache.iotdb.db.auth.AuthorizerManager;
+import org.apache.iotdb.db.client.ConfigNodeInfo;
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
import org.apache.iotdb.db.engine.StorageEngineV2;
@@ -92,6 +94,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeReq;
import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp;
+import org.apache.iotdb.mpp.rpc.thrift.TUpdateConfigNodeGroupReq;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
@@ -483,6 +486,20 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
return StorageEngineV2.getInstance().setTTL(req);
}
+ @Override
+ public TSStatus updateConfigNodeGroup(TUpdateConfigNodeGroupReq req) {
+ List<TConfigNodeLocation> configNodeLocations = req.getConfigNodeLocations();
+ if (configNodeLocations != null) {
+ ConfigNodeInfo.getInstance()
+ .updateConfigNodeList(
+ configNodeLocations
+ .parallelStream()
+ .map(TConfigNodeLocation::getInternalEndPoint)
+ .collect(Collectors.toList()));
+ }
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ }
+
@Override
public TSStatus deleteRegion(TConsensusGroupId tconsensusGroupId) {
ConsensusGroupId consensusGroupId =
diff --git a/thrift/src/main/thrift/datanode.thrift b/thrift/src/main/thrift/datanode.thrift
index 56c937cac0..3593651fda 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -194,6 +194,10 @@ struct TRegionRouteReq {
2: required map<common.TConsensusGroupId, common.TRegionReplicaSet> regionRouteMap
}
+struct TUpdateConfigNodeGroupReq {
+ 1: required list<common.TConfigNodeLocation> configNodeLocations
+}
+
service IDataNodeRPCService {
// -----------------------------------For Data Node-----------------------------------------------
@@ -324,6 +328,12 @@ service IDataNodeRPCService {
common.TSStatus flush(common.TFlushReq req)
common.TSStatus setTTL(common.TSetTTLReq req)
+ /**
+ * configNode will notify all DataNodes when the capacity of the ConfigNodeGroup is expanded or reduced
+ *
+ * @param list<common.TConfigNodeLocation> configNodeLocations
+ */
+ common.TSStatus updateConfigNodeGroup(TUpdateConfigNodeGroupReq req)
}
service MPPDataExchangeService {