You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/10 03:49:58 UTC
[incubator-inlong] branch master updated: [INLONG-3015] [TubeMQ] Add configuration to support the number of reloaded machines per batch (#3019)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new df96cd1 [INLONG-3015] [TubeMQ] Add configuration to support the number of reloaded machines per batch (#3019)
df96cd1 is described below
commit df96cd1187c2b2d3e0a95e33c79f1de89c71766b
Author: bluewang <88...@users.noreply.github.com>
AuthorDate: Thu Mar 10 11:49:54 2022 +0800
[INLONG-3015] [TubeMQ] Add configuration to support the number of reloaded machines per batch (#3019)
Co-authored-by: v_lizhwang <v_...@tencent.com>
---
.../tubemq/manager/controller/cluster/request/AddClusterReq.java | 1 +
.../java/org/apache/inlong/tubemq/manager/entry/ClusterEntry.java | 2 ++
.../apache/inlong/tubemq/manager/service/ClusterServiceImpl.java | 1 +
.../org/apache/inlong/tubemq/manager/service/NodeServiceImpl.java | 5 +++--
.../org/apache/inlong/tubemq/manager/service/TaskServiceImpl.java | 6 +++---
.../inlong/tubemq/manager/service/interfaces/NodeService.java | 3 ++-
.../inlong/tubemq/manager/controller/TestClusterController.java | 1 +
7 files changed, 13 insertions(+), 6 deletions(-)
diff --git a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/request/AddClusterReq.java b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/request/AddClusterReq.java
index 4db0bf2..eb7ef5a 100644
--- a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/request/AddClusterReq.java
+++ b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/controller/cluster/request/AddClusterReq.java
@@ -31,6 +31,7 @@ public class AddClusterReq {
private List<MasterEntry> masterEntries;
private String createUser;
private String token;
+ private int reloadBrokerSize;
public boolean legal() {
return CollectionUtils.isNotEmpty(masterEntries) && StringUtils.isNotBlank(token);
diff --git a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/entry/ClusterEntry.java b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/entry/ClusterEntry.java
index 5ee2476..3a86774 100644
--- a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/entry/ClusterEntry.java
+++ b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/entry/ClusterEntry.java
@@ -43,4 +43,6 @@ public class ClusterEntry {
private Date modifyTime;
private String createUser;
+
+ private int reloadBrokerSize;
}
diff --git a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/ClusterServiceImpl.java b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/ClusterServiceImpl.java
index 7f09de3..1a0d172 100644
--- a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/ClusterServiceImpl.java
+++ b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/ClusterServiceImpl.java
@@ -55,6 +55,7 @@ public class ClusterServiceImpl implements ClusterService {
entry.setCreateTime(new Date());
entry.setCreateUser(req.getCreateUser());
entry.setClusterName(req.getClusterName());
+ entry.setReloadBrokerSize(req.getReloadBrokerSize());
ClusterEntry retEntry = clusterRepository.saveAndFlush(entry);
// add master node
addMasterNode(req, retEntry);
diff --git a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/NodeServiceImpl.java b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/NodeServiceImpl.java
index 2976ae0..82f9ec3 100644
--- a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/NodeServiceImpl.java
+++ b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/NodeServiceImpl.java
@@ -43,6 +43,7 @@ import org.apache.inlong.tubemq.manager.controller.node.request.BatchAddTopicReq
import org.apache.inlong.tubemq.manager.controller.node.request.CloneBrokersReq;
import org.apache.inlong.tubemq.manager.controller.node.request.CloneTopicReq;
import org.apache.inlong.tubemq.manager.controller.node.request.QueryBrokerCfgReq;
+import org.apache.inlong.tubemq.manager.entry.ClusterEntry;
import org.apache.inlong.tubemq.manager.entry.MasterEntry;
import org.apache.inlong.tubemq.manager.repository.MasterRepository;
import org.apache.inlong.tubemq.manager.service.interfaces.MasterService;
@@ -253,7 +254,7 @@ public class NodeServiceImpl implements NodeService {
* @param needReloadList
*/
@Override
- public void handleReloadBroker(MasterEntry masterEntry, List<Integer> needReloadList) {
+ public void handleReloadBroker(MasterEntry masterEntry, List<Integer> needReloadList, ClusterEntry clusterEntry) {
// reload without exceed max broker.
if (needReloadList.isEmpty()) {
return;
@@ -261,7 +262,7 @@ public class NodeServiceImpl implements NodeService {
int begin = 0;
int end = 0;
do {
- end = Math.min(maxConfigurableBrokerSize + begin, needReloadList.size());
+ end = Math.min(clusterEntry.getReloadBrokerSize() + begin, needReloadList.size());
List<Integer> brokerIdList = needReloadList.subList(begin, end);
String brokerStr = StringUtils.join(brokerIdList, ",");
String url = TubeConst.SCHEMA + masterEntry.getIp() + ":" + masterEntry.getWebPort()
diff --git a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/TaskServiceImpl.java b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/TaskServiceImpl.java
index 48895b8..496ab7f 100644
--- a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/TaskServiceImpl.java
+++ b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/TaskServiceImpl.java
@@ -140,14 +140,14 @@ public class TaskServiceImpl implements TaskService {
if (ValidateUtils.isNull(brokerInfoList)) {
continue;
}
- doReloadBrokers(clusterId, masterEntry, brokerInfoList);
+ doReloadBrokers(clusterId, masterEntry, brokerInfoList, cluster);
}
}
@Async("asyncExecutor")
public void doReloadBrokers(long clusterId, MasterEntry masterEntry,
- TubeHttpBrokerInfoList brokerInfoList) {
- nodeService.handleReloadBroker(masterEntry, brokerInfoList.getNeedReloadList());
+ TubeHttpBrokerInfoList brokerInfoList, ClusterEntry clusterEntry) {
+ nodeService.handleReloadBroker(masterEntry, brokerInfoList.getNeedReloadList(), clusterEntry);
updateCreateTopicTaskStatus(clusterId);
}
diff --git a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/interfaces/NodeService.java b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/interfaces/NodeService.java
index 90e694d..bdc78dc 100644
--- a/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/interfaces/NodeService.java
+++ b/inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/interfaces/NodeService.java
@@ -27,6 +27,7 @@ import org.apache.inlong.tubemq.manager.controller.node.request.AddTopicReq;
import org.apache.inlong.tubemq.manager.controller.node.request.BatchAddTopicReq;
import org.apache.inlong.tubemq.manager.controller.node.request.CloneBrokersReq;
import org.apache.inlong.tubemq.manager.controller.node.request.CloneTopicReq;
+import org.apache.inlong.tubemq.manager.entry.ClusterEntry;
import org.apache.inlong.tubemq.manager.entry.MasterEntry;
import org.apache.inlong.tubemq.manager.service.tube.TubeHttpBrokerInfoList;
@@ -82,7 +83,7 @@ public interface NodeService {
boolean configBrokersForTopics(MasterEntry masterEntry,
Set<String> topics, List<Integer> brokerList, int maxBrokers);
- void handleReloadBroker(MasterEntry masterEntry, List<Integer> needReloadList);
+ void handleReloadBroker(MasterEntry masterEntry, List<Integer> needReloadList, ClusterEntry clusterEntry);
void close() throws IOException;
diff --git a/inlong-tubemq/tubemq-manager/src/test/java/org/apache/inlong/tubemq/manager/controller/TestClusterController.java b/inlong-tubemq/tubemq-manager/src/test/java/org/apache/inlong/tubemq/manager/controller/TestClusterController.java
index 8548c74..ac0c3d4 100644
--- a/inlong-tubemq/tubemq-manager/src/test/java/org/apache/inlong/tubemq/manager/controller/TestClusterController.java
+++ b/inlong-tubemq/tubemq-manager/src/test/java/org/apache/inlong/tubemq/manager/controller/TestClusterController.java
@@ -169,6 +169,7 @@ public class TestClusterController {
masterEntries.add(masterEntry);
req.setMasterEntries(masterEntries);
req.setToken("abc");
+ req.setReloadBrokerSize(2);
ClusterEntry entry = getOneClusterEntry();
TubeMQResult successResult = new TubeMQResult();