You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2021/01/09 01:45:15 UTC

[incubator-tubemq] branch TUBEMQ-421 updated: [TUBEMQ-503] offset query + design by contract

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch TUBEMQ-421
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/TUBEMQ-421 by this push:
     new de85d0e  [TUBEMQ-503] offset query + design by contract
de85d0e is described below

commit de85d0e7b504461cd4ccec884db0b48c5216189d
Author: EMsnap <zp...@connect.ust.hk>
AuthorDate: Fri Jan 8 19:46:14 2021 +0800

    [TUBEMQ-503] offset query + design by contract
---
 .../manager/controller/group/GroupController.java  |  10 +-
 .../group/request/QueryOffsetReq.java}             |  30 +-
 .../group/result/AllBrokersOffsetRes.java}         |  38 +-
 .../controller/group/result/OffsetQueryRes.java    |  57 +++
 .../manager/controller/node/NodeController.java    |   2 -
 .../controller/topic/TopicWebController.java       |   8 +-
 .../apache/tubemq/manager/service/NodeService.java | 438 ++-------------------
 .../{NodeService.java => NodeServiceImpl.java}     |  17 +-
 .../tubemq/manager/service/TopicBackendWorker.java |   2 +-
 .../tubemq/manager/service/TopicService.java       | 227 +++--------
 .../{TopicService.java => TopicServiceImpl.java}   |  62 ++-
 .../tubemq/manager/service/TubeMQHttpConst.java    |   2 +-
 .../service/tube/TubeHttpTopicInfoList.java        |   6 +-
 .../apache/tubemq/manager/utils/ConvertUtils.java  |   2 +-
 .../src/main/resources/application.properties      |   2 +-
 .../manager/controller/TestNodeController.java     |   5 -
 16 files changed, 244 insertions(+), 664 deletions(-)

diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/GroupController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/GroupController.java
index 12b4fc4..052986b 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/GroupController.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/GroupController.java
@@ -19,11 +19,11 @@ package org.apache.tubemq.manager.controller.group;
 
 
 
-import static org.apache.tubemq.manager.service.MasterService.requestMaster;
 import static org.apache.tubemq.manager.service.TubeMQHttpConst.ADD;
 import static org.apache.tubemq.manager.service.TubeMQHttpConst.CLONE;
 import static org.apache.tubemq.manager.service.TubeMQHttpConst.DELETE;
 import static org.apache.tubemq.manager.service.MasterService.queryMaster;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.QUERY;
 import static org.apache.tubemq.manager.service.TubeMQHttpConst.REBALANCE_CONSUMER_GROUP;
 import static org.apache.tubemq.manager.service.TubeMQHttpConst.REBALANCE_CONSUMER;
 
@@ -34,12 +34,13 @@ import org.apache.tubemq.manager.controller.TubeMQResult;
 import org.apache.tubemq.manager.controller.group.request.AddBlackGroupReq;
 import org.apache.tubemq.manager.controller.group.request.DeleteBlackGroupReq;
 import org.apache.tubemq.manager.controller.group.request.DeleteOffsetReq;
+import org.apache.tubemq.manager.controller.group.request.QueryOffsetReq;
 import org.apache.tubemq.manager.controller.node.request.CloneOffsetReq;
 import org.apache.tubemq.manager.controller.topic.request.BatchAddGroupAuthReq;
 import org.apache.tubemq.manager.controller.topic.request.DeleteGroupReq;
 import org.apache.tubemq.manager.controller.topic.request.RebalanceConsumerReq;
 import org.apache.tubemq.manager.controller.topic.request.RebalanceGroupReq;
-import org.apache.tubemq.manager.service.TopicService;
+import org.apache.tubemq.manager.service.TopicServiceImpl;
 import org.apache.tubemq.manager.service.MasterService;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.web.bind.annotation.GetMapping;
