You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2021/01/05 14:23:51 UTC

[incubator-tubemq] branch TUBEMQ-421 updated (eee5615 -> a65b291)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a change to branch TUBEMQ-421
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git.


    from eee5615  [TUBEMQ-488] reorg topic operation including batch delete / query / modify
     new fdf4503  [TUBEMQ-465] add new feature - copy offset from one group to another
     new a65b291  [TUBEMQ-465] add comment

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../manager/controller/node/request/BaseReq.java   |  3 ++
 .../{CloneTopicReq.java => CloneOffsetReq.java}    | 11 +++----
 .../controller/topic/TopicWebController.java       | 21 ++++++++++++
 .../apache/tubemq/manager/service/NodeService.java | 32 +++++++++++++++++++
 .../service/tube/TubeHttpTopicInfoList.java        | 26 ++++++++++++---
 .../apache/tubemq/manager/utils/ConvertUtils.java  | 37 ++++++++++++++--------
 6 files changed, 106 insertions(+), 24 deletions(-)
 copy tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/{CloneTopicReq.java => CloneOffsetReq.java} (81%)


[incubator-tubemq] 01/02: [TUBEMQ-465] add new feature - copy offset from one group to another

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch TUBEMQ-421
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git

commit fdf450318f418aae70d0f380fdb2a13094186108
Author: EMsnap <zp...@connect.ust.hk>
AuthorDate: Tue Jan 5 15:25:00 2021 +0800

    [TUBEMQ-465] add new feature - copy offset from one group to another
---
 .../manager/controller/node/request/BaseReq.java   |  3 ++
 .../request/{BaseReq.java => CloneOffsetReq.java}  | 13 +++++---
 .../controller/topic/TopicWebController.java       | 21 ++++++++++++
 .../apache/tubemq/manager/service/NodeService.java | 32 +++++++++++++++++++
 .../service/tube/TubeHttpTopicInfoList.java        | 26 ++++++++++++---
 .../apache/tubemq/manager/utils/ConvertUtils.java  | 37 ++++++++++++++--------
 6 files changed, 110 insertions(+), 22 deletions(-)

diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BaseReq.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BaseReq.java
index fe3a32d..0159220 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BaseReq.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BaseReq.java
@@ -17,6 +17,9 @@
 
 package org.apache.tubemq.manager.controller.node.request;
 
