You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/10/30 06:05:16 UTC
[incubator-tubemq] branch TUBEMQ-336 updated: [TUBEMQ-361] create
topic when getting request (#292)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch TUBEMQ-336
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/TUBEMQ-336 by this push:
new 714ea2b [TUBEMQ-361] create topic when getting request (#292)
714ea2b is described below
commit 714ea2b7e781ca612fe61d585d1eb26aeba829f2
Author: Yuanbo Liu <yu...@apache.org>
AuthorDate: Fri Oct 30 14:05:05 2020 +0800
[TUBEMQ-361] create topic when getting request (#292)
---
tubemq-manager/pom.xml | 14 ++
.../controller/ManagerControllerAdvice.java | 6 +-
.../TopicController.java} | 67 +++--
.../BusinessResult.java => topic/TopicResult.java} | 4 +-
.../BusinessResult.java => entry/NodeEntry.java} | 33 ++-
.../entry/{BusinessEntry.java => TopicEntry.java} | 8 +-
.../BusinessResult.java => entry/TopicStatus.java} | 23 +-
...BusinessRepository.java => NodeRepository.java} | 10 +-
...usinessRepository.java => TopicRepository.java} | 21 +-
.../apache/tubemq/manager/service/NodeService.java | 272 +++++++++++++++++++++
.../tubemq/manager/service/TopicBackendWorker.java | 137 +++++++++++
.../apache/tubemq/manager/service/TopicFuture.java | 58 +++++
.../{AsyncService.java => TubeHttpConst.java} | 21 +-
.../service/tube/TubeHttpBrokerInfoList.java | 135 ++++++++++
.../TubeHttpResponse.java} | 17 +-
.../service/tube/TubeHttpTopicInfoList.java | 97 ++++++++
.../src/main/resources/application.properties | 17 ++
.../manager/controller/TestBusinessController.java | 20 +-
.../manager/repository/TestBusinessRepository.java | 10 +-
.../service/tube/TestTubeHttpBrokerResponse.java | 48 ++++
.../service/tube/TestTubeHttpTopicInfoList.java | 52 ++++
21 files changed, 978 insertions(+), 92 deletions(-)
diff --git a/tubemq-manager/pom.xml b/tubemq-manager/pom.xml
index 0d33d82..18e692c 100644
--- a/tubemq-manager/pom.xml
+++ b/tubemq-manager/pom.xml
@@ -33,6 +33,15 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
@@ -45,6 +54,11 @@
</dependency>
<dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/ManagerControllerAdvice.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/ManagerControllerAdvice.java
index 33369ca..09a72cd 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/ManagerControllerAdvice.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/ManagerControllerAdvice.java
@@ -17,7 +17,7 @@
package org.apache.tubemq.manager.controller;
import javax.servlet.http.HttpServletRequest;
-import org.apache.tubemq.manager.controller.business.BusinessResult;
+import org.apache.tubemq.manager.controller.topic.TopicResult;
import org.apache.tubemq.manager.exceptions.TubeMQManagerException;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.RestControllerAdvice;
@@ -36,9 +36,9 @@ public class ManagerControllerAdvice {
* @return entity
*/
@ExceptionHandler(TubeMQManagerException.class)
- public BusinessResult handlingBusinessException(HttpServletRequest request,
+ public TopicResult handlingBusinessException(HttpServletRequest request,
TubeMQManagerException ex) {
- BusinessResult result = new BusinessResult();
+ TopicResult result = new TopicResult();
result.setMessage(ex.getMessage());
result.setCode(-1);
return result;
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/business/BusinessController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicController.java
similarity index 54%
rename from tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/business/BusinessController.java
rename to tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicController.java
index c8190a8..314d079 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/business/BusinessController.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicController.java
@@ -14,15 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.tubemq.manager.controller.business;
+package org.apache.tubemq.manager.controller.topic;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
-import org.apache.tubemq.manager.entry.BusinessEntry;
+import org.apache.tubemq.manager.entry.TopicEntry;
+import org.apache.tubemq.manager.entry.TopicStatus;
import org.apache.tubemq.manager.exceptions.TubeMQManagerException;
-import org.apache.tubemq.manager.repository.BusinessRepository;
-import org.apache.tubemq.manager.service.AsyncService;
+import org.apache.tubemq.manager.repository.TopicRepository;
+import org.apache.tubemq.manager.service.TopicBackendWorker;
+import org.apache.tubemq.manager.service.TopicFuture;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
@@ -35,62 +38,75 @@ import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping(path = "/business")
@Slf4j
-public class BusinessController {
+public class TopicController {
@Autowired
- private BusinessRepository businessRepository;
+ private TopicRepository topicRepository;
@Autowired
- private AsyncService asyncService;
+ private TopicBackendWorker topicBackendWorker;
/**
- * add new business.
+ * add new topic.
*
* @return - businessResult
* @throws Exception - exception
*/
@PostMapping("/add")
- public BusinessResult addBusiness(@RequestBody BusinessEntry entry) {
- businessRepository.saveAndFlush(entry);
- return new BusinessResult();
+ public TopicResult addTopic(@RequestBody TopicEntry entry) {
+ // entry in adding status
+ entry.setStatus(TopicStatus.ADDING.value());
+ topicRepository.saveAndFlush(entry);
+ CompletableFuture<TopicEntry> future = new CompletableFuture<>();
+ topicBackendWorker.addTopicFuture(new TopicFuture(entry, future));
+ future.whenComplete((entry1, throwable) -> {
+ entry1.setStatus(TopicStatus.SUCCESS.value());
+ if (throwable != null) {
+ // if throwable is not success, mark it as failed.
+ entry1.setStatus(TopicStatus.FAILED.value());
+ log.error("exception caught", throwable);
+ }
+ topicRepository.saveAndFlush(entry1);
+ });
+ return new TopicResult();
}
/**
- * update business
+ * update topic
*
* @return
* @throws Exception
*/
@PostMapping("/update")
- public BusinessResult updateBusiness(@RequestBody BusinessEntry entry) {
- return new BusinessResult();
+ public TopicResult updateTopic(@RequestBody TopicEntry entry) {
+ return new TopicResult();
}
/**
- * Check business status by business name.
+ * Check topic status by business name.
*
* @return
* @throws Exception
*/
@GetMapping("/check")
- public BusinessResult checkBusinessByName(
+ public TopicResult checkTopicByBusinessName(
@RequestParam String businessName) {
- List<BusinessEntry> result = businessRepository.findAllByBusinessName(businessName);
- return new BusinessResult();
+ List<TopicEntry> result = topicRepository.findAllByBusinessName(businessName);
+ return new TopicResult();
}
/**
- * get business by id.
+ * get topic by id.
*
* @param id business id
* @return BusinessResult
* @throws Exception
*/
@GetMapping("/get/{id}")
- public BusinessResult getBusinessByID(
+ public TopicResult getBusinessByID(
@PathVariable Long id) {
- Optional<BusinessEntry> businessEntry = businessRepository.findById(id);
- BusinessResult result = new BusinessResult();
+ Optional<TopicEntry> businessEntry = topicRepository.findById(id);
+ TopicResult result = new TopicResult();
if (!businessEntry.isPresent()) {
result.setCode(-1);
result.setMessage("business not found");
@@ -98,9 +114,12 @@ public class BusinessController {
return result;
}
-
+ /**
+ * test for exception situation.
+ * @return
+ */
@GetMapping("/throwException")
- public BusinessResult throwException() {
+ public TopicResult throwException() {
throw new TubeMQManagerException("exception for test");
}
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/business/BusinessResult.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicResult.java
similarity index 91%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/business/BusinessResult.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicResult.java
index 88c39ae..98fb81e 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/business/BusinessResult.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicResult.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.tubemq.manager.controller.business;
+package org.apache.tubemq.manager.controller.topic;
import lombok.Data;
@@ -22,7 +22,7 @@ import lombok.Data;
* rest result for business controller
*/
@Data
-public class BusinessResult {
+public class TopicResult {
private String message;
private int code = 0;
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/business/BusinessResult.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/NodeEntry.java
similarity index 58%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/business/BusinessResult.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/NodeEntry.java
index 88c39ae..54c4236 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/business/BusinessResult.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/NodeEntry.java
@@ -14,15 +14,38 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.tubemq.manager.controller.business;
+package org.apache.tubemq.manager.entry;
+
+import javax.persistence.Entity;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.Table;
import lombok.Data;
/**
- * rest result for business controller
+ * node machine for tube cluster. broker/master/standby
*/
+@Entity
+@Table(name = "node")
@Data
-public class BusinessResult {
- private String message;
- private int code = 0;
+public class NodeEntry {
+ @Id
+ @GeneratedValue(strategy= GenerationType.AUTO)
+ private long brokerId;
+
+ private boolean master;
+
+ private boolean standby;
+
+ private boolean broker;
+
+ private String ip;
+
+ private int port;
+
+ private int webPort;
+
+ private int clusterId;
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/BusinessEntry.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/TopicEntry.java
similarity index 95%
rename from tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/BusinessEntry.java
rename to tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/TopicEntry.java
index 88e8e1e..17b7711 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/BusinessEntry.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/TopicEntry.java
@@ -32,10 +32,10 @@ import org.hibernate.annotations.CreationTimestamp;
import org.springframework.data.jpa.domain.support.AuditingEntityListener;
@Entity
-@Table(name = "business")
+@Table(name = "topic")
@Data
@EntityListeners(AuditingEntityListener.class) // support CreationTimestamp annotation
-public class BusinessEntry {
+public class TopicEntry {
@Id
@GeneratedValue(strategy=GenerationType.AUTO)
private long businessId;
@@ -121,7 +121,7 @@ public class BusinessEntry {
private String issueMethod;
- public BusinessEntry(String businessName, String schemaName,
+ public TopicEntry(String businessName, String schemaName,
String username, String passwd, String topic, String encodingType) {
this.businessName = businessName;
this.schemaName = schemaName;
@@ -131,7 +131,7 @@ public class BusinessEntry {
this.encodingType = encodingType;
}
- public BusinessEntry() {
+ public TopicEntry() {
}
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/business/BusinessResult.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/TopicStatus.java
similarity index 74%
rename from tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/business/BusinessResult.java
rename to tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/TopicStatus.java
index 88c39ae..e5796af 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/business/BusinessResult.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/TopicStatus.java
@@ -14,15 +14,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.tubemq.manager.controller.business;
-import lombok.Data;
+package org.apache.tubemq.manager.entry;
-/**
- * rest result for business controller
- */
-@Data
-public class BusinessResult {
- private String message;
- private int code = 0;
+public enum TopicStatus {
+
+ ADDING(0), SUCCESS(1), FAILED(2), RETRY(3);
+
+ private int value = 0;
+
+ private TopicStatus(int value) {
+ this.value = value;
+ }
+
+ public int value() {
+ return this.value;
+ }
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/BusinessRepository.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/NodeRepository.java
similarity index 77%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/BusinessRepository.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/NodeRepository.java
index fa4f3af..4bf6ec7 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/BusinessRepository.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/NodeRepository.java
@@ -17,14 +17,12 @@
package org.apache.tubemq.manager.repository;
-import java.util.List;
-import org.apache.tubemq.manager.entry.BusinessEntry;
+import org.apache.tubemq.manager.entry.NodeEntry;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
@Repository
-public interface BusinessRepository extends JpaRepository<BusinessEntry, Long> {
- List<BusinessEntry> findAllByBusinessName(String businessName);
- BusinessEntry findByBusinessName(String businessName);
-}
+public interface NodeRepository extends JpaRepository<NodeEntry, Long> {
+ NodeEntry findNodeEntryByClusterIdIsAndMasterIsTrue(int clusterId);
+}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/BusinessRepository.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/TopicRepository.java
similarity index 69%
rename from tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/BusinessRepository.java
rename to tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/TopicRepository.java
index fa4f3af..4c88949 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/BusinessRepository.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/TopicRepository.java
@@ -18,13 +18,26 @@
package org.apache.tubemq.manager.repository;
import java.util.List;
-import org.apache.tubemq.manager.entry.BusinessEntry;
+import org.apache.tubemq.manager.entry.TopicEntry;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
@Repository
-public interface BusinessRepository extends JpaRepository<BusinessEntry, Long> {
- List<BusinessEntry> findAllByBusinessName(String businessName);
- BusinessEntry findByBusinessName(String businessName);
+public interface TopicRepository extends JpaRepository<TopicEntry, Long> {
+
+ /**
+ * get all topicEntry list by business name
+ * @param businessName
+ * @return
+ */
+ List<TopicEntry> findAllByBusinessName(String businessName);
+
+ /**
+ * get one topicEntry by business name
+ * @param businessName
+ * @return
+ */
+ TopicEntry findByBusinessName(String businessName);
+
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
new file mode 100644
index 0000000..4e0db3e
--- /dev/null
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.manager.service;
+
+
+import static org.apache.tubemq.manager.service.TubeHttpConst.ADD_TUBE_TOPIC;
+import static org.apache.tubemq.manager.service.TubeHttpConst.BROKER_RUN_STATUS;
+import static org.apache.tubemq.manager.service.TubeHttpConst.RELOAD_BROKER;
+import static org.apache.tubemq.manager.service.TubeHttpConst.SCHEMA;
+import static org.apache.tubemq.manager.service.TubeHttpConst.TOPIC_CONFIG_INFO;
+
+import com.google.gson.Gson;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.tubemq.manager.entry.NodeEntry;
+import org.apache.tubemq.manager.repository.NodeRepository;
+import org.apache.tubemq.manager.service.tube.TubeHttpBrokerInfoList;
+import org.apache.tubemq.manager.service.tube.TubeHttpResponse;
+import org.apache.tubemq.manager.service.tube.TubeHttpTopicInfoList;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+
+/**
+ * node service to query broker/master/standby status of tube cluster.
+ */
+@Slf4j
+public class NodeService {
+
+ private final CloseableHttpClient httpclient = HttpClients.createDefault();
+ private final Gson gson = new Gson();
+
+ @Value("${manager.max.configurable.broker.size:50}")
+ private int maxConfigurableBrokerSize;
+
+ @Value("${manager.max.retry.adding.topic:10}")
+ private int maxRetryAddingTopic;
+
+ private final TopicBackendWorker worker;
+
+ @Autowired
+ private NodeRepository nodeRepository;
+
+ public NodeService(TopicBackendWorker worker) {
+ this.worker = worker;
+ }
+
+ /**
+ * request node status via http.
+ *
+ * @param nodeEntry - node entry
+ * @return
+ * @throws IOException
+ */
+ private TubeHttpBrokerInfoList requestClusterNodeStatus(NodeEntry nodeEntry) throws IOException {
+ String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort() + BROKER_RUN_STATUS;
+ HttpGet httpget = new HttpGet(url);
+ try (CloseableHttpResponse response = httpclient.execute(httpget)) {
+ TubeHttpBrokerInfoList brokerInfoList =
+ gson.fromJson(new InputStreamReader(response.getEntity().getContent()),
+ TubeHttpBrokerInfoList.class);
+ // request return normal.
+ if (brokerInfoList.getCode() == 0) {
+ // divide by state.
+ brokerInfoList.divideBrokerListByState();
+ return brokerInfoList;
+ }
+ } catch (Exception ex) {
+ log.error("exception caught while requesting broker status", ex);
+ }
+ return null;
+ }
+
+
+ private TubeHttpTopicInfoList requestTopicConfigInfo(NodeEntry nodeEntry, String topic) {
+ String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort()
+ + TOPIC_CONFIG_INFO + "&topicName=" + topic;
+ HttpGet httpget = new HttpGet(url);
+ try (CloseableHttpResponse response = httpclient.execute(httpget)) {
+ TubeHttpTopicInfoList topicInfoList =
+ gson.fromJson(new InputStreamReader(response.getEntity().getContent()),
+ TubeHttpTopicInfoList.class);
+ if (topicInfoList.getErrCode() == 0) {
+ return topicInfoList;
+ }
+ } catch (Exception ex) {
+ log.error("exception caught while requesting broker status", ex);
+ }
+ return null;
+ }
+
+
+ private boolean configBrokersForTopics(NodeEntry nodeEntry,
+ Set<String> topics, List<Integer> brokerList, int maxBrokers) {
+ List<Integer> finalBrokerList = brokerList.subList(0, maxBrokers);
+ String brokerStr = StringUtils.join(finalBrokerList, ",");
+ String topicStr = StringUtils.join(topics, ",");
+ String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort()
+ + ADD_TUBE_TOPIC + "&topicName=" + topicStr + "&brokerId=" + brokerStr;
+ HttpGet httpget = new HttpGet(url);
+ try (CloseableHttpResponse response = httpclient.execute(httpget)) {
+ TubeHttpResponse result =
+ gson.fromJson(new InputStreamReader(response.getEntity().getContent()),
+ TubeHttpResponse.class);
+ return result.getCode() == 0 && result.getErrCode() == 0;
+ } catch (Exception ex) {
+ log.error("exception caught while requesting broker status", ex);
+ }
+ return false;
+ }
+
+ /**
+ * handle result, if success, complete it,
+ * if not success, add back to queue without exceeding max retry,
+ * otherwise complete it with exception.
+ *
+ * @param isSuccess
+ * @param topics
+ * @param pendingTopic
+ */
+ private void handleAddingResult(boolean isSuccess, Set<String> topics,
+ Map<String, TopicFuture> pendingTopic) {
+ for (String topic : topics) {
+ TopicFuture future = pendingTopic.get(topic);
+ if (future != null) {
+ if (isSuccess) {
+ future.complete();
+ } else {
+ future.increaseRetryTime();
+ if (future.getRetryTime() > maxRetryAddingTopic) {
+ future.completeExceptional();
+ } else {
+ // add back to queue.
+ worker.addTopicFuture(future);
+ }
+ }
+ }
+ }
+ }
+
+
+ /**
+ * Adding topic is an async operation, so this method should
+ * 1. check whether pendingTopic contains topic that has failed/succeeded to be added.
+ * 2. async add topic to tubemq cluster
+ *
+ * @param brokerInfoList - broker list
+ * @param pendingTopic - topicMap
+ */
+ private void handleAddingTopic(NodeEntry nodeEntry,
+ TubeHttpBrokerInfoList brokerInfoList,
+ Map<String, TopicFuture> pendingTopic) {
+ // 1. check tubemq cluster by topic name, remove pending topic if has added.
+ Set<String> brandNewTopics = new HashSet<>();
+ for (String topic : pendingTopic.keySet()) {
+ TubeHttpTopicInfoList topicInfoList = requestTopicConfigInfo(nodeEntry, topic);
+ if (topicInfoList != null) {
+ // get broker list by topic request
+ List<Integer> topicBrokerList = topicInfoList.getTopicBrokerIdList();
+ if (topicBrokerList.isEmpty()) {
+ brandNewTopics.add(topic);
+ } else {
+ // remove brokers which have been added.
+ List<Integer> configurableBrokerIdList =
+ brokerInfoList.getConfigurableBrokerIdList();
+ configurableBrokerIdList.removeAll(topicBrokerList);
+ // add topic to satisfy max broker number.
+ Set<String> singleTopic = new HashSet<>();
+ singleTopic.add(topic);
+ int maxBrokers = maxConfigurableBrokerSize - topicBrokerList.size();
+ boolean isSuccess = configBrokersForTopics(nodeEntry, singleTopic,
+ configurableBrokerIdList, maxBrokers);
+ handleAddingResult(isSuccess, singleTopic, pendingTopic);
+ }
+ }
+ }
+ // 2. add new topics to cluster
+ List<Integer> configurableBrokerIdList = brokerInfoList.getConfigurableBrokerIdList();
+ int maxBrokers = Math.min(maxConfigurableBrokerSize, configurableBrokerIdList.size());
+ boolean isSuccess = configBrokersForTopics(nodeEntry, brandNewTopics,
+ configurableBrokerIdList, maxBrokers);
+ handleAddingResult(isSuccess, brandNewTopics, pendingTopic);
+ }
+
+ /**
+ * reload broker list, cannot exceed maxConfigurableBrokerSize each time.
+ *
+ * @param nodeEntry
+ * @param needReloadList
+ */
+ private void handleReloadBroker(NodeEntry nodeEntry, List<Integer> needReloadList) {
+ // reload without exceed max broker.
+ int begin = 0;
+ int end = 0;
+ do {
+ end = Math.min(maxConfigurableBrokerSize + begin, needReloadList.size());
+ List<Integer> brokerIdList = needReloadList.subList(begin, end);
+ String brokerStr = StringUtils.join(brokerIdList, ",");
+ String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort()
+ + RELOAD_BROKER + "&brokerId=" + brokerStr;
+ HttpGet httpget = new HttpGet(url);
+ try (CloseableHttpResponse response = httpclient.execute(httpget)) {
+ TubeHttpResponse result =
+ gson.fromJson(new InputStreamReader(response.getEntity().getContent()),
+ TubeHttpResponse.class);
+ if (result.getErrCode() == 0 && result.getCode() == 0) {
+ log.info("reload tube broker cgi: " +
+ url + " ; return value : " + result.getCode());
+ }
+ } catch (Exception ex) {
+ log.error("exception caught while requesting broker status", ex);
+ }
+ begin = end;
+ } while (end >= needReloadList.size());
+ }
+
+
+
+ /**
+ * update broker status
+ */
+ public void updateBrokerStatus(int clusterId, Map<String, TopicFuture> pendingTopic) {
+ NodeEntry nodeEntry = nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(clusterId);
+ if (nodeEntry != null) {
+ try {
+ TubeHttpBrokerInfoList brokerInfoList = requestClusterNodeStatus(nodeEntry);
+ if (brokerInfoList != null) {
+ handleAddingTopic(nodeEntry, brokerInfoList, pendingTopic);
+ }
+
+ // refresh broker list
+ brokerInfoList = requestClusterNodeStatus(nodeEntry);
+ if (brokerInfoList != null) {
+ handleReloadBroker(nodeEntry, brokerInfoList.getNeedReloadList());
+ }
+
+ } catch (Exception ex) {
+ log.error("exception caught while requesting broker status", ex);
+ }
+ } else {
+ log.error("cannot get master ip by clusterId {}, please check it", clusterId);
+ }
+ }
+
+ public void close() throws IOException {
+ httpclient.close();
+ }
+}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicBackendWorker.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicBackendWorker.java
new file mode 100644
index 0000000..86b72d5
--- /dev/null
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicBackendWorker.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.manager.service;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.tubemq.manager.repository.TopicRepository;
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+/**
+ * Topic backend thread worker.
+ */
+@Component
+@Slf4j
+public class TopicBackendWorker implements DisposableBean, Runnable {
+ private final AtomicBoolean runFlag = new AtomicBoolean(true);
+ private final ConcurrentHashMap<Integer, BlockingQueue<TopicFuture>> pendingTopics =
+ new ConcurrentHashMap<>();
+ private final AtomicInteger notSatisfiedCount = new AtomicInteger(0);
+ private final NodeService nodeService;
+
+ @Autowired
+ private TopicRepository topicRepository;
+
+ @Value("${manager.topic.queue.warning.size:100}")
+ private int queueWarningSize;
+
+ // value in seconds
+ @Value("${manager.topic.queue.thread.interval:10}")
+ private int queueThreadInterval;
+
+ @Value("${manager.topic.queue.max.wait:3}")
+ private int queueMaxWait;
+
+ @Value("${manager.topic.queue.max.running.size:20}")
+ private int queueMaxRunningSize;
+
+ TopicBackendWorker() {
+ Thread thread = new Thread(this);
+ // daemon thread
+ thread.setDaemon(true);
+ thread.start();
+ nodeService = new NodeService(this);
+ }
+
+ /**
+ * add topic future to pending executing queue.
+ * @param future - TopicFuture.
+ */
+ public void addTopicFuture(TopicFuture future) {
+ BlockingQueue<TopicFuture> tmpQueue = new LinkedBlockingQueue<>();
+ BlockingQueue<TopicFuture> queue = pendingTopics.putIfAbsent(
+ future.getEntry().getClusterId(), tmpQueue);
+ if (queue == null) {
+ queue = tmpQueue;
+ }
+ queue.add(future);
+ if (queue.size() > queueWarningSize) {
+ log.warn("queue size exceed {}, please check it", queueWarningSize);
+ }
+ }
+
+ /**
+ * batch executing adding topic, wait util max n seconds or max size satisfied.
+ */
+ private void batchAddTopic() {
+ pendingTopics.forEach((clusterId, queue) -> {
+ Map<String, TopicFuture> pendingTopicList = new HashMap<>();
+ if (notSatisfiedCount.get() > queueMaxWait || queue.size() > queueMaxRunningSize) {
+ notSatisfiedCount.set(0);
+ List<TopicFuture> tmpTopicList = new ArrayList<>();
+ queue.drainTo(tmpTopicList, queueMaxRunningSize);
+ for (TopicFuture topicFuture : tmpTopicList) {
+ pendingTopicList.put(topicFuture.getEntry().getTopic(), topicFuture);
+ }
+ } else {
+ notSatisfiedCount.incrementAndGet();
+ }
+ // update broker status
+ nodeService.updateBrokerStatus(clusterId, pendingTopicList);
+ });
+
+ }
+
+ /**
+ * check topic from db
+ */
+ private void checkTopicFromDB() {
+ }
+
+ @Override
+ public void run() {
+ log.info("TopicBackendWorker has started");
+ while (runFlag.get()) {
+ try {
+ batchAddTopic();
+ checkTopicFromDB();
+ TimeUnit.SECONDS.sleep(queueThreadInterval);
+ } catch (Exception exception) {
+ log.warn("exception caught", exception);
+ }
+ }
+ }
+
+ @Override
+ public void destroy() throws Exception {
+ runFlag.set(false);
+ nodeService.close();
+ }
+}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicFuture.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicFuture.java
new file mode 100644
index 0000000..62b0e2d
--- /dev/null
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicFuture.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.manager.service;
+
+import java.util.concurrent.CompletableFuture;
+import lombok.Getter;
+import org.apache.tubemq.manager.entry.TopicEntry;
+
+/**
+ * topic business with future.
+ */
+public class TopicFuture {
+ @Getter
+ private int retryTime = 0;
+ @Getter
+ private final TopicEntry entry;
+ @Getter
+ private final CompletableFuture<TopicEntry> future;
+
+ public TopicFuture(TopicEntry entry, CompletableFuture<TopicEntry> future) {
+ this.entry = entry;
+ this.future = future;
+ }
+
+ /**
+ * record retry time.
+ */
+ public void increaseRetryTime() {
+ retryTime += 1;
+ }
+
+ /**
+ * when topic operation finished, complete it.
+ */
+ public void complete() {
+ this.future.complete(this.entry);
+ }
+
+ public void completeExceptional() {
+ this.future.completeExceptionally(new RuntimeException("exceed max retry "
+ + retryTime +" adding"));
+ }
+}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/AsyncService.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TubeHttpConst.java
similarity index 59%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/service/AsyncService.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TubeHttpConst.java
index 335f1d0..81a360e 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/AsyncService.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TubeHttpConst.java
@@ -17,15 +17,14 @@
package org.apache.tubemq.manager.service;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Service;
-
-/**
- * Service for running async tasks.
- * https://howtodoinjava.com/spring-boot2/rest/enableasync-async-controller/
- */
-@Service
-@Slf4j
-public class AsyncService {
-
+public class TubeHttpConst {
+ public static final String SCHEMA = "http://";
+ public static final String BROKER_RUN_STATUS =
+ "/webapi.htm?type=op_query&method=admin_query_broker_run_status";
+ public static final String TOPIC_CONFIG_INFO =
+ "/webapi.htm?type=op_query&method=admin_query_topic_info";
+ public static final String ADD_TUBE_TOPIC =
+ "/webapi.htm?type=op_modify&method=admin_add_new_topic_record";
+ public static final String RELOAD_BROKER =
+ "/webapi.htm?type=op_modify&method=admin_reload_broker_configure";
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpBrokerInfoList.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpBrokerInfoList.java
new file mode 100644
index 0000000..c768aa1
--- /dev/null
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpBrokerInfoList.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.manager.service.tube;
+
+import java.util.ArrayList;
+import java.util.List;
+import lombok.Data;
+
+/**
+ * json class for broker info list from master http service.
+ */
+@Data
+public class TubeHttpBrokerInfoList {
+
+ /**
+ * json class for broker info.
+ */
+ @Data
+ public static class BrokerInfo {
+ private int brokerId;
+ private String brokerIp;
+ private int brokerPort;
+ private String manageStatus;
+ private String runStatus;
+ private String subStatus;
+ private int stepOp;
+ private boolean isConfChanged;
+ private boolean isConfLoaded;
+ private boolean isBrokerOnline;
+ private String brokerVersion;
+ private boolean acceptPublish;
+ private boolean acceptSubscribe;
+
+ public boolean isIdle() {
+ return subStatus != null && subStatus.equals("idle");
+ }
+
+ public boolean isWorking() {
+ if (runStatus != null && manageStatus != null) {
+ return runStatus.equals("running") && (
+ manageStatus.equals("online") ||
+ manageStatus.equals("only-read") ||
+ manageStatus.equals("only-write"));
+ }
+ return false;
+ }
+
+ public boolean isConfigurable() {
+ return stepOp == -2 || stepOp == 31 || stepOp == 32;
+ }
+
+ @Override
+ public int hashCode() {
+ return brokerId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+
+ if (o == this) return true;
+ if (!(o instanceof BrokerInfo)) {
+ return false;
+ }
+
+ BrokerInfo brokerInfo = (BrokerInfo) o;
+
+ return brokerId == brokerInfo.brokerId;
+ }
+ }
+
+ private int code;
+ private String errMsg;
+ // total broker info list of brokers.
+ private List<BrokerInfo> data;
+ // configurable list of brokers.
+ private List<BrokerInfo> configurableList;
+ // working state list of brokers
+ private List<BrokerInfo> workingList;
+ // idle broker list
+ private List<BrokerInfo> idleList;
+ // need reload broker list
+ private List<Integer> needReloadList;
+
+ /**
+ * divide broker list into different list by broker state.
+ */
+ public void divideBrokerListByState() {
+ if (data != null) {
+ configurableList = new ArrayList<>();
+ workingList = new ArrayList<>();
+ idleList = new ArrayList<>();
+ needReloadList = new ArrayList<>();
+ for (BrokerInfo brokerInfo : data) {
+ if (brokerInfo.isConfigurable()) {
+ configurableList.add(brokerInfo);
+ }
+ if (brokerInfo.isWorking()) {
+ workingList.add(brokerInfo);
+ }
+ if (brokerInfo.isIdle()) {
+ idleList.add(brokerInfo);
+ }
+ if (brokerInfo.isConfChanged) {
+ needReloadList.add(brokerInfo.getBrokerId());
+ }
+ }
+ }
+ }
+
+ public List<Integer> getConfigurableBrokerIdList() {
+ List<Integer> tmpBrokerIdList = new ArrayList<>();
+ if (configurableList != null) {
+ for (BrokerInfo brokerInfo : configurableList) {
+ tmpBrokerIdList.add(brokerInfo.getBrokerId());
+ }
+ }
+ return tmpBrokerIdList;
+ }
+
+}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/AsyncService.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpResponse.java
similarity index 74%
rename from tubemq-manager/src/main/java/org/apache/tubemq/manager/service/AsyncService.java
rename to tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpResponse.java
index 335f1d0..bc30025 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/AsyncService.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpResponse.java
@@ -15,17 +15,16 @@
* limitations under the License.
*/
-package org.apache.tubemq.manager.service;
+package org.apache.tubemq.manager.service.tube;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Service;
+import lombok.Data;
/**
- * Service for running async tasks.
- * https://howtodoinjava.com/spring-boot2/rest/enableasync-async-controller/
+ * common response json str for tube htt request
*/
-@Service
-@Slf4j
-public class AsyncService {
-
+@Data
+public class TubeHttpResponse {
+ private int code;
+ private String errMsg;
+ private int errCode;
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpTopicInfoList.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpTopicInfoList.java
new file mode 100644
index 0000000..7131b83
--- /dev/null
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpTopicInfoList.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.manager.service.tube;
+
+import java.util.ArrayList;
+import java.util.List;
+import lombok.Data;
+import org.apache.tubemq.manager.service.tube.TubeHttpTopicInfoList.TopicInfoList.TopicInfo;
+
+/**
+ * json class for topic info list from master http service.
+ */
+@Data
+public class TubeHttpTopicInfoList {
+ private boolean result;
+
+ private String errMsg;
+
+ private int errCode;
+
+ private List<TopicInfoList> data;
+
+ @Data
+ public static class TopicInfoList {
+
+ @Data
+ public static class TopicInfo {
+
+ @Data
+ public static class RunInfo {
+ private boolean acceptPublish;
+ private boolean acceptSubscribe;
+ private int numPartitions;
+ private int numTopicStores;
+ private String brokerManageStatus;
+ }
+
+
+ private String topicName;
+ private int topicStatusId;
+ private int brokerId;
+ private String brokerIp;
+ private int brokerPort;
+ private int numPartitions;
+ private int unflushThreshold;
+ private int unflushInterval;
+ private int unFlushDataHold;
+ private String deleteWhen;
+ private String deletePolicy;
+ private boolean acceptPublish;
+ private boolean acceptSubscribe;
+ private int numTopicStores;
+ private int memCacheMsgSizeInMB;
+ private int memCacheFlushIntvl;
+ private int memCacheMsgCntInK;
+ private String createUser;
+ private String createDate;
+ private String modifyUser;
+ private String modifyDate;
+ private RunInfo runInfo;
+
+ }
+
+ private String topicName;
+ private List<TopicInfo> topicInfo;
+ }
+
+
+ public List<Integer> getTopicBrokerIdList() {
+ List<Integer> tmpBrokerIdList = new ArrayList<>();
+ if (data != null) {
+ for (TopicInfoList topicInfoList : data) {
+ if (topicInfoList.getTopicInfo() != null) {
+ for (TopicInfo topicInfo : topicInfoList.getTopicInfo()) {
+ tmpBrokerIdList.add(topicInfo.getBrokerId());
+ }
+ }
+ }
+ }
+ return tmpBrokerIdList;
+ }
+}
diff --git a/tubemq-manager/src/main/resources/application.properties b/tubemq-manager/src/main/resources/application.properties
new file mode 100644
index 0000000..dee51b7
--- /dev/null
+++ b/tubemq-manager/src/main/resources/application.properties
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+# <p>
+# http://www.apache.org/licenses/LICENSE-2.0
+# <p>
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+spring.jpa.hibernate.ddl-auto=update
+# configuration for manager
diff --git a/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestBusinessController.java b/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestBusinessController.java
index 2ddfb67..0838203 100644
--- a/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestBusinessController.java
+++ b/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestBusinessController.java
@@ -19,9 +19,9 @@ package org.apache.tubemq.manager.controller;
import java.net.URI;
import java.util.Objects;
import lombok.extern.slf4j.Slf4j;
-import org.apache.tubemq.manager.controller.business.BusinessController;
-import org.apache.tubemq.manager.controller.business.BusinessResult;
-import org.apache.tubemq.manager.entry.BusinessEntry;
+import org.apache.tubemq.manager.controller.topic.TopicController;
+import org.apache.tubemq.manager.controller.topic.TopicResult;
+import org.apache.tubemq.manager.entry.TopicEntry;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -59,7 +59,7 @@ public class TestBusinessController {
@Before
public void setUp() {
- mvc = MockMvcBuilders.standaloneSetup(new BusinessController()).build();
+ mvc = MockMvcBuilders.standaloneSetup(new TopicController()).build();
}
@Test
@@ -76,14 +76,14 @@ public class TestBusinessController {
final String baseUrl = "http://localhost:" + randomServerPort + "/business/add";
URI uri = new URI(baseUrl);
String demoName = "test";
- BusinessEntry entry = new BusinessEntry(demoName, demoName, demoName,
+ TopicEntry entry = new TopicEntry(demoName, demoName, demoName,
demoName, demoName, demoName);
HttpHeaders headers = new HttpHeaders();
- HttpEntity<BusinessEntry> request = new HttpEntity<>(entry, headers);
+ HttpEntity<TopicEntry> request = new HttpEntity<>(entry, headers);
- ResponseEntity<BusinessResult> responseEntity =
- client.postForEntity(uri, request, BusinessResult.class);
+ ResponseEntity<TopicResult> responseEntity =
+ client.postForEntity(uri, request, TopicResult.class);
assertThat(responseEntity.getStatusCode().is2xxSuccessful()).isEqualTo(true);
}
@@ -91,8 +91,8 @@ public class TestBusinessController {
public void testControllerException() throws Exception {
final String baseUrl = "http://localhost:" + randomServerPort + "/business/throwException";
URI uri = new URI(baseUrl);
- ResponseEntity<BusinessResult> responseEntity =
- client.getForEntity(uri, BusinessResult.class);
+ ResponseEntity<TopicResult> responseEntity =
+ client.getForEntity(uri, TopicResult.class);
assertThat(Objects.requireNonNull(responseEntity.getBody()).getCode()).isEqualTo(-1);
assertThat(responseEntity.getBody().getMessage()).isEqualTo("exception for test");
}
diff --git a/tubemq-manager/src/test/java/org/apache/tubemq/manager/repository/TestBusinessRepository.java b/tubemq-manager/src/test/java/org/apache/tubemq/manager/repository/TestBusinessRepository.java
index d51a9ed..7bd8c73 100644
--- a/tubemq-manager/src/test/java/org/apache/tubemq/manager/repository/TestBusinessRepository.java
+++ b/tubemq-manager/src/test/java/org/apache/tubemq/manager/repository/TestBusinessRepository.java
@@ -17,7 +17,7 @@
package org.apache.tubemq.manager.repository;
import static org.assertj.core.api.Assertions.assertThat;
-import org.apache.tubemq.manager.entry.BusinessEntry;
+import org.apache.tubemq.manager.entry.TopicEntry;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
@@ -32,24 +32,24 @@ public class TestBusinessRepository {
private TestEntityManager entityManager;
@Autowired
- private BusinessRepository businessRepository;
+ private TopicRepository businessRepository;
@Test
public void whenFindByNameThenReturnBusiness() {
String demoName = "alex";
- BusinessEntry businessEntry = new BusinessEntry(demoName, demoName,
+ TopicEntry businessEntry = new TopicEntry(demoName, demoName,
demoName, demoName, demoName, demoName);
entityManager.persist(businessEntry);
entityManager.flush();
- BusinessEntry businessEntry1 = businessRepository.findByBusinessName("alex");
+ TopicEntry businessEntry1 = businessRepository.findByBusinessName("alex");
assertThat(businessEntry1.getBusinessName()).isEqualTo(businessEntry.getBusinessName());
}
@Test
public void checkValidation() throws Exception {
String demoName = "a";
- BusinessEntry businessEntry = new BusinessEntry(demoName, demoName, demoName,
+ TopicEntry businessEntry = new TopicEntry(demoName, demoName, demoName,
demoName, demoName, demoName);
StringBuilder builder = new StringBuilder();
diff --git a/tubemq-manager/src/test/java/org/apache/tubemq/manager/service/tube/TestTubeHttpBrokerResponse.java b/tubemq-manager/src/test/java/org/apache/tubemq/manager/service/tube/TestTubeHttpBrokerResponse.java
new file mode 100644
index 0000000..2d79f69
--- /dev/null
+++ b/tubemq-manager/src/test/java/org/apache/tubemq/manager/service/tube/TestTubeHttpBrokerResponse.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.manager.service.tube;
+
+import com.google.gson.Gson;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.Assert;
+import org.junit.Test;
+
+@Slf4j
+public class TestTubeHttpBrokerResponse {
+
+ private final Gson gson = new Gson();
+
+ @Test
+ public void testJsonStr() {
+ String jsonStr = "{\"code\":0,\"errMsg\":\"OK\",\"data\":"
+ + "[{\"brokerId\":136,\"brokerIp\":\"127.0.0.1\","
+ + "\"brokerPort\":8123,\"manageStatus\":\"online\","
+ + "\"runStatus\":\"notRegister\",\"subStatus\":\"processing_event\","
+ + "\"stepOp\":32,\"isConfChanged\":\"true\",\"isConfLoaded\":\"false\","
+ + "\"isBrokerOnline\":\"false\",\"brokerVersion\":\"-\","
+ + "\"acceptPublish\":\"false\",\"acceptSubscribe\":\"false\"}]}";
+ TubeHttpBrokerInfoList brokerInfoList =
+ gson.fromJson(jsonStr, TubeHttpBrokerInfoList.class);
+ Assert.assertEquals(1, brokerInfoList.getData().size());
+ Assert.assertEquals(0, brokerInfoList.getCode());
+ Assert.assertEquals("OK", brokerInfoList.getErrMsg());
+ Assert.assertTrue(brokerInfoList.getData().get(0).isConfChanged());
+ Assert.assertFalse(brokerInfoList.getData().get(0).isAcceptPublish());
+ Assert.assertFalse(brokerInfoList.getData().get(0).isBrokerOnline());
+ }
+}
diff --git a/tubemq-manager/src/test/java/org/apache/tubemq/manager/service/tube/TestTubeHttpTopicInfoList.java b/tubemq-manager/src/test/java/org/apache/tubemq/manager/service/tube/TestTubeHttpTopicInfoList.java
new file mode 100644
index 0000000..82a9fda
--- /dev/null
+++ b/tubemq-manager/src/test/java/org/apache/tubemq/manager/service/tube/TestTubeHttpTopicInfoList.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.manager.service.tube;
+
+import com.google.gson.Gson;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestTubeHttpTopicInfoList {
+
+ private final Gson gson = new Gson();
+
+ @Test
+ public void testJsonStr() {
+ String jsonStr = "{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\",\""
+ + "data\":[{\"topicName\":\"test1\",\"topicInfo\":[{\"topicName\":\"test1\",\""
+ + "topicStatusId\":0,\"brokerId\":152509201,\"brokerIp\":\"127.0.0.1\",\""
+ + "brokerPort\":8123,\"numPartitions\":1,\"unflushThreshold\":1000,\""
+ + "unflushInterval\":10000,\"unFlushDataHold\":1000,\"deleteWhen\":\"\",\""
+ + "deletePolicy\":\"delete,32h\",\"acceptPublish\":true,"
+ + "\"acceptSubscribe\":true,\"numTopicStores\":1,\"memCacheMsgSizeInMB\":2,\""
+ + "memCacheFlushIntvl\":20000,\"memCacheMsgCntInK\":10,"
+ + "\"createUser\":\"Alice\",\"createDate\":\"20200917122645\","
+ + "\"modifyUser\":\"Alice\",\"modifyDate\":\"20200917122645\","
+ + "\"runInfo\":{\"acceptPublish\":true,\"acceptSubscribe\":true,"
+ + "\"numPartitions\":1,\"numTopicStores\":1,"
+ + "\"brokerManageStatus\":\"online\"}}]}]}";
+ TubeHttpTopicInfoList topicInfoList = gson.fromJson(jsonStr, TubeHttpTopicInfoList.class);
+ Assert.assertTrue(topicInfoList.isResult());
+ Assert.assertEquals(0, topicInfoList.getErrCode());
+ Assert.assertEquals(1, topicInfoList.getData().size());
+ Assert.assertEquals("Alice", topicInfoList.getData().get(0)
+ .getTopicInfo().get(0).getCreateUser());
+ Assert.assertEquals("online", topicInfoList.getData().get(0)
+ .getTopicInfo().get(0).getRunInfo().getBrokerManageStatus());
+ }
+}