@@ -55,13 +56,14 @@ import org.springframework.web.bind.annotation.RestController;
 @Slf4j
 public class GroupController {
 
+
     public Gson gson = new Gson();
 
     @Autowired
     private MasterService masterService;
 
     @Autowired
-    private TopicService topicService;
+    private TopicServiceImpl topicService;
 
 
     @PostMapping("")
@@ -103,6 +105,8 @@ public class GroupController {
                 return topicService.cloneOffsetToOtherGroups(gson.fromJson(req, CloneOffsetReq.class));
             case DELETE:
                 return topicService.deleteOffset(gson.fromJson(req, DeleteOffsetReq.class));
+            case QUERY:
+                return topicService.queryOffset(gson.fromJson(req, QueryOffsetReq.class));
             default:
                 return TubeMQResult.getErrorResult("no such method");
         }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpBrokerCfgInfo.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/request/QueryOffsetReq.java
similarity index 50%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpBrokerCfgInfo.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/request/QueryOffsetReq.java
index 53fe096..c4ace9b 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpBrokerCfgInfo.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/request/QueryOffsetReq.java
@@ -15,29 +15,13 @@
  * limitations under the License.
  */
 
+package org.apache.tubemq.manager.controller.group.request;
 
-package org.apache.tubemq.manager.service.tube;
+import lombok.Data;
+import org.apache.tubemq.manager.controller.node.request.BaseReq;
 
-public class TubeHttpBrokerCfgInfo {
-    private boolean acceptPublish;
-    private boolean acceptSubscribe;
-    private Integer brokerId;
-    private String brokerIp;
-    private Integer brokerPort;
-    private Integer brokerTLSPort;
-    private String createDate;
-    private String createUser;
-    private String deletePolicy;
-    private String deleteWhen;
-    private String modifyDate;
-    private String modifyUser;
-    private Integer memCacheFlushIntvl;
-    private Integer memCacheMsgCntInK;
-    private Integer memCacheMsgSizeInMB;
-    private Integer numPartitions;
-    private Integer numTopicStores;
-    private Integer unflushDataHold;
-    private Integer unflushInterval;
-    private Integer unflushThreshold;
-    private boolean hasTLSPort;
+@Data
+public class QueryOffsetReq extends BaseReq {
+    private String topicName;
+    private String groupName;
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpBrokerCfgInfo.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/result/AllBrokersOffsetRes.java
similarity index 50%
rename from tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpBrokerCfgInfo.java
rename to tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/result/AllBrokersOffsetRes.java
index 53fe096..20b0bf0 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpBrokerCfgInfo.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/result/AllBrokersOffsetRes.java
@@ -15,29 +15,21 @@
  * limitations under the License.
  */
 
+package org.apache.tubemq.manager.controller.group.result;
 
-package org.apache.tubemq.manager.service.tube;
+import java.util.ArrayList;
+import java.util.List;
+import lombok.Data;
 
-public class TubeHttpBrokerCfgInfo {
-    private boolean acceptPublish;
-    private boolean acceptSubscribe;
-    private Integer brokerId;
-    private String brokerIp;
-    private Integer brokerPort;
-    private Integer brokerTLSPort;
-    private String createDate;
-    private String createUser;
-    private String deletePolicy;
-    private String deleteWhen;
-    private String modifyDate;
-    private String modifyUser;
-    private Integer memCacheFlushIntvl;
-    private Integer memCacheMsgCntInK;
-    private Integer memCacheMsgSizeInMB;
-    private Integer numPartitions;
-    private Integer numTopicStores;
-    private Integer unflushDataHold;
-    private Integer unflushInterval;
-    private Integer unflushThreshold;
-    private boolean hasTLSPort;
+
+@Data
+public class AllBrokersOffsetRes {
+
+    private List<OffsetInfo> offsetPerBroker = new ArrayList<>();
+
+    @Data
+    public static class OffsetInfo {
+        private int brokerId;
+        private OffsetQueryRes offsetQueryRes;
+    }
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/result/OffsetQueryRes.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/result/OffsetQueryRes.java
new file mode 100644
index 0000000..434923a
--- /dev/null
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/result/OffsetQueryRes.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.manager.controller.group.result;
+
+import java.util.List;
+import lombok.Data;
+
+@Data
+public class OffsetQueryRes {
+    private boolean result;
+    private int errCode;
+    private String errMsg;
+    private List<GroupOffsetRes> dataSet;
+    private int totalCnt;
+
+    @Data
+    public static class GroupOffsetRes {
+        private String groupName;
+        private List<TopicOffsetRes> subInfo;
+        private int topicCount;
+
+        @Data
+        public static class TopicOffsetRes {
+            private String topicName;
+            private List<OffsetPartitionRes> offsets;
+            private int partCount;
+
+            @Data
+            public static class OffsetPartitionRes {
+                private int partitionId;
+                private long curOffset;
+                private int flightOffset;
+                private int curDataOffset;
+                private int offsetLag;
+                private int dataLag;
+                private int offsetMax;
+                private int dataMax;
+            }
+        }
+    }
+
+}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java
index 4689762..26411fc 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java
@@ -19,8 +19,6 @@ package org.apache.tubemq.manager.controller.node;
 
 import com.google.gson.Gson;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
 import org.apache.tubemq.manager.controller.TubeMQResult;
 import org.apache.tubemq.manager.controller.node.request.AddBrokersReq;
 import org.apache.tubemq.manager.controller.node.request.BrokerSetReadOrWriteReq;
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java
index b4c4fc0..2f19062 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java
@@ -25,7 +25,6 @@ import static org.apache.tubemq.manager.service.TubeMQHttpConst.DELETE;
 import static org.apache.tubemq.manager.service.TubeMQHttpConst.MODIFY;
 import static org.apache.tubemq.manager.service.TubeMQHttpConst.REMOVE;
 import static org.apache.tubemq.manager.service.MasterService.queryMaster;
-import static org.apache.tubemq.manager.service.MasterService.requestMaster;
 
 
 import com.google.gson.Gson;
@@ -36,16 +35,11 @@ import org.apache.tubemq.manager.controller.node.request.BatchAddTopicReq;
 import org.apache.tubemq.manager.controller.node.request.CloneTopicReq;
 import org.apache.tubemq.manager.controller.topic.request.DeleteTopicReq;
 import org.apache.tubemq.manager.controller.topic.request.ModifyTopicReq;
-import org.apache.tubemq.manager.controller.topic.request.RebalanceGroupReq;
 import org.apache.tubemq.manager.controller.topic.request.SetAuthControlReq;
-import org.apache.tubemq.manager.entry.NodeEntry;
-import org.apache.tubemq.manager.repository.NodeRepository;
 import org.apache.tubemq.manager.service.NodeService;
 import org.apache.tubemq.manager.service.MasterService;
-import org.apache.tubemq.manager.service.TopicService;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.web.bind.annotation.GetMapping;
-import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestParam;
@@ -74,7 +68,7 @@ public class TopicWebController {
         @RequestParam String method, @RequestBody String req) throws Exception {
         switch (method) {
             case ADD:
-                return nodeService.addTopic(gson.fromJson(req, BatchAddTopicReq.class));
+                return nodeService.batchAddTopic(gson.fromJson(req, BatchAddTopicReq.class));
             case CLONE:
                 return nodeService.cloneTopicToBrokers(gson.fromJson(req, CloneTopicReq.class));
             case AUTH_CONTROL:
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
index ddcfc75..eebcf96 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -17,447 +17,75 @@
 
 package org.apache.tubemq.manager.service;
 
-
-import static org.apache.tubemq.manager.controller.node.request.AddBrokersReq.getAddBrokerReq;
-import static org.apache.tubemq.manager.service.MasterService.TUBE_REQUEST_PATH;
-import static org.apache.tubemq.manager.service.MasterService.queryMaster;
-import static org.apache.tubemq.manager.service.MasterService.requestMaster;
-import static org.apache.tubemq.manager.service.TubeMQHttpConst.ADD_TUBE_TOPIC;
-import static org.apache.tubemq.manager.service.TubeMQHttpConst.BROKER_RUN_STATUS;
-import static org.apache.tubemq.manager.service.TubeMQHttpConst.NO_SUCH_CLUSTER;
-import static org.apache.tubemq.manager.service.TubeMQHttpConst.RELOAD_BROKER;
-import static org.apache.tubemq.manager.service.TubeMQHttpConst.SCHEMA;
-import static org.apache.tubemq.manager.service.TubeMQHttpConst.SUCCESS_CODE;
-import static org.apache.tubemq.manager.utils.ConvertUtils.convertReqToQueryStr;
-
-import com.google.common.collect.Lists;
-import com.google.gson.Gson;
 import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.*;
-import java.util.stream.Collectors;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
+import java.util.List;
+import java.util.Map;
 import org.apache.tubemq.manager.controller.TubeMQResult;
 import org.apache.tubemq.manager.controller.node.request.AddBrokersReq;
 import org.apache.tubemq.manager.controller.node.request.AddTopicReq;
 import org.apache.tubemq.manager.controller.node.request.BatchAddTopicReq;
 import org.apache.tubemq.manager.controller.node.request.CloneBrokersReq;
 import org.apache.tubemq.manager.controller.node.request.CloneTopicReq;
-import org.apache.tubemq.manager.controller.node.request.QueryBrokerCfgReq;
 import org.apache.tubemq.manager.entry.NodeEntry;
-import org.apache.tubemq.manager.repository.NodeRepository;
-import org.apache.tubemq.manager.service.tube.*;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Component;
-
-/**
- * node service to query broker/master/standby status of tube cluster.
- */
-@Slf4j
-@Component
-public class NodeService {
-
-    private final CloseableHttpClient httpclient = HttpClients.createDefault();
-    private final Gson gson = new Gson();
-
-    @Value("${manager.max.configurable.broker.size:50}")
-    private int maxConfigurableBrokerSize;
-
-    @Value("${manager.max.retry.adding.topic:10}")
-    private int maxRetryAddingTopic;
-
-    private final TopicBackendWorker worker;
-
-    @Autowired
-    private NodeRepository nodeRepository;
-
-    @Autowired
-    private TopicService topicService;
-
-    @Autowired
-    private MasterService masterService;
-
-    public NodeService(TopicBackendWorker worker) {
-        this.worker = worker;
-    }
-
-    /**
-     * request node status via http.
-     *
-     * @param nodeEntry - node entry
-     * @return
-     * @throws IOException
-     */
-    private TubeHttpBrokerInfoList requestClusterNodeStatus(NodeEntry nodeEntry) {
-        String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort() + BROKER_RUN_STATUS;
-        HttpGet httpget = new HttpGet(url);
-        try (CloseableHttpResponse response = httpclient.execute(httpget)) {
-            TubeHttpBrokerInfoList brokerInfoList =
-                    gson.fromJson(new InputStreamReader(response.getEntity().getContent()),
-                            TubeHttpBrokerInfoList.class);
-            // request return normal.
-            if (brokerInfoList.getCode() == SUCCESS_CODE) {
-                // divide by state.
-                brokerInfoList.divideBrokerListByState();
-                return brokerInfoList;
-            }
-        } catch (Exception ex) {
-            log.error("exception caught while requesting broker status", ex);
-        }
-        return null;
-    }
-
-
+import org.apache.tubemq.manager.service.tube.AddBrokerResult;
 
+public interface NodeService {
 
     /**
-     * clone source broker to generate brokers with the same config and copy the topics in it.
+     * clone brokers with topic in it
      * @param req
      * @return
      * @throws Exception
      */
-    public TubeMQResult cloneBrokersWithTopic(CloneBrokersReq req) throws Exception {
-
-        int clusterId = req.getClusterId();
-        // 1. query source broker config
-        QueryBrokerCfgReq queryReq = QueryBrokerCfgReq.getReq(req.getSourceBrokerId());
-        NodeEntry masterEntry = nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(
-                clusterId);
-        BrokerStatusInfo brokerStatusInfo = getBrokerStatusInfo(queryReq, masterEntry);
-
-        // 2. use source broker config to clone brokers
-        BrokerConf sourceBrokerConf = brokerStatusInfo.getData().get(0);
-        AddBrokersReq addBrokersReq = getBatchAddBrokersReq(req, clusterId, sourceBrokerConf);
-
-        // 3. request master, return broker ids generated by master
-        AddBrokerResult addBrokerResult = addBrokersToClusterWithId(addBrokersReq, masterEntry);
-
-        // might have duplicate brokers
-        if (addBrokerResult.getErrCode() != SUCCESS_CODE) {
-            return TubeMQResult.getErrorResult(addBrokerResult.getErrMsg());
-        }
-        List<Integer> brokerIds = getBrokerIds(addBrokerResult);
-        List<AddTopicReq> addTopicReqs = req.getAddTopicReqs();
-
-        // 4. add topic to brokers
-        return addTopicsToBrokers(masterEntry, brokerIds, addTopicReqs);
-    }
-
-    public TubeMQResult addTopicsToBrokers(NodeEntry masterEntry, List<Integer> brokerIds, List<AddTopicReq> addTopicReqs) {
-        TubeMQResult tubeResult = new TubeMQResult();
-        AddTopicsResult addTopicsResult = new AddTopicsResult();
-
-        if (CollectionUtils.isEmpty(addTopicReqs)) {
-            return tubeResult;
-        }
-        addTopicReqs.forEach(addTopicReq -> {
-            try {
-                String brokerStr = StringUtils.join(brokerIds, ",");
-                addTopicReq.setBrokerId(brokerStr);
-                TubeMQResult result = addTopicToBrokers(addTopicReq, masterEntry);
-                if (result.getErrCode() == SUCCESS_CODE) {
-                    addTopicsResult.getSuccessTopics().add(addTopicReq.getTopicName());
-                } else {
-                    addTopicsResult.getFailTopics().add(addTopicReq.getTopicName());
-                }
-            } catch (Exception e) {
-                log.error("add topic to brokers fail with exception", e);
-                addTopicsResult.getFailTopics().add(addTopicReq.getTopicName());
-            }
-        });
-
-        tubeResult.setData(gson.toJson(addTopicsResult));
-        return tubeResult;
-    }
-
-    private List<Integer> getBrokerIds(AddBrokerResult addBrokerResult) {
-        List<IpIdRelation> ipids = addBrokerResult.getData();
-        List<Integer> brokerIds = Lists.newArrayList();
-        for (IpIdRelation ipid : ipids) {
-            brokerIds.add(ipid.getId());
-        }
-        return brokerIds;
-    }
-
-    private AddBrokersReq getBatchAddBrokersReq(CloneBrokersReq req, int clusterId, BrokerConf sourceBrokerConf) {
-        AddBrokersReq addBrokersReq = getAddBrokerReq(req.getConfModAuthToken(), clusterId);
-
-        // generate add brokers req using given target broker ips
-        List<BrokerConf> brokerConfs = Lists.newArrayList();
-        req.getTargetIps().forEach(ip -> {
-            BrokerConf brokerConf = new BrokerConf(sourceBrokerConf);
-            brokerConf.setBrokerIp(ip);
-            brokerConf.setBrokerId(0);
-            brokerConfs.add(brokerConf);
-        });
-        addBrokersReq.setBrokerJsonSet(brokerConfs);
-        return addBrokersReq;
-    }
-
-    private BrokerStatusInfo getBrokerStatusInfo(QueryBrokerCfgReq queryReq, NodeEntry masterEntry) throws Exception {
-        String url = SCHEMA + masterEntry.getIp() + ":" + masterEntry.getWebPort()
-                + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(queryReq);
-        BrokerStatusInfo brokerStatusInfo = gson.fromJson(queryMaster(url),
-                BrokerStatusInfo.class);
-        return brokerStatusInfo;
-    }
-
-    public TubeMQResult addTopicToBrokers(AddTopicReq req, NodeEntry masterEntry) throws Exception {
-        String url = SCHEMA + masterEntry.getIp() + ":" + masterEntry.getWebPort()
-                + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
-        return requestMaster(url);
-    }
-
-
-    private boolean configBrokersForTopics(NodeEntry nodeEntry,
-            Set<String> topics, List<Integer> brokerList, int maxBrokers) {
-        List<Integer> finalBrokerList = brokerList.subList(0, maxBrokers);
-        String brokerStr = StringUtils.join(finalBrokerList, ",");
-        String topicStr = StringUtils.join(topics, ",");
-        String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort()
-                + ADD_TUBE_TOPIC  + "&topicName=" + topicStr + "&brokerId=" + brokerStr;
-        HttpGet httpget = new HttpGet(url);
-        try (CloseableHttpResponse response = httpclient.execute(httpget)) {
-            TubeHttpResponse result =
-                    gson.fromJson(new InputStreamReader(response.getEntity().getContent()),
-                            TubeHttpResponse.class);
-            return result.getCode() == SUCCESS_CODE && result.getErrCode() == SUCCESS_CODE;
-        } catch (Exception ex) {
-            log.error("exception caught while requesting broker status", ex);
-        }
-        return false;
-    }
+    TubeMQResult cloneBrokersWithTopic(CloneBrokersReq req) throws Exception;
 
     /**
-     * handle result, if success, complete it,
-     * if not success, add back to queue without exceeding max retry,
-     * otherwise complete it with exception.
-     *
-     * @param isSuccess
-     * @param topics
-     * @param pendingTopic
+     * add topics to brokers
+     * @param masterEntry
+     * @param brokerIds
+     * @param addTopicReqs
+     * @return
      */
-    private void handleAddingResult(boolean isSuccess, Set<String> topics,
-            Map<String, TopicFuture> pendingTopic) {
-        for (String topic : topics) {
-            TopicFuture future = pendingTopic.get(topic);
-            if (future != null) {
-                if (isSuccess) {
-                    future.complete();
-                } else {
-                    future.increaseRetryTime();
-                    if (future.getRetryTime() > maxRetryAddingTopic) {
-                        future.completeExceptional();
-                    } else {
-                        // add back to queue.
-                        worker.addTopicFuture(future);
-                    }
-                }
-            }
-        }
-    }
-
+    TubeMQResult addTopicsToBrokers(NodeEntry masterEntry, List<Integer> brokerIds,
+        List<AddTopicReq> addTopicReqs);
 
     /**
-     * Adding topic is an async operation, so this method should
-     * 1. check whether pendingTopic contains topic that has failed/succeeded to be added.
-     * 2. async add topic to tubemq cluster
-     *
-     * @param brokerInfoList - broker list
-     * @param pendingTopic - topicMap
+     * add one topic to brokers
+     * @param req
+     * @param masterEntry
+     * @return
+     * @throws Exception
      */
-    private void handleAddingTopic(NodeEntry nodeEntry,
-            TubeHttpBrokerInfoList brokerInfoList,
-            Map<String, TopicFuture> pendingTopic) {
-        // 1. check tubemq cluster by topic name, remove pending topic if has added.
-        Set<String> brandNewTopics = new HashSet<>();
-        for (String topic : pendingTopic.keySet()) {
-            TubeHttpTopicInfoList topicInfoList = topicService.requestTopicConfigInfo(nodeEntry, topic);
-            if (topicInfoList != null) {
-                // get broker list by topic request
-                List<Integer> topicBrokerList = topicInfoList.getTopicBrokerIdList();
-                if (topicBrokerList.isEmpty()) {
-                    brandNewTopics.add(topic);
-                } else {
-                    // remove brokers which have been added.
-                    List<Integer> configurableBrokerIdList =
-                            brokerInfoList.getConfigurableBrokerIdList();
-                    configurableBrokerIdList.removeAll(topicBrokerList);
-                    // add topic to satisfy max broker number.
-                    Set<String> singleTopic = new HashSet<>();
-                    singleTopic.add(topic);
-                    int maxBrokers = maxConfigurableBrokerSize - topicBrokerList.size();
-                    boolean isSuccess = configBrokersForTopics(nodeEntry, singleTopic,
-                            configurableBrokerIdList, maxBrokers);
-                    handleAddingResult(isSuccess, singleTopic, pendingTopic);
-                }
-            }
-        }
-        // 2. add new topics to cluster
-        List<Integer> configurableBrokerIdList = brokerInfoList.getConfigurableBrokerIdList();
-        int maxBrokers = Math.min(maxConfigurableBrokerSize, configurableBrokerIdList.size());
-        boolean isSuccess = configBrokersForTopics(nodeEntry, brandNewTopics,
-                configurableBrokerIdList, maxBrokers);
-        handleAddingResult(isSuccess, brandNewTopics, pendingTopic);
-    }
+    TubeMQResult addTopicToBrokers(AddTopicReq req, NodeEntry masterEntry) throws Exception;
 
     /**
-     * reload broker list, cannot exceed maxConfigurableBrokerSize each time.
-     *
-     * @param nodeEntry
-     * @param needReloadList
+     * update broker status
+     * @param clusterId
+     * @param pendingTopic
      */
-    private void handleReloadBroker(NodeEntry nodeEntry, List<Integer> needReloadList) {
-        // reload without exceed max broker.
-        int begin = 0;
-        int end = 0;
-        do {
-            end = Math.min(maxConfigurableBrokerSize + begin, needReloadList.size());
-            List<Integer> brokerIdList = needReloadList.subList(begin, end);
-            String brokerStr = StringUtils.join(brokerIdList, ",");
-            String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort()
-                    + RELOAD_BROKER + "&brokerId=" + brokerStr;
-            HttpGet httpget = new HttpGet(url);
-            try (CloseableHttpResponse response = httpclient.execute(httpget)) {
-                TubeHttpResponse result =
-                        gson.fromJson(new InputStreamReader(response.getEntity().getContent()),
-                                TubeHttpResponse.class);
-                if (result.getErrCode() == SUCCESS_CODE && result.getCode() == SUCCESS_CODE) {
-                    log.info("reload tube broker cgi: " +
-                            url + " ; return value : " + result.getCode());
-                }
-            } catch (Exception ex) {
-                log.error("exception caught while requesting broker status", ex);
-            }
-            begin = end;
-        } while (end >= needReloadList.size());
-    }
-
-
+    void updateBrokerStatus(int clusterId, Map<String, TopicFuture> pendingTopic);
 
     /**
-     * update broker status
+     * query cluster info
+     * @param clusterId
+     * @return
      */
-    public void updateBrokerStatus(int clusterId, Map<String, TopicFuture> pendingTopic) {
-        NodeEntry nodeEntry = nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(clusterId);
-        if (nodeEntry != null) {
-            try {
-                TubeHttpBrokerInfoList brokerInfoList = requestClusterNodeStatus(nodeEntry);
-                if (brokerInfoList != null) {
-                    handleAddingTopic(nodeEntry, brokerInfoList, pendingTopic);
-                }
-
-                // refresh broker list
-                brokerInfoList = requestClusterNodeStatus(nodeEntry);
-                if (brokerInfoList != null) {
-                    handleReloadBroker(nodeEntry, brokerInfoList.getNeedReloadList());
-                }
-
-            } catch (Exception ex) {
-                log.error("exception caught while requesting broker status", ex);
-            }
-        } else {
-            log.error("cannot get master ip by clusterId {}, please check it", clusterId);
-        }
-    }
+    String queryClusterInfo(Integer clusterId);
 
-    public String queryClusterInfo(Integer clusterId) {
-        TubeHttpClusterInfoList clusterInfoList;
-        try {
-            // find all nodes by given clusterIds, show all nodes if clusterIds not provided
-            List<NodeEntry> nodeEntries = clusterId == null ?
-                    nodeRepository.findAll() : nodeRepository.findNodeEntriesByClusterIdIs(clusterId);
-            // divide all entries by clusterId
-            Map<Integer, List<NodeEntry>> nodeEntriesPerCluster =
-                    nodeEntries.parallelStream().collect(Collectors.groupingBy(NodeEntry::getClusterId));
-
-            clusterInfoList = TubeHttpClusterInfoList.getClusterInfoList(nodeEntriesPerCluster);
-        } catch (Exception e) {
-            log.error("query cluster info error", e);
-            return gson.toJson(TubeMQResult.getErrorResult(""));
-        }
-
-        return gson.toJson(clusterInfoList);
-    }
-
-
-
-    public void close() throws IOException {
-        httpclient.close();
-    }
-
-    public AddBrokerResult addBrokersToClusterWithId(AddBrokersReq req, NodeEntry masterEntry) throws Exception {
-
-        String url = SCHEMA + masterEntry.getIp() + ":" + masterEntry.getWebPort()
-                + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
-        HttpGet httpget = new HttpGet(url);
-        try (CloseableHttpResponse response = httpclient.execute(httpget)) {
-            return gson.fromJson(new InputStreamReader(response.getEntity().getContent()),
-                    AddBrokerResult.class);
-        } catch (Exception ex) {
-            log.error("exception caught while requesting broker status", ex);
-        }
-        return null;
-    }
+    void close() throws IOException;
 
     /**
-     * given one topic, copy its config and clone to brokers
-     * if no broker is is provided, topics will be cloned to all brokers in cluster
+     * clone topic to brokers
      * @param req
      * @return
      * @throws Exception
      */
-    public TubeMQResult cloneTopicToBrokers(CloneTopicReq req) throws Exception {
-
-        NodeEntry master = masterService.getMasterNode(req);
-        if (master == null) {
-
-            return TubeMQResult.getErrorResult(NO_SUCH_CLUSTER);
-        }
-        // 1 query topic config
-        TubeHttpTopicInfoList topicInfoList = topicService.requestTopicConfigInfo(master, req.getSourceTopicName());
-
-        if (topicInfoList == null) {
-            return TubeMQResult.getErrorResult("no such topic");
-        }
-
-        // 2 if there's no specific broker ids then clone to all of the brokers
-        List<Integer> brokerId = req.getBrokerId();
-
-        if (CollectionUtils.isEmpty(brokerId)) {
-            TubeHttpBrokerInfoList brokerInfoList = requestClusterNodeStatus(master);
-            if (brokerInfoList != null) {
-                brokerId = brokerInfoList.getConfigurableBrokerIdList();
-            }
-        }
-
-        // 3 generate add topic req
-        AddTopicReq addTopicReq = topicInfoList.getAddTopicReq(brokerId,
-            req.getTargetTopicName(), req.getConfModAuthToken());
-
-        // 4 send to master
-        return addTopicToBrokers(addTopicReq, master);
-
-    }
+    TubeMQResult cloneTopicToBrokers(CloneTopicReq req) throws Exception;
 
     /**
-     * add topic to brokers
+     * batch add topic to master
      * @param req
      * @return
      */
-    public TubeMQResult addTopic(BatchAddTopicReq req) {
-        NodeEntry masterEntry = masterService.getMasterNode(req);
-        if (masterEntry == null) {
-            return TubeMQResult.getErrorResult(NO_SUCH_CLUSTER);
-        }
-        return addTopicsToBrokers(masterEntry, req.getBrokerIds(), req.getAddTopicReqs());
-    }
+    TubeMQResult batchAddTopic(BatchAddTopicReq req);
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeServiceImpl.java
similarity index 98%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeServiceImpl.java
index ddcfc75..30b5adf 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeServiceImpl.java
@@ -63,7 +63,7 @@ import org.springframework.stereotype.Component;
  */
 @Slf4j
 @Component
-public class NodeService {
+public class NodeServiceImpl implements NodeService {
 
     private final CloseableHttpClient httpclient = HttpClients.createDefault();
     private final Gson gson = new Gson();
@@ -85,7 +85,7 @@ public class NodeService {
     @Autowired
     private MasterService masterService;
 
-    public NodeService(TopicBackendWorker worker) {
+    public NodeServiceImpl(TopicBackendWorker worker) {
         this.worker = worker;
     }
 
@@ -124,6 +124,7 @@ public class NodeService {
      * @return
      * @throws Exception
      */
+    @Override
     public TubeMQResult cloneBrokersWithTopic(CloneBrokersReq req) throws Exception {
 
         int clusterId = req.getClusterId();
@@ -151,7 +152,9 @@ public class NodeService {
         return addTopicsToBrokers(masterEntry, brokerIds, addTopicReqs);
     }
 
-    public TubeMQResult addTopicsToBrokers(NodeEntry masterEntry, List<Integer> brokerIds, List<AddTopicReq> addTopicReqs) {
+    @Override
+    public TubeMQResult addTopicsToBrokers(NodeEntry masterEntry, List<Integer> brokerIds,
+        List<AddTopicReq> addTopicReqs) {
         TubeMQResult tubeResult = new TubeMQResult();
         AddTopicsResult addTopicsResult = new AddTopicsResult();
 
@@ -210,6 +213,7 @@ public class NodeService {
         return brokerStatusInfo;
     }
 
+    @Override
     public TubeMQResult addTopicToBrokers(AddTopicReq req, NodeEntry masterEntry) throws Exception {
         String url = SCHEMA + masterEntry.getIp() + ":" + masterEntry.getWebPort()
                 + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
@@ -346,6 +350,7 @@ public class NodeService {
     /**
      * update broker status
      */
+    @Override
     public void updateBrokerStatus(int clusterId, Map<String, TopicFuture> pendingTopic) {
         NodeEntry nodeEntry = nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(clusterId);
         if (nodeEntry != null) {
@@ -369,6 +374,7 @@ public class NodeService {
         }
     }
 
+    @Override
     public String queryClusterInfo(Integer clusterId) {
         TubeHttpClusterInfoList clusterInfoList;
         try {
@@ -390,6 +396,7 @@ public class NodeService {
 
 
 
+    @Override
     public void close() throws IOException {
         httpclient.close();
     }
@@ -415,6 +422,7 @@ public class NodeService {
      * @return
      * @throws Exception
      */
+    @Override
     public TubeMQResult cloneTopicToBrokers(CloneTopicReq req) throws Exception {
 
         NodeEntry master = masterService.getMasterNode(req);
@@ -453,7 +461,8 @@ public class NodeService {
      * @param req
      * @return
      */
-    public TubeMQResult addTopic(BatchAddTopicReq req) {
+    @Override
+    public TubeMQResult batchAddTopic(BatchAddTopicReq req) {
         NodeEntry masterEntry = masterService.getMasterNode(req);
         if (masterEntry == null) {
             return TubeMQResult.getErrorResult(NO_SUCH_CLUSTER);
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicBackendWorker.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicBackendWorker.java
index 86b72d5..7637c0f 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicBackendWorker.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicBackendWorker.java
@@ -67,7 +67,7 @@ public class TopicBackendWorker implements DisposableBean, Runnable  {
         // daemon thread
         thread.setDaemon(true);
         thread.start();
-        nodeService = new NodeService(this);
+        nodeService = new NodeServiceImpl(this);
     }
 
     /**
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicService.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicService.java
index 1abff54..7fd6f48 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicService.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicService.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -17,190 +17,59 @@
 
 package org.apache.tubemq.manager.service;
 
-
-import static org.apache.tubemq.manager.service.TubeMQHttpConst.QUERY_GROUP_DETAIL_INFO;
-import static org.apache.tubemq.manager.service.TubeMQHttpConst.SCHEMA;
-import static org.apache.tubemq.manager.service.TubeMQHttpConst.SUCCESS_CODE;
-import static org.apache.tubemq.manager.service.TubeMQHttpConst.TOPIC_CONFIG_INFO;
-import static org.apache.tubemq.manager.utils.ConvertUtils.convertReqToQueryStr;
-import static org.apache.tubemq.manager.utils.ConvertUtils.convertToRebalanceConsumerReq;
-import static org.apache.tubemq.manager.service.MasterService.TUBE_REQUEST_PATH;
-import static org.apache.tubemq.manager.service.MasterService.requestMaster;
-
-import com.google.gson.Gson;
-import java.io.InputStreamReader;
-import java.util.List;
-import java.util.Objects;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
 import org.apache.tubemq.manager.controller.TubeMQResult;
 import org.apache.tubemq.manager.controller.group.request.DeleteOffsetReq;
+import org.apache.tubemq.manager.controller.group.request.QueryOffsetReq;
 import org.apache.tubemq.manager.controller.node.request.CloneOffsetReq;
-import org.apache.tubemq.manager.controller.topic.request.BatchAddGroupAuthReq;
-import org.apache.tubemq.manager.controller.topic.request.DeleteGroupReq;
-import org.apache.tubemq.manager.controller.topic.request.RebalanceConsumerReq;
 import org.apache.tubemq.manager.controller.topic.request.RebalanceGroupReq;
 import org.apache.tubemq.manager.entry.NodeEntry;
-import org.apache.tubemq.manager.repository.NodeRepository;
-import org.apache.tubemq.manager.service.tube.CleanOffsetResult;
-import org.apache.tubemq.manager.service.tube.RebalanceGroupResult;
 import org.apache.tubemq.manager.service.tube.TubeHttpGroupDetailInfo;
 import org.apache.tubemq.manager.service.tube.TubeHttpTopicInfoList;
-import org.apache.tubemq.manager.service.tube.TubeHttpTopicInfoList.TopicInfoList.TopicInfo;
-import org.apache.tubemq.manager.utils.ConvertUtils;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Component;
-
-/**
- * node service to query broker/master/standby status of tube cluster.
- */
-@Slf4j
-@Component
-public class TopicService {
-
-    private final CloseableHttpClient httpclient = HttpClients.createDefault();
-    private final Gson gson = new Gson();
-
-    @Value("${manager.broker.webPort:8081}")
-    private int brokerWebPort;
-
-    @Autowired
-    private MasterService masterService;
-
-    private TubeHttpGroupDetailInfo requestGroupRunInfo(NodeEntry nodeEntry, String group) {
-        String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort()
-            + QUERY_GROUP_DETAIL_INFO + "&consumeGroup=" + group;
-        HttpGet httpget = new HttpGet(url);
-        try (CloseableHttpResponse response = httpclient.execute(httpget)) {
-            TubeHttpGroupDetailInfo groupDetailInfo =
-                gson.fromJson(new InputStreamReader(response.getEntity().getContent()),
-                    TubeHttpGroupDetailInfo.class);
-            if (groupDetailInfo.getErrCode() == 0) {
-                return groupDetailInfo;
-            }
-        } catch (Exception ex) {
-            log.error("exception caught while requesting group status", ex);
-        }
-        return null;
-    }
-
-
-    public TubeMQResult cloneOffsetToOtherGroups(CloneOffsetReq req) {
-
-        NodeEntry master = masterService.getMasterNode(req);
-        if (master == null) {
-            return TubeMQResult.getErrorResult("no such cluster");
-        }
-        // 1. query the corresponding brokers having given topic
-        TubeHttpTopicInfoList topicInfoList = requestTopicConfigInfo(master, req.getTopicName());
-        TubeMQResult result = new TubeMQResult();
-
-        if (topicInfoList == null) {
-            return result;
-        }
-
-        List<TopicInfo> topicInfos = topicInfoList.getTopicInfo();
-        // 2. for each broker, request to clone offset
-        for (TopicInfo topicInfo : topicInfos) {
-            String brokerIp = topicInfo.getBrokerIp();
-            String url = SCHEMA + brokerIp + ":" + brokerWebPort
-                + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
-            result = requestMaster(url);
-            if (result.getErrCode() != SUCCESS_CODE) {
-                return result;
-            }
-        }
-
-        return result;
-    }
-
-
-    public TubeHttpTopicInfoList requestTopicConfigInfo(NodeEntry nodeEntry, String topic) {
-        String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort()
-            + TOPIC_CONFIG_INFO + "&topicName=" + topic;
-        HttpGet httpget = new HttpGet(url);
-        try (CloseableHttpResponse response = httpclient.execute(httpget)) {
-            TubeHttpTopicInfoList topicInfoList =
-                gson.fromJson(new InputStreamReader(response.getEntity().getContent()),
-                    TubeHttpTopicInfoList.class);
-            if (topicInfoList.getErrCode() == SUCCESS_CODE) {
-                return topicInfoList;
-            }
-        } catch (Exception ex) {
-            log.error("exception caught while requesting broker status", ex);
-        }
-        return null;
-    }
-
-
-    public TubeMQResult rebalanceGroup(RebalanceGroupReq req) {
-
-        NodeEntry master = masterService.getMasterNode(req);
-        if (master == null) {
-            return TubeMQResult.getErrorResult("no such cluster");
-        }
-
-        // 1. get all consumer ids in group
-        List<String> consumerIds = Objects
-            .requireNonNull(requestGroupRunInfo(master, req.getGroupName())).getConsumerIds();
-        RebalanceGroupResult rebalanceGroupResult = new RebalanceGroupResult();
-
-        // 2. rebalance consumers in group
-        consumerIds.forEach(consumerId -> {
-            RebalanceConsumerReq rebalanceConsumerReq = convertToRebalanceConsumerReq(req,
-                consumerId);
-            String url = SCHEMA + master.getIp() + ":" + master.getWebPort()
-                + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(rebalanceConsumerReq);
-            TubeMQResult result = requestMaster(url);
-            if (result.getErrCode() != 0) {
-                rebalanceGroupResult.getFailConsumers().add(consumerId);
-            }
-            rebalanceGroupResult.getSuccessConsumers().add(consumerId);
-        });
-
-        TubeMQResult tubeResult = new TubeMQResult();
-        tubeResult.setData(gson.toJson(rebalanceGroupResult));
-
-        return tubeResult;
-    }
-
-
-    public TubeMQResult deleteOffset(DeleteOffsetReq req) {
-
-        NodeEntry master = masterService.getMasterNode(req);
-        if (master == null) {
-            return TubeMQResult.getErrorResult("no such cluster");
-        }
-
-        // 1. query the corresponding brokers having given topic
-        TubeHttpTopicInfoList topicInfoList = requestTopicConfigInfo(master, req.getTopicName());
-        TubeMQResult result = new TubeMQResult();
-        CleanOffsetResult cleanOffsetResult = new CleanOffsetResult();
-        if (topicInfoList == null) {
-            return TubeMQResult.getErrorResult("no such topic");
-        }
-
-        List<TopicInfo> topicInfos = topicInfoList.getTopicInfo();
-        // 2. for each broker, request to delete offset
-        for (TopicInfo topicInfo : topicInfos) {
-            String brokerIp = topicInfo.getBrokerIp();
-            String url = SCHEMA + brokerIp + ":" + brokerWebPort
-                + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
-            result = requestMaster(url);
-            if (result.getErrCode() != SUCCESS_CODE) {
-                cleanOffsetResult.getFailBrokers().add(brokerIp);
-            } else {
-                cleanOffsetResult.getSuccessBrokers().add(brokerIp);
-            }
-        }
-
-        result.setData(gson.toJson(cleanOffsetResult));
-
-        return result;
-    }
 
+public interface TopicService {
+
+    /**
+     * get consumer group run info
+     * @param nodeEntry
+     * @param group
+     * @return
+     */
+    TubeHttpGroupDetailInfo requestGroupRunInfo(NodeEntry nodeEntry, String group);
+
+    /**
+     * clone offset to other groups
+     * @param req
+     * @return
+     */
+    TubeMQResult cloneOffsetToOtherGroups(CloneOffsetReq req);
+
+    /**
+     * get topic config info
+     * @param nodeEntry
+     * @param topic
+     * @return
+     */
+    TubeHttpTopicInfoList requestTopicConfigInfo(NodeEntry nodeEntry, String topic);
+
+    /**
+     * rebalance group
+     * @param req
+     * @return
+     */
+    TubeMQResult rebalanceGroup(RebalanceGroupReq req);
+
+    /**
+     * delete offset given topic and broker
+     * @param req
+     * @return
+     */
+    TubeMQResult deleteOffset(DeleteOffsetReq req);
+
+
+    /**
+     * query offset given topic and group name
+     * @param req
+     * @return
+     */
+    TubeMQResult queryOffset(QueryOffsetReq req);
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicService.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicServiceImpl.java
similarity index 77%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicService.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicServiceImpl.java
index 1abff54..e8af636 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicService.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicServiceImpl.java
@@ -18,6 +18,7 @@
 package org.apache.tubemq.manager.service;
 
 
+import static org.apache.tubemq.manager.service.MasterService.queryMaster;
 import static org.apache.tubemq.manager.service.TubeMQHttpConst.QUERY_GROUP_DETAIL_INFO;
 import static org.apache.tubemq.manager.service.TubeMQHttpConst.SCHEMA;
 import static org.apache.tubemq.manager.service.TubeMQHttpConst.SUCCESS_CODE;
@@ -38,19 +39,19 @@ import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
 import org.apache.tubemq.manager.controller.TubeMQResult;
 import org.apache.tubemq.manager.controller.group.request.DeleteOffsetReq;
+import org.apache.tubemq.manager.controller.group.request.QueryOffsetReq;
+import org.apache.tubemq.manager.controller.group.result.AllBrokersOffsetRes;
+import org.apache.tubemq.manager.controller.group.result.AllBrokersOffsetRes.OffsetInfo;
+import org.apache.tubemq.manager.controller.group.result.OffsetQueryRes;
 import org.apache.tubemq.manager.controller.node.request.CloneOffsetReq;
-import org.apache.tubemq.manager.controller.topic.request.BatchAddGroupAuthReq;
-import org.apache.tubemq.manager.controller.topic.request.DeleteGroupReq;
 import org.apache.tubemq.manager.controller.topic.request.RebalanceConsumerReq;
 import org.apache.tubemq.manager.controller.topic.request.RebalanceGroupReq;
 import org.apache.tubemq.manager.entry.NodeEntry;
-import org.apache.tubemq.manager.repository.NodeRepository;
 import org.apache.tubemq.manager.service.tube.CleanOffsetResult;
 import org.apache.tubemq.manager.service.tube.RebalanceGroupResult;
 import org.apache.tubemq.manager.service.tube.TubeHttpGroupDetailInfo;
 import org.apache.tubemq.manager.service.tube.TubeHttpTopicInfoList;
 import org.apache.tubemq.manager.service.tube.TubeHttpTopicInfoList.TopicInfoList.TopicInfo;
-import org.apache.tubemq.manager.utils.ConvertUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
@@ -60,7 +61,7 @@ import org.springframework.stereotype.Component;
  */
 @Slf4j
 @Component
-public class TopicService {
+public class TopicServiceImpl implements TopicService {
 
     private final CloseableHttpClient httpclient = HttpClients.createDefault();
     private final Gson gson = new Gson();
@@ -71,7 +72,8 @@ public class TopicService {
     @Autowired
     private MasterService masterService;
 
-    private TubeHttpGroupDetailInfo requestGroupRunInfo(NodeEntry nodeEntry, String group) {
+    @Override
+    public TubeHttpGroupDetailInfo requestGroupRunInfo(NodeEntry nodeEntry, String group) {
         String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort()
             + QUERY_GROUP_DETAIL_INFO + "&consumeGroup=" + group;
         HttpGet httpget = new HttpGet(url);
@@ -89,6 +91,7 @@ public class TopicService {
     }
 
 
+    @Override
     public TubeMQResult cloneOffsetToOtherGroups(CloneOffsetReq req) {
 
         NodeEntry master = masterService.getMasterNode(req);
@@ -119,6 +122,7 @@ public class TopicService {
     }
 
 
+    @Override
     public TubeHttpTopicInfoList requestTopicConfigInfo(NodeEntry nodeEntry, String topic) {
         String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort()
             + TOPIC_CONFIG_INFO + "&topicName=" + topic;
@@ -137,6 +141,7 @@ public class TopicService {
     }
 
 
+    @Override
     public TubeMQResult rebalanceGroup(RebalanceGroupReq req) {
 
         NodeEntry master = masterService.getMasterNode(req);
@@ -169,6 +174,7 @@ public class TopicService {
     }
 
 
+    @Override
     public TubeMQResult deleteOffset(DeleteOffsetReq req) {
 
         NodeEntry master = masterService.getMasterNode(req);
@@ -203,4 +209,48 @@ public class TopicService {
         return result;
     }
 
+    @Override
+    public TubeMQResult queryOffset(QueryOffsetReq req) {
+
+        NodeEntry master = masterService.getMasterNode(req);
+        if (master == null) {
+            return TubeMQResult.getErrorResult("no such cluster");
+        }
+
+        // 1. query the corresponding brokers having given topic
+        TubeHttpTopicInfoList topicInfoList = requestTopicConfigInfo(master, req.getTopicName());
+        TubeMQResult result = new TubeMQResult();
+        if (topicInfoList == null) {
+            return TubeMQResult.getErrorResult("no such topic");
+        }
+
+        List<TopicInfo> topicInfos = topicInfoList.getTopicInfo();
+
+        AllBrokersOffsetRes allBrokersOffsetRes = new AllBrokersOffsetRes();
+        List<OffsetInfo> offsetPerBroker = allBrokersOffsetRes.getOffsetPerBroker();
+
+        // 2. for each broker, request to query offset
+        for (TopicInfo topicInfo : topicInfos) {
+            String brokerIp = topicInfo.getBrokerIp();
+            String url = SCHEMA + brokerIp + ":" + brokerWebPort
+                + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
+            OffsetQueryRes res = gson.fromJson(queryMaster(url), OffsetQueryRes.class);
+            if (res.getErrCode() != SUCCESS_CODE) {
+                return TubeMQResult.getErrorResult("query broker id" + topicInfo.getBrokerId() + " fail");
+            }
+            generateOffsetInfo(offsetPerBroker, topicInfo, res);
+        }
+
+        result.setData(gson.toJson(allBrokersOffsetRes));
+        return result;
+    }
+
+
+    private void generateOffsetInfo(List<OffsetInfo> offsetPerBroker, TopicInfo topicInfo,
+        OffsetQueryRes res) {
+        OffsetInfo offsetInfo = new OffsetInfo();
+        offsetInfo.setBrokerId(topicInfo.getBrokerId());
+        offsetInfo.setOffsetQueryRes(res);
+        offsetPerBroker.add(offsetInfo);
+    }
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TubeMQHttpConst.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TubeMQHttpConst.java
index 04cf17b..9939dac 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TubeMQHttpConst.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TubeMQHttpConst.java
@@ -52,5 +52,5 @@ public class TubeMQHttpConst {
     public static final String REBALANCE_CONSUMER = "rebalanceConsumer";
     public static final String NO_SUCH_CLUSTER = "no such cluster";
     public static final Integer SUCCESS_CODE = 0;
-
+    public static final String QUERY = "query";
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpTopicInfoList.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpTopicInfoList.java
index 0ba3426..5350ca5 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpTopicInfoList.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpTopicInfoList.java
@@ -105,10 +105,10 @@ public class TubeHttpTopicInfoList {
 
 
     public List<TopicInfo> getTopicInfo() {
-        if (data != null) {
-            return data.get(0).getTopicInfo();
+        if (CollectionUtils.isEmpty(data)) {
+            return Lists.newArrayList();
         }
-        return Lists.newArrayList();
+        return data.get(0).getTopicInfo();
     }
 
     public AddTopicReq getAddTopicReq(List<Integer> brokerIds, List<String> targetTopicNames, String token) {
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java
index 58fdd15..130faa0 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java
@@ -36,7 +36,7 @@ import static org.apache.tubemq.manager.service.TubeMQHttpConst.REBALANCE_GROUP;
 @Slf4j
 public class ConvertUtils {
 
-    public static Gson gson = new Gson();
+    public static final Gson gson = new Gson();
 
     public static String convertReqToQueryStr(Object req) {
         List<String> queryList = new ArrayList<>();
diff --git a/tubemq-manager/src/main/resources/application.properties b/tubemq-manager/src/main/resources/application.properties
index dee51b7..9afbdab 100644
--- a/tubemq-manager/src/main/resources/application.properties
+++ b/tubemq-manager/src/main/resources/application.properties
@@ -14,4 +14,4 @@
 # limitations under the License.
 
 spring.jpa.hibernate.ddl-auto=update
-# configuration for manager
+# configuration for manager
\ No newline at end of file
diff --git a/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestNodeController.java b/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestNodeController.java
index 9b8ce64..8bad33b 100644
--- a/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestNodeController.java
+++ b/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestNodeController.java
@@ -16,11 +16,9 @@
  */
 package org.apache.tubemq.manager.controller;
 
-import com.google.gson.Gson;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.tubemq.manager.entry.NodeEntry;
 import org.apache.tubemq.manager.repository.NodeRepository;
-import org.apache.tubemq.manager.service.NodeService;
 import org.assertj.core.util.Lists;
 import org.junit.Assert;
 import org.junit.Test;
@@ -29,15 +27,12 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.boot.test.mock.mockito.MockBean;
-import org.springframework.http.MediaType;
-import org.springframework.http.ResponseEntity;
 import org.springframework.test.context.junit4.SpringRunner;
 import org.springframework.test.web.servlet.MockMvc;
 import org.springframework.test.web.servlet.MvcResult;
 import org.springframework.test.web.servlet.RequestBuilder;
 
 import java.util.List;
-import java.util.Objects;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertTrue;