You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by yu...@apache.org on 2021/01/22 11:07:13 UTC

[incubator-tubemq] branch TUBEMQ-421 updated: [TUBEMQ-530] create a cluster in manager (#405)

This is an automated email from the ASF dual-hosted git repository.

yuanbo pushed a commit to branch TUBEMQ-421
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/TUBEMQ-421 by this push:
     new b8b323d  [TUBEMQ-530] create a cluster in manager (#405)
b8b323d is described below

commit b8b323d4ca7cbbc7529a6d5743ac55bcff1d9246
Author: EMsnap <zp...@connect.ust.hk>
AuthorDate: Fri Jan 22 19:07:02 2021 +0800

    [TUBEMQ-530] create a cluster in manager (#405)
    
    * [TUBEMQ-530] create a cluster in manager
    
    * [TUBEMQ-530] create a cluster in manager
    
    * [TUBEMQ-530] 1. delete broker info since nodeEntry only stores master node
    2. add version into properties in pom.xml
    3. make cluster name unique
---
 tubemq-manager/pom.xml                             |  13 ++-
 .../controller/cluster/ClusterController.java      |  44 ++++++-
 .../cluster/request/AddClusterReq.java}            |  23 ++--
 .../manager/controller/group/GroupController.java  |   7 +-
 .../manager/controller/node/NodeController.java    |  27 +----
 .../controller/topic/TopicWebController.java       |   9 +-
 .../entry/{NodeEntry.java => ClusterEntry.java}    |  30 +++--
 .../org/apache/tubemq/manager/entry/NodeEntry.java |   4 +-
 .../apache/tubemq/manager/entry/TopicEntry.java    |   2 +-
 ...{NodeRepository.java => ClusterRepository.java} |  19 ++--
 .../tubemq/manager/repository/NodeRepository.java  |  14 +++
 .../tubemq/manager/service/ClusterServiceImpl.java |  74 ++++++++++++
 .../{MasterService.java => MasterServiceImpl.java} |  38 +++----
 .../tubemq/manager/service/NodeServiceImpl.java    |  45 +++-----
 .../tubemq/manager/service/TopicBackendWorker.java |   1 +
 .../tubemq/manager/service/TopicServiceImpl.java   |  14 +--
 .../interfaces/ClusterService.java}                |  23 ++--
 .../manager/service/interfaces/MasterService.java  |  73 ++++++++++++
 .../service/{ => interfaces}/NodeService.java      |  17 +--
 .../service/{ => interfaces}/TopicService.java     |   2 +-
 .../service/tube/TubeHttpBrokerInfoList.java       |   2 +-
 .../service/tube/TubeHttpClusterInfoList.java      |  97 ----------------
 .../service/tube/TubeHttpGroupDetailInfo.java      |   3 -
 .../apache/tubemq/manager/utils/ConvertUtils.java  |  14 +++
 .../manager/controller/TestClusterController.java  |  51 ++++++++-
 .../manager/controller/TestNodeController.java     | 126 ---------------------
 26 files changed, 383 insertions(+), 389 deletions(-)

diff --git a/tubemq-manager/pom.xml b/tubemq-manager/pom.xml
index eaf220a..b5bf2d7 100644
--- a/tubemq-manager/pom.xml
+++ b/tubemq-manager/pom.xml
@@ -28,6 +28,11 @@
 
     <name>Apache TubeMQ - Manager</name>
 
+    <properties>
+        <guava.version>21.0</guava.version>
+        <commons.version>4.3</commons.version>
+    </properties>
+
     <dependencies>
         <dependency>
             <groupId>org.springframework.boot</groupId>
@@ -51,7 +56,13 @@
         <dependency>
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-collections4</artifactId>
-            <version>4.3</version>
+            <version>${commons.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>${guava.version}</version>
         </dependency>
 
         <dependency>
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/ClusterController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/ClusterController.java
index 6b2cf9f..d2fcc7a 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/ClusterController.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/ClusterController.java
@@ -17,17 +17,21 @@
 
 package org.apache.tubemq.manager.controller.cluster;
 
+import static org.apache.tubemq.manager.service.MasterServiceImpl.TUBE_REQUEST_PATH;
 import static org.apache.tubemq.manager.service.TubeMQHttpConst.SCHEMA;
-import static org.apache.tubemq.manager.service.MasterService.*;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.SUCCESS_CODE;
+import static org.apache.tubemq.manager.utils.ConvertUtils.covertMapToQueryString;
 
 import com.google.gson.Gson;
 import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.tubemq.manager.controller.TubeMQResult;
+import org.apache.tubemq.manager.controller.cluster.request.AddClusterReq;
 import org.apache.tubemq.manager.entry.NodeEntry;
 import org.apache.tubemq.manager.repository.NodeRepository;
-import org.apache.tubemq.manager.service.MasterService;
+import org.apache.tubemq.manager.service.interfaces.ClusterService;
+import org.apache.tubemq.manager.service.interfaces.MasterService;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.MediaType;
 import org.springframework.web.bind.annotation.RequestBody;
@@ -48,9 +52,39 @@ public class ClusterController {
     private NodeRepository nodeRepository;
 
     @Autowired
+    private ClusterService clusterService;
+
+    @Autowired
     private MasterService masterService;
 
     /**
+     * add a new cluster, should provide a master node
+     */
+    @RequestMapping(value = "", method = RequestMethod.POST,
+        produces = MediaType.APPLICATION_JSON_VALUE)
+    public @ResponseBody TubeMQResult addNewCluster(
+        @RequestBody AddClusterReq req) {
+
+        // 1. validate params
+        if (req.getMasterIp() == null || req.getMasterWebPort() == null) {
+            return TubeMQResult.getErrorResult("please input master ip and webPort");
+        }
+        TubeMQResult checkResult = masterService.checkMasterNodeStatus(req.getMasterIp(), req.getMasterWebPort());
+        if (checkResult.getErrCode() != SUCCESS_CODE) {
+            return TubeMQResult.getErrorResult("please check master ip and webPort");
+        }
+
+        // 2. add cluster and master node
+        Boolean addSuccess = clusterService.addClusterAndMasterNode(req);
+
+        if (!addSuccess) {
+            return TubeMQResult.getErrorResult("add cluster and master fail");
+        }
+
+        return new TubeMQResult();
+    }
+
+    /**
      * query cluster info
      */
     @RequestMapping(value = "/query", method = RequestMethod.GET,
@@ -58,7 +92,7 @@ public class ClusterController {
     public @ResponseBody String queryInfo(
             @RequestParam Map<String, String> queryBody) throws Exception {
         String url = masterService.getQueryUrl(queryBody);
-        return queryMaster(url);
+        return masterService.queryMaster(url);
     }
 
     /**
@@ -78,7 +112,7 @@ public class ClusterController {
                     clusterId);
             String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort()
                     + "/" + TUBE_REQUEST_PATH + "?" + covertMapToQueryString(requestBody);
-            return gson.toJson(requestMaster(url));
+            return gson.toJson(masterService.requestMaster(url));
         } else {
             TubeMQResult result = new TubeMQResult();
             result.setErrCode(-1);
@@ -89,4 +123,6 @@ public class ClusterController {
     }
 
 
+
+
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/NodeRepository.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/request/AddClusterReq.java
similarity index 62%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/NodeRepository.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/request/AddClusterReq.java
index 7a0cbb0..0ca052a 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/NodeRepository.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/request/AddClusterReq.java
@@ -15,20 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.tubemq.manager.repository;
+package org.apache.tubemq.manager.controller.cluster.request;
 
-import org.apache.tubemq.manager.entry.NodeEntry;
-import org.springframework.data.jpa.repository.JpaRepository;
-import org.springframework.stereotype.Repository;
+import lombok.Data;
 
-import java.util.List;
-
-@Repository
-public interface NodeRepository extends JpaRepository<NodeEntry, Long> {
-
-    NodeEntry findNodeEntryByClusterIdIsAndMasterIsTrue(int clusterId);
-
-    List<NodeEntry> findNodeEntriesByClusterIdIs(int clusterId);
-
-    List<NodeEntry> findAll();
+@Data
+public class AddClusterReq {
+    private String masterIp;
+    private String clusterName;
+    private Integer masterPort;
+    private Integer masterWebPort;
+    private String createUser;
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/GroupController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/GroupController.java
index 3f56342..7c588ee 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/GroupController.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/GroupController.java
@@ -22,7 +22,6 @@ package org.apache.tubemq.manager.controller.group;
 import static org.apache.tubemq.manager.service.TubeMQHttpConst.ADD;
 import static org.apache.tubemq.manager.service.TubeMQHttpConst.CLONE;
 import static org.apache.tubemq.manager.service.TubeMQHttpConst.DELETE;
-import static org.apache.tubemq.manager.service.MasterService.queryMaster;
 import static org.apache.tubemq.manager.service.TubeMQHttpConst.QUERY;
 import static org.apache.tubemq.manager.service.TubeMQHttpConst.REBALANCE_CONSUMER_GROUP;
 import static org.apache.tubemq.manager.service.TubeMQHttpConst.REBALANCE_CONSUMER;
@@ -40,8 +39,8 @@ import org.apache.tubemq.manager.controller.topic.request.BatchAddGroupAuthReq;
 import org.apache.tubemq.manager.controller.topic.request.DeleteGroupReq;
 import org.apache.tubemq.manager.controller.topic.request.RebalanceConsumerReq;
 import org.apache.tubemq.manager.controller.topic.request.RebalanceGroupReq;
+import org.apache.tubemq.manager.service.interfaces.MasterService;
 import org.apache.tubemq.manager.service.TopicServiceImpl;
-import org.apache.tubemq.manager.service.MasterService;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.PostMapping;
@@ -93,7 +92,7 @@ public class GroupController {
     public @ResponseBody String queryConsumer(
         @RequestParam Map<String, String> req) throws Exception {
         String url = masterService.getQueryUrl(req);
-        return queryMaster(url);
+        return masterService.queryMaster(url);
     }
 
 
@@ -137,7 +136,7 @@ public class GroupController {
     public @ResponseBody String queryBlackGroup(
         @RequestParam Map<String, String> req) throws Exception {
         String url = masterService.getQueryUrl(req);
-        return queryMaster(url);
+        return masterService.queryMaster(url);
     }
 
 
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java
index 26411fc..f26da51 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java
@@ -27,8 +27,8 @@ import org.apache.tubemq.manager.controller.node.request.DeleteBrokerReq;
 import org.apache.tubemq.manager.controller.node.request.OnlineOfflineBrokerReq;
 import org.apache.tubemq.manager.controller.node.request.ReloadBrokerReq;
 import org.apache.tubemq.manager.repository.NodeRepository;
-import org.apache.tubemq.manager.service.NodeService;
-import org.apache.tubemq.manager.service.MasterService;
+import org.apache.tubemq.manager.service.interfaces.MasterService;
+import org.apache.tubemq.manager.service.interfaces.NodeService;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.MediaType;
 import org.springframework.web.bind.annotation.*;
@@ -46,7 +46,6 @@ import static org.apache.tubemq.manager.service.TubeMQHttpConst.ONLINE;
 import static org.apache.tubemq.manager.service.TubeMQHttpConst.OP_QUERY;
 import static org.apache.tubemq.manager.service.TubeMQHttpConst.RELOAD;
 import static org.apache.tubemq.manager.service.TubeMQHttpConst.SET_READ_OR_WRITE;
-import static org.apache.tubemq.manager.service.MasterService.*;
 
 @RestController
 @RequestMapping(path = "/v1/node")
@@ -66,24 +65,6 @@ public class NodeController {
     MasterService masterService;
 
     /**
-     * query brokers in certain cluster
-     * @param type
-     * @param method
-     * @param clusterId
-     * @return
-     */
-    @RequestMapping(value = "/query/clusterInfo", method = RequestMethod.GET,
-            produces = MediaType.APPLICATION_JSON_VALUE)
-    public @ResponseBody String queryInfo(@RequestParam String type, @RequestParam String method,
-            @RequestParam(required = false) Integer clusterId) {
-        if (method.equals(ADMIN_QUERY_CLUSTER_INFO) && type.equals(OP_QUERY)) {
-            return nodeService.queryClusterInfo(clusterId);
-        }
-        return gson.toJson(getErrorResult(NO_SUCH_METHOD));
-    }
-
-
-    /**
      * query brokers' run status
      * this method supports batch operation
      */
@@ -92,7 +73,7 @@ public class NodeController {
     public @ResponseBody String queryBrokerDetail(
             @RequestParam Map<String, String> queryBody) throws Exception {
         String url = masterService.getQueryUrl(queryBody);
-        return queryMaster(url);
+        return masterService.queryMaster(url);
     }
 
 
@@ -105,7 +86,7 @@ public class NodeController {
     public @ResponseBody String queryBrokerConfig(
             @RequestParam Map<String, String> queryBody) throws Exception {
         String url = masterService.getQueryUrl(queryBody);
-        return queryMaster(url);
+        return masterService.queryMaster(url);
     }
 
 
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java
index b1f6341..7890846 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java
@@ -24,7 +24,6 @@ import static org.apache.tubemq.manager.service.TubeMQHttpConst.CLONE;
 import static org.apache.tubemq.manager.service.TubeMQHttpConst.DELETE;
 import static org.apache.tubemq.manager.service.TubeMQHttpConst.MODIFY;
 import static org.apache.tubemq.manager.service.TubeMQHttpConst.REMOVE;
-import static org.apache.tubemq.manager.service.MasterService.queryMaster;
 
 
 import com.google.gson.Gson;
@@ -36,8 +35,8 @@ import org.apache.tubemq.manager.controller.node.request.CloneTopicReq;
 import org.apache.tubemq.manager.controller.topic.request.DeleteTopicReq;
 import org.apache.tubemq.manager.controller.topic.request.ModifyTopicReq;
 import org.apache.tubemq.manager.controller.topic.request.SetAuthControlReq;
-import org.apache.tubemq.manager.service.NodeService;
-import org.apache.tubemq.manager.service.MasterService;
+import org.apache.tubemq.manager.service.interfaces.MasterService;
+import org.apache.tubemq.manager.service.interfaces.NodeService;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.RequestBody;
@@ -93,7 +92,7 @@ public class TopicWebController {
     public @ResponseBody String queryConsumerAuth(
         @RequestParam Map<String, String> req) throws Exception {
         String url = masterService.getQueryUrl(req);
-        return queryMaster(url);
+        return masterService.queryMaster(url);
     }
 
     /**
@@ -106,7 +105,7 @@ public class TopicWebController {
     public @ResponseBody String queryTopicConfig(
         @RequestParam Map<String, String> req) throws Exception {
         String url = masterService.getQueryUrl(req);
-        return queryMaster(url);
+        return masterService.queryMaster(url);
     }
 
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/NodeEntry.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/ClusterEntry.java
similarity index 74%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/NodeEntry.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/ClusterEntry.java
index fb19232..0464ed7 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/NodeEntry.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/ClusterEntry.java
@@ -17,37 +17,33 @@
 
 package org.apache.tubemq.manager.entry;
 
+
+import java.util.Date;
 import javax.persistence.Entity;
 import javax.persistence.GeneratedValue;
 import javax.persistence.GenerationType;
 import javax.persistence.Id;
 import javax.persistence.Table;
+import javax.persistence.UniqueConstraint;
 import lombok.Data;
 
 /**
- * node machine for tube cluster. broker/master/standby
+ * cluster machine for tube cluster. broker/master/standby
  */
 @Entity
-@Table(name = "node")
+@Table(name = "cluster", uniqueConstraints=
+    @UniqueConstraint(columnNames={"clusterName"}))
 @Data
-public class NodeEntry {
+public class ClusterEntry {
     @Id
-    @GeneratedValue(strategy= GenerationType.AUTO)
-    private long brokerId;
-
-    private boolean master;
-
-    private boolean standby;
-
-    private boolean broker;
-
-    private String ip;
+    @GeneratedValue(strategy= GenerationType.IDENTITY)
+    private int clusterId;
 
-    private int port;
+    private String clusterName;
 
-    private int webPort;
+    private Date createTime;
 
-    private int clusterId;
+    private Date modifyTime;
 
-    private String clusterName;
+    private String createUser;
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/NodeEntry.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/NodeEntry.java
index fb19232..9be4493 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/NodeEntry.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/NodeEntry.java
@@ -32,8 +32,8 @@ import lombok.Data;
 @Data
 public class NodeEntry {
     @Id
-    @GeneratedValue(strategy= GenerationType.AUTO)
-    private long brokerId;
+    @GeneratedValue(strategy= GenerationType.IDENTITY)
+    private long id;
 
     private boolean master;
 
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/TopicEntry.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/TopicEntry.java
index 17b7711..767fbe9 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/TopicEntry.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/TopicEntry.java
@@ -37,7 +37,7 @@ import org.springframework.data.jpa.domain.support.AuditingEntityListener;
 @EntityListeners(AuditingEntityListener.class) // support CreationTimestamp annotation
 public class TopicEntry {
     @Id
-    @GeneratedValue(strategy=GenerationType.AUTO)
+    @GeneratedValue(strategy= GenerationType.IDENTITY)
     private long businessId;
 
     @Size(max = 30)
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/NodeRepository.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/ClusterRepository.java
similarity index 76%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/NodeRepository.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/ClusterRepository.java
index 7a0cbb0..6959052 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/NodeRepository.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/ClusterRepository.java
@@ -17,18 +17,17 @@
 
 package org.apache.tubemq.manager.repository;
 
+import java.util.List;
+import org.apache.tubemq.manager.entry.ClusterEntry;
 import org.apache.tubemq.manager.entry.NodeEntry;
 import org.springframework.data.jpa.repository.JpaRepository;
-import org.springframework.stereotype.Repository;
-
-import java.util.List;
-
-@Repository
-public interface NodeRepository extends JpaRepository<NodeEntry, Long> {
-
-    NodeEntry findNodeEntryByClusterIdIsAndMasterIsTrue(int clusterId);
 
-    List<NodeEntry> findNodeEntriesByClusterIdIs(int clusterId);
+public interface ClusterRepository extends JpaRepository<ClusterEntry, Long> {
 
-    List<NodeEntry> findAll();
+    /**
+     * find clusterEntry by clusterId
+     * @param clusterId
+     * @return
+     */
+    ClusterEntry findClusterEntryByClusterId(Integer clusterId);
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/NodeRepository.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/NodeRepository.java
index 7a0cbb0..c8b2eb9 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/NodeRepository.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/NodeRepository.java
@@ -26,9 +26,23 @@ import java.util.List;
 @Repository
 public interface NodeRepository extends JpaRepository<NodeEntry, Long> {
 
+    /**
+     * find master By clusterId
+     * @param clusterId
+     * @return
+     */
     NodeEntry findNodeEntryByClusterIdIsAndMasterIsTrue(int clusterId);
 
+    /**
+     * find all nodes in cluster
+     * @param clusterId
+     * @return
+     */
     List<NodeEntry> findNodeEntriesByClusterIdIs(int clusterId);
 
+    /**
+     * find all nodes
+     * @return
+     */
     List<NodeEntry> findAll();
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/ClusterServiceImpl.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/ClusterServiceImpl.java
new file mode 100644
index 0000000..a16aea2
--- /dev/null
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/ClusterServiceImpl.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.tubemq.manager.service;
+
+import java.util.Date;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.tubemq.manager.controller.cluster.request.AddClusterReq;
+import org.apache.tubemq.manager.entry.ClusterEntry;
+import org.apache.tubemq.manager.entry.NodeEntry;
+import org.apache.tubemq.manager.repository.ClusterRepository;
+import org.apache.tubemq.manager.service.interfaces.ClusterService;
+import org.apache.tubemq.manager.service.interfaces.NodeService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+
+@Component
+@Slf4j
+public class ClusterServiceImpl implements ClusterService {
+
+    @Autowired
+    ClusterRepository clusterRepository;
+
+    @Autowired
+    NodeService nodeService;
+
+    @Override
+    public Boolean addClusterAndMasterNode(AddClusterReq req) {
+        ClusterEntry entry = new ClusterEntry();
+        entry.setCreateTime(new Date());
+        entry.setCreateUser(req.getCreateUser());
+        entry.setClusterName(req.getClusterName());
+        ClusterEntry retEntry = null;
+        try {
+            retEntry = clusterRepository.saveAndFlush(entry);
+        } catch (Exception e) {
+            log.error("create cluster fail with exception", e);
+            return false;
+        }
+        // add master node
+        return addMasterNode(req, retEntry);
+    }
+
+    private boolean addMasterNode(AddClusterReq req, ClusterEntry clusterEntry) {
+        if (clusterEntry == null) {
+            return false;
+        }
+        NodeEntry nodeEntry = new NodeEntry();
+        nodeEntry.setPort(req.getMasterPort());
+        nodeEntry.setMaster(true);
+        nodeEntry.setClusterId(clusterEntry.getClusterId());
+        nodeEntry.setWebPort(req.getMasterWebPort());
+        nodeEntry.setIp(req.getMasterIp());
+        nodeEntry.setBroker(false);
+        return nodeService.addNode(nodeEntry);
+    }
+
+}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/MasterService.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/MasterServiceImpl.java
similarity index 85%
rename from tubemq-manager/src/main/java/org/apache/tubemq/manager/service/MasterService.java
rename to tubemq-manager/src/main/java/org/apache/tubemq/manager/service/MasterServiceImpl.java
index 8f726e9..71758ed 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/MasterService.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/MasterServiceImpl.java
@@ -20,38 +20,34 @@ package org.apache.tubemq.manager.service;
 
 import com.google.gson.Gson;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
 import org.apache.http.util.EntityUtils;
 import org.apache.tubemq.manager.controller.TubeMQResult;
-import org.apache.tubemq.manager.controller.node.request.AddTopicReq;
 import org.apache.tubemq.manager.controller.node.request.BaseReq;
 import org.apache.tubemq.manager.entry.NodeEntry;
 import org.apache.tubemq.manager.repository.NodeRepository;
+import org.apache.tubemq.manager.service.interfaces.MasterService;
 import org.apache.tubemq.manager.service.tube.TubeHttpResponse;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
-import org.springframework.web.bind.annotation.RequestBody;
 
 import java.io.InputStreamReader;
-import java.net.URLEncoder;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.tubemq.manager.controller.TubeMQResult.getErrorResult;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.BROKER_RUN_STATUS;
 import static org.apache.tubemq.manager.service.TubeMQHttpConst.SCHEMA;
 import static org.apache.tubemq.manager.service.TubeMQHttpConst.SUCCESS_CODE;
 import static org.apache.tubemq.manager.utils.ConvertUtils.convertReqToQueryStr;
+import static org.apache.tubemq.manager.utils.ConvertUtils.covertMapToQueryString;
 
 
 @Slf4j
 @Component
-public class MasterService {
+public class MasterServiceImpl implements MasterService {
 
     private static CloseableHttpClient httpclient = HttpClients.createDefault();
     private static Gson gson = new Gson();
@@ -60,19 +56,8 @@ public class MasterService {
     @Autowired
     NodeRepository nodeRepository;
 
-    public static String covertMapToQueryString(Map<String, String> requestMap) throws Exception {
-        List<String> queryList = new ArrayList<>();
-
-        for (Map.Entry<String, String> entry : requestMap.entrySet()) {
-            queryList.add(entry.getKey() + "=" + URLEncoder.encode(
-                    entry.getValue(), UTF_8.toString()));
-        }
-        return StringUtils.join(queryList, "&");
-    }
-
-
-
-    public static TubeMQResult requestMaster(String url) {
+    @Override
+    public TubeMQResult requestMaster(String url) {
 
         log.info("start to request {}", url);
         HttpGet httpGet = new HttpGet(url);
@@ -99,7 +84,8 @@ public class MasterService {
      * @param url
      * @return query info
      */
-    public static String queryMaster(String url) {
+    @Override
+    public String queryMaster(String url) {
         log.info("start to request {}", url);
         HttpGet httpGet = new HttpGet(url);
         TubeMQResult defaultResult = new TubeMQResult();
@@ -116,6 +102,7 @@ public class MasterService {
     }
 
 
+    @Override
     public TubeMQResult baseRequestMaster(BaseReq req) {
         if (req.getClusterId() == null) {
             return TubeMQResult.getErrorResult("please input clusterId");
@@ -131,6 +118,7 @@ public class MasterService {
     }
 
 
+    @Override
     public NodeEntry getMasterNode(BaseReq req) {
         if (req.getClusterId() == null) {
             return null;
@@ -140,6 +128,7 @@ public class MasterService {
     }
 
 
+    @Override
     public String getQueryUrl(Map<String, String> queryBody) throws Exception {
         int clusterId = Integer.parseInt(queryBody.get("clusterId"));
         queryBody.remove("clusterId");
@@ -149,4 +138,9 @@ public class MasterService {
                 + "/" + TUBE_REQUEST_PATH + "?" + covertMapToQueryString(queryBody);
     }
 
+    @Override
+    public TubeMQResult checkMasterNodeStatus(String masterIp, Integer masterWebPort) {
+        String url = SCHEMA + masterIp + ":" + masterWebPort + BROKER_RUN_STATUS;
+        return requestMaster(url);
+    }
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeServiceImpl.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeServiceImpl.java
index 30b5adf..cb9536e 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeServiceImpl.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeServiceImpl.java
@@ -19,9 +19,7 @@ package org.apache.tubemq.manager.service;
 
 
 import static org.apache.tubemq.manager.controller.node.request.AddBrokersReq.getAddBrokerReq;
-import static org.apache.tubemq.manager.service.MasterService.TUBE_REQUEST_PATH;
-import static org.apache.tubemq.manager.service.MasterService.queryMaster;
-import static org.apache.tubemq.manager.service.MasterService.requestMaster;
+import static org.apache.tubemq.manager.service.MasterServiceImpl.TUBE_REQUEST_PATH;
 import static org.apache.tubemq.manager.service.TubeMQHttpConst.ADD_TUBE_TOPIC;
 import static org.apache.tubemq.manager.service.TubeMQHttpConst.BROKER_RUN_STATUS;
 import static org.apache.tubemq.manager.service.TubeMQHttpConst.NO_SUCH_CLUSTER;
@@ -53,6 +51,9 @@ import org.apache.tubemq.manager.controller.node.request.CloneTopicReq;
 import org.apache.tubemq.manager.controller.node.request.QueryBrokerCfgReq;
 import org.apache.tubemq.manager.entry.NodeEntry;
 import org.apache.tubemq.manager.repository.NodeRepository;
+import org.apache.tubemq.manager.service.interfaces.MasterService;
+import org.apache.tubemq.manager.service.interfaces.NodeService;
+import org.apache.tubemq.manager.service.interfaces.TopicService;
 import org.apache.tubemq.manager.service.tube.*;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
@@ -208,7 +209,7 @@ public class NodeServiceImpl implements NodeService {
     private BrokerStatusInfo getBrokerStatusInfo(QueryBrokerCfgReq queryReq, NodeEntry masterEntry) throws Exception {
         String url = SCHEMA + masterEntry.getIp() + ":" + masterEntry.getWebPort()
                 + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(queryReq);
-        BrokerStatusInfo brokerStatusInfo = gson.fromJson(queryMaster(url),
+        BrokerStatusInfo brokerStatusInfo = gson.fromJson(masterService.queryMaster(url),
                 BrokerStatusInfo.class);
         return brokerStatusInfo;
     }
@@ -217,7 +218,7 @@ public class NodeServiceImpl implements NodeService {
     public TubeMQResult addTopicToBrokers(AddTopicReq req, NodeEntry masterEntry) throws Exception {
         String url = SCHEMA + masterEntry.getIp() + ":" + masterEntry.getWebPort()
                 + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
-        return requestMaster(url);
+        return masterService.requestMaster(url);
     }
 
 
@@ -375,28 +376,6 @@ public class NodeServiceImpl implements NodeService {
     }
 
     @Override
-    public String queryClusterInfo(Integer clusterId) {
-        TubeHttpClusterInfoList clusterInfoList;
-        try {
-            // find all nodes by given clusterIds, show all nodes if clusterIds not provided
-            List<NodeEntry> nodeEntries = clusterId == null ?
-                    nodeRepository.findAll() : nodeRepository.findNodeEntriesByClusterIdIs(clusterId);
-            // divide all entries by clusterId
-            Map<Integer, List<NodeEntry>> nodeEntriesPerCluster =
-                    nodeEntries.parallelStream().collect(Collectors.groupingBy(NodeEntry::getClusterId));
-
-            clusterInfoList = TubeHttpClusterInfoList.getClusterInfoList(nodeEntriesPerCluster);
-        } catch (Exception e) {
-            log.error("query cluster info error", e);
-            return gson.toJson(TubeMQResult.getErrorResult(""));
-        }
-
-        return gson.toJson(clusterInfoList);
-    }
-
-
-
-    @Override
     public void close() throws IOException {
         httpclient.close();
     }
@@ -469,4 +448,16 @@ public class NodeServiceImpl implements NodeService {
         }
         return addTopicsToBrokers(masterEntry, req.getBrokerIds(), req.getAddTopicReqs());
     }
+
+
+    @Override
+    public boolean addNode(NodeEntry nodeEntry) {
+        try {
+            nodeRepository.saveAndFlush(nodeEntry);
+        } catch (Exception e) {
+            log.error("create node error with exception", e);
+            return false;
+        }
+        return true;
+    }
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicBackendWorker.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicBackendWorker.java
index 7f2f910..7341a89 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicBackendWorker.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicBackendWorker.java
@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.tubemq.manager.repository.TopicRepository;
+import org.apache.tubemq.manager.service.interfaces.NodeService;
 import org.springframework.beans.factory.DisposableBean;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicServiceImpl.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicServiceImpl.java
index e8af636..f0dba76 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicServiceImpl.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicServiceImpl.java
@@ -18,15 +18,13 @@
 package org.apache.tubemq.manager.service;
 
 
-import static org.apache.tubemq.manager.service.MasterService.queryMaster;
 import static org.apache.tubemq.manager.service.TubeMQHttpConst.QUERY_GROUP_DETAIL_INFO;
 import static org.apache.tubemq.manager.service.TubeMQHttpConst.SCHEMA;
 import static org.apache.tubemq.manager.service.TubeMQHttpConst.SUCCESS_CODE;
 import static org.apache.tubemq.manager.service.TubeMQHttpConst.TOPIC_CONFIG_INFO;
 import static org.apache.tubemq.manager.utils.ConvertUtils.convertReqToQueryStr;
 import static org.apache.tubemq.manager.utils.ConvertUtils.convertToRebalanceConsumerReq;
-import static org.apache.tubemq.manager.service.MasterService.TUBE_REQUEST_PATH;
-import static org.apache.tubemq.manager.service.MasterService.requestMaster;
+import static org.apache.tubemq.manager.service.MasterServiceImpl.TUBE_REQUEST_PATH;
 
 import com.google.gson.Gson;
 import java.io.InputStreamReader;
@@ -47,6 +45,8 @@ import org.apache.tubemq.manager.controller.node.request.CloneOffsetReq;
 import org.apache.tubemq.manager.controller.topic.request.RebalanceConsumerReq;
 import org.apache.tubemq.manager.controller.topic.request.RebalanceGroupReq;
 import org.apache.tubemq.manager.entry.NodeEntry;
+import org.apache.tubemq.manager.service.interfaces.MasterService;
+import org.apache.tubemq.manager.service.interfaces.TopicService;
 import org.apache.tubemq.manager.service.tube.CleanOffsetResult;
 import org.apache.tubemq.manager.service.tube.RebalanceGroupResult;
 import org.apache.tubemq.manager.service.tube.TubeHttpGroupDetailInfo;
@@ -112,7 +112,7 @@ public class TopicServiceImpl implements TopicService {
             String brokerIp = topicInfo.getBrokerIp();
             String url = SCHEMA + brokerIp + ":" + brokerWebPort
                 + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
-            result = requestMaster(url);
+            result = masterService.requestMaster(url);
             if (result.getErrCode() != SUCCESS_CODE) {
                 return result;
             }
@@ -160,7 +160,7 @@ public class TopicServiceImpl implements TopicService {
                 consumerId);
             String url = SCHEMA + master.getIp() + ":" + master.getWebPort()
                 + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(rebalanceConsumerReq);
-            TubeMQResult result = requestMaster(url);
+            TubeMQResult result = masterService.requestMaster(url);
             if (result.getErrCode() != 0) {
                 rebalanceGroupResult.getFailConsumers().add(consumerId);
             }
@@ -196,7 +196,7 @@ public class TopicServiceImpl implements TopicService {
             String brokerIp = topicInfo.getBrokerIp();
             String url = SCHEMA + brokerIp + ":" + brokerWebPort
                 + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
-            result = requestMaster(url);
+            result = masterService.requestMaster(url);
             if (result.getErrCode() != SUCCESS_CODE) {
                 cleanOffsetResult.getFailBrokers().add(brokerIp);
             } else {
@@ -234,7 +234,7 @@ public class TopicServiceImpl implements TopicService {
             String brokerIp = topicInfo.getBrokerIp();
             String url = SCHEMA + brokerIp + ":" + brokerWebPort
                 + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
-            OffsetQueryRes res = gson.fromJson(queryMaster(url), OffsetQueryRes.class);
+            OffsetQueryRes res = gson.fromJson(masterService.queryMaster(url), OffsetQueryRes.class);
             if (res.getErrCode() != SUCCESS_CODE) {
                 return TubeMQResult.getErrorResult("query broker id" + topicInfo.getBrokerId() + " fail");
             }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/NodeRepository.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/interfaces/ClusterService.java
similarity index 62%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/NodeRepository.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/service/interfaces/ClusterService.java
index 7a0cbb0..8bc1abd 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/NodeRepository.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/interfaces/ClusterService.java
@@ -15,20 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.tubemq.manager.repository;
+package org.apache.tubemq.manager.service.interfaces;
 
-import org.apache.tubemq.manager.entry.NodeEntry;
-import org.springframework.data.jpa.repository.JpaRepository;
-import org.springframework.stereotype.Repository;
 
-import java.util.List;
+import org.apache.tubemq.manager.controller.cluster.request.AddClusterReq;
+import org.springframework.stereotype.Component;
 
-@Repository
-public interface NodeRepository extends JpaRepository<NodeEntry, Long> {
+@Component
+public interface ClusterService {
 
-    NodeEntry findNodeEntryByClusterIdIsAndMasterIsTrue(int clusterId);
-
-    List<NodeEntry> findNodeEntriesByClusterIdIs(int clusterId);
-
-    List<NodeEntry> findAll();
+    /**
+     * add cluster and the master node in the cluster
+     * @param req
+     * @return
+     */
+    Boolean addClusterAndMasterNode(AddClusterReq req);
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/interfaces/MasterService.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/interfaces/MasterService.java
new file mode 100644
index 0000000..340e6f0
--- /dev/null
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/interfaces/MasterService.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.tubemq.manager.service.interfaces;
+
+import java.util.Map;
+import org.apache.tubemq.manager.controller.TubeMQResult;
+import org.apache.tubemq.manager.controller.node.request.BaseReq;
+import org.apache.tubemq.manager.entry.NodeEntry;
+import org.springframework.stereotype.Component;
+
+@Component
+public interface MasterService {
+
+    /**
+     * request master with request url, return action result (success or fail)
+     * @param url
+     * @return
+     */
+    TubeMQResult requestMaster(String url);
+
+    /**
+     * query master with query url, return the information returned by master
+     * @param url
+     * @return
+     */
+    String queryMaster(String url);
+
+    /**
+     * request master with baseReq, return action result (success or fail)
+     * @param req
+     * @return
+     */
+    TubeMQResult baseRequestMaster(BaseReq req);
+
+    /**
+     * get the master node in the cluster
+     * @param req
+     * @return
+     */
+    NodeEntry getMasterNode(BaseReq req);
+
+    /**
+     * use queryBody to generate queryUrl for master query
+     * @param queryBody
+     * @return
+     * @throws Exception
+     */
+    String getQueryUrl(Map<String, String> queryBody) throws Exception;
+
+    /**
+     * check whether the master node is alive
+     * @param masterIp
+     * @param masterPort
+     * @return
+     */
+    TubeMQResult checkMasterNodeStatus(String masterIp, Integer masterPort);
+}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/interfaces/NodeService.java
similarity index 92%
rename from tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
rename to tubemq-manager/src/main/java/org/apache/tubemq/manager/service/interfaces/NodeService.java
index eebcf96..ddb1a59 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/interfaces/NodeService.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.tubemq.manager.service;
+package org.apache.tubemq.manager.service.interfaces;
 
 import java.io.IOException;
 import java.util.List;
@@ -27,6 +27,7 @@ import org.apache.tubemq.manager.controller.node.request.BatchAddTopicReq;
 import org.apache.tubemq.manager.controller.node.request.CloneBrokersReq;
 import org.apache.tubemq.manager.controller.node.request.CloneTopicReq;
 import org.apache.tubemq.manager.entry.NodeEntry;
+import org.apache.tubemq.manager.service.TopicFuture;
 import org.apache.tubemq.manager.service.tube.AddBrokerResult;
 
 public interface NodeService {
@@ -65,13 +66,6 @@ public interface NodeService {
      */
     void updateBrokerStatus(int clusterId, Map<String, TopicFuture> pendingTopic);
 
-    /**
-     * query cluster info
-     * @param clusterId
-     * @return
-     */
-    String queryClusterInfo(Integer clusterId);
-
     void close() throws IOException;
 
     /**
@@ -88,4 +82,11 @@ public interface NodeService {
      * @return
      */
     TubeMQResult batchAddTopic(BatchAddTopicReq req);
+
+    /**
+     * add one node to node repository
+     * @param nodeEntry
+     * @return
+     */
+    boolean addNode(NodeEntry nodeEntry);
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicService.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/interfaces/TopicService.java
similarity index 97%
rename from tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicService.java
rename to tubemq-manager/src/main/java/org/apache/tubemq/manager/service/interfaces/TopicService.java
index 7fd6f48..2b27945 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicService.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/interfaces/TopicService.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.tubemq.manager.service;
+package org.apache.tubemq.manager.service.interfaces;
 
 import org.apache.tubemq.manager.controller.TubeMQResult;
 import org.apache.tubemq.manager.controller.group.request.DeleteOffsetReq;
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpBrokerInfoList.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpBrokerInfoList.java
index 61a5bd9..7cf3fce 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpBrokerInfoList.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpBrokerInfoList.java
@@ -37,7 +37,7 @@ public class TubeHttpBrokerInfoList {
      * json class for broker info.
      */
     @Data
-    private static class BrokerInfo {
+    public static class BrokerInfo {
         private int brokerId;
         private String brokerIp;
         private int brokerPort;
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpClusterInfoList.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpClusterInfoList.java
deleted file mode 100644
index 4da3f04..0000000
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpClusterInfoList.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tubemq.manager.service.tube;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import org.apache.tubemq.manager.controller.TubeMQResult;
-import org.apache.tubemq.manager.entry.NodeEntry;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-@Data
-public class TubeHttpClusterInfoList extends TubeMQResult {
-
-    private List<ClusterData> clusterData = new ArrayList<>();
-
-    @Data
-    @AllArgsConstructor
-    public static class ClusterData {
-
-        @Data
-        public static class ClusterInfo {
-
-            @Data
-            public static class BrokerInfo {
-                private int brokerId;
-                private String brokerIp;
-            }
-
-            private String master;
-            private List<String> standby = new ArrayList<>();
-            private List<BrokerInfo> broker = new ArrayList<>();
-
-        }
-
-        private int clusterId;
-        private String clusterName;
-        private ClusterInfo clusterInfo;
-
-    }
-
-
-    public static TubeHttpClusterInfoList getClusterInfoList(Map<Integer, List<NodeEntry>> nodeEntriesPerCluster) {
-        // for each cluster provide cluster information
-        TubeHttpClusterInfoList clusterInfoList = new TubeHttpClusterInfoList();
-        nodeEntriesPerCluster.forEach((id, entries) -> {
-                    ClusterData.ClusterInfo singleClusterInfo = getSingleClusterInfo(entries);
-                    ClusterData clusterData =
-                            new ClusterData(id, entries.get(0).getClusterName(), singleClusterInfo);
-                    clusterInfoList.getClusterData().add(clusterData);
-                }
-        );
-        return clusterInfoList;
-    }
-
-    private static ClusterData.ClusterInfo getSingleClusterInfo(List<NodeEntry> entries) {
-
-        TubeHttpClusterInfoList.ClusterData.ClusterInfo clusterInfo =
-                new TubeHttpClusterInfoList.ClusterData.ClusterInfo();
-
-        entries.forEach(nodeEntry -> {
-            if (nodeEntry.isMaster()) {
-                clusterInfo.setMaster(nodeEntry.getIp());
-            }
-            if (nodeEntry.isBroker()) {
-                ClusterData.ClusterInfo.BrokerInfo brokerInfo =
-                        new ClusterData.ClusterInfo.BrokerInfo();
-                brokerInfo.setBrokerId((int) nodeEntry.getBrokerId());
-                brokerInfo.setBrokerIp(nodeEntry.getIp());
-                clusterInfo.getBroker().add(brokerInfo);
-            }
-            if (nodeEntry.isStandby()) {
-                clusterInfo.getStandby().add(nodeEntry.getIp());
-            }
-        });
-
-        return clusterInfo;
-    }
-
-
-}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpGroupDetailInfo.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpGroupDetailInfo.java
index 9834d98..29de567 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpGroupDetailInfo.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpGroupDetailInfo.java
@@ -19,10 +19,7 @@ package org.apache.tubemq.manager.service.tube;
 
 import com.google.common.collect.Lists;
 import java.util.List;
-import java.util.Map;
 import lombok.Data;
-import org.apache.tubemq.manager.entry.NodeEntry;
-import org.apache.tubemq.manager.service.tube.TubeHttpClusterInfoList.ClusterData;
 
 @Data
 public class TubeHttpGroupDetailInfo {
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java
index f36c98d..9ed7c28 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java
@@ -18,6 +18,7 @@
 package org.apache.tubemq.manager.utils;
 
 import com.google.gson.Gson;
+import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 
@@ -89,4 +90,17 @@ public class ConvertUtils {
         consumerReq.setMethod(REBALANCE_GROUP);
         return consumerReq;
     }
+
+
+    public static String covertMapToQueryString(Map<String, String> requestMap) throws Exception {
+        List<String> queryList = new ArrayList<>();
+
+        for (Map.Entry<String, String> entry : requestMap.entrySet()) {
+            queryList.add(entry.getKey() + "=" + URLEncoder.encode(
+                entry.getValue(), UTF_8.toString()));
+        }
+        return StringUtils.join(queryList, "&");
+    }
+
+
 }
diff --git a/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestClusterController.java b/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestClusterController.java
index bd893a7..53e3306 100644
--- a/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestClusterController.java
+++ b/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestClusterController.java
@@ -18,19 +18,24 @@
 package org.apache.tubemq.manager.controller;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.when;
 import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
 import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
 
 import com.google.gson.Gson;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.tubemq.manager.controller.cluster.ClusterController;
+import org.apache.tubemq.manager.controller.cluster.request.AddClusterReq;
+import org.apache.tubemq.manager.entry.ClusterEntry;
 import org.apache.tubemq.manager.entry.NodeEntry;
+import org.apache.tubemq.manager.repository.ClusterRepository;
 import org.apache.tubemq.manager.repository.NodeRepository;
+import org.apache.tubemq.manager.service.interfaces.MasterService;
+import org.apache.tubemq.manager.service.interfaces.NodeService;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.mockito.InjectMocks;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
 import org.springframework.boot.test.context.SpringBootTest;
@@ -52,8 +57,14 @@ public class TestClusterController {
     @MockBean
     private NodeRepository nodeRepository;
 
-    @InjectMocks
-    private ClusterController clusterController;
+    @MockBean
+    private ClusterRepository clusterRepository;
+
+    @MockBean
+    private NodeService nodeService;
+
+    @MockBean
+    private MasterService masterService;
 
     @Autowired
     private MockMvc mockMvc;
@@ -147,4 +158,36 @@ public class TestClusterController {
         log.info("result json string is {}, response type is {}", resultStr,
                 result.getResponse().getContentType());
     }
+
+
+    private ClusterEntry getOneClusterEntry() {
+        ClusterEntry clusterEntry = new ClusterEntry();
+        clusterEntry.setClusterId(1);
+        clusterEntry.setClusterName("test");
+        return clusterEntry;
+    }
+
+    @Test
+    public void testAddCluster() throws Exception {
+
+        AddClusterReq req = new AddClusterReq();
+        req.setClusterName("test");
+        req.setMasterIp("127.0.0.1");
+        req.setMasterWebPort(8080);
+        req.setMasterPort(8089);
+
+        ClusterEntry entry = getOneClusterEntry();
+        TubeMQResult successResult = new TubeMQResult();
+
+        when(clusterRepository.saveAndFlush(any(ClusterEntry.class))).thenReturn(entry);
+        when(nodeService.addNode(any(NodeEntry.class))).thenReturn(Boolean.TRUE);
+        when(masterService.checkMasterNodeStatus(anyString(), anyInt())).thenReturn(successResult);
+
+        RequestBuilder request = post("/v1/cluster")
+            .contentType(MediaType.APPLICATION_JSON).content(gson.toJson(req));
+        MvcResult result = mockMvc.perform(request).andReturn();
+        String resultStr = result.getResponse().getContentAsString();
+        String expectRes = "{\"errMsg\":\"\",\"errCode\":0,\"result\":true,\"data\":null}";
+        Assert.assertEquals(resultStr, expectRes);
+    }
 }
diff --git a/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestNodeController.java b/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestNodeController.java
deleted file mode 100644
index 8bad33b..0000000
--- a/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestNodeController.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tubemq.manager.controller;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.tubemq.manager.entry.NodeEntry;
-import org.apache.tubemq.manager.repository.NodeRepository;
-import org.assertj.core.util.Lists;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.boot.test.mock.mockito.MockBean;
-import org.springframework.test.context.junit4.SpringRunner;
-import org.springframework.test.web.servlet.MockMvc;
-import org.springframework.test.web.servlet.MvcResult;
-import org.springframework.test.web.servlet.RequestBuilder;
-
-import java.util.List;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.when;
-import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
-import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
-
-@Slf4j
-@RunWith(SpringRunner.class)
-@SpringBootTest
-@AutoConfigureMockMvc
-public class TestNodeController {
-
-    @MockBean
-    private NodeRepository nodeRepository;
-
-    @Autowired
-    private MockMvc mockMvc;
-
-    private List<NodeEntry> getOneNodeEntry() {
-        NodeEntry nodeEntry = new NodeEntry();
-        nodeEntry.setMaster(true);
-        nodeEntry.setIp("127.0.0.1");
-        nodeEntry.setWebPort(8014);
-        nodeEntry.setClusterId(0);
-        return Lists.newArrayList(nodeEntry);
-    }
-
-
-    private List<NodeEntry> getTwoNodeEntries() {
-        NodeEntry nodeEntry1 = new NodeEntry();
-        nodeEntry1.setMaster(true);
-        nodeEntry1.setIp("127.0.0.1");
-        nodeEntry1.setWebPort(8014);
-        nodeEntry1.setClusterId(1);
-
-        NodeEntry nodeEntry2 = new NodeEntry();
-        nodeEntry2.setMaster(true);
-        nodeEntry2.setIp("127.0.0.1");
-        nodeEntry2.setWebPort(8014);
-        nodeEntry2.setClusterId(2);
-
-        return Lists.newArrayList(nodeEntry1, nodeEntry2);
-    }
-
-    private String expectedOneEntry =
-            "{\"data\":[{\"clusterId\":0," +
-            "\"clusterInfo\":{\"master\":\"127.0.0.1\"," +
-            "\"standby\":[],\"broker\":[]}}],\"errMsg\":" +
-            "\"\",\"errCode\":0,\"result\":true}";
-
-
-    private String expectedTwoEntries =
-            "{\"data\":[{\"clusterId\":1,\"clusterInfo\":" +
-                    "{\"master\":\"127.0.0.1\",\"standby\"" +
-                    ":[],\"broker\":[]}},{\"clusterId\":2," +
-                    "\"clusterInfo\":{\"master\":\"127.0.0.1\"," +
-                    "\"standby\":[],\"broker\":[]}}]," +
-                    "\"errMsg\":\"\",\"errCode\":0,\"result\":true}";
-
-    @Test
-    public void testClusterInfo() throws Exception {
-        List<NodeEntry> nodeEntries = getOneNodeEntry();
-        when(nodeRepository.findNodeEntriesByClusterIdIs(any(Integer.class)))
-                .thenReturn(nodeEntries);
-        RequestBuilder request = get("/v1/node/query?method=admin_query_cluster_info&" +
-                "type=op_query&clusterId=1");
-        MvcResult result = mockMvc.perform(request).andReturn();
-        String resultStr = result.getResponse().getContentAsString();
-        Assert.assertEquals(resultStr, expectedOneEntry);
-        log.info("result json string is {}, response type is {}", resultStr,
-                result.getResponse().getContentType());
-    }
-
-
-    @Test
-    public void testClusterInfoTwoEntries() throws Exception {
-        List<NodeEntry> nodeEntries = getTwoNodeEntries();
-        when(nodeRepository.findNodeEntriesByClusterIdIs(any(Integer.class)))
-                .thenReturn(nodeEntries);
-        RequestBuilder request = get("/v1/node/query?method=admin_query_cluster_info&" +
-                "type=op_query&clusterId=1");
-        MvcResult result = mockMvc.perform(request).andReturn();
-        String resultStr = result.getResponse().getContentAsString();
-        Assert.assertEquals(resultStr, expectedTwoEntries);
-        log.info("result json string is {}, response type is {}", resultStr,
-                result.getResponse().getContentType());
-    }
-}