You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/10/30 06:05:16 UTC

[incubator-tubemq] branch TUBEMQ-336 updated: [TUBEMQ-361] create topic when getting request (#292)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch TUBEMQ-336
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/TUBEMQ-336 by this push:
     new 714ea2b  [TUBEMQ-361] create topic when getting request (#292)
714ea2b is described below

commit 714ea2b7e781ca612fe61d585d1eb26aeba829f2
Author: Yuanbo Liu <yu...@apache.org>
AuthorDate: Fri Oct 30 14:05:05 2020 +0800

    [TUBEMQ-361] create topic when getting request (#292)
---
 tubemq-manager/pom.xml                             |  14 ++
 .../controller/ManagerControllerAdvice.java        |   6 +-
 .../TopicController.java}                          |  67 +++--
 .../BusinessResult.java => topic/TopicResult.java} |   4 +-
 .../BusinessResult.java => entry/NodeEntry.java}   |  33 ++-
 .../entry/{BusinessEntry.java => TopicEntry.java}  |   8 +-
 .../BusinessResult.java => entry/TopicStatus.java} |  23 +-
 ...BusinessRepository.java => NodeRepository.java} |  10 +-
 ...usinessRepository.java => TopicRepository.java} |  21 +-
 .../apache/tubemq/manager/service/NodeService.java | 272 +++++++++++++++++++++
 .../tubemq/manager/service/TopicBackendWorker.java | 137 +++++++++++
 .../apache/tubemq/manager/service/TopicFuture.java |  58 +++++
 .../{AsyncService.java => TubeHttpConst.java}      |  21 +-
 .../service/tube/TubeHttpBrokerInfoList.java       | 135 ++++++++++
 .../TubeHttpResponse.java}                         |  17 +-
 .../service/tube/TubeHttpTopicInfoList.java        |  97 ++++++++
 .../src/main/resources/application.properties      |  17 ++
 .../manager/controller/TestBusinessController.java |  20 +-
 .../manager/repository/TestBusinessRepository.java |  10 +-
 .../service/tube/TestTubeHttpBrokerResponse.java   |  48 ++++
 .../service/tube/TestTubeHttpTopicInfoList.java    |  52 ++++
 21 files changed, 978 insertions(+), 92 deletions(-)

diff --git a/tubemq-manager/pom.xml b/tubemq-manager/pom.xml
index 0d33d82..18e692c 100644
--- a/tubemq-manager/pom.xml
+++ b/tubemq-manager/pom.xml
@@ -33,6 +33,15 @@
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-web</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+        </dependency>
 
         <dependency>
             <groupId>org.springframework.boot</groupId>
@@ -45,6 +54,11 @@
         </dependency>
 
         <dependency>
+            <groupId>com.google.code.gson</groupId>
+            <artifactId>gson</artifactId>
+        </dependency>
+
+        <dependency>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-data-jpa</artifactId>
         </dependency>
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/ManagerControllerAdvice.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/ManagerControllerAdvice.java
index 33369ca..09a72cd 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/ManagerControllerAdvice.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/ManagerControllerAdvice.java
@@ -17,7 +17,7 @@
 
 package org.apache.tubemq.manager.controller;
 import javax.servlet.http.HttpServletRequest;
-import org.apache.tubemq.manager.controller.business.BusinessResult;
+import org.apache.tubemq.manager.controller.topic.TopicResult;
 import org.apache.tubemq.manager.exceptions.TubeMQManagerException;
 import org.springframework.web.bind.annotation.ExceptionHandler;
 import org.springframework.web.bind.annotation.RestControllerAdvice;
