You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2021/01/05 14:23:52 UTC

[incubator-tubemq] 01/02: [TUBEMQ-465] add new feature - copy offset from one group to another

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch TUBEMQ-421
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git

commit fdf450318f418aae70d0f380fdb2a13094186108
Author: EMsnap <zp...@connect.ust.hk>
AuthorDate: Tue Jan 5 15:25:00 2021 +0800

    [TUBEMQ-465] add new feature - copy offset from one group to another
---
 .../manager/controller/node/request/BaseReq.java   |  3 ++
 .../request/{BaseReq.java => CloneOffsetReq.java}  | 13 +++++---
 .../controller/topic/TopicWebController.java       | 21 ++++++++++++
 .../apache/tubemq/manager/service/NodeService.java | 32 +++++++++++++++++++
 .../service/tube/TubeHttpTopicInfoList.java        | 26 ++++++++++++---
 .../apache/tubemq/manager/utils/ConvertUtils.java  | 37 ++++++++++++++--------
 6 files changed, 110 insertions(+), 22 deletions(-)

diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BaseReq.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BaseReq.java
index fe3a32d..0159220 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BaseReq.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BaseReq.java
@@ -17,6 +17,9 @@
 
 package org.apache.tubemq.manager.controller.node.request;
 
+import lombok.Data;
+
+@Data
 public class BaseReq {
     public String type;
     public Integer clusterId;
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BaseReq.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/CloneOffsetReq.java
similarity index 78%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BaseReq.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/CloneOffsetReq.java
index fe3a32d..645084d 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BaseReq.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/CloneOffsetReq.java
@@ -17,8 +17,13 @@
 
 package org.apache.tubemq.manager.controller.node.request;
 
-public class BaseReq {
-    public String type;
-    public Integer clusterId;
-    public String method;
+import lombok.Data;
+
+@Data
+public class CloneOffsetReq extends BaseReq {
+    public String sourceGroupName;
+    public String modifyUser;
+    public String topicName;
+    public String targetGroupName;
+    public String confModAuthToken;
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java
index dd9cd52..b8d0f05 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.tubemq.manager.controller.TubeMQResult;
 import org.apache.tubemq.manager.controller.node.request.BatchAddTopicReq;
+import org.apache.tubemq.manager.controller.node.request.CloneOffsetReq;
 import org.apache.tubemq.manager.controller.node.request.CloneTopicReq;
 import org.apache.tubemq.manager.entry.NodeEntry;
 import org.apache.tubemq.manager.repository.NodeRepository;
@@ -155,6 +156,7 @@ public class TopicWebController {
      * @return
      * @throws Exception
      */
+    @PostMapping("/query/config")
     public @ResponseBody String queryTopicConfig(
         @RequestParam Map<String, String> req) throws Exception {
         String url = masterUtils.getQueryUrl(req);
@@ -162,4 +164,23 @@ public class TopicWebController {
     }
 
 
+    /**
+     *
+     * @param req
+     * @return
+     * @throws Exception
+     */
+    @PostMapping("/clone/offset")
+    public @ResponseBody TubeMQResult cloneOffset(
+        @RequestBody CloneOffsetReq req) throws Exception {
+        if (req.getClusterId() == null)
+            return TubeMQResult.getErrorResult("please input clusterId");
+        NodeEntry masterEntry = nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(
+            req.getClusterId());
+        if (masterEntry == null)
+            return TubeMQResult.getErrorResult("no such cluster");
+        return nodeService.cloneOffsetToOtherGroups(req, masterEntry);
+    }
+
+
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
index 1b15770..7de3ec3 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
@@ -45,12 +45,14 @@ import org.apache.tubemq.manager.controller.TubeMQResult;
 import org.apache.tubemq.manager.controller.node.request.AddBrokersReq;
 import org.apache.tubemq.manager.controller.node.request.AddTopicReq;
 import org.apache.tubemq.manager.controller.node.request.CloneBrokersReq;
+import org.apache.tubemq.manager.controller.node.request.CloneOffsetReq;
 import org.apache.tubemq.manager.controller.node.request.CloneTopicReq;
 import org.apache.tubemq.manager.controller.node.request.QueryBrokerCfgReq;
 import org.apache.tubemq.manager.entry.NodeEntry;
 import org.apache.tubemq.manager.repository.NodeRepository;
 import org.apache.tubemq.manager.service.tube.*;
 import org.apache.tubemq.manager.service.tube.TubeHttpBrokerInfoList.BrokerInfo;
+import org.apache.tubemq.manager.service.tube.TubeHttpTopicInfoList.TopicInfoList.TopicInfo;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
@@ -73,6 +75,9 @@ public class NodeService {
 
     private final TopicBackendWorker worker;
 
+    @Value("${manager.broker.webPort:8081}")
+    private int brokerWebPort;
+
     @Autowired
     private NodeRepository nodeRepository;
 
@@ -443,4 +448,31 @@ public class NodeService {
         return addTopicToBrokers(addTopicReq, master);
 
     }
+
+    public TubeMQResult cloneOffsetToOtherGroups(CloneOffsetReq req, NodeEntry master)
+        throws Exception {
+
+        // 1. query the corresponding brokers having given topic
+        TubeHttpTopicInfoList topicInfoList = requestTopicConfigInfo(master, req.getTopicName());
+        TubeMQResult result = new TubeMQResult();
+
+        if (topicInfoList != null) {
+            List<TopicInfo> topicInfos = topicInfoList.getTopicInfo();
+            // 2. for each broker, request to clone offset
+            for (TopicInfo topicInfo : topicInfos) {
+                String brokerIp = topicInfo.getBrokerIp();
+                String url = SCHEMA + brokerIp + ":" + brokerWebPort
+                    + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
+                result = requestMaster(url);
+                if (result.getErrCode() != 0) {
+                    return result;
+                }
+            }
+        }
+
+        return result;
+    }
+
+
+
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpTopicInfoList.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpTopicInfoList.java
index bff5a7b..ae747a9 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpTopicInfoList.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpTopicInfoList.java
@@ -50,10 +50,10 @@ public class TubeHttpTopicInfoList {
 
             @Data
             public static class RunInfo {
-                private boolean acceptPublish;
-                private boolean acceptSubscribe;
-                private int numPartitions;
-                private int numTopicStores;
+                private String acceptPublish;
+                private String acceptSubscribe;
+                private String numPartitions;
+                private String numTopicStores;
                 private String brokerManageStatus;
             }
 
@@ -102,6 +102,15 @@ public class TubeHttpTopicInfoList {
         return tmpBrokerIdList;
     }
 
+
+    public List<TopicInfo> getTopicInfo() {
+        List<Integer> tmpBrokerIdList = new ArrayList<>();
+        if (data != null) {
+            return data.get(0).getTopicInfo();
+        }
+        return null;
+    }
+
     public AddTopicReq getAddTopicReq(List<Integer> brokerIds, List<String> targetTopicNames, String token) {
 
         AddTopicReq req = new AddTopicReq();
@@ -118,6 +127,14 @@ public class TubeHttpTopicInfoList {
         String brokerStr = StringUtils.join(brokerIds, ",");
         String topic = StringUtils.join(targetTopicNames, ",");
 
+        setAttributes(token, req, topicInfo, brokerStr, topic);
+        return req;
+    }
+
+
+
+    private void setAttributes(String token, AddTopicReq req, TopicInfo topicInfo, String brokerStr,
+        String topic) {
         req.setBrokerId(brokerStr);
         req.setTopicName(topic);
         req.setMethod(BATCH_ADD_TOPIC);
@@ -130,6 +147,5 @@ public class TubeHttpTopicInfoList {
         req.setUnflushInterval(topicInfo.getUnflushInterval());
         req.setConfModAuthToken(token);
         req.setDeletePolicy(topicInfo.getDeletePolicy());
-        return req;
     }
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java
index f93819f..91e4f8f 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java
@@ -34,21 +34,32 @@ public class ConvertUtils {
     public static String convertReqToQueryStr(Object req) throws Exception {
         List<String> queryList = new ArrayList<>();
         Class<?> clz = req.getClass();
-        Field[] fields = clz.getDeclaredFields();
-        for (Field field : fields) {
-            field.setAccessible(true);
-            Object o = field.get(req);
-            String value;
-            // convert list to json string
-            if (o == null) continue;
-            if (o instanceof List) {
-                value = gson.toJson(o);
-            } else {
-                value = o.toString();
-            }
-            queryList.add(field.getName() + "=" + URLEncoder.encode(
+        List fieldsList = new ArrayList<Field[]>();
+
+        while (clz != null) {
+            Field[] declaredFields = clz.getDeclaredFields();
+            fieldsList.add(declaredFields);
+            clz = clz.getSuperclass();
+        }
+
+        for (Object fields:fieldsList) {
+            Field[] f = (Field[]) fields;
+            for (Field field : f) {
+                field.setAccessible(true);
+                Object o = field.get(req);
+                String value;
+                // convert list to json string
+                if (o == null) continue;
+                if (o instanceof List) {
+                    value = gson.toJson(o);
+                } else {
+                    value = o.toString();
+                }
+                queryList.add(field.getName() + "=" + URLEncoder.encode(
                     value, UTF_8.toString()));
+            }
         }
+
         return StringUtils.join(queryList, "&");
     }
 }