You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/10 03:49:58 UTC

[incubator-inlong] branch master updated: [INLONG-3015] [TubeMQ] Add configuration to support the number of reloaded machines per batch (#3019)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new df96cd1  [INLONG-3015] [TubeMQ] Add configuration to support the number of reloaded machines per batch (#3019)
df96cd1 is described below

commit df96cd1187c2b2d3e0a95e33c79f1de89c71766b
Author: bluewang <88...@users.noreply.github.com>
AuthorDate: Thu Mar 10 11:49:54 2022 +0800

    [INLONG-3015] [TubeMQ] Add configuration to support the number of reloaded machines per batch (#3019)
    
    Co-authored-by: v_lizhwang <v_...@tencent.com>
---
 .../tubemq/manager/controller/cluster/request/AddClusterReq.java    | 1 +
 .../java/org/apache/inlong/tubemq/manager/entry/ClusterEntry.java   | 2 ++
 .../apache/inlong/tubemq/manager/service/ClusterServiceImpl.java    | 1 +
 .../org/apache/inlong/tubemq/manager/service/NodeServiceImpl.java   | 5 +++--
 .../org/apache/inlong/tubemq/manager/service/TaskServiceImpl.java   | 6 +++---
 .../inlong/tubemq/manager/service/interfaces/NodeService.java       | 3 ++-
 .../inlong/tubemq/manager/controller/TestClusterController.java     | 1 +
 7 files changed, 13 insertions(+), 6 deletions(-)

diff --git a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/request/AddClusterReq.java b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/request/AddClusterReq.java
index 4db0bf2..eb7ef5a 100644
--- a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/request/AddClusterReq.java
+++ b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/request/AddClusterReq.java
@@ -31,6 +31,7 @@ public class AddClusterReq {
     private List<MasterEntry> masterEntries;
     private String createUser;
     private String token;
+    private int reloadBrokerSize;
 
     public boolean legal() {
         return CollectionUtils.isNotEmpty(masterEntries) && StringUtils.isNotBlank(token);
diff --git a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/entry/ClusterEntry.java b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/entry/ClusterEntry.java
index 5ee2476..3a86774 100644
--- a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/entry/ClusterEntry.java
+++ b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/entry/ClusterEntry.java
@@ -43,4 +43,6 @@ public class ClusterEntry {
     private Date modifyTime;
 
     private String createUser;
+
+    private int reloadBrokerSize;
 }
diff --git a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/ClusterServiceImpl.java b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/ClusterServiceImpl.java
index 7f09de3..1a0d172 100644
--- a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/ClusterServiceImpl.java
+++ b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/ClusterServiceImpl.java
@@ -55,6 +55,7 @@ public class ClusterServiceImpl implements ClusterService {
         entry.setCreateTime(new Date());
         entry.setCreateUser(req.getCreateUser());
         entry.setClusterName(req.getClusterName());
+        entry.setReloadBrokerSize(req.getReloadBrokerSize());
         ClusterEntry retEntry = clusterRepository.saveAndFlush(entry);
         // add master node
         addMasterNode(req, retEntry);
diff --git a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/NodeServiceImpl.java b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/NodeServiceImpl.java
index 2976ae0..82f9ec3 100644
--- a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/NodeServiceImpl.java
+++ b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/NodeServiceImpl.java
@@ -43,6 +43,7 @@ import org.apache.inlong.tubemq.manager.controller.node.request.BatchAddTopicReq
 import org.apache.inlong.tubemq.manager.controller.node.request.CloneBrokersReq;
 import org.apache.inlong.tubemq.manager.controller.node.request.CloneTopicReq;
 import org.apache.inlong.tubemq.manager.controller.node.request.QueryBrokerCfgReq;
+import org.apache.inlong.tubemq.manager.entry.ClusterEntry;
 import org.apache.inlong.tubemq.manager.entry.MasterEntry;
 import org.apache.inlong.tubemq.manager.repository.MasterRepository;
 import org.apache.inlong.tubemq.manager.service.interfaces.MasterService;
@@ -253,7 +254,7 @@ public class NodeServiceImpl implements NodeService {
      * @param needReloadList
      */
     @Override
-    public void handleReloadBroker(MasterEntry masterEntry, List<Integer> needReloadList) {
+    public void handleReloadBroker(MasterEntry masterEntry, List<Integer> needReloadList, ClusterEntry clusterEntry) {
         // reload without exceed max broker.
         if (needReloadList.isEmpty()) {
             return;
@@ -261,7 +262,7 @@ public class NodeServiceImpl implements NodeService {
         int begin = 0;
         int end = 0;
         do {
-            end = Math.min(maxConfigurableBrokerSize + begin, needReloadList.size());
+            end = Math.min(clusterEntry.getReloadBrokerSize() + begin, needReloadList.size());
             List<Integer> brokerIdList = needReloadList.subList(begin, end);
             String brokerStr = StringUtils.join(brokerIdList, ",");
             String url = TubeConst.SCHEMA + masterEntry.getIp() + ":" + masterEntry.getWebPort()
diff --git a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/TaskServiceImpl.java b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/TaskServiceImpl.java
index 48895b8..496ab7f 100644
--- a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/TaskServiceImpl.java
+++ b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/TaskServiceImpl.java
@@ -140,14 +140,14 @@ public class TaskServiceImpl implements TaskService {
             if (ValidateUtils.isNull(brokerInfoList)) {
                 continue;
             }
-            doReloadBrokers(clusterId, masterEntry, brokerInfoList);
+            doReloadBrokers(clusterId, masterEntry, brokerInfoList, cluster);
         }
     }
 
     @Async("asyncExecutor")
     public void doReloadBrokers(long clusterId, MasterEntry masterEntry,
-                                TubeHttpBrokerInfoList brokerInfoList) {
-        nodeService.handleReloadBroker(masterEntry, brokerInfoList.getNeedReloadList());
+                                TubeHttpBrokerInfoList brokerInfoList, ClusterEntry clusterEntry) {
+        nodeService.handleReloadBroker(masterEntry, brokerInfoList.getNeedReloadList(), clusterEntry);
         updateCreateTopicTaskStatus(clusterId);
     }
 
diff --git a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/interfaces/NodeService.java b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/interfaces/NodeService.java
index 90e694d..bdc78dc 100644
--- a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/interfaces/NodeService.java
+++ b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/interfaces/NodeService.java
@@ -27,6 +27,7 @@ import org.apache.inlong.tubemq.manager.controller.node.request.AddTopicReq;
 import org.apache.inlong.tubemq.manager.controller.node.request.BatchAddTopicReq;
 import org.apache.inlong.tubemq.manager.controller.node.request.CloneBrokersReq;
 import org.apache.inlong.tubemq.manager.controller.node.request.CloneTopicReq;
+import org.apache.inlong.tubemq.manager.entry.ClusterEntry;
 import org.apache.inlong.tubemq.manager.entry.MasterEntry;
 import org.apache.inlong.tubemq.manager.service.tube.TubeHttpBrokerInfoList;
 
@@ -82,7 +83,7 @@ public interface NodeService {
     boolean configBrokersForTopics(MasterEntry masterEntry,
                                    Set<String> topics, List<Integer> brokerList, int maxBrokers);
 
-    void handleReloadBroker(MasterEntry masterEntry, List<Integer> needReloadList);
+    void handleReloadBroker(MasterEntry masterEntry, List<Integer> needReloadList, ClusterEntry clusterEntry);
 
     void close() throws IOException;
 
diff --git a/inlong-tubemq/tubemq-manager/src/test/java/org/apache/inlong/tubemq/manager/controller/TestClusterController.java b/inlong-tubemq/tubemq-manager/src/test/java/org/apache/inlong/tubemq/manager/controller/TestClusterController.java
index 8548c74..ac0c3d4 100644
--- a/inlong-tubemq/tubemq-manager/src/test/java/org/apache/inlong/tubemq/manager/controller/TestClusterController.java
+++ b/inlong-tubemq/tubemq-manager/src/test/java/org/apache/inlong/tubemq/manager/controller/TestClusterController.java
@@ -169,6 +169,7 @@ public class TestClusterController {
         masterEntries.add(masterEntry);
         req.setMasterEntries(masterEntries);
         req.setToken("abc");
+        req.setReloadBrokerSize(2);
 
         ClusterEntry entry = getOneClusterEntry();
         TubeMQResult successResult = new TubeMQResult();