@@ -36,9 +36,9 @@ public class ManagerControllerAdvice {
      * @return entity
      */
     @ExceptionHandler(TubeMQManagerException.class)
-    public BusinessResult handlingBusinessException(HttpServletRequest request,
+    public TopicResult handlingBusinessException(HttpServletRequest request,
             TubeMQManagerException ex) {
-        BusinessResult result = new BusinessResult();
+        TopicResult result = new TopicResult();
         result.setMessage(ex.getMessage());
         result.setCode(-1);
         return result;
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/business/BusinessController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicController.java
similarity index 54%
rename from tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/business/BusinessController.java
rename to tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicController.java
index c8190a8..314d079 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/business/BusinessController.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicController.java
@@ -14,15 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.tubemq.manager.controller.business;
+package org.apache.tubemq.manager.controller.topic;
 
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.tubemq.manager.entry.BusinessEntry;
+import org.apache.tubemq.manager.entry.TopicEntry;
+import org.apache.tubemq.manager.entry.TopicStatus;
 import org.apache.tubemq.manager.exceptions.TubeMQManagerException;
-import org.apache.tubemq.manager.repository.BusinessRepository;
-import org.apache.tubemq.manager.service.AsyncService;
+import org.apache.tubemq.manager.repository.TopicRepository;
+import org.apache.tubemq.manager.service.TopicBackendWorker;
+import org.apache.tubemq.manager.service.TopicFuture;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.PathVariable;
@@ -35,62 +38,75 @@ import org.springframework.web.bind.annotation.RestController;
 @RestController
 @RequestMapping(path = "/business")
 @Slf4j
-public class BusinessController {
+public class TopicController {
 
     @Autowired
-    private BusinessRepository businessRepository;
+    private TopicRepository topicRepository;
 
     @Autowired
-    private AsyncService asyncService;
+    private TopicBackendWorker topicBackendWorker;
 
     /**
-     * add new business.
+     * add new topic.
      *
      * @return - businessResult
      * @throws Exception - exception
      */
     @PostMapping("/add")
-    public BusinessResult addBusiness(@RequestBody BusinessEntry entry) {
-        businessRepository.saveAndFlush(entry);
-        return new BusinessResult();
+    public TopicResult addTopic(@RequestBody TopicEntry entry) {
+        // entry in adding status
+        entry.setStatus(TopicStatus.ADDING.value());
+        topicRepository.saveAndFlush(entry);
+        CompletableFuture<TopicEntry> future = new CompletableFuture<>();
+        topicBackendWorker.addTopicFuture(new TopicFuture(entry, future));
+        future.whenComplete((entry1, throwable) -> {
+            entry1.setStatus(TopicStatus.SUCCESS.value());
+            if (throwable != null) {
+                // if throwable is not success, mark it as failed.
+                entry1.setStatus(TopicStatus.FAILED.value());
+                log.error("exception caught", throwable);
+            }
+            topicRepository.saveAndFlush(entry1);
+        });
+        return new TopicResult();
     }
 
     /**
-     * update business
+     * update topic
      *
      * @return
      * @throws Exception
      */
     @PostMapping("/update")
-    public BusinessResult updateBusiness(@RequestBody BusinessEntry entry) {
-        return new BusinessResult();
+    public TopicResult updateTopic(@RequestBody TopicEntry entry) {
+        return new TopicResult();
     }
 
     /**
-     * Check business status by business name.
+     * Check topic status by business name.
      *
      * @return
      * @throws Exception
      */
     @GetMapping("/check")
-    public BusinessResult checkBusinessByName(
+    public TopicResult checkTopicByBusinessName(
             @RequestParam String businessName) {
-        List<BusinessEntry> result = businessRepository.findAllByBusinessName(businessName);
-        return new BusinessResult();
+        List<TopicEntry> result = topicRepository.findAllByBusinessName(businessName);
+        return new TopicResult();
     }
 
     /**
-     * get business by id.
+     * get topic by id.
      *
      * @param id business id
      * @return BusinessResult
      * @throws Exception
      */
     @GetMapping("/get/{id}")
-    public BusinessResult getBusinessByID(
+    public TopicResult getBusinessByID(
             @PathVariable Long id) {
-        Optional<BusinessEntry> businessEntry = businessRepository.findById(id);
-        BusinessResult result = new BusinessResult();
+        Optional<TopicEntry> businessEntry = topicRepository.findById(id);
+        TopicResult result = new TopicResult();
         if (!businessEntry.isPresent()) {
             result.setCode(-1);
             result.setMessage("business not found");
@@ -98,9 +114,12 @@ public class BusinessController {
         return result;
     }
 
-
+    /**
+     * test for exception situation.
+     * @return
+     */
     @GetMapping("/throwException")
-    public BusinessResult throwException() {
+    public TopicResult throwException() {
         throw new TubeMQManagerException("exception for test");
     }
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/business/BusinessResult.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicResult.java
similarity index 91%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/business/BusinessResult.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicResult.java
index 88c39ae..98fb81e 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/business/BusinessResult.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicResult.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.tubemq.manager.controller.business;
+package org.apache.tubemq.manager.controller.topic;
 
 import lombok.Data;
 
@@ -22,7 +22,7 @@ import lombok.Data;
  * rest result for business controller
  */
 @Data
-public class BusinessResult {
+public class TopicResult {
     private String message;
     private int code = 0;
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/business/BusinessResult.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/NodeEntry.java
similarity index 58%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/business/BusinessResult.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/NodeEntry.java
index 88c39ae..54c4236 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/business/BusinessResult.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/NodeEntry.java
@@ -14,15 +14,38 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.tubemq.manager.controller.business;
 
+package org.apache.tubemq.manager.entry;
+
+import javax.persistence.Entity;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.Table;
 import lombok.Data;
 
 /**
- * rest result for business controller
+ * node machine for tube cluster. broker/master/standby
  */
+@Entity
+@Table(name = "node")
 @Data
-public class BusinessResult {
-    private String message;
-    private int code = 0;
+public class NodeEntry {
+    @Id
+    @GeneratedValue(strategy= GenerationType.AUTO)
+    private long brokerId;
+
+    private boolean master;
+
+    private boolean standby;
+
+    private boolean broker;
+
+    private String ip;
+
+    private int port;
+
+    private int webPort;
+
+    private int clusterId;
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/BusinessEntry.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/TopicEntry.java
similarity index 95%
rename from tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/BusinessEntry.java
rename to tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/TopicEntry.java
index 88e8e1e..17b7711 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/BusinessEntry.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/TopicEntry.java
@@ -32,10 +32,10 @@ import org.hibernate.annotations.CreationTimestamp;
 import org.springframework.data.jpa.domain.support.AuditingEntityListener;
 
 @Entity
-@Table(name = "business")
+@Table(name = "topic")
 @Data
 @EntityListeners(AuditingEntityListener.class) // support CreationTimestamp annotation
-public class BusinessEntry {
+public class TopicEntry {
     @Id
     @GeneratedValue(strategy=GenerationType.AUTO)
     private long businessId;
@@ -121,7 +121,7 @@ public class BusinessEntry {
     private String issueMethod;
 
 
-    public BusinessEntry(String businessName, String schemaName,
+    public TopicEntry(String businessName, String schemaName,
             String username, String passwd, String topic, String encodingType) {
         this.businessName = businessName;
         this.schemaName = schemaName;
@@ -131,7 +131,7 @@ public class BusinessEntry {
         this.encodingType = encodingType;
     }
 
-    public BusinessEntry() {
+    public TopicEntry() {
 
     }
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/business/BusinessResult.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/TopicStatus.java
similarity index 74%
rename from tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/business/BusinessResult.java
rename to tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/TopicStatus.java
index 88c39ae..e5796af 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/business/BusinessResult.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/TopicStatus.java
@@ -14,15 +14,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.tubemq.manager.controller.business;
 
-import lombok.Data;
+package org.apache.tubemq.manager.entry;
 
-/**
- * rest result for business controller
- */
-@Data
-public class BusinessResult {
-    private String message;
-    private int code = 0;
+public enum TopicStatus {
+
+    ADDING(0), SUCCESS(1), FAILED(2), RETRY(3);
+
+    private int value = 0;
+
+    private TopicStatus(int value) {
+        this.value = value;
+    }
+
+    public int value() {
+        return this.value;
+    }
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/BusinessRepository.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/NodeRepository.java
similarity index 77%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/BusinessRepository.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/NodeRepository.java
index fa4f3af..4bf6ec7 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/BusinessRepository.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/NodeRepository.java
@@ -17,14 +17,12 @@
 
 package org.apache.tubemq.manager.repository;
 
-import java.util.List;
-import org.apache.tubemq.manager.entry.BusinessEntry;
+import org.apache.tubemq.manager.entry.NodeEntry;
 import org.springframework.data.jpa.repository.JpaRepository;
 import org.springframework.stereotype.Repository;
 
 @Repository
-public interface BusinessRepository extends JpaRepository<BusinessEntry, Long> {
-   List<BusinessEntry> findAllByBusinessName(String businessName);
-   BusinessEntry findByBusinessName(String businessName);
-}
+public interface NodeRepository extends JpaRepository<NodeEntry, Long> {
 
+    NodeEntry findNodeEntryByClusterIdIsAndMasterIsTrue(int clusterId);
+}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/BusinessRepository.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/TopicRepository.java
similarity index 69%
rename from tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/BusinessRepository.java
rename to tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/TopicRepository.java
index fa4f3af..4c88949 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/BusinessRepository.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/TopicRepository.java
@@ -18,13 +18,26 @@
 package org.apache.tubemq.manager.repository;
 
 import java.util.List;
-import org.apache.tubemq.manager.entry.BusinessEntry;
+import org.apache.tubemq.manager.entry.TopicEntry;
 import org.springframework.data.jpa.repository.JpaRepository;
 import org.springframework.stereotype.Repository;
 
 @Repository
-public interface BusinessRepository extends JpaRepository<BusinessEntry, Long> {
-   List<BusinessEntry> findAllByBusinessName(String businessName);
-   BusinessEntry findByBusinessName(String businessName);
+public interface TopicRepository extends JpaRepository<TopicEntry, Long> {
+
+   /**
+    * get all topicEntry list by business name
+    * @param businessName
+    * @return
+    */
+   List<TopicEntry> findAllByBusinessName(String businessName);
+
+   /**
+    * get one topicEntry by business name
+    * @param businessName
+    * @return
+    */
+   TopicEntry findByBusinessName(String businessName);
+
 }
 
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
new file mode 100644
index 0000000..4e0db3e
--- /dev/null
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.manager.service;
+
+
+import static org.apache.tubemq.manager.service.TubeHttpConst.ADD_TUBE_TOPIC;
+import static org.apache.tubemq.manager.service.TubeHttpConst.BROKER_RUN_STATUS;
+import static org.apache.tubemq.manager.service.TubeHttpConst.RELOAD_BROKER;
+import static org.apache.tubemq.manager.service.TubeHttpConst.SCHEMA;
+import static org.apache.tubemq.manager.service.TubeHttpConst.TOPIC_CONFIG_INFO;
+
+import com.google.gson.Gson;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.tubemq.manager.entry.NodeEntry;
+import org.apache.tubemq.manager.repository.NodeRepository;
+import org.apache.tubemq.manager.service.tube.TubeHttpBrokerInfoList;
+import org.apache.tubemq.manager.service.tube.TubeHttpResponse;
+import org.apache.tubemq.manager.service.tube.TubeHttpTopicInfoList;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+
+/**
+ * node service to query broker/master/standby status of tube cluster.
+ */
+@Slf4j
+public class NodeService {
+
+    private final CloseableHttpClient httpclient = HttpClients.createDefault();
+    private final Gson gson = new Gson();
+
+    @Value("${manager.max.configurable.broker.size:50}")
+    private int maxConfigurableBrokerSize;
+
+    @Value("${manager.max.retry.adding.topic:10}")
+    private int maxRetryAddingTopic;
+
+    private final TopicBackendWorker worker;
+
+    @Autowired
+    private NodeRepository nodeRepository;
+
+    public NodeService(TopicBackendWorker worker) {
+        this.worker = worker;
+    }
+
+    /**
+     * request node status via http.
+     *
+     * @param nodeEntry - node entry
+     * @return
+     * @throws IOException
+     */
+    private TubeHttpBrokerInfoList requestClusterNodeStatus(NodeEntry nodeEntry) throws IOException {
+        String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort() + BROKER_RUN_STATUS;
+        HttpGet httpget = new HttpGet(url);
+        try (CloseableHttpResponse response = httpclient.execute(httpget)) {
+            TubeHttpBrokerInfoList brokerInfoList =
+                    gson.fromJson(new InputStreamReader(response.getEntity().getContent()),
+                            TubeHttpBrokerInfoList.class);
+            // request return normal.
+            if (brokerInfoList.getCode() == 0) {
+                // divide by state.
+                brokerInfoList.divideBrokerListByState();
+                return brokerInfoList;
+            }
+        } catch (Exception ex) {
+            log.error("exception caught while requesting broker status", ex);
+        }
+        return null;
+    }
+
+
+    private TubeHttpTopicInfoList requestTopicConfigInfo(NodeEntry nodeEntry, String topic) {
+        String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort()
+                + TOPIC_CONFIG_INFO + "&topicName=" + topic;
+        HttpGet httpget = new HttpGet(url);
+        try (CloseableHttpResponse response = httpclient.execute(httpget)) {
+            TubeHttpTopicInfoList topicInfoList =
+                    gson.fromJson(new InputStreamReader(response.getEntity().getContent()),
+                            TubeHttpTopicInfoList.class);
+            if (topicInfoList.getErrCode() == 0) {
+                return topicInfoList;
+            }
+        } catch (Exception ex) {
+            log.error("exception caught while requesting broker status", ex);
+        }
+        return null;
+    }
+
+
+    private boolean configBrokersForTopics(NodeEntry nodeEntry,
+            Set<String> topics, List<Integer> brokerList, int maxBrokers) {
+        List<Integer> finalBrokerList = brokerList.subList(0, maxBrokers);
+        String brokerStr = StringUtils.join(finalBrokerList, ",");
+        String topicStr = StringUtils.join(topics, ",");
+        String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort()
+                + ADD_TUBE_TOPIC  + "&topicName=" + topicStr + "&brokerId=" + brokerStr;
+        HttpGet httpget = new HttpGet(url);
+        try (CloseableHttpResponse response = httpclient.execute(httpget)) {
+            TubeHttpResponse result =
+                    gson.fromJson(new InputStreamReader(response.getEntity().getContent()),
+                            TubeHttpResponse.class);
+            return result.getCode() == 0 && result.getErrCode() == 0;
+        } catch (Exception ex) {
+            log.error("exception caught while requesting broker status", ex);
+        }
+        return false;
+    }
+
+    /**
+     * handle result, if success, complete it,
+     * if not success, add back to queue without exceeding max retry,
+     * otherwise complete it with exception.
+     *
+     * @param isSuccess
+     * @param topics
+     * @param pendingTopic
+     */
+    private void handleAddingResult(boolean isSuccess, Set<String> topics,
+            Map<String, TopicFuture> pendingTopic) {
+        for (String topic : topics) {
+            TopicFuture future = pendingTopic.get(topic);
+            if (future != null) {
+                if (isSuccess) {
+                    future.complete();
+                } else {
+                    future.increaseRetryTime();
+                    if (future.getRetryTime() > maxRetryAddingTopic) {
+                        future.completeExceptional();
+                    } else {
+                        // add back to queue.
+                        worker.addTopicFuture(future);
+                    }
+                }
+            }
+        }
+    }
+
+
+    /**
+     * Adding topic is an async operation, so this method should
+     * 1. check whether pendingTopic contains topic that has failed/succeeded to be added.
+     * 2. async add topic to tubemq cluster
+     *
+     * @param brokerInfoList - broker list
+     * @param pendingTopic - topicMap
+     */
+    private void handleAddingTopic(NodeEntry nodeEntry,
+            TubeHttpBrokerInfoList brokerInfoList,
+            Map<String, TopicFuture> pendingTopic) {
+        // 1. check tubemq cluster by topic name, remove pending topic if has added.
+        Set<String> brandNewTopics = new HashSet<>();
+        for (String topic : pendingTopic.keySet()) {
+            TubeHttpTopicInfoList topicInfoList = requestTopicConfigInfo(nodeEntry, topic);
+            if (topicInfoList != null) {
+                // get broker list by topic request
+                List<Integer> topicBrokerList = topicInfoList.getTopicBrokerIdList();
+                if (topicBrokerList.isEmpty()) {
+                    brandNewTopics.add(topic);
+                } else {
+                    // remove brokers which have been added.
+                    List<Integer> configurableBrokerIdList =
+                            brokerInfoList.getConfigurableBrokerIdList();
+                    configurableBrokerIdList.removeAll(topicBrokerList);
+                    // add topic to satisfy max broker number.
+                    Set<String> singleTopic = new HashSet<>();
+                    singleTopic.add(topic);
+                    int maxBrokers = maxConfigurableBrokerSize - topicBrokerList.size();
+                    boolean isSuccess = configBrokersForTopics(nodeEntry, singleTopic,
+                            configurableBrokerIdList, maxBrokers);
+                    handleAddingResult(isSuccess, singleTopic, pendingTopic);
+                }
+            }
+        }
+        // 2. add new topics to cluster
+        List<Integer> configurableBrokerIdList = brokerInfoList.getConfigurableBrokerIdList();
+        int maxBrokers = Math.min(maxConfigurableBrokerSize, configurableBrokerIdList.size());
+        boolean isSuccess = configBrokersForTopics(nodeEntry, brandNewTopics,
+                configurableBrokerIdList, maxBrokers);
+        handleAddingResult(isSuccess, brandNewTopics, pendingTopic);
+    }
+
+    /**
+     * reload broker list, cannot exceed maxConfigurableBrokerSize each time.
+     *
+     * @param nodeEntry
+     * @param needReloadList
+     */
+    private void handleReloadBroker(NodeEntry nodeEntry, List<Integer> needReloadList) {
+        // reload without exceed max broker.
+        int begin = 0;
+        int end = 0;
+        do {
+            end = Math.min(maxConfigurableBrokerSize + begin, needReloadList.size());
+            List<Integer> brokerIdList = needReloadList.subList(begin, end);
+            String brokerStr = StringUtils.join(brokerIdList, ",");
+            String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort()
+                    + RELOAD_BROKER + "&brokerId=" + brokerStr;
+            HttpGet httpget = new HttpGet(url);
+            try (CloseableHttpResponse response = httpclient.execute(httpget)) {
+                TubeHttpResponse result =
+                        gson.fromJson(new InputStreamReader(response.getEntity().getContent()),
+                                TubeHttpResponse.class);
+                if (result.getErrCode() == 0 && result.getCode() == 0) {
+                    log.info("reload tube broker cgi: " +
+                            url + " ; return value : " + result.getCode());
+                }
+            } catch (Exception ex) {
+                log.error("exception caught while requesting broker status", ex);
+            }
+            begin = end;
+        } while (end >= needReloadList.size());
+    }
+
+
+
+    /**
+     * update broker status
+     */
+    public void updateBrokerStatus(int clusterId, Map<String, TopicFuture> pendingTopic) {
+        NodeEntry nodeEntry = nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(clusterId);
+        if (nodeEntry != null) {
+            try {
+                TubeHttpBrokerInfoList brokerInfoList = requestClusterNodeStatus(nodeEntry);
+                if (brokerInfoList != null) {
+                    handleAddingTopic(nodeEntry, brokerInfoList, pendingTopic);
+                }
+
+                // refresh broker list
+                brokerInfoList = requestClusterNodeStatus(nodeEntry);
+                if (brokerInfoList != null) {
+                    handleReloadBroker(nodeEntry, brokerInfoList.getNeedReloadList());
+                }
+
+            } catch (Exception ex) {
+                log.error("exception caught while requesting broker status", ex);
+            }
+        } else {
+            log.error("cannot get master ip by clusterId {}, please check it", clusterId);
+        }
+    }
+
+    public void close() throws IOException {
+        httpclient.close();
+    }
+}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicBackendWorker.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicBackendWorker.java
new file mode 100644
index 0000000..86b72d5
--- /dev/null
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicBackendWorker.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.manager.service;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.tubemq.manager.repository.TopicRepository;
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+/**
+ * Topic backend thread worker.
+ */
+@Component
+@Slf4j
+public class TopicBackendWorker implements DisposableBean, Runnable  {
+    private final AtomicBoolean runFlag = new AtomicBoolean(true);
+    private final ConcurrentHashMap<Integer, BlockingQueue<TopicFuture>> pendingTopics =
+            new ConcurrentHashMap<>();
+    private final AtomicInteger notSatisfiedCount = new AtomicInteger(0);
+    private final NodeService nodeService;
+
+    @Autowired
+    private TopicRepository topicRepository;
+
+    @Value("${manager.topic.queue.warning.size:100}")
+    private int queueWarningSize;
+
+    // value in seconds
+    @Value("${manager.topic.queue.thread.interval:10}")
+    private int queueThreadInterval;
+
+    @Value("${manager.topic.queue.max.wait:3}")
+    private int queueMaxWait;
+
+    @Value("${manager.topic.queue.max.running.size:20}")
+    private int queueMaxRunningSize;
+
+    TopicBackendWorker() {
+        Thread thread = new Thread(this);
+        // daemon thread
+        thread.setDaemon(true);
+        thread.start();
+        nodeService = new NodeService(this);
+    }
+
+    /**
+     * add topic future to pending executing queue.
+     * @param future - TopicFuture.
+     */
+    public void addTopicFuture(TopicFuture future) {
+        BlockingQueue<TopicFuture> tmpQueue = new LinkedBlockingQueue<>();
+        BlockingQueue<TopicFuture> queue = pendingTopics.putIfAbsent(
+                future.getEntry().getClusterId(), tmpQueue);
+        if (queue == null) {
+            queue = tmpQueue;
+        }
+        queue.add(future);
+        if (queue.size() > queueWarningSize) {
+            log.warn("queue size exceed {}, please check it", queueWarningSize);
+        }
+    }
+
+    /**
+     * batch executing adding topic, wait util max n seconds or max size satisfied.
+     */
+    private void batchAddTopic() {
+        pendingTopics.forEach((clusterId, queue) -> {
+            Map<String, TopicFuture> pendingTopicList = new HashMap<>();
+            if (notSatisfiedCount.get() > queueMaxWait || queue.size() > queueMaxRunningSize) {
+                notSatisfiedCount.set(0);
+                List<TopicFuture> tmpTopicList = new ArrayList<>();
+                queue.drainTo(tmpTopicList, queueMaxRunningSize);
+                for (TopicFuture topicFuture : tmpTopicList) {
+                    pendingTopicList.put(topicFuture.getEntry().getTopic(), topicFuture);
+                }
+            } else {
+                notSatisfiedCount.incrementAndGet();
+            }
+            // update broker status
+            nodeService.updateBrokerStatus(clusterId, pendingTopicList);
+        });
+
+    }
+
+    /**
+     * check topic from db
+     */
+    private void checkTopicFromDB() {
+    }
+
+    @Override
+    public void run() {
+        log.info("TopicBackendWorker has started");
+        while (runFlag.get()) {
+            try {
+                batchAddTopic();
+                checkTopicFromDB();
+                TimeUnit.SECONDS.sleep(queueThreadInterval);
+            } catch (Exception exception) {
+                log.warn("exception caught", exception);
+            }
+        }
+    }
+
+    @Override
+    public void destroy() throws Exception {
+        runFlag.set(false);
+        nodeService.close();
+    }
+}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicFuture.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicFuture.java
new file mode 100644
index 0000000..62b0e2d
--- /dev/null
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicFuture.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.manager.service;
+
+import java.util.concurrent.CompletableFuture;
+import lombok.Getter;
+import org.apache.tubemq.manager.entry.TopicEntry;
+
+/**
+ * topic business with future.
+ */
+public class TopicFuture {
+    @Getter
+    private int retryTime = 0;
+    @Getter
+    private final TopicEntry entry;
+    @Getter
+    private final CompletableFuture<TopicEntry> future;
+
+    public TopicFuture(TopicEntry entry, CompletableFuture<TopicEntry> future) {
+        this.entry = entry;
+        this.future = future;
+    }
+
+    /**
+     * record retry time.
+     */
+    public void increaseRetryTime() {
+        retryTime += 1;
+    }
+
+    /**
+     * when topic operation finished, complete it.
+     */
+    public void complete() {
+        this.future.complete(this.entry);
+    }
+
+    public void completeExceptional() {
+        this.future.completeExceptionally(new RuntimeException("exceed max retry "
+                + retryTime +" adding"));
+    }
+}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/AsyncService.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TubeHttpConst.java
similarity index 59%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/service/AsyncService.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TubeHttpConst.java
index 335f1d0..81a360e 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/AsyncService.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TubeHttpConst.java
@@ -17,15 +17,14 @@
 
 package org.apache.tubemq.manager.service;
 
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Service;
-
-/**
- * Service for running async tasks.
- * https://howtodoinjava.com/spring-boot2/rest/enableasync-async-controller/
- */
-@Service
-@Slf4j
-public class AsyncService {
-
+public class TubeHttpConst {
+    public static final String SCHEMA = "http://";
+    public static final String BROKER_RUN_STATUS =
+            "/webapi.htm?type=op_query&method=admin_query_broker_run_status";
+    public static final String TOPIC_CONFIG_INFO =
+            "/webapi.htm?type=op_query&method=admin_query_topic_info";
+    public static final String ADD_TUBE_TOPIC =
+            "/webapi.htm?type=op_modify&method=admin_add_new_topic_record";
+    public static final String RELOAD_BROKER =
+            "/webapi.htm?type=op_modify&method=admin_reload_broker_configure";
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpBrokerInfoList.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpBrokerInfoList.java
new file mode 100644
index 0000000..c768aa1
--- /dev/null
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpBrokerInfoList.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.manager.service.tube;
+
+import java.util.ArrayList;
+import java.util.List;
+import lombok.Data;
+
+/**
+ * json class for broker info list from master http service.
+ */
+@Data
+public class TubeHttpBrokerInfoList {
+
+    /**
+     * json class for broker info.
+     */
+    @Data
+    public static class BrokerInfo {
+        private int brokerId;
+        private String brokerIp;
+        private int brokerPort;
+        private String manageStatus;
+        private String runStatus;
+        private String subStatus;
+        private int stepOp;
+        private boolean isConfChanged;
+        private boolean isConfLoaded;
+        private boolean isBrokerOnline;
+        private String brokerVersion;
+        private boolean acceptPublish;
+        private boolean acceptSubscribe;
+
+        public boolean isIdle() {
+            return subStatus != null && subStatus.equals("idle");
+        }
+
+        public boolean isWorking() {
+            if (runStatus != null && manageStatus != null) {
+                return runStatus.equals("running") && (
+                        manageStatus.equals("online") ||
+                        manageStatus.equals("only-read") ||
+                        manageStatus.equals("only-write"));
+            }
+            return false;
+        }
+
+        public boolean isConfigurable() {
+            return stepOp == -2 || stepOp == 31 || stepOp == 32;
+        }
+
+        @Override
+        public int hashCode() {
+            return brokerId;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+
+            if (o == this) return true;
+            if (!(o instanceof BrokerInfo)) {
+                return false;
+            }
+
+            BrokerInfo brokerInfo = (BrokerInfo) o;
+
+            return brokerId == brokerInfo.brokerId;
+        }
+    }
+
+    private int code;
+    private String errMsg;
+    // total broker info list of brokers.
+    private List<BrokerInfo> data;
+    // configurable list of brokers.
+    private List<BrokerInfo> configurableList;
+    // working state list of brokers
+    private List<BrokerInfo> workingList;
+    // idle broker list
+    private List<BrokerInfo> idleList;
+    // need reload broker list
+    private List<Integer> needReloadList;
+
+    /**
+     * divide broker list into different list by broker state.
+     */
+    public void divideBrokerListByState() {
+        if (data != null) {
+            configurableList = new ArrayList<>();
+            workingList = new ArrayList<>();
+            idleList = new ArrayList<>();
+            needReloadList = new ArrayList<>();
+            for (BrokerInfo brokerInfo : data) {
+                if (brokerInfo.isConfigurable()) {
+                    configurableList.add(brokerInfo);
+                }
+                if (brokerInfo.isWorking()) {
+                    workingList.add(brokerInfo);
+                }
+                if (brokerInfo.isIdle()) {
+                    idleList.add(brokerInfo);
+                }
+                if (brokerInfo.isConfChanged) {
+                    needReloadList.add(brokerInfo.getBrokerId());
+                }
+            }
+        }
+    }
+
+    public List<Integer> getConfigurableBrokerIdList() {
+        List<Integer> tmpBrokerIdList = new ArrayList<>();
+        if (configurableList != null) {
+            for (BrokerInfo brokerInfo : configurableList) {
+                tmpBrokerIdList.add(brokerInfo.getBrokerId());
+            }
+        }
+        return tmpBrokerIdList;
+    }
+
+}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/AsyncService.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpResponse.java
similarity index 74%
rename from tubemq-manager/src/main/java/org/apache/tubemq/manager/service/AsyncService.java
rename to tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpResponse.java
index 335f1d0..bc30025 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/AsyncService.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpResponse.java
@@ -15,17 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.tubemq.manager.service;
+package org.apache.tubemq.manager.service.tube;
 
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Service;
+import lombok.Data;
 
 /**
- * Service for running async tasks.
- * https://howtodoinjava.com/spring-boot2/rest/enableasync-async-controller/
+ * common response json str for tube htt request
  */
-@Service
-@Slf4j
-public class AsyncService {
-
+@Data
+public class TubeHttpResponse {
+    private int code;
+    private String errMsg;
+    private int errCode;
 }
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpTopicInfoList.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpTopicInfoList.java
new file mode 100644
index 0000000..7131b83
--- /dev/null
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpTopicInfoList.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.manager.service.tube;
+
+import java.util.ArrayList;
+import java.util.List;
+import lombok.Data;
+import org.apache.tubemq.manager.service.tube.TubeHttpTopicInfoList.TopicInfoList.TopicInfo;
+
+/**
+ * json class for topic info list from master http service.
+ */
+@Data
+public class TubeHttpTopicInfoList {
+    private boolean result;
+
+    private String errMsg;
+
+    private int errCode;
+
+    private List<TopicInfoList> data;
+
+    @Data
+    public static class TopicInfoList {
+
+        @Data
+        public static class TopicInfo {
+
+            @Data
+            public static class RunInfo {
+                private boolean acceptPublish;
+                private boolean acceptSubscribe;
+                private int numPartitions;
+                private int numTopicStores;
+                private String brokerManageStatus;
+            }
+
+
+            private String topicName;
+            private int topicStatusId;
+            private int brokerId;
+            private String brokerIp;
+            private int brokerPort;
+            private int numPartitions;
+            private int unflushThreshold;
+            private int unflushInterval;
+            private int unFlushDataHold;
+            private String deleteWhen;
+            private String deletePolicy;
+            private boolean acceptPublish;
+            private boolean acceptSubscribe;
+            private int numTopicStores;
+            private int memCacheMsgSizeInMB;
+            private int memCacheFlushIntvl;
+            private int memCacheMsgCntInK;
+            private String createUser;
+            private String createDate;
+            private String modifyUser;
+            private String modifyDate;
+            private RunInfo runInfo;
+
+        }
+
+        private String topicName;
+        private List<TopicInfo> topicInfo;
+    }
+
+
+    public List<Integer> getTopicBrokerIdList() {
+        List<Integer> tmpBrokerIdList = new ArrayList<>();
+        if (data != null) {
+            for (TopicInfoList topicInfoList : data) {
+                if (topicInfoList.getTopicInfo() != null) {
+                    for (TopicInfo topicInfo : topicInfoList.getTopicInfo()) {
+                        tmpBrokerIdList.add(topicInfo.getBrokerId());
+                    }
+                }
+            }
+        }
+        return tmpBrokerIdList;
+    }
+}
diff --git a/tubemq-manager/src/main/resources/application.properties b/tubemq-manager/src/main/resources/application.properties
new file mode 100644
index 0000000..dee51b7
--- /dev/null
+++ b/tubemq-manager/src/main/resources/application.properties
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# <p>
+# http://www.apache.org/licenses/LICENSE-2.0
+# <p>
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+spring.jpa.hibernate.ddl-auto=update
+# configuration for manager
diff --git a/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestBusinessController.java b/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestBusinessController.java
index 2ddfb67..0838203 100644
--- a/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestBusinessController.java
+++ b/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestBusinessController.java
@@ -19,9 +19,9 @@ package org.apache.tubemq.manager.controller;
 import java.net.URI;
 import java.util.Objects;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.tubemq.manager.controller.business.BusinessController;
-import org.apache.tubemq.manager.controller.business.BusinessResult;
-import org.apache.tubemq.manager.entry.BusinessEntry;
+import org.apache.tubemq.manager.controller.topic.TopicController;
+import org.apache.tubemq.manager.controller.topic.TopicResult;
+import org.apache.tubemq.manager.entry.TopicEntry;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -59,7 +59,7 @@ public class TestBusinessController {
 
     @Before
     public void setUp() {
-        mvc = MockMvcBuilders.standaloneSetup(new BusinessController()).build();
+        mvc = MockMvcBuilders.standaloneSetup(new TopicController()).build();
     }
 
     @Test
@@ -76,14 +76,14 @@ public class TestBusinessController {
         final String baseUrl = "http://localhost:" + randomServerPort + "/business/add";
         URI uri = new URI(baseUrl);
         String demoName = "test";
-        BusinessEntry entry = new BusinessEntry(demoName, demoName, demoName,
+        TopicEntry entry = new TopicEntry(demoName, demoName, demoName,
                 demoName, demoName, demoName);
 
         HttpHeaders headers = new HttpHeaders();
-        HttpEntity<BusinessEntry> request = new HttpEntity<>(entry, headers);
+        HttpEntity<TopicEntry> request = new HttpEntity<>(entry, headers);
 
-        ResponseEntity<BusinessResult> responseEntity =
-                client.postForEntity(uri, request, BusinessResult.class);
+        ResponseEntity<TopicResult> responseEntity =
+                client.postForEntity(uri, request, TopicResult.class);
         assertThat(responseEntity.getStatusCode().is2xxSuccessful()).isEqualTo(true);
     }
 
@@ -91,8 +91,8 @@ public class TestBusinessController {
     public void testControllerException() throws Exception {
         final String baseUrl = "http://localhost:" + randomServerPort + "/business/throwException";
         URI uri = new URI(baseUrl);
-        ResponseEntity<BusinessResult> responseEntity =
-                client.getForEntity(uri, BusinessResult.class);
+        ResponseEntity<TopicResult> responseEntity =
+                client.getForEntity(uri, TopicResult.class);
         assertThat(Objects.requireNonNull(responseEntity.getBody()).getCode()).isEqualTo(-1);
         assertThat(responseEntity.getBody().getMessage()).isEqualTo("exception for test");
     }
diff --git a/tubemq-manager/src/test/java/org/apache/tubemq/manager/repository/TestBusinessRepository.java b/tubemq-manager/src/test/java/org/apache/tubemq/manager/repository/TestBusinessRepository.java
index d51a9ed..7bd8c73 100644
--- a/tubemq-manager/src/test/java/org/apache/tubemq/manager/repository/TestBusinessRepository.java
+++ b/tubemq-manager/src/test/java/org/apache/tubemq/manager/repository/TestBusinessRepository.java
@@ -17,7 +17,7 @@
 package org.apache.tubemq.manager.repository;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import org.apache.tubemq.manager.entry.BusinessEntry;
+import org.apache.tubemq.manager.entry.TopicEntry;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -32,24 +32,24 @@ public class TestBusinessRepository {
     private TestEntityManager entityManager;
 
     @Autowired
-    private BusinessRepository businessRepository;
+    private TopicRepository businessRepository;
 
     @Test
     public void whenFindByNameThenReturnBusiness() {
         String demoName = "alex";
-        BusinessEntry businessEntry = new BusinessEntry(demoName, demoName,
+        TopicEntry businessEntry = new TopicEntry(demoName, demoName,
                 demoName, demoName, demoName, demoName);
         entityManager.persist(businessEntry);
         entityManager.flush();
 
-        BusinessEntry businessEntry1 = businessRepository.findByBusinessName("alex");
+        TopicEntry businessEntry1 = businessRepository.findByBusinessName("alex");
         assertThat(businessEntry1.getBusinessName()).isEqualTo(businessEntry.getBusinessName());
     }
 
     @Test
     public void checkValidation() throws Exception {
         String demoName = "a";
-        BusinessEntry businessEntry = new BusinessEntry(demoName, demoName, demoName,
+        TopicEntry businessEntry = new TopicEntry(demoName, demoName, demoName,
                 demoName, demoName, demoName);
         StringBuilder builder = new StringBuilder();
 
diff --git a/tubemq-manager/src/test/java/org/apache/tubemq/manager/service/tube/TestTubeHttpBrokerResponse.java b/tubemq-manager/src/test/java/org/apache/tubemq/manager/service/tube/TestTubeHttpBrokerResponse.java
new file mode 100644
index 0000000..2d79f69
--- /dev/null
+++ b/tubemq-manager/src/test/java/org/apache/tubemq/manager/service/tube/TestTubeHttpBrokerResponse.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.manager.service.tube;
+
+import com.google.gson.Gson;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.Assert;
+import org.junit.Test;
+
+@Slf4j
+public class TestTubeHttpBrokerResponse {
+
+    private final Gson gson = new Gson();
+
+    @Test
+    public void testJsonStr() {
+        String jsonStr = "{\"code\":0,\"errMsg\":\"OK\",\"data\":"
+                + "[{\"brokerId\":136,\"brokerIp\":\"127.0.0.1\","
+                + "\"brokerPort\":8123,\"manageStatus\":\"online\","
+                + "\"runStatus\":\"notRegister\",\"subStatus\":\"processing_event\","
+                + "\"stepOp\":32,\"isConfChanged\":\"true\",\"isConfLoaded\":\"false\","
+                + "\"isBrokerOnline\":\"false\",\"brokerVersion\":\"-\","
+                + "\"acceptPublish\":\"false\",\"acceptSubscribe\":\"false\"}]}";
+        TubeHttpBrokerInfoList brokerInfoList =
+                gson.fromJson(jsonStr, TubeHttpBrokerInfoList.class);
+        Assert.assertEquals(1, brokerInfoList.getData().size());
+        Assert.assertEquals(0, brokerInfoList.getCode());
+        Assert.assertEquals("OK", brokerInfoList.getErrMsg());
+        Assert.assertTrue(brokerInfoList.getData().get(0).isConfChanged());
+        Assert.assertFalse(brokerInfoList.getData().get(0).isAcceptPublish());
+        Assert.assertFalse(brokerInfoList.getData().get(0).isBrokerOnline());
+    }
+}
diff --git a/tubemq-manager/src/test/java/org/apache/tubemq/manager/service/tube/TestTubeHttpTopicInfoList.java b/tubemq-manager/src/test/java/org/apache/tubemq/manager/service/tube/TestTubeHttpTopicInfoList.java
new file mode 100644
index 0000000..82a9fda
--- /dev/null
+++ b/tubemq-manager/src/test/java/org/apache/tubemq/manager/service/tube/TestTubeHttpTopicInfoList.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.manager.service.tube;
+
+import com.google.gson.Gson;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestTubeHttpTopicInfoList {
+
+    private final Gson gson = new Gson();
+
+    @Test
+    public void testJsonStr() {
+        String jsonStr = "{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\",\""
+                + "data\":[{\"topicName\":\"test1\",\"topicInfo\":[{\"topicName\":\"test1\",\""
+                + "topicStatusId\":0,\"brokerId\":152509201,\"brokerIp\":\"127.0.0.1\",\""
+                + "brokerPort\":8123,\"numPartitions\":1,\"unflushThreshold\":1000,\""
+                + "unflushInterval\":10000,\"unFlushDataHold\":1000,\"deleteWhen\":\"\",\""
+                + "deletePolicy\":\"delete,32h\",\"acceptPublish\":true,"
+                + "\"acceptSubscribe\":true,\"numTopicStores\":1,\"memCacheMsgSizeInMB\":2,\""
+                + "memCacheFlushIntvl\":20000,\"memCacheMsgCntInK\":10,"
+                + "\"createUser\":\"Alice\",\"createDate\":\"20200917122645\","
+                + "\"modifyUser\":\"Alice\",\"modifyDate\":\"20200917122645\","
+                + "\"runInfo\":{\"acceptPublish\":true,\"acceptSubscribe\":true,"
+                + "\"numPartitions\":1,\"numTopicStores\":1,"
+                + "\"brokerManageStatus\":\"online\"}}]}]}";
+        TubeHttpTopicInfoList topicInfoList = gson.fromJson(jsonStr, TubeHttpTopicInfoList.class);
+        Assert.assertTrue(topicInfoList.isResult());
+        Assert.assertEquals(0, topicInfoList.getErrCode());
+        Assert.assertEquals(1, topicInfoList.getData().size());
+        Assert.assertEquals("Alice", topicInfoList.getData().get(0)
+                .getTopicInfo().get(0).getCreateUser());
+        Assert.assertEquals("online", topicInfoList.getData().get(0)
+                .getTopicInfo().get(0).getRunInfo().getBrokerManageStatus());
+    }
+}