You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by yu...@apache.org on 2021/01/08 11:15:11 UTC

[incubator-tubemq] branch TUBEMQ-421 updated: [TUBEMQ-493] reorg restful apis and reorg services (#383)

This is an automated email from the ASF dual-hosted git repository.

yuanbo pushed a commit to branch TUBEMQ-421
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/TUBEMQ-421 by this push:
     new a9566ff  [TUBEMQ-493] reorg restful apis and reorg services (#383)
a9566ff is described below

commit a9566ff789c117f18bce3c954ae8f926690144f4
Author: EMsnap <zp...@connect.ust.hk>
AuthorDate: Fri Jan 8 19:15:05 2021 +0800

    [TUBEMQ-493] reorg restful apis and reorg services (#383)
    
    * [TUBEMQ-491] rebalance group or consumer
    
    * [TUBEMQ-496] clean offset in consumer group
    
    * [TUBEMQ-493] reorg restful apis and reorg services
    
    * [TUBEMQ-493] reuse MasterService.baseRequestMaster and use unified path name
    
    * [TUBEMQ-493] avoid using wildcard in import area.
    
    * [TUBEMQ-493] avoid using wildcard in import area delete some code from other branch.
    
    * [TUBEMQ-493] delete some personal property
---
 .../controller/cluster/ClusterController.java      |   8 +-
 .../manager/controller/group/GroupController.java  | 142 +++++++++++
 .../request/AddBlackGroupReq.java}                 |  13 +-
 .../request/DeleteBlackGroupReq.java}              |  12 +-
 .../request/DeleteOffsetReq.java}                  |  15 +-
 .../manager/controller/node/NodeController.java    | 164 ++++--------
 .../controller/node/request/AddBrokersReq.java     |  16 +-
 .../controller/node/request/AddTopicReq.java       |  24 +-
 .../controller/node/request/BatchAddTopicReq.java  |   3 +-
 ...dTopicReq.java => BrokerSetReadOrWriteReq.java} |  11 +-
 ...{BatchAddTopicReq.java => DeleteBrokerReq.java} |  10 +-
 ...ddTopicReq.java => OnlineOfflineBrokerReq.java} |  10 +-
 ...{BatchAddTopicReq.java => ReloadBrokerReq.java} |   9 +-
 .../controller/topic/TopicWebController.java       | 282 +++------------------
 .../request/DeleteTopicReq.java}                   |  13 +-
 .../request/ModifyTopicReq.java}                   |  24 +-
 .../request/RebalanceConsumerReq.java}             |  16 +-
 .../request/RebalanceGroupReq.java}                |  13 +-
 .../request/SetAuthControlReq.java}                |  13 +-
 .../MasterService.java}                            |  40 ++-
 .../apache/tubemq/manager/service/NodeService.java | 110 ++++----
 .../tubemq/manager/service/TopicService.java       | 206 +++++++++++++++
 .../tubemq/manager/service/TubeMQHttpConst.java    |  20 ++
 .../tube/CleanOffsetResult.java}                   |  14 +-
 .../tube/RebalanceGroupResult.java}                |  12 +-
 .../service/tube/TubeHttpGroupDetailInfo.java      |  69 +++++
 .../apache/tubemq/manager/utils/ConvertUtils.java  |  59 +++--
 .../src/main/resources/application.properties      |   2 +-
 28 files changed, 759 insertions(+), 571 deletions(-)

diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/ClusterController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/ClusterController.java
index 562813c..a2242a1 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/ClusterController.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/ClusterController.java
@@ -18,7 +18,7 @@
 package org.apache.tubemq.manager.controller.cluster;
 
 import static org.apache.tubemq.manager.service.TubeMQHttpConst.SCHEMA;
-import static org.apache.tubemq.manager.utils.MasterUtils.*;
+import static org.apache.tubemq.manager.service.MasterService.*;
 
 import com.google.gson.Gson;
 import java.util.Map;
@@ -27,7 +27,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.tubemq.manager.controller.TubeMQResult;
 import org.apache.tubemq.manager.entry.NodeEntry;
 import org.apache.tubemq.manager.repository.NodeRepository;
-import org.apache.tubemq.manager.utils.MasterUtils;
+import org.apache.tubemq.manager.service.MasterService;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.MediaType;
 import org.springframework.web.bind.annotation.RequestBody;
@@ -48,7 +48,7 @@ public class ClusterController {
     private NodeRepository nodeRepository;
 
     @Autowired
-    public MasterUtils masterUtil;
+    public MasterService masterService;
 
     /**
      * query cluster info
@@ -57,7 +57,7 @@ public class ClusterController {
             produces = MediaType.APPLICATION_JSON_VALUE)
     public @ResponseBody String queryInfo(
             @RequestParam Map<String, String> queryBody) throws Exception {
-        String url = masterUtil.getQueryUrl(queryBody);
+        String url = masterService.getQueryUrl(queryBody);
         return queryMaster(url);
     }
 
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/GroupController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/GroupController.java
new file mode 100644
index 0000000..12b4fc4
--- /dev/null
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/GroupController.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.manager.controller.group;
+
+
+
+import static org.apache.tubemq.manager.service.MasterService.requestMaster;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.ADD;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.CLONE;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.DELETE;
+import static org.apache.tubemq.manager.service.MasterService.queryMaster;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.REBALANCE_CONSUMER_GROUP;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.REBALANCE_CONSUMER;
+
+import com.google.gson.Gson;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.tubemq.manager.controller.TubeMQResult;
+import org.apache.tubemq.manager.controller.group.request.AddBlackGroupReq;
+import org.apache.tubemq.manager.controller.group.request.DeleteBlackGroupReq;
+import org.apache.tubemq.manager.controller.group.request.DeleteOffsetReq;
+import org.apache.tubemq.manager.controller.node.request.CloneOffsetReq;
+import org.apache.tubemq.manager.controller.topic.request.BatchAddGroupAuthReq;
+import org.apache.tubemq.manager.controller.topic.request.DeleteGroupReq;
+import org.apache.tubemq.manager.controller.topic.request.RebalanceConsumerReq;
+import org.apache.tubemq.manager.controller.topic.request.RebalanceGroupReq;
+import org.apache.tubemq.manager.service.TopicService;
+import org.apache.tubemq.manager.service.MasterService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.ResponseBody;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+@RequestMapping(path = "/v1/group")
+@Slf4j
+public class GroupController {
+
+    public Gson gson = new Gson();
+
+    @Autowired
+    private MasterService masterService;
+
+    @Autowired
+    private TopicService topicService;
+
+
+    @PostMapping("")
+    public @ResponseBody TubeMQResult groupMethodProxy(
+        @RequestParam String method, @RequestBody String req) throws Exception {
+        switch (method) {
+            case ADD:
+                return masterService.baseRequestMaster(gson.fromJson(req, BatchAddGroupAuthReq.class));
+            case DELETE:
+                return masterService.baseRequestMaster(gson.fromJson(req, DeleteGroupReq.class));
+            case REBALANCE_CONSUMER_GROUP:
+                return topicService.rebalanceGroup(gson.fromJson(req, RebalanceGroupReq.class));
+            case REBALANCE_CONSUMER:
+                return masterService.baseRequestMaster(gson.fromJson(req, RebalanceConsumerReq.class));
+            default:
+                return TubeMQResult.getErrorResult("no such method");
+        }
+    }
+
+    /**
+     * query the consumer group for certain topic
+     * @param req
+     * @return
+     * @throws Exception
+     */
+    @GetMapping("/")
+    public @ResponseBody String queryConsumer(
+        @RequestParam Map<String, String> req) throws Exception {
+        String url = masterService.getQueryUrl(req);
+        return queryMaster(url);
+    }
+
+
+    @PostMapping("/offset")
+    public @ResponseBody TubeMQResult offsetProxy(
+        @RequestParam String method, @RequestBody String req) {
+        switch (method) {
+            case CLONE:
+                return topicService.cloneOffsetToOtherGroups(gson.fromJson(req, CloneOffsetReq.class));
+            case DELETE:
+                return topicService.deleteOffset(gson.fromJson(req, DeleteOffsetReq.class));
+            default:
+                return TubeMQResult.getErrorResult("no such method");
+        }
+    }
+
+
+    @PostMapping("/blackGroup")
+    public @ResponseBody TubeMQResult BlackGroupProxy(
+        @RequestParam String method, @RequestBody String req) {
+        switch (method) {
+            case ADD:
+                return masterService.baseRequestMaster(gson.fromJson(req, AddBlackGroupReq.class));
+            case DELETE:
+                return masterService.baseRequestMaster(gson.fromJson(req, DeleteBlackGroupReq.class));
+            default:
+                return TubeMQResult.getErrorResult("no such method");
+        }
+    }
+
+
+    /**
+     * query the black list for certain topic
+     * @param req
+     * @return
+     * @throws Exception
+     */
+    @GetMapping("/blackGroup")
+    public @ResponseBody String queryBlackGroup(
+        @RequestParam Map<String, String> req) throws Exception {
+        String url = masterService.getQueryUrl(req);
+        return queryMaster(url);
+    }
+
+
+
+
+}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/request/AddBlackGroupReq.java
similarity index 73%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/request/AddBlackGroupReq.java
index 0201643..71280e9 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/request/AddBlackGroupReq.java
@@ -15,14 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.tubemq.manager.controller.node.request;
+package org.apache.tubemq.manager.controller.group.request;
 