+import lombok.Data;
+
+@Data
 public class BaseReq {
     public String type;
     public Integer clusterId;
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BaseReq.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/CloneOffsetReq.java
similarity index 78%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BaseReq.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/CloneOffsetReq.java
index fe3a32d..645084d 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BaseReq.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/CloneOffsetReq.java
@@ -17,8 +17,13 @@
 
 package org.apache.tubemq.manager.controller.node.request;
 
-public class BaseReq {
-    public String type;
-    public Integer clusterId;
-    public String method;
+import lombok.Data;
+
+@Data
+public class CloneOffsetReq extends BaseReq {
+    public String sourceGroupName;
+    public String modifyUser;
+    public String topicName;
+    public String targetGroupName;
+    public String confModAuthToken;
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java
index dd9cd52..b8d0f05 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.tubemq.manager.controller.TubeMQResult;
 import org.apache.tubemq.manager.controller.node.request.BatchAddTopicReq;
+import org.apache.tubemq.manager.controller.node.request.CloneOffsetReq;
 import org.apache.tubemq.manager.controller.node.request.CloneTopicReq;
 import org.apache.tubemq.manager.entry.NodeEntry;
 import org.apache.tubemq.manager.repository.NodeRepository;
@@ -155,6 +156,7 @@ public class TopicWebController {
      * @return
      * @throws Exception
      */
+    @PostMapping("/query/config")
     public @ResponseBody String queryTopicConfig(
         @RequestParam Map<String, String> req) throws Exception {
         String url = masterUtils.getQueryUrl(req);
@@ -162,4 +164,23 @@ public class TopicWebController {
     }
 
 
+    /**
+     *
+     * @param req
+     * @return
+     * @throws Exception
+     */
+    @PostMapping("/clone/offset")
+    public @ResponseBody TubeMQResult cloneOffset(
+        @RequestBody CloneOffsetReq req) throws Exception {
+        if (req.getClusterId() == null)
+            return TubeMQResult.getErrorResult("please input clusterId");
+        NodeEntry masterEntry = nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(
+            req.getClusterId());
+        if (masterEntry == null)
+            return TubeMQResult.getErrorResult("no such cluster");
+        return nodeService.cloneOffsetToOtherGroups(req, masterEntry);
+    }
+
+
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
index 1b15770..7de3ec3 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
@@ -45,12 +45,14 @@ import org.apache.tubemq.manager.controller.TubeMQResult;
 import org.apache.tubemq.manager.controller.node.request.AddBrokersReq;
 import org.apache.tubemq.manager.controller.node.request.AddTopicReq;
 import org.apache.tubemq.manager.controller.node.request.CloneBrokersReq;
+import org.apache.tubemq.manager.controller.node.request.CloneOffsetReq;
 import org.apache.tubemq.manager.controller.node.request.CloneTopicReq;
 import org.apache.tubemq.manager.controller.node.request.QueryBrokerCfgReq;
 import org.apache.tubemq.manager.entry.NodeEntry;
 import org.apache.tubemq.manager.repository.NodeRepository;
 import org.apache.tubemq.manager.service.tube.*;
 import org.apache.tubemq.manager.service.tube.TubeHttpBrokerInfoList.BrokerInfo;
+import org.apache.tubemq.manager.service.tube.TubeHttpTopicInfoList.TopicInfoList.TopicInfo;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
@@ -73,6 +75,9 @@ public class NodeService {
 
     private final TopicBackendWorker worker;
 
+    @Value("${manager.broker.webPort:8081}")
+    private int brokerWebPort;
+
     @Autowired
     private NodeRepository nodeRepository;
 
@@ -443,4 +448,31 @@ public class NodeService {
         return addTopicToBrokers(addTopicReq, master);
 
     }
+
+    public TubeMQResult cloneOffsetToOtherGroups(CloneOffsetReq req, NodeEntry master)
+        throws Exception {
+
+        // 1. query the corresponding brokers having given topic
+        TubeHttpTopicInfoList topicInfoList = requestTopicConfigInfo(master, req.getTopicName());
+        TubeMQResult result = new TubeMQResult();
+
+        if (topicInfoList != null) {
+            List<TopicInfo> topicInfos = topicInfoList.getTopicInfo();
+            // 2. for each broker, request to clone offset
+            for (TopicInfo topicInfo : topicInfos) {
+                String brokerIp = topicInfo.getBrokerIp();
+                String url = SCHEMA + brokerIp + ":" + brokerWebPort
+                    + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
+                result = requestMaster(url);
+                if (result.getErrCode() != 0) {
+                    return result;
+                }
+            }
+        }
+
+        return result;
+    }
+
+
+
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpTopicInfoList.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpTopicInfoList.java
index bff5a7b..ae747a9 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpTopicInfoList.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpTopicInfoList.java
@@ -50,10 +50,10 @@ public class TubeHttpTopicInfoList {
 
             @Data
             public static class RunInfo {
-                private boolean acceptPublish;
-                private boolean acceptSubscribe;
-                private int numPartitions;
-                private int numTopicStores;
+                private String acceptPublish;
+                private String acceptSubscribe;
+                private String numPartitions;
+                private String numTopicStores;
                 private String brokerManageStatus;
             }
 
@@ -102,6 +102,15 @@ public class TubeHttpTopicInfoList {
         return tmpBrokerIdList;
     }
 
+
+    public List<TopicInfo> getTopicInfo() {
+        List<Integer> tmpBrokerIdList = new ArrayList<>();
+        if (data != null) {
+            return data.get(0).getTopicInfo();
+        }
+        return null;
+    }
+
     public AddTopicReq getAddTopicReq(List<Integer> brokerIds, List<String> targetTopicNames, String token) {
 
         AddTopicReq req = new AddTopicReq();
@@ -118,6 +127,14 @@ public class TubeHttpTopicInfoList {
         String brokerStr = StringUtils.join(brokerIds, ",");
         String topic = StringUtils.join(targetTopicNames, ",");
 
+        setAttributes(token, req, topicInfo, brokerStr, topic);
+        return req;
+    }
+
+
+
+    private void setAttributes(String token, AddTopicReq req, TopicInfo topicInfo, String brokerStr,
+        String topic) {
         req.setBrokerId(brokerStr);
         req.setTopicName(topic);
         req.setMethod(BATCH_ADD_TOPIC);
@@ -130,6 +147,5 @@ public class TubeHttpTopicInfoList {
         req.setUnflushInterval(topicInfo.getUnflushInterval());
         req.setConfModAuthToken(token);
         req.setDeletePolicy(topicInfo.getDeletePolicy());
-        return req;
     }
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java
index f93819f..91e4f8f 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java
@@ -34,21 +34,32 @@ public class ConvertUtils {
     public static String convertReqToQueryStr(Object req) throws Exception {
         List<String> queryList = new ArrayList<>();
         Class<?> clz = req.getClass();
-        Field[] fields = clz.getDeclaredFields();
-        for (Field field : fields) {
-            field.setAccessible(true);
-            Object o = field.get(req);
-            String value;
-            // convert list to json string
-            if (o == null) continue;
-            if (o instanceof List) {
-                value = gson.toJson(o);
-            } else {
-                value = o.toString();
-            }
-            queryList.add(field.getName() + "=" + URLEncoder.encode(
+        List fieldsList = new ArrayList<Field[]>();
+
+        while (clz != null) {
+            Field[] declaredFields = clz.getDeclaredFields();
+            fieldsList.add(declaredFields);
+            clz = clz.getSuperclass();
+        }
+
+        for (Object fields:fieldsList) {
+            Field[] f = (Field[]) fields;
+            for (Field field : f) {
+                field.setAccessible(true);
+                Object o = field.get(req);
+                String value;
+                // convert list to json string
+                if (o == null) continue;
+                if (o instanceof List) {
+                    value = gson.toJson(o);
+                } else {
+                    value = o.toString();
+                }
+                queryList.add(field.getName() + "=" + URLEncoder.encode(
                     value, UTF_8.toString()));
+            }
         }
+
         return StringUtils.join(queryList, "&");
     }
 }


[incubator-tubemq] 02/02: [TUBEMQ-465] add comment

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch TUBEMQ-421
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git

commit a65b291552dee67288c50e3ef9442cb54284e4c6
Author: EMsnap <zp...@connect.ust.hk>
AuthorDate: Tue Jan 5 19:02:42 2021 +0800

    [TUBEMQ-465] add comment
---
 .../org/apache/tubemq/manager/controller/topic/TopicWebController.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java
index b8d0f05..6addbab 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java
@@ -165,7 +165,7 @@ public class TopicWebController {
 
 
     /**
-     *
+     * clone offset from one group to another
      * @param req
      * @return
      * @throws Exception