You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2021/01/05 14:23:52 UTC
[incubator-tubemq] 01/02: [TUBEMQ-465] add new feature - copy
offset from one group to another
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch TUBEMQ-421
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
commit fdf450318f418aae70d0f380fdb2a13094186108
Author: EMsnap <zp...@connect.ust.hk>
AuthorDate: Tue Jan 5 15:25:00 2021 +0800
[TUBEMQ-465] add new feature - copy offset from one group to another
---
.../manager/controller/node/request/BaseReq.java | 3 ++
.../request/{BaseReq.java => CloneOffsetReq.java} | 13 +++++---
.../controller/topic/TopicWebController.java | 21 ++++++++++++
.../apache/tubemq/manager/service/NodeService.java | 32 +++++++++++++++++++
.../service/tube/TubeHttpTopicInfoList.java | 26 ++++++++++++---
.../apache/tubemq/manager/utils/ConvertUtils.java | 37 ++++++++++++++--------
6 files changed, 110 insertions(+), 22 deletions(-)
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BaseReq.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BaseReq.java
index fe3a32d..0159220 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BaseReq.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BaseReq.java
@@ -17,6 +17,9 @@
package org.apache.tubemq.manager.controller.node.request;
+import lombok.Data;
+
+@Data
public class BaseReq {
public String type;
public Integer clusterId;
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BaseReq.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/CloneOffsetReq.java
similarity index 78%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BaseReq.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/CloneOffsetReq.java
index fe3a32d..645084d 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BaseReq.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/CloneOffsetReq.java
@@ -17,8 +17,13 @@
package org.apache.tubemq.manager.controller.node.request;
-public class BaseReq {
- public String type;
- public Integer clusterId;
- public String method;
+import lombok.Data;
+
+@Data
+public class CloneOffsetReq extends BaseReq {
+ public String sourceGroupName;
+ public String modifyUser;
+ public String topicName;
+ public String targetGroupName;
+ public String confModAuthToken;
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java
index dd9cd52..b8d0f05 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java
@@ -25,6 +25,7 @@ import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.tubemq.manager.controller.TubeMQResult;
import org.apache.tubemq.manager.controller.node.request.BatchAddTopicReq;
+import org.apache.tubemq.manager.controller.node.request.CloneOffsetReq;
import org.apache.tubemq.manager.controller.node.request.CloneTopicReq;
import org.apache.tubemq.manager.entry.NodeEntry;
import org.apache.tubemq.manager.repository.NodeRepository;
@@ -155,6 +156,7 @@ public class TopicWebController {
* @return
* @throws Exception
*/
+ @PostMapping("/query/config")
public @ResponseBody String queryTopicConfig(
@RequestParam Map<String, String> req) throws Exception {
String url = masterUtils.getQueryUrl(req);
@@ -162,4 +164,23 @@ public class TopicWebController {
}
+ /**
+ *
+ * @param req
+ * @return
+ * @throws Exception
+ */
+ @PostMapping("/clone/offset")
+ public @ResponseBody TubeMQResult cloneOffset(
+ @RequestBody CloneOffsetReq req) throws Exception {
+ if (req.getClusterId() == null)
+ return TubeMQResult.getErrorResult("please input clusterId");
+ NodeEntry masterEntry = nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(
+ req.getClusterId());
+ if (masterEntry == null)
+ return TubeMQResult.getErrorResult("no such cluster");
+ return nodeService.cloneOffsetToOtherGroups(req, masterEntry);
+ }
+
+
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
index 1b15770..7de3ec3 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
@@ -45,12 +45,14 @@ import org.apache.tubemq.manager.controller.TubeMQResult;
import org.apache.tubemq.manager.controller.node.request.AddBrokersReq;
import org.apache.tubemq.manager.controller.node.request.AddTopicReq;
import org.apache.tubemq.manager.controller.node.request.CloneBrokersReq;
+import org.apache.tubemq.manager.controller.node.request.CloneOffsetReq;
import org.apache.tubemq.manager.controller.node.request.CloneTopicReq;
import org.apache.tubemq.manager.controller.node.request.QueryBrokerCfgReq;
import org.apache.tubemq.manager.entry.NodeEntry;
import org.apache.tubemq.manager.repository.NodeRepository;
import org.apache.tubemq.manager.service.tube.*;
import org.apache.tubemq.manager.service.tube.TubeHttpBrokerInfoList.BrokerInfo;
+import org.apache.tubemq.manager.service.tube.TubeHttpTopicInfoList.TopicInfoList.TopicInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@@ -73,6 +75,9 @@ public class NodeService {
private final TopicBackendWorker worker;
+ @Value("${manager.broker.webPort:8081}")
+ private int brokerWebPort;
+
@Autowired
private NodeRepository nodeRepository;
@@ -443,4 +448,31 @@ public class NodeService {
return addTopicToBrokers(addTopicReq, master);
}
+
+ public TubeMQResult cloneOffsetToOtherGroups(CloneOffsetReq req, NodeEntry master)
+ throws Exception {
+
+ // 1. query the corresponding brokers having given topic
+ TubeHttpTopicInfoList topicInfoList = requestTopicConfigInfo(master, req.getTopicName());
+ TubeMQResult result = new TubeMQResult();
+
+ if (topicInfoList != null) {
+ List<TopicInfo> topicInfos = topicInfoList.getTopicInfo();
+ // 2. for each broker, request to clone offset
+ for (TopicInfo topicInfo : topicInfos) {
+ String brokerIp = topicInfo.getBrokerIp();
+ String url = SCHEMA + brokerIp + ":" + brokerWebPort
+ + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
+ result = requestMaster(url);
+ if (result.getErrCode() != 0) {
+ return result;
+ }
+ }
+ }
+
+ return result;
+ }
+
+
+
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpTopicInfoList.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpTopicInfoList.java
index bff5a7b..ae747a9 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpTopicInfoList.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpTopicInfoList.java
@@ -50,10 +50,10 @@ public class TubeHttpTopicInfoList {
@Data
public static class RunInfo {
- private boolean acceptPublish;
- private boolean acceptSubscribe;
- private int numPartitions;
- private int numTopicStores;
+ private String acceptPublish;
+ private String acceptSubscribe;
+ private String numPartitions;
+ private String numTopicStores;
private String brokerManageStatus;
}
@@ -102,6 +102,15 @@ public class TubeHttpTopicInfoList {
return tmpBrokerIdList;
}
+
+ public List<TopicInfo> getTopicInfo() {
+ List<Integer> tmpBrokerIdList = new ArrayList<>();
+ if (data != null) {
+ return data.get(0).getTopicInfo();
+ }
+ return null;
+ }
+
public AddTopicReq getAddTopicReq(List<Integer> brokerIds, List<String> targetTopicNames, String token) {
AddTopicReq req = new AddTopicReq();
@@ -118,6 +127,14 @@ public class TubeHttpTopicInfoList {
String brokerStr = StringUtils.join(brokerIds, ",");
String topic = StringUtils.join(targetTopicNames, ",");
+ setAttributes(token, req, topicInfo, brokerStr, topic);
+ return req;
+ }
+
+
+
+ private void setAttributes(String token, AddTopicReq req, TopicInfo topicInfo, String brokerStr,
+ String topic) {
req.setBrokerId(brokerStr);
req.setTopicName(topic);
req.setMethod(BATCH_ADD_TOPIC);
@@ -130,6 +147,5 @@ public class TubeHttpTopicInfoList {
req.setUnflushInterval(topicInfo.getUnflushInterval());
req.setConfModAuthToken(token);
req.setDeletePolicy(topicInfo.getDeletePolicy());
- return req;
}
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java
index f93819f..91e4f8f 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java
@@ -34,21 +34,32 @@ public class ConvertUtils {
public static String convertReqToQueryStr(Object req) throws Exception {
List<String> queryList = new ArrayList<>();
Class<?> clz = req.getClass();
- Field[] fields = clz.getDeclaredFields();
- for (Field field : fields) {
- field.setAccessible(true);
- Object o = field.get(req);
- String value;
- // convert list to json string
- if (o == null) continue;
- if (o instanceof List) {
- value = gson.toJson(o);
- } else {
- value = o.toString();
- }
- queryList.add(field.getName() + "=" + URLEncoder.encode(
+ List fieldsList = new ArrayList<Field[]>();
+
+ while (clz != null) {
+ Field[] declaredFields = clz.getDeclaredFields();
+ fieldsList.add(declaredFields);
+ clz = clz.getSuperclass();
+ }
+
+ for (Object fields:fieldsList) {
+ Field[] f = (Field[]) fields;
+ for (Field field : f) {
+ field.setAccessible(true);
+ Object o = field.get(req);
+ String value;
+ // convert list to json string
+ if (o == null) continue;
+ if (o instanceof List) {
+ value = gson.toJson(o);
+ } else {
+ value = o.toString();
+ }
+ queryList.add(field.getName() + "=" + URLEncoder.encode(
value, UTF_8.toString()));
+ }
}
+
return StringUtils.join(queryList, "&");
}
}