-import java.util.List;
 import lombok.Data;
+import org.apache.tubemq.manager.controller.node.request.BaseReq;
 
 @Data
-public class BatchAddTopicReq {
-    List<AddTopicReq> addTopicReqs;
-    List<Integer> brokerIds;
-    Integer clusterId;
+public class AddBlackGroupReq extends BaseReq {
+    private String groupName;
+    private String topicName;
+    private String confModAuthToken;
+    private String createUser;
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/request/DeleteBlackGroupReq.java
similarity index 75%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/request/DeleteBlackGroupReq.java
index 0201643..bcb9079 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/request/DeleteBlackGroupReq.java
@@ -15,14 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.tubemq.manager.controller.node.request;
+package org.apache.tubemq.manager.controller.group.request;
 
-import java.util.List;
 import lombok.Data;
+import org.apache.tubemq.manager.controller.node.request.BaseReq;
 
 @Data
-public class BatchAddTopicReq {
-    List<AddTopicReq> addTopicReqs;
-    List<Integer> brokerIds;
-    Integer clusterId;
+public class DeleteBlackGroupReq extends BaseReq {
+    private String topicName;
+    private String confModAuthToken;
+    private String groupName;
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/request/DeleteOffsetReq.java
similarity index 73%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/request/DeleteOffsetReq.java
index 0201643..c56041c 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/request/DeleteOffsetReq.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,14 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.tubemq.manager.controller.node.request;
+package org.apache.tubemq.manager.controller.group.request;
 
-import java.util.List;
 import lombok.Data;
+import org.apache.tubemq.manager.controller.node.request.BaseReq;
 
 @Data
-public class BatchAddTopicReq {
-    List<AddTopicReq> addTopicReqs;
-    List<Integer> brokerIds;
-    Integer clusterId;
+public class DeleteOffsetReq extends BaseReq {
+    private String groupName;
+    private String modifyUser;
+    private String topicName;
+    private Boolean onlyMemory;
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java
index 2c81825..4689762 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java
@@ -17,48 +17,46 @@
 
 package org.apache.tubemq.manager.controller.node;
 
-import com.google.common.collect.Lists;
 import com.google.gson.Gson;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
-import org.apache.http.util.EntityUtils;
 import org.apache.tubemq.manager.controller.TubeMQResult;
 import org.apache.tubemq.manager.controller.node.request.AddBrokersReq;
-import org.apache.tubemq.manager.controller.node.request.AddTopicReq;
+import org.apache.tubemq.manager.controller.node.request.BrokerSetReadOrWriteReq;
 import org.apache.tubemq.manager.controller.node.request.CloneBrokersReq;
-import org.apache.tubemq.manager.controller.node.request.QueryBrokerCfgReq;
-import org.apache.tubemq.manager.entry.NodeEntry;
+import org.apache.tubemq.manager.controller.node.request.DeleteBrokerReq;
+import org.apache.tubemq.manager.controller.node.request.OnlineOfflineBrokerReq;
+import org.apache.tubemq.manager.controller.node.request.ReloadBrokerReq;
 import org.apache.tubemq.manager.repository.NodeRepository;
 import org.apache.tubemq.manager.service.NodeService;
-import org.apache.tubemq.manager.service.tube.*;
-import org.apache.tubemq.manager.utils.MasterUtils;
+import org.apache.tubemq.manager.service.MasterService;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.MediaType;
 import org.springframework.web.bind.annotation.*;
 
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 
 import static org.apache.tubemq.manager.controller.TubeMQResult.getErrorResult;
-import static org.apache.tubemq.manager.controller.node.request.AddBrokersReq.getAddBrokerReq;
-import static org.apache.tubemq.manager.service.TubeMQHttpConst.*;
-import static org.apache.tubemq.manager.utils.ConvertUtils.convertReqToQueryStr;
-import static org.apache.tubemq.manager.utils.MasterUtils.*;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.ADD;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.ADMIN_QUERY_CLUSTER_INFO;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.CLONE;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.DELETE;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.NO_SUCH_METHOD;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.OFFLINE;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.ONLINE;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.OP_QUERY;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.RELOAD;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.SET_READ_OR_WRITE;
+import static org.apache.tubemq.manager.service.MasterService.*;
 
 @RestController
 @RequestMapping(path = "/v1/node")
 @Slf4j
 public class NodeController {
 
-    public static final String NO_SUCH_METHOD = "no such method";
-    public static final String OP_QUERY = "op_query";
-    public static final String ADMIN_QUERY_CLUSTER_INFO = "admin_query_cluster_info";
+
     private final Gson gson = new Gson();
-    private static final CloseableHttpClient httpclient = HttpClients.createDefault();
 
     @Autowired
     NodeService nodeService;
@@ -67,7 +65,7 @@ public class NodeController {
     NodeRepository nodeRepository;
 
     @Autowired
-    MasterUtils masterUtil;
+    MasterService masterService;
 
     /**
      * query brokers in certain cluster
@@ -91,11 +89,11 @@ public class NodeController {
      * query brokers' run status
      * this method supports batch operation
      */
-    @RequestMapping(value = "/query/brokerStatus", method = RequestMethod.GET,
+    @RequestMapping(value = "/broker/status", method = RequestMethod.GET,
             produces = MediaType.APPLICATION_JSON_VALUE)
     public @ResponseBody String queryBrokerDetail(
             @RequestParam Map<String, String> queryBody) throws Exception {
-        String url = masterUtil.getQueryUrl(queryBody);
+        String url = masterService.getQueryUrl(queryBody);
         return queryMaster(url);
     }
 
@@ -104,115 +102,41 @@ public class NodeController {
      * query brokers' configuration
      * this method supports batch operation
      */
-    @RequestMapping(value = "/query/brokerConfig", method = RequestMethod.GET,
+    @RequestMapping(value = "/broker/config", method = RequestMethod.GET,
             produces = MediaType.APPLICATION_JSON_VALUE)
     public @ResponseBody String queryBrokerConfig(
             @RequestParam Map<String, String> queryBody) throws Exception {
-        String url = masterUtil.getQueryUrl(queryBody);
+        String url = masterService.getQueryUrl(queryBody);
         return queryMaster(url);
     }
 
-    /**
-     * clone source broker to generate brokers with the same config and copy the topics in it.
-     * @param req
-     * @return
-     * @throws Exception
-     */
-    @RequestMapping(value = "/clone", method = RequestMethod.POST)
-    public @ResponseBody String cloneBrokers(
-            @RequestBody CloneBrokersReq req) throws Exception {
-        int clusterId = req.getClusterId();
-        TubeMQResult tubeResult = nodeService.cloneBrokersWithTopic(req, clusterId);
-        return gson.toJson(tubeResult);
-    }
-
-
 
 
     /**
-     * add brokers to cluster, need to check token and
-     * make sure user has authorization to modify it.
+     * broker method proxy
+     * divides the operation on broker to different method
      */
-    @RequestMapping(value = "/add", method = RequestMethod.POST)
-    public @ResponseBody String addBrokers(
-            @RequestBody AddBrokersReq req) throws Exception {
-        String token = req.getConfModAuthToken();
-        int clusterId = req.getClusterId();
-
-        if (StringUtils.isNotBlank(token)) {
-            NodeEntry masterEntry = nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(
-                    clusterId);
-            TubeMQResult result = addBrokersToCluster(req, masterEntry);
-            return gson.toJson(result);
-        } else {
-            TubeMQResult result = new TubeMQResult();
-            result.setErrCode(-1);
-            result.setResult(false);
-            result.setErrMsg("token is not correct");
-            return gson.toJson(result);
+    @RequestMapping(value = "/broker")
+    public @ResponseBody
+    TubeMQResult brokerMethodProxy(
+        @RequestParam String method, @RequestBody String req) throws Exception {
+        switch (method) {
+            case CLONE:
+                return nodeService.cloneBrokersWithTopic(gson.fromJson(req, CloneBrokersReq.class));
+            case ADD:
+                return masterService.baseRequestMaster(gson.fromJson(req, AddBrokersReq.class));
+            case ONLINE:
+            case OFFLINE:
+                return masterService.baseRequestMaster(gson.fromJson(req, OnlineOfflineBrokerReq.class));
+            case RELOAD:
+                return masterService.baseRequestMaster(gson.fromJson(req, ReloadBrokerReq.class));
+            case DELETE:
+                return masterService.baseRequestMaster(gson.fromJson(req, DeleteBrokerReq.class));
+            case SET_READ_OR_WRITE:
+                return masterService.baseRequestMaster(gson.fromJson(req, BrokerSetReadOrWriteReq.class));
+            default:
+                return TubeMQResult.getErrorResult("no such method");
         }
-
-    }
-
-
-    /**
-     * online brokers in cluster
-     * this method supports batch operation
-     */
-    @RequestMapping(value = "/online", method = RequestMethod.GET)
-    public @ResponseBody String onlineBrokers(
-            @RequestParam Map<String, String> queryBody) throws Exception {
-        return gson.toJson(masterUtil.redirectToMaster(queryBody));
-    }
-
-    /**
-     * reload brokers in cluster
-     * this method supports batch operation
-     */
-    @RequestMapping(value = "/reload", method = RequestMethod.GET)
-    public @ResponseBody String reloadBrokers(
-            @RequestParam Map<String, String> queryBody) throws Exception {
-        return gson.toJson(masterUtil.redirectToMaster(queryBody));
-    }
-
-    /**
-     * delete brokers in cluster
-     * this method supports batch operation
-     */
-    @RequestMapping(value = "/delete", method = RequestMethod.GET)
-    public @ResponseBody String deleteBrokers(
-            @RequestParam Map<String, String> queryBody) throws Exception {
-        TubeMQResult result = masterUtil.redirectToMaster(queryBody);
-        return gson.toJson(result);
-    }
-
-    /**
-     * change brokers' read mode in cluster
-     * this method supports batch operation
-     */
-    @RequestMapping(value = "/setRead", method = RequestMethod.GET)
-    public @ResponseBody String setBrokersRead(
-            @RequestParam Map<String, String> queryBody) throws Exception {
-        TubeMQResult result = masterUtil.redirectToMaster(queryBody);
-        return gson.toJson(result);
-    }
-
-    /**
-     * change brokers' write mode in cluster
-     * this method supports batch operation
-     */
-    @RequestMapping(value = "/setWrite", method = RequestMethod.GET)
-    public @ResponseBody String setBrokersWrite(
-            @RequestParam Map<String, String> queryBody) throws Exception {
-        TubeMQResult result = masterUtil.redirectToMaster(queryBody);
-        return gson.toJson(result);
-    }
-
-    private TubeMQResult addBrokersToCluster(AddBrokersReq req, NodeEntry masterEntry) throws Exception {
-        String url = SCHEMA + masterEntry.getIp() + ":" + masterEntry.getWebPort()
-                + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
-        TubeMQResult tubeMQResult = requestMaster(url);
-        return tubeMQResult;
     }
 
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
index 5bfb486..aea0a2e 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
@@ -17,8 +17,6 @@
 
 package org.apache.tubemq.manager.controller.node.request;
 
-
-import lombok.Builder;
 import lombok.Data;
 import org.apache.tubemq.manager.service.tube.BrokerConf;
 
@@ -27,24 +25,12 @@ import java.util.List;
 import static org.apache.tubemq.manager.service.TubeMQHttpConst.*;
 
 @Data
-public class AddBrokersReq {
+public class AddBrokersReq extends BaseReq{
 
     private String confModAuthToken;
 
     private String createUser;
 
-    private int clusterId;
-
-    /**
-     * admin_bath_add_broker_configure
-     */
-    private String method;
-
-    /**
-     * op_modify
-     */
-    private String type;
-
     private List<BrokerConf> brokerJsonSet;
 
     public static AddBrokersReq getAddBrokerReq(String token, int clusterId) {
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddTopicReq.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddTopicReq.java
index 1753be6..39f0504 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddTopicReq.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddTopicReq.java
@@ -22,17 +22,15 @@ import lombok.Data;
 
 @Data
 public class AddTopicReq extends BaseReq {
-    public String createUser;
-    public String deleteWhen;
-    public Integer unflushThreshold;
-    public Boolean acceptPublish;
-    public Integer numPartitions;
-    public Integer unflushInterval;
-    public Boolean acceptSubscribe;
-    public String method;
-    public String type;
-    public String brokerId;
-    public String confModAuthToken;
-    public String topicName;
-    public String deletePolicy;
+    private String createUser;
+    private String deleteWhen;
+    private Integer unflushThreshold;
+    private Boolean acceptPublish;
+    private Integer numPartitions;
+    private Integer unflushInterval;
+    private Boolean acceptSubscribe;
+    private String brokerId;
+    private String confModAuthToken;
+    private String topicName;
+    private String deletePolicy;
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
index 0201643..08ec6ae 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
@@ -21,8 +21,7 @@ import java.util.List;
 import lombok.Data;
 
 @Data
-public class BatchAddTopicReq {
+public class BatchAddTopicReq extends BaseReq{
     List<AddTopicReq> addTopicReqs;
     List<Integer> brokerIds;
-    Integer clusterId;
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BrokerSetReadOrWriteReq.java
similarity index 79%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BrokerSetReadOrWriteReq.java
index 0201643..3b8fbb2 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BrokerSetReadOrWriteReq.java
@@ -17,12 +17,13 @@
 
 package org.apache.tubemq.manager.controller.node.request;
 
-import java.util.List;
 import lombok.Data;
 
 @Data
-public class BatchAddTopicReq {
-    List<AddTopicReq> addTopicReqs;
-    List<Integer> brokerIds;
-    Integer clusterId;
+public class BrokerSetReadOrWriteReq extends BaseReq{
+    private Boolean isAcceptPublish;
+    private Boolean isAcceptSubscribe;
+    private String modifyUser;
+    private String brokerId;
+    private String confModAuthToken;
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/DeleteBrokerReq.java
similarity index 83%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/DeleteBrokerReq.java
index 0201643..df59bf6 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/DeleteBrokerReq.java
@@ -17,12 +17,12 @@
 
 package org.apache.tubemq.manager.controller.node.request;
 
-import java.util.List;
 import lombok.Data;
 
 @Data
-public class BatchAddTopicReq {
-    List<AddTopicReq> addTopicReqs;
-    List<Integer> brokerIds;
-    Integer clusterId;
+public class DeleteBrokerReq extends BaseReq{
+    private Boolean isReservedData;
+    private String modifyUser;
+    private String brokerId;
+    private String confModAuthToken;
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/OnlineOfflineBrokerReq.java
similarity index 85%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/OnlineOfflineBrokerReq.java
index 0201643..1d14533 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/OnlineOfflineBrokerReq.java
@@ -17,12 +17,12 @@
 
 package org.apache.tubemq.manager.controller.node.request;
 
-import java.util.List;
+
 import lombok.Data;
 
 @Data
-public class BatchAddTopicReq {
-    List<AddTopicReq> addTopicReqs;
-    List<Integer> brokerIds;
-    Integer clusterId;
+public class OnlineOfflineBrokerReq extends BaseReq{
+    private Integer brokerId;
+    private String modifyUser;
+    private String confModAuthToken;
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/ReloadBrokerReq.java
similarity index 86%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/ReloadBrokerReq.java
index 0201643..65f65b7 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/ReloadBrokerReq.java
@@ -17,12 +17,11 @@
 
 package org.apache.tubemq.manager.controller.node.request;
 
-import java.util.List;
 import lombok.Data;
 
 @Data
-public class BatchAddTopicReq {
-    List<AddTopicReq> addTopicReqs;
-    List<Integer> brokerIds;
-    Integer clusterId;
+public class ReloadBrokerReq extends BaseReq{
+    private Integer brokerId;
+    private String modifyUser;
+    private String confModAuthToken;
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java
index 9511c96..b4c4fc0 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java
@@ -18,27 +18,31 @@
 
 package org.apache.tubemq.manager.controller.topic;
 
-import static org.apache.tubemq.manager.service.TubeMQHttpConst.SCHEMA;
-import static org.apache.tubemq.manager.utils.MasterUtils.TUBE_REQUEST_PATH;
-import static org.apache.tubemq.manager.utils.MasterUtils.queryMaster;
-import static org.apache.tubemq.manager.utils.MasterUtils.requestMaster;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.ADD;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.AUTH_CONTROL;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.CLONE;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.DELETE;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.MODIFY;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.REMOVE;
+import static org.apache.tubemq.manager.service.MasterService.queryMaster;
+import static org.apache.tubemq.manager.service.MasterService.requestMaster;
+
 
 import com.google.gson.Gson;
 import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.tubemq.manager.controller.TubeMQResult;
 import org.apache.tubemq.manager.controller.node.request.BatchAddTopicReq;
-import org.apache.tubemq.manager.controller.node.request.CloneOffsetReq;
 import org.apache.tubemq.manager.controller.node.request.CloneTopicReq;
-import org.apache.tubemq.manager.controller.topic.request.BatchAddGroupAuthReq;
-import org.apache.tubemq.manager.controller.topic.request.DeleteGroupReq;
+import org.apache.tubemq.manager.controller.topic.request.DeleteTopicReq;
+import org.apache.tubemq.manager.controller.topic.request.ModifyTopicReq;
+import org.apache.tubemq.manager.controller.topic.request.RebalanceGroupReq;
+import org.apache.tubemq.manager.controller.topic.request.SetAuthControlReq;
 import org.apache.tubemq.manager.entry.NodeEntry;
 import org.apache.tubemq.manager.repository.NodeRepository;
-import org.apache.tubemq.manager.repository.TopicRepository;
 import org.apache.tubemq.manager.service.NodeService;
-import org.apache.tubemq.manager.service.TopicBackendWorker;
-import org.apache.tubemq.manager.utils.ConvertUtils;
-import org.apache.tubemq.manager.utils.MasterUtils;
+import org.apache.tubemq.manager.service.MasterService;
+import org.apache.tubemq.manager.service.TopicService;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.PostMapping;
@@ -54,98 +58,35 @@ import org.springframework.web.bind.annotation.RestController;
 public class TopicWebController {
 
     @Autowired
-    private TopicRepository topicRepository;
-
-    @Autowired
-    private TopicBackendWorker topicBackendWorker;
-
-    @Autowired
     private NodeService nodeService;
 
-    @Autowired
-    private NodeRepository nodeRepository;
-
     public Gson gson = new Gson();
 
     @Autowired
-    private MasterUtils masterUtils;
-
-    /**
-     * add topic to brokers
-     * @param req
-     * @return
-     */
-    @PostMapping("/add")
-    public TubeMQResult addTopic(@RequestBody BatchAddTopicReq req) {
-        if (req.getClusterId() == null) {
-            return TubeMQResult.getErrorResult("please input clusterId");
-        }
-        NodeEntry masterEntry = nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(
-            req.getClusterId());
-        if (masterEntry == null) {
-            return TubeMQResult.getErrorResult("no such cluster");
-        }
-        return nodeService.addTopicsToBrokers(masterEntry, req.getBrokerIds(), req.getAddTopicReqs());
-    }
-
-    /**
-     * given one topic, copy its config and clone to brokers
-     * if no broker is is provided, topics will be cloned to all brokers in cluster
-     * @param req
-     * @return
-     * @throws Exception
-     */
-    @PostMapping("/clone")
-    public TubeMQResult cloneTopic(@RequestBody CloneTopicReq req) throws Exception {
-        if (req.getClusterId() == null) {
-            return TubeMQResult.getErrorResult("please input clusterId");
-        }
-        NodeEntry masterEntry = nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(
-            req.getClusterId());
-        if (masterEntry == null) {
-            return TubeMQResult.getErrorResult("no such cluster");
+    private MasterService masterService;
+
+    /**
+     * broker method proxy
+     * divides the operation on broker to different method
+     */
+    @RequestMapping(value = "")
+    public @ResponseBody TubeMQResult topicMethodProxy(
+        @RequestParam String method, @RequestBody String req) throws Exception {
+        switch (method) {
+            case ADD:
+                return nodeService.addTopic(gson.fromJson(req, BatchAddTopicReq.class));
+            case CLONE:
+                return nodeService.cloneTopicToBrokers(gson.fromJson(req, CloneTopicReq.class));
+            case AUTH_CONTROL:
+                return masterService.baseRequestMaster(gson.fromJson(req, SetAuthControlReq.class));
+            case MODIFY:
+                return masterService.baseRequestMaster(gson.fromJson(req, ModifyTopicReq.class));
+            case DELETE:
+            case REMOVE:
+                return masterService.baseRequestMaster(gson.fromJson(req, DeleteTopicReq.class));
+            default:
+                return TubeMQResult.getErrorResult("no such method");
         }
-        return nodeService.cloneTopicToBrokers(req, masterEntry);
-    }
-
-    /**
-     * batch modify topic config
-     * @param req
-     * @return
-     * @throws Exception
-     */
-    @PostMapping("/modify")
-    public @ResponseBody String modifyTopics(
-        @RequestParam Map<String, String> req) throws Exception {
-        String url = masterUtils.getQueryUrl(req);
-        return queryMaster(url);
-    }
-
-    /**
-     * batch delete topic info
-     * @param req
-     * @return
-     * @throws Exception
-     */
-    @PostMapping("/delete")
-    public @ResponseBody String deleteTopics(
-        @RequestParam Map<String, String> req) throws Exception {
-        String url = masterUtils.getQueryUrl(req);
-        return queryMaster(url);
-    }
-
-
-    /**
-     * batch remove topics
-     * @param req
-     * @return
-     * @throws Exception
-     */
-    @PostMapping("/remove")
-    public @ResponseBody String removeTopics(
-        @RequestParam Map<String, String> req) throws Exception {
-        String url = masterUtils.getQueryUrl(req);
-        return queryMaster(url);
     }
 
     /**
@@ -154,10 +95,10 @@ public class TopicWebController {
      * @return
      * @throws Exception
      */
-    @PostMapping("/query/consumer-auth")
+    @GetMapping("/consumerAuth")
     public @ResponseBody String queryConsumerAuth(
         @RequestParam Map<String, String> req) throws Exception {
-        String url = masterUtils.getQueryUrl(req);
+        String url = masterService.getQueryUrl(req);
         return queryMaster(url);
     }
 
@@ -167,150 +108,11 @@ public class TopicWebController {
      * @return
      * @throws Exception
      */
-    @PostMapping("/query/topic-config")
+    @GetMapping("/topicConfig")
     public @ResponseBody String queryTopicConfig(
         @RequestParam Map<String, String> req) throws Exception {
-        String url = masterUtils.getQueryUrl(req);
-        return queryMaster(url);
-    }
-
-    /**
-     * add group to black list for certain topic
-     * @param req
-     * @return
-     * @throws Exception
-     */
-    @GetMapping("/add/blackGroup")
-    public @ResponseBody TubeMQResult addBlackGroup(
-        @RequestParam Map<String, String> req) throws Exception {
-        String url = masterUtils.getQueryUrl(req);
-        return requestMaster(url);
-    }
-
-    /**
-     * delete group to black list for certain topic
-     * @param req
-     * @return
-     * @throws Exception
-     */
-    @GetMapping("/delete/blackGroup")
-    public @ResponseBody TubeMQResult deleteBlackGroup(
-        @RequestParam Map<String, String> req) throws Exception {
-        String url = masterUtils.getQueryUrl(req);
-        return requestMaster(url);
-    }
-
-    /**
-     * query the black list for certain topic
-     * @param req
-     * @return
-     * @throws Exception
-     */
-    @GetMapping("/query/blackGroup")
-    public @ResponseBody String queryBlackGroup(
-        @RequestParam Map<String, String> req) throws Exception {
-        String url = masterUtils.getQueryUrl(req);
-        return queryMaster(url);
-    }
-
-    /**
-     * batch add consumer group for certain topic
-     * @param req
-     * @return
-     * @throws Exception
-     */
-    @PostMapping("/add/group")
-    public @ResponseBody TubeMQResult addConsumer(
-        @RequestBody BatchAddGroupAuthReq req) throws Exception {
-        NodeEntry nodeEntry =
-            nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(req.getClusterId());
-        if (nodeEntry == null) {
-            return TubeMQResult.getErrorResult("no such cluster");
-        }
-        String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort()
-            + "/" + TUBE_REQUEST_PATH + "?" + ConvertUtils.convertReqToQueryStr(req);
-        return requestMaster(url);
-    }
-
-
-    /**
-     * delete consumer group for certain topic
-     * @param req
-     * @return
-     * @throws Exception
-     */
-    @PostMapping("/delete/group")
-    public @ResponseBody TubeMQResult deleteConsumer(
-        @RequestBody DeleteGroupReq req) throws Exception {
-        NodeEntry nodeEntry =
-            nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(req.getClusterId());
-        if (nodeEntry == null) {
-            return TubeMQResult.getErrorResult("no such cluster");
-        }
-        String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort()
-            + "/" + TUBE_REQUEST_PATH + "?" + ConvertUtils.convertReqToQueryStr(req);
-        return requestMaster(url);
-    }
-
-    /**
-     * enable auth control for topics
-     * @param req
-     * @return
-     * @throws Exception
-     */
-    @GetMapping("/enable/auth-control")
-    public @ResponseBody TubeMQResult enableAuthControl(
-        @RequestParam Map<String, String> req) throws Exception {
-        String url = masterUtils.getQueryUrl(req);
-        return requestMaster(url);
-    }
-
-    /**
-     * disable auth control for topics
-     * @param req
-     * @return
-     * @throws Exception
-     */
-    @GetMapping("/disable/auth-control")
-    public @ResponseBody TubeMQResult disableAuthControl(
-        @RequestParam Map<String, String> req) throws Exception {
-        String url = masterUtils.getQueryUrl(req);
-        return requestMaster(url);
-    }
-
-    /**
-     * query the consumer group for certain topic
-     * @param req
-     * @return
-     * @throws Exception
-     */
-    @GetMapping("/query/group")
-    public @ResponseBody String queryConsumer(
-        @RequestParam Map<String, String> req) throws Exception {
-        String url = masterUtils.getQueryUrl(req);
+        String url = masterService.getQueryUrl(req);
         return queryMaster(url);
     }
 
-
-    /**
-     * clone offset from one group to another
-     * @param req
-     * @return
-     * @throws Exception
-     */
-    @PostMapping("/clone/offset")
-    public @ResponseBody TubeMQResult cloneOffset(
-        @RequestBody CloneOffsetReq req) throws Exception {
-        if (req.getClusterId() == null) {
-            return TubeMQResult.getErrorResult("please input clusterId");
-        }
-        NodeEntry masterEntry = nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(
-            req.getClusterId());
-        if (masterEntry == null) {
-            return TubeMQResult.getErrorResult("no such cluster");
-        }
-        return nodeService.cloneOffsetToOtherGroups(req, masterEntry);
-    }
-
-
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/request/DeleteTopicReq.java
similarity index 73%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/request/DeleteTopicReq.java
index 0201643..2482773 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/request/DeleteTopicReq.java
@@ -15,14 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.tubemq.manager.controller.node.request;
+package org.apache.tubemq.manager.controller.topic.request;
 
-import java.util.List;
 import lombok.Data;
+import org.apache.tubemq.manager.controller.node.request.BaseReq;
 
 @Data
-public class BatchAddTopicReq {
-    List<AddTopicReq> addTopicReqs;
-    List<Integer> brokerIds;
-    Integer clusterId;
+public class DeleteTopicReq extends BaseReq {
+    private String topicName;
+    private String brokerId;
+    private String modifyUser;
+    private String confModAuthToken;
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/request/ModifyTopicReq.java
similarity index 54%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/request/ModifyTopicReq.java
index 0201643..4d19696 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/request/ModifyTopicReq.java
@@ -15,14 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.tubemq.manager.controller.node.request;
+package org.apache.tubemq.manager.controller.topic.request;
 
-import java.util.List;
 import lombok.Data;
+import org.apache.tubemq.manager.controller.node.request.BaseReq;
 
 @Data
-public class BatchAddTopicReq {
-    List<AddTopicReq> addTopicReqs;
-    List<Integer> brokerIds;
-    Integer clusterId;
+public class ModifyTopicReq extends BaseReq {
+    private String modifyUser;
+    private String deleteWhen;
+    private Integer unflushThreshold;
+    private Boolean acceptPublish;
+    private Integer numPartitions;
+    private Integer unflushInterval;
+    private Boolean acceptSubscribe;
+    private String brokerId;
+    private String confModAuthToken;
+    private String topicName;
+    private String deletePolicy;
+    private Integer unflushDataHold;
+    private Integer numTopicStores;
+    private Integer memCacheMsgCntInK;
+    private Integer memCacheMsgSizeInMB;
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/request/RebalanceConsumerReq.java
similarity index 71%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/request/RebalanceConsumerReq.java
index 0201643..040d5ea 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/request/RebalanceConsumerReq.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,14 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.tubemq.manager.controller.node.request;
+package org.apache.tubemq.manager.controller.topic.request;
 
-import java.util.List;
 import lombok.Data;
+import org.apache.tubemq.manager.controller.node.request.BaseReq;
 
 @Data
-public class BatchAddTopicReq {
-    List<AddTopicReq> addTopicReqs;
-    List<Integer> brokerIds;
-    Integer clusterId;
+public class RebalanceConsumerReq extends BaseReq {
+    public String groupName;
+    public String confModAuthToken;
+    public Integer reJoinWait;
+    public String modifyUser;
+    public String consumerId;
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/request/RebalanceGroupReq.java
similarity index 73%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/request/RebalanceGroupReq.java
index 0201643..92271ca 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/request/RebalanceGroupReq.java
@@ -15,14 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.tubemq.manager.controller.node.request;
+package org.apache.tubemq.manager.controller.topic.request;
 
-import java.util.List;
 import lombok.Data;
+import org.apache.tubemq.manager.controller.node.request.BaseReq;
 
 @Data
-public class BatchAddTopicReq {
-    List<AddTopicReq> addTopicReqs;
-    List<Integer> brokerIds;
-    Integer clusterId;
+public class RebalanceGroupReq extends BaseReq {
+    public String groupName;
+    public String confModAuthToken;
+    public Integer reJoinWait;
+    public String modifyUser;
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/request/SetAuthControlReq.java
similarity index 73%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/request/SetAuthControlReq.java
index 0201643..1dcb756 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/request/SetAuthControlReq.java
@@ -15,14 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.tubemq.manager.controller.node.request;
+package org.apache.tubemq.manager.controller.topic.request;
 
-import java.util.List;
 import lombok.Data;
+import org.apache.tubemq.manager.controller.node.request.BaseReq;
 
 @Data
-public class BatchAddTopicReq {
-    List<AddTopicReq> addTopicReqs;
-    List<Integer> brokerIds;
-    Integer clusterId;
+public class SetAuthControlReq extends BaseReq {
+    private String confModAuthToken;
+    private String topicName;
+    private String createUser;
+    private Boolean isEnable;
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/MasterUtils.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/MasterService.java
similarity index 79%
rename from tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/MasterUtils.java
rename to tubemq-manager/src/main/java/org/apache/tubemq/manager/service/MasterService.java
index 5dea029..8f726e9 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/MasterUtils.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/MasterService.java
@@ -16,7 +16,7 @@
  */
 
 
-package org.apache.tubemq.manager.utils;
+package org.apache.tubemq.manager.service;
 
 import com.google.gson.Gson;
 import lombok.extern.slf4j.Slf4j;
@@ -27,6 +27,8 @@ import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
 import org.apache.http.util.EntityUtils;
 import org.apache.tubemq.manager.controller.TubeMQResult;
+import org.apache.tubemq.manager.controller.node.request.AddTopicReq;
+import org.apache.tubemq.manager.controller.node.request.BaseReq;
 import org.apache.tubemq.manager.entry.NodeEntry;
 import org.apache.tubemq.manager.repository.NodeRepository;
 import org.apache.tubemq.manager.service.tube.TubeHttpResponse;
@@ -43,13 +45,14 @@ import java.util.Map;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.tubemq.manager.controller.TubeMQResult.getErrorResult;
 import static org.apache.tubemq.manager.service.TubeMQHttpConst.SCHEMA;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.SUCCESS_CODE;
+import static org.apache.tubemq.manager.utils.ConvertUtils.convertReqToQueryStr;
 
 
 @Slf4j
 @Component
-public class MasterUtils {
+public class MasterService {
 
-    public static final int SUCCESS_CODE = 0;
     private static CloseableHttpClient httpclient = HttpClients.createDefault();
     private static Gson gson = new Gson();
     public static final String TUBE_REQUEST_PATH = "webapi.htm";
@@ -69,7 +72,7 @@ public class MasterUtils {
 
 
 
-    public static TubeMQResult requestMaster(String url) throws Exception {
+    public static TubeMQResult requestMaster(String url) {
 
         log.info("start to request {}", url);
         HttpGet httpGet = new HttpGet(url);
@@ -113,21 +116,30 @@ public class MasterUtils {
     }
 
 
+    public TubeMQResult baseRequestMaster(BaseReq req) {
+        if (req.getClusterId() == null) {
+            return TubeMQResult.getErrorResult("please input clusterId");
+        }
+        NodeEntry masterEntry = nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(
+            req.getClusterId());
+        if (masterEntry == null) {
+            return TubeMQResult.getErrorResult("no such cluster");
+        }
+        String url = SCHEMA + masterEntry.getIp() + ":" + masterEntry.getWebPort()
+            + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
+        return requestMaster(url);
+    }
 
 
-    public TubeMQResult redirectToMaster(Map<String, String> queryBody) throws Exception {
-        int clusterId = Integer.parseInt(queryBody.get("clusterId"));
-        queryBody.remove("clusterId");
-        NodeEntry nodeEntry =
-                nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(clusterId);
-        if (nodeEntry == null) {
-            return TubeMQResult.getErrorResult("ClusterId doesn't exist");
+    public NodeEntry getMasterNode(BaseReq req) {
+        if (req.getClusterId() == null) {
+            return null;
         }
-        String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort()
-                + "/" + TUBE_REQUEST_PATH + "?" + covertMapToQueryString(queryBody);
-        return requestMaster(url);
+        return nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(
+            req.getClusterId());
     }
 
+
     public String getQueryUrl(Map<String, String> queryBody) throws Exception {
         int clusterId = Integer.parseInt(queryBody.get("clusterId"));
         queryBody.remove("clusterId");
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
index 927025a..ddcfc75 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
@@ -19,13 +19,16 @@ package org.apache.tubemq.manager.service;
 
 
 import static org.apache.tubemq.manager.controller.node.request.AddBrokersReq.getAddBrokerReq;
+import static org.apache.tubemq.manager.service.MasterService.TUBE_REQUEST_PATH;
+import static org.apache.tubemq.manager.service.MasterService.queryMaster;
+import static org.apache.tubemq.manager.service.MasterService.requestMaster;
 import static org.apache.tubemq.manager.service.TubeMQHttpConst.ADD_TUBE_TOPIC;
 import static org.apache.tubemq.manager.service.TubeMQHttpConst.BROKER_RUN_STATUS;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.NO_SUCH_CLUSTER;
 import static org.apache.tubemq.manager.service.TubeMQHttpConst.RELOAD_BROKER;
 import static org.apache.tubemq.manager.service.TubeMQHttpConst.SCHEMA;
-import static org.apache.tubemq.manager.service.TubeMQHttpConst.TOPIC_CONFIG_INFO;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.SUCCESS_CODE;
 import static org.apache.tubemq.manager.utils.ConvertUtils.convertReqToQueryStr;
-import static org.apache.tubemq.manager.utils.MasterUtils.*;
 
 import com.google.common.collect.Lists;
 import com.google.gson.Gson;
@@ -44,15 +47,13 @@ import org.apache.http.impl.client.HttpClients;
 import org.apache.tubemq.manager.controller.TubeMQResult;
 import org.apache.tubemq.manager.controller.node.request.AddBrokersReq;
 import org.apache.tubemq.manager.controller.node.request.AddTopicReq;
+import org.apache.tubemq.manager.controller.node.request.BatchAddTopicReq;
 import org.apache.tubemq.manager.controller.node.request.CloneBrokersReq;
-import org.apache.tubemq.manager.controller.node.request.CloneOffsetReq;
 import org.apache.tubemq.manager.controller.node.request.CloneTopicReq;
 import org.apache.tubemq.manager.controller.node.request.QueryBrokerCfgReq;
 import org.apache.tubemq.manager.entry.NodeEntry;
 import org.apache.tubemq.manager.repository.NodeRepository;
 import org.apache.tubemq.manager.service.tube.*;
-import org.apache.tubemq.manager.service.tube.TubeHttpBrokerInfoList.BrokerInfo;
-import org.apache.tubemq.manager.service.tube.TubeHttpTopicInfoList.TopicInfoList.TopicInfo;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
@@ -75,12 +76,15 @@ public class NodeService {
 
     private final TopicBackendWorker worker;
 
-    @Value("${manager.broker.webPort:8081}")
-    private int brokerWebPort;
-
     @Autowired
     private NodeRepository nodeRepository;
 
+    @Autowired
+    private TopicService topicService;
+
+    @Autowired
+    private MasterService masterService;
+
     public NodeService(TopicBackendWorker worker) {
         this.worker = worker;
     }
@@ -92,7 +96,7 @@ public class NodeService {
      * @return
      * @throws IOException
      */
-    private TubeHttpBrokerInfoList requestClusterNodeStatus(NodeEntry nodeEntry) throws IOException {
+    private TubeHttpBrokerInfoList requestClusterNodeStatus(NodeEntry nodeEntry) {
         String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort() + BROKER_RUN_STATUS;
         HttpGet httpget = new HttpGet(url);
         try (CloseableHttpResponse response = httpclient.execute(httpget)) {
@@ -112,26 +116,17 @@ public class NodeService {
     }
 
 
-    private TubeHttpTopicInfoList requestTopicConfigInfo(NodeEntry nodeEntry, String topic) {
-        String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort()
-                + TOPIC_CONFIG_INFO + "&topicName=" + topic;
-        HttpGet httpget = new HttpGet(url);
-        try (CloseableHttpResponse response = httpclient.execute(httpget)) {
-            TubeHttpTopicInfoList topicInfoList =
-                    gson.fromJson(new InputStreamReader(response.getEntity().getContent()),
-                            TubeHttpTopicInfoList.class);
-            if (topicInfoList.getErrCode() == SUCCESS_CODE) {
-                return topicInfoList;
-            }
-        } catch (Exception ex) {
-            log.error("exception caught while requesting broker status", ex);
-        }
-        return null;
-    }
 
 
-    public TubeMQResult cloneBrokersWithTopic(CloneBrokersReq req, int clusterId) throws Exception {
+    /**
+     * clone source broker to generate brokers with the same config and copy the topics in it.
+     * @param req
+     * @return
+     * @throws Exception
+     */
+    public TubeMQResult cloneBrokersWithTopic(CloneBrokersReq req) throws Exception {
 
+        int clusterId = req.getClusterId();
         // 1. query source broker config
         QueryBrokerCfgReq queryReq = QueryBrokerCfgReq.getReq(req.getSourceBrokerId());
         NodeEntry masterEntry = nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(
@@ -222,14 +217,6 @@ public class NodeService {
     }
 
 
-    public TubeMQResult addBrokersToCluster(AddBrokersReq req, NodeEntry masterEntry) throws Exception {
-        String url = SCHEMA + masterEntry.getIp() + ":" + masterEntry.getWebPort()
-                + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
-        TubeMQResult tubeMQResult = requestMaster(url);
-        return tubeMQResult;
-    }
-
-
     private boolean configBrokersForTopics(NodeEntry nodeEntry,
             Set<String> topics, List<Integer> brokerList, int maxBrokers) {
         List<Integer> finalBrokerList = brokerList.subList(0, maxBrokers);
@@ -293,7 +280,7 @@ public class NodeService {
         // 1. check tubemq cluster by topic name, remove pending topic if has added.
         Set<String> brandNewTopics = new HashSet<>();
         for (String topic : pendingTopic.keySet()) {
-            TubeHttpTopicInfoList topicInfoList = requestTopicConfigInfo(nodeEntry, topic);
+            TubeHttpTopicInfoList topicInfoList = topicService.requestTopicConfigInfo(nodeEntry, topic);
             if (topicInfoList != null) {
                 // get broker list by topic request
                 List<Integer> topicBrokerList = topicInfoList.getTopicBrokerIdList();
@@ -421,10 +408,22 @@ public class NodeService {
         return null;
     }
 
-    public TubeMQResult cloneTopicToBrokers(CloneTopicReq req, NodeEntry master) throws Exception {
+    /**
+     * given one topic, copy its config and clone to brokers
+     * if no broker is is provided, topics will be cloned to all brokers in cluster
+     * @param req
+     * @return
+     * @throws Exception
+     */
+    public TubeMQResult cloneTopicToBrokers(CloneTopicReq req) throws Exception {
+
+        NodeEntry master = masterService.getMasterNode(req);
+        if (master == null) {
 
+            return TubeMQResult.getErrorResult(NO_SUCH_CLUSTER);
+        }
         // 1 query topic config
-        TubeHttpTopicInfoList topicInfoList = requestTopicConfigInfo(master, req.getSourceTopicName());
+        TubeHttpTopicInfoList topicInfoList = topicService.requestTopicConfigInfo(master, req.getSourceTopicName());
 
         if (topicInfoList == null) {
             return TubeMQResult.getErrorResult("no such topic");
@@ -449,33 +448,16 @@ public class NodeService {
 
     }
 
-    public TubeMQResult cloneOffsetToOtherGroups(CloneOffsetReq req, NodeEntry master)
-        throws Exception {
-
-        // 1. query the corresponding brokers having given topic
-        TubeHttpTopicInfoList topicInfoList = requestTopicConfigInfo(master, req.getTopicName());
-        TubeMQResult result = new TubeMQResult();
-
-        if (topicInfoList == null) {
-            return result;
-        }
-
-        List<TopicInfo> topicInfos = topicInfoList.getTopicInfo();
-        // 2. for each broker, request to clone offset
-        for (TopicInfo topicInfo : topicInfos) {
-            String brokerIp = topicInfo.getBrokerIp();
-            String url = SCHEMA + brokerIp + ":" + brokerWebPort
-                + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
-            result = requestMaster(url);
-            if (result.getErrCode() != SUCCESS_CODE) {
-                return result;
-            }
+    /**
+     * add topic to brokers
+     * @param req
+     * @return
+     */
+    public TubeMQResult addTopic(BatchAddTopicReq req) {
+        NodeEntry masterEntry = masterService.getMasterNode(req);
+        if (masterEntry == null) {
+            return TubeMQResult.getErrorResult(NO_SUCH_CLUSTER);
         }
-
-
-        return result;
+        return addTopicsToBrokers(masterEntry, req.getBrokerIds(), req.getAddTopicReqs());
     }
-
-
-
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicService.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicService.java
new file mode 100644
index 0000000..1abff54
--- /dev/null
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicService.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.manager.service;
+
+
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.QUERY_GROUP_DETAIL_INFO;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.SCHEMA;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.SUCCESS_CODE;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.TOPIC_CONFIG_INFO;
+import static org.apache.tubemq.manager.utils.ConvertUtils.convertReqToQueryStr;
+import static org.apache.tubemq.manager.utils.ConvertUtils.convertToRebalanceConsumerReq;
+import static org.apache.tubemq.manager.service.MasterService.TUBE_REQUEST_PATH;
+import static org.apache.tubemq.manager.service.MasterService.requestMaster;
+
+import com.google.gson.Gson;
+import java.io.InputStreamReader;
+import java.util.List;
+import java.util.Objects;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.tubemq.manager.controller.TubeMQResult;
+import org.apache.tubemq.manager.controller.group.request.DeleteOffsetReq;
+import org.apache.tubemq.manager.controller.node.request.CloneOffsetReq;
+import org.apache.tubemq.manager.controller.topic.request.BatchAddGroupAuthReq;
+import org.apache.tubemq.manager.controller.topic.request.DeleteGroupReq;
+import org.apache.tubemq.manager.controller.topic.request.RebalanceConsumerReq;
+import org.apache.tubemq.manager.controller.topic.request.RebalanceGroupReq;
+import org.apache.tubemq.manager.entry.NodeEntry;
+import org.apache.tubemq.manager.repository.NodeRepository;
+import org.apache.tubemq.manager.service.tube.CleanOffsetResult;
+import org.apache.tubemq.manager.service.tube.RebalanceGroupResult;
+import org.apache.tubemq.manager.service.tube.TubeHttpGroupDetailInfo;
+import org.apache.tubemq.manager.service.tube.TubeHttpTopicInfoList;
+import org.apache.tubemq.manager.service.tube.TubeHttpTopicInfoList.TopicInfoList.TopicInfo;
+import org.apache.tubemq.manager.utils.ConvertUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+/**
+ * node service to query broker/master/standby status of tube cluster.
+ */
+@Slf4j
+@Component
+public class TopicService {
+
+    private final CloseableHttpClient httpclient = HttpClients.createDefault();
+    private final Gson gson = new Gson();
+
+    @Value("${manager.broker.webPort:8081}")
+    private int brokerWebPort;
+
+    @Autowired
+    private MasterService masterService;
+
+    private TubeHttpGroupDetailInfo requestGroupRunInfo(NodeEntry nodeEntry, String group) {
+        String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort()
+            + QUERY_GROUP_DETAIL_INFO + "&consumeGroup=" + group;
+        HttpGet httpget = new HttpGet(url);
+        try (CloseableHttpResponse response = httpclient.execute(httpget)) {
+            TubeHttpGroupDetailInfo groupDetailInfo =
+                gson.fromJson(new InputStreamReader(response.getEntity().getContent()),
+                    TubeHttpGroupDetailInfo.class);
+            if (groupDetailInfo.getErrCode() == 0) {
+                return groupDetailInfo;
+            }
+        } catch (Exception ex) {
+            log.error("exception caught while requesting group status", ex);
+        }
+        return null;
+    }
+
+
+    public TubeMQResult cloneOffsetToOtherGroups(CloneOffsetReq req) {
+
+        NodeEntry master = masterService.getMasterNode(req);
+        if (master == null) {
+            return TubeMQResult.getErrorResult("no such cluster");
+        }
+        // 1. query the corresponding brokers having given topic
+        TubeHttpTopicInfoList topicInfoList = requestTopicConfigInfo(master, req.getTopicName());
+        TubeMQResult result = new TubeMQResult();
+
+        if (topicInfoList == null) {
+            return result;
+        }
+
+        List<TopicInfo> topicInfos = topicInfoList.getTopicInfo();
+        // 2. for each broker, request to clone offset
+        for (TopicInfo topicInfo : topicInfos) {
+            String brokerIp = topicInfo.getBrokerIp();
+            String url = SCHEMA + brokerIp + ":" + brokerWebPort
+                + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
+            result = requestMaster(url);
+            if (result.getErrCode() != SUCCESS_CODE) {
+                return result;
+            }
+        }
+
+        return result;
+    }
+
+
+    public TubeHttpTopicInfoList requestTopicConfigInfo(NodeEntry nodeEntry, String topic) {
+        String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort()
+            + TOPIC_CONFIG_INFO + "&topicName=" + topic;
+        HttpGet httpget = new HttpGet(url);
+        try (CloseableHttpResponse response = httpclient.execute(httpget)) {
+            TubeHttpTopicInfoList topicInfoList =
+                gson.fromJson(new InputStreamReader(response.getEntity().getContent()),
+                    TubeHttpTopicInfoList.class);
+            if (topicInfoList.getErrCode() == SUCCESS_CODE) {
+                return topicInfoList;
+            }
+        } catch (Exception ex) {
+            log.error("exception caught while requesting broker status", ex);
+        }
+        return null;
+    }
+
+
+    public TubeMQResult rebalanceGroup(RebalanceGroupReq req) {
+
+        NodeEntry master = masterService.getMasterNode(req);
+        if (master == null) {
+            return TubeMQResult.getErrorResult("no such cluster");
+        }
+
+        // 1. get all consumer ids in group
+        List<String> consumerIds = Objects
+            .requireNonNull(requestGroupRunInfo(master, req.getGroupName())).getConsumerIds();
+        RebalanceGroupResult rebalanceGroupResult = new RebalanceGroupResult();
+
+        // 2. rebalance consumers in group
+        consumerIds.forEach(consumerId -> {
+            RebalanceConsumerReq rebalanceConsumerReq = convertToRebalanceConsumerReq(req,
+                consumerId);
+            String url = SCHEMA + master.getIp() + ":" + master.getWebPort()
+                + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(rebalanceConsumerReq);
+            TubeMQResult result = requestMaster(url);
+            if (result.getErrCode() != 0) {
+                rebalanceGroupResult.getFailConsumers().add(consumerId);
+            }
+            rebalanceGroupResult.getSuccessConsumers().add(consumerId);
+        });
+
+        TubeMQResult tubeResult = new TubeMQResult();
+        tubeResult.setData(gson.toJson(rebalanceGroupResult));
+
+        return tubeResult;
+    }
+
+
+    public TubeMQResult deleteOffset(DeleteOffsetReq req) {
+
+        NodeEntry master = masterService.getMasterNode(req);
+        if (master == null) {
+            return TubeMQResult.getErrorResult("no such cluster");
+        }
+
+        // 1. query the corresponding brokers having given topic
+        TubeHttpTopicInfoList topicInfoList = requestTopicConfigInfo(master, req.getTopicName());
+        TubeMQResult result = new TubeMQResult();
+        CleanOffsetResult cleanOffsetResult = new CleanOffsetResult();
+        if (topicInfoList == null) {
+            return TubeMQResult.getErrorResult("no such topic");
+        }
+
+        List<TopicInfo> topicInfos = topicInfoList.getTopicInfo();
+        // 2. for each broker, request to delete offset
+        for (TopicInfo topicInfo : topicInfos) {
+            String brokerIp = topicInfo.getBrokerIp();
+            String url = SCHEMA + brokerIp + ":" + brokerWebPort
+                + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
+            result = requestMaster(url);
+            if (result.getErrCode() != SUCCESS_CODE) {
+                cleanOffsetResult.getFailBrokers().add(brokerIp);
+            } else {
+                cleanOffsetResult.getSuccessBrokers().add(brokerIp);
+            }
+        }
+
+        result.setData(gson.toJson(cleanOffsetResult));
+
+        return result;
+    }
+
+}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TubeMQHttpConst.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TubeMQHttpConst.java
index 0fa83d6..04cf17b 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TubeMQHttpConst.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TubeMQHttpConst.java
@@ -23,6 +23,8 @@ public class TubeMQHttpConst {
             "/webapi.htm?type=op_query&method=admin_query_broker_run_status";
     public static final String TOPIC_CONFIG_INFO =
             "/webapi.htm?type=op_query&method=admin_query_topic_info";
+    public static final String QUERY_GROUP_DETAIL_INFO =
+        "/webapi.htm?type=op_query&method=admin_query_consume_group_detail";
     public static final String ADD_TUBE_TOPIC =
             "/webapi.htm?type=op_modify&method=admin_add_new_topic_record";
     public static final String RELOAD_BROKER =
@@ -33,4 +35,22 @@ public class TubeMQHttpConst {
     public static final String BATCH_ADD_BROKER = "admin_bath_add_broker_configure";
     public static final String WEB_API = "webapi";
     public static final String BATCH_ADD_TOPIC = "admin_add_new_topic_record";
+    public static final String REBALANCE_GROUP = "admin_rebalance_group_allocate";
+    public static final String AUTH_CONTROL = "authControl";
+    public static final String MODIFY = "modify";
+    public static final String DELETE = "delete";
+    public static final String REMOVE = "remove";
+    public static final String NO_SUCH_METHOD = "no such method";
+    public static final String ADMIN_QUERY_CLUSTER_INFO = "admin_query_cluster_info";
+    public static final String CLONE = "clone";
+    public static final String ADD = "add";
+    public static final String ONLINE = "online";
+    public static final String RELOAD = "reload";
+    public static final String SET_READ_OR_WRITE = "setReadOrWrite";
+    public static final String OFFLINE = "offline";
+    public static final String REBALANCE_CONSUMER_GROUP = "rebalanceGroup";
+    public static final String REBALANCE_CONSUMER = "rebalanceConsumer";
+    public static final String NO_SUCH_CLUSTER = "no such cluster";
+    public static final Integer SUCCESS_CODE = 0;
+
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/CleanOffsetResult.java
similarity index 77%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/CleanOffsetResult.java
index 0201643..9fe4e04 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/CleanOffsetResult.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,14 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.tubemq.manager.controller.node.request;
 
+package org.apache.tubemq.manager.service.tube;
+
+import com.google.common.collect.Lists;
 import java.util.List;
 import lombok.Data;
 
 @Data
-public class BatchAddTopicReq {
-    List<AddTopicReq> addTopicReqs;
-    List<Integer> brokerIds;
-    Integer clusterId;
+public class CleanOffsetResult {
+    private List<String> failBrokers = Lists.newArrayList();
+
+    private List<String> successBrokers = Lists.newArrayList();
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/RebalanceGroupResult.java
similarity index 77%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/RebalanceGroupResult.java
index 0201643..5db9ee3 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/RebalanceGroupResult.java
@@ -15,14 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.tubemq.manager.controller.node.request;
 
+package org.apache.tubemq.manager.service.tube;
+
+import com.google.common.collect.Lists;
 import java.util.List;
 import lombok.Data;
 
 @Data
-public class BatchAddTopicReq {
-    List<AddTopicReq> addTopicReqs;
-    List<Integer> brokerIds;
-    Integer clusterId;
+public class RebalanceGroupResult {
+    public List<String> failConsumers = Lists.newArrayList();
+
+    public List<String> successConsumers = Lists.newArrayList();
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpGroupDetailInfo.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpGroupDetailInfo.java
new file mode 100644
index 0000000..6ff4568
--- /dev/null
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpGroupDetailInfo.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.manager.service.tube;
+
+import com.google.common.collect.Lists;
+import java.util.List;
+import java.util.Map;
+import lombok.Data;
+import org.apache.tubemq.manager.entry.NodeEntry;
+import org.apache.tubemq.manager.service.tube.TubeHttpClusterInfoList.ClusterData;
+
+@Data
+public class TubeHttpGroupDetailInfo {
+
+    private String errMsg;
+
+    private int errCode;
+
+    private int count;
+
+    private List<String> topicSet;
+
+    private String consumeGroup;
+
+    private List<ConsumerInfo> data;
+
+    @Data
+    public static class ConsumerInfo {
+
+        private String consumerId;
+
+        private Integer parCount;
+
+        private List<PartitionInfo> parInfo;
+
+        @Data
+        public static class PartitionInfo {
+            private String brokerAddr;
+            private String topic;
+            private Integer partId;
+        }
+
+    }
+
+     public List<String> getConsumerIds() {
+        List<String> consumerIds = Lists.newArrayList();
+        data.forEach(consumerInfo -> {
+            consumerIds.add(consumerInfo.getConsumerId());
+        });
+        return consumerIds;
+    }
+
+
+}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java
index f59bb73..58fdd15 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java
@@ -18,20 +18,27 @@
 package org.apache.tubemq.manager.utils;
 
 import com.google.gson.Gson;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 
 import java.lang.reflect.Field;
 import java.net.URLEncoder;
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.tubemq.manager.controller.topic.request.RebalanceConsumerReq;
+import org.apache.tubemq.manager.controller.topic.request.RebalanceGroupReq;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.OP_MODIFY;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.OP_QUERY;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.REBALANCE_GROUP;
 
+@Slf4j
 public class ConvertUtils {
 
     public static Gson gson = new Gson();
 
-    public static String convertReqToQueryStr(Object req) throws Exception {
+    public static String convertReqToQueryStr(Object req) {
         List<String> queryList = new ArrayList<>();
         Class<?> clz = req.getClass();
         List fieldsList = new ArrayList<Field[]>();
@@ -42,26 +49,44 @@ public class ConvertUtils {
             clz = clz.getSuperclass();
         }
 
-        for (Object fields:fieldsList) {
-            Field[] f = (Field[]) fields;
-            for (Field field : f) {
-                field.setAccessible(true);
-                Object o = field.get(req);
-                String value;
-                // convert list to json string
-                if (o == null) {
-                    continue;
+        try {
+            for (Object fields:fieldsList) {
+                Field[] f = (Field[]) fields;
+                for (Field field : f) {
+                    field.setAccessible(true);
+                    Object o = field.get(req);
+                    String value;
+                    // convert list to json string
+                    if (o == null) {
+                        continue;
+                    }
+                    if (o instanceof List) {
+                        value = gson.toJson(o);
+                    } else {
+                        value = o.toString();
+                    }
+                    queryList.add(field.getName() + "=" + URLEncoder.encode(
+                        value, UTF_8.toString()));
                 }
-                if (o instanceof List) {
-                    value = gson.toJson(o);
-                } else {
-                    value = o.toString();
-                }
-                queryList.add(field.getName() + "=" + URLEncoder.encode(
-                    value, UTF_8.toString()));
             }
+        } catch (Exception e) {
+            log.error("exception occurred while parsing object {}", gson.toJson(req), e);
+            return StringUtils.EMPTY;
         }
 
         return StringUtils.join(queryList, "&");
     }
+
+
+    public static RebalanceConsumerReq convertToRebalanceConsumerReq(RebalanceGroupReq req, String consumerId) {
+        RebalanceConsumerReq consumerReq = new RebalanceConsumerReq();
+        consumerReq.setConsumerId(consumerId);
+        consumerReq.setConfModAuthToken(req.getConfModAuthToken());
+        consumerReq.setGroupName(req.getGroupName());
+        consumerReq.setModifyUser(req.getModifyUser());
+        consumerReq.setReJoinWait(req.getReJoinWait());
+        consumerReq.setType(OP_MODIFY);
+        consumerReq.setMethod(REBALANCE_GROUP);
+        return consumerReq;
+    }
 }
diff --git a/tubemq-manager/src/main/resources/application.properties b/tubemq-manager/src/main/resources/application.properties
index 9afbdab..dee51b7 100644
--- a/tubemq-manager/src/main/resources/application.properties
+++ b/tubemq-manager/src/main/resources/application.properties
@@ -14,4 +14,4 @@
 # limitations under the License.
 
 spring.jpa.hibernate.ddl-auto=update
-# configuration for manager
\ No newline at end of file
+# configuration for manager