You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by yu...@apache.org on 2021/01/22 11:07:13 UTC
[incubator-tubemq] branch TUBEMQ-421 updated: [TUBEMQ-530] create a
cluster in manager (#405)
This is an automated email from the ASF dual-hosted git repository.
yuanbo pushed a commit to branch TUBEMQ-421
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/TUBEMQ-421 by this push:
new b8b323d [TUBEMQ-530] create a cluster in manager (#405)
b8b323d is described below
commit b8b323d4ca7cbbc7529a6d5743ac55bcff1d9246
Author: EMsnap <zp...@connect.ust.hk>
AuthorDate: Fri Jan 22 19:07:02 2021 +0800
[TUBEMQ-530] create a cluster in manager (#405)
* [TUBEMQ-530] create a cluster in manager
* [TUBEMQ-530] create a cluster in manager
* [TUBEMQ-530] 1. delete broker info since nodeEntry only stores master node
2. add version into properties in pom.xml
3. make cluster name unique
---
tubemq-manager/pom.xml | 13 ++-
.../controller/cluster/ClusterController.java | 44 ++++++-
.../cluster/request/AddClusterReq.java} | 23 ++--
.../manager/controller/group/GroupController.java | 7 +-
.../manager/controller/node/NodeController.java | 27 +----
.../controller/topic/TopicWebController.java | 9 +-
.../entry/{NodeEntry.java => ClusterEntry.java} | 30 +++--
.../org/apache/tubemq/manager/entry/NodeEntry.java | 4 +-
.../apache/tubemq/manager/entry/TopicEntry.java | 2 +-
...{NodeRepository.java => ClusterRepository.java} | 19 ++--
.../tubemq/manager/repository/NodeRepository.java | 14 +++
.../tubemq/manager/service/ClusterServiceImpl.java | 74 ++++++++++++
.../{MasterService.java => MasterServiceImpl.java} | 38 +++----
.../tubemq/manager/service/NodeServiceImpl.java | 45 +++-----
.../tubemq/manager/service/TopicBackendWorker.java | 1 +
.../tubemq/manager/service/TopicServiceImpl.java | 14 +--
.../interfaces/ClusterService.java} | 23 ++--
.../manager/service/interfaces/MasterService.java | 73 ++++++++++++
.../service/{ => interfaces}/NodeService.java | 17 +--
.../service/{ => interfaces}/TopicService.java | 2 +-
.../service/tube/TubeHttpBrokerInfoList.java | 2 +-
.../service/tube/TubeHttpClusterInfoList.java | 97 ----------------
.../service/tube/TubeHttpGroupDetailInfo.java | 3 -
.../apache/tubemq/manager/utils/ConvertUtils.java | 14 +++
.../manager/controller/TestClusterController.java | 51 ++++++++-
.../manager/controller/TestNodeController.java | 126 ---------------------
26 files changed, 383 insertions(+), 389 deletions(-)
diff --git a/tubemq-manager/pom.xml b/tubemq-manager/pom.xml
index eaf220a..b5bf2d7 100644
--- a/tubemq-manager/pom.xml
+++ b/tubemq-manager/pom.xml
@@ -28,6 +28,11 @@
<name>Apache TubeMQ - Manager</name>
+ <properties>
+ <guava.version>21.0</guava.version>
+ <commons.version>4.3</commons.version>
+ </properties>
+
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
@@ -51,7 +56,13 @@
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
- <version>4.3</version>
+ <version>${commons.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
</dependency>
<dependency>
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/ClusterController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/ClusterController.java
index 6b2cf9f..d2fcc7a 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/ClusterController.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/ClusterController.java
@@ -17,17 +17,21 @@
package org.apache.tubemq.manager.controller.cluster;
+import static org.apache.tubemq.manager.service.MasterServiceImpl.TUBE_REQUEST_PATH;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.SCHEMA;
-import static org.apache.tubemq.manager.service.MasterService.*;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.SUCCESS_CODE;
+import static org.apache.tubemq.manager.utils.ConvertUtils.covertMapToQueryString;
import com.google.gson.Gson;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.tubemq.manager.controller.TubeMQResult;
+import org.apache.tubemq.manager.controller.cluster.request.AddClusterReq;
import org.apache.tubemq.manager.entry.NodeEntry;
import org.apache.tubemq.manager.repository.NodeRepository;
-import org.apache.tubemq.manager.service.MasterService;
+import org.apache.tubemq.manager.service.interfaces.ClusterService;
+import org.apache.tubemq.manager.service.interfaces.MasterService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.RequestBody;
@@ -48,9 +52,39 @@ public class ClusterController {
private NodeRepository nodeRepository;
@Autowired
+ private ClusterService clusterService;
+
+ @Autowired
private MasterService masterService;
/**
+ * add a new cluster, should provide a master node
+ */
+ @RequestMapping(value = "", method = RequestMethod.POST,
+ produces = MediaType.APPLICATION_JSON_VALUE)
+ public @ResponseBody TubeMQResult addNewCluster(
+ @RequestBody AddClusterReq req) {
+
+ // 1. validate params
+ if (req.getMasterIp() == null || req.getMasterWebPort() == null) {
+ return TubeMQResult.getErrorResult("please input master ip and webPort");
+ }
+ TubeMQResult checkResult = masterService.checkMasterNodeStatus(req.getMasterIp(), req.getMasterWebPort());
+ if (checkResult.getErrCode() != SUCCESS_CODE) {
+ return TubeMQResult.getErrorResult("please check master ip and webPort");
+ }
+
+ // 2. add cluster and master node
+ Boolean addSuccess = clusterService.addClusterAndMasterNode(req);
+
+ if (!addSuccess) {
+ return TubeMQResult.getErrorResult("add cluster and master fail");
+ }
+
+ return new TubeMQResult();
+ }
+
+ /**
* query cluster info
*/
@RequestMapping(value = "/query", method = RequestMethod.GET,
@@ -58,7 +92,7 @@ public class ClusterController {
public @ResponseBody String queryInfo(
@RequestParam Map<String, String> queryBody) throws Exception {
String url = masterService.getQueryUrl(queryBody);
- return queryMaster(url);
+ return masterService.queryMaster(url);
}
/**
@@ -78,7 +112,7 @@ public class ClusterController {
clusterId);
String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort()
+ "/" + TUBE_REQUEST_PATH + "?" + covertMapToQueryString(requestBody);
- return gson.toJson(requestMaster(url));
+ return gson.toJson(masterService.requestMaster(url));
} else {
TubeMQResult result = new TubeMQResult();
result.setErrCode(-1);
@@ -89,4 +123,6 @@ public class ClusterController {
}
+
+
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/NodeRepository.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/request/AddClusterReq.java
similarity index 62%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/NodeRepository.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/request/AddClusterReq.java
index 7a0cbb0..0ca052a 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/NodeRepository.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/request/AddClusterReq.java
@@ -15,20 +15,15 @@
* limitations under the License.
*/
-package org.apache.tubemq.manager.repository;
+package org.apache.tubemq.manager.controller.cluster.request;
-import org.apache.tubemq.manager.entry.NodeEntry;
-import org.springframework.data.jpa.repository.JpaRepository;
-import org.springframework.stereotype.Repository;
+import lombok.Data;
-import java.util.List;
-
-@Repository
-public interface NodeRepository extends JpaRepository<NodeEntry, Long> {
-
- NodeEntry findNodeEntryByClusterIdIsAndMasterIsTrue(int clusterId);
-
- List<NodeEntry> findNodeEntriesByClusterIdIs(int clusterId);
-
- List<NodeEntry> findAll();
+@Data
+public class AddClusterReq {
+ private String masterIp;
+ private String clusterName;
+ private Integer masterPort;
+ private Integer masterWebPort;
+ private String createUser;
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/GroupController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/GroupController.java
index 3f56342..7c588ee 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/GroupController.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/GroupController.java
@@ -22,7 +22,6 @@ package org.apache.tubemq.manager.controller.group;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.ADD;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.CLONE;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.DELETE;
-import static org.apache.tubemq.manager.service.MasterService.queryMaster;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.QUERY;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.REBALANCE_CONSUMER_GROUP;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.REBALANCE_CONSUMER;
@@ -40,8 +39,8 @@ import org.apache.tubemq.manager.controller.topic.request.BatchAddGroupAuthReq;
import org.apache.tubemq.manager.controller.topic.request.DeleteGroupReq;
import org.apache.tubemq.manager.controller.topic.request.RebalanceConsumerReq;
import org.apache.tubemq.manager.controller.topic.request.RebalanceGroupReq;
+import org.apache.tubemq.manager.service.interfaces.MasterService;
import org.apache.tubemq.manager.service.TopicServiceImpl;
-import org.apache.tubemq.manager.service.MasterService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
@@ -93,7 +92,7 @@ public class GroupController {
public @ResponseBody String queryConsumer(
@RequestParam Map<String, String> req) throws Exception {
String url = masterService.getQueryUrl(req);
- return queryMaster(url);
+ return masterService.queryMaster(url);
}
@@ -137,7 +136,7 @@ public class GroupController {
public @ResponseBody String queryBlackGroup(
@RequestParam Map<String, String> req) throws Exception {
String url = masterService.getQueryUrl(req);
- return queryMaster(url);
+ return masterService.queryMaster(url);
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java
index 26411fc..f26da51 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java
@@ -27,8 +27,8 @@ import org.apache.tubemq.manager.controller.node.request.DeleteBrokerReq;
import org.apache.tubemq.manager.controller.node.request.OnlineOfflineBrokerReq;
import org.apache.tubemq.manager.controller.node.request.ReloadBrokerReq;
import org.apache.tubemq.manager.repository.NodeRepository;
-import org.apache.tubemq.manager.service.NodeService;
-import org.apache.tubemq.manager.service.MasterService;
+import org.apache.tubemq.manager.service.interfaces.MasterService;
+import org.apache.tubemq.manager.service.interfaces.NodeService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
@@ -46,7 +46,6 @@ import static org.apache.tubemq.manager.service.TubeMQHttpConst.ONLINE;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.OP_QUERY;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.RELOAD;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.SET_READ_OR_WRITE;
-import static org.apache.tubemq.manager.service.MasterService.*;
@RestController
@RequestMapping(path = "/v1/node")
@@ -66,24 +65,6 @@ public class NodeController {
MasterService masterService;
/**
- * query brokers in certain cluster
- * @param type
- * @param method
- * @param clusterId
- * @return
- */
- @RequestMapping(value = "/query/clusterInfo", method = RequestMethod.GET,
- produces = MediaType.APPLICATION_JSON_VALUE)
- public @ResponseBody String queryInfo(@RequestParam String type, @RequestParam String method,
- @RequestParam(required = false) Integer clusterId) {
- if (method.equals(ADMIN_QUERY_CLUSTER_INFO) && type.equals(OP_QUERY)) {
- return nodeService.queryClusterInfo(clusterId);
- }
- return gson.toJson(getErrorResult(NO_SUCH_METHOD));
- }
-
-
- /**
* query brokers' run status
* this method supports batch operation
*/
@@ -92,7 +73,7 @@ public class NodeController {
public @ResponseBody String queryBrokerDetail(
@RequestParam Map<String, String> queryBody) throws Exception {
String url = masterService.getQueryUrl(queryBody);
- return queryMaster(url);
+ return masterService.queryMaster(url);
}
@@ -105,7 +86,7 @@ public class NodeController {
public @ResponseBody String queryBrokerConfig(
@RequestParam Map<String, String> queryBody) throws Exception {
String url = masterService.getQueryUrl(queryBody);
- return queryMaster(url);
+ return masterService.queryMaster(url);
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java
index b1f6341..7890846 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java
@@ -24,7 +24,6 @@ import static org.apache.tubemq.manager.service.TubeMQHttpConst.CLONE;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.DELETE;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.MODIFY;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.REMOVE;
-import static org.apache.tubemq.manager.service.MasterService.queryMaster;
import com.google.gson.Gson;
@@ -36,8 +35,8 @@ import org.apache.tubemq.manager.controller.node.request.CloneTopicReq;
import org.apache.tubemq.manager.controller.topic.request.DeleteTopicReq;
import org.apache.tubemq.manager.controller.topic.request.ModifyTopicReq;
import org.apache.tubemq.manager.controller.topic.request.SetAuthControlReq;
-import org.apache.tubemq.manager.service.NodeService;
-import org.apache.tubemq.manager.service.MasterService;
+import org.apache.tubemq.manager.service.interfaces.MasterService;
+import org.apache.tubemq.manager.service.interfaces.NodeService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestBody;
@@ -93,7 +92,7 @@ public class TopicWebController {
public @ResponseBody String queryConsumerAuth(
@RequestParam Map<String, String> req) throws Exception {
String url = masterService.getQueryUrl(req);
- return queryMaster(url);
+ return masterService.queryMaster(url);
}
/**
@@ -106,7 +105,7 @@ public class TopicWebController {
public @ResponseBody String queryTopicConfig(
@RequestParam Map<String, String> req) throws Exception {
String url = masterService.getQueryUrl(req);
- return queryMaster(url);
+ return masterService.queryMaster(url);
}
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/NodeEntry.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/ClusterEntry.java
similarity index 74%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/NodeEntry.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/ClusterEntry.java
index fb19232..0464ed7 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/NodeEntry.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/ClusterEntry.java
@@ -17,37 +17,33 @@
package org.apache.tubemq.manager.entry;
+
+import java.util.Date;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Table;
+import javax.persistence.UniqueConstraint;
import lombok.Data;
/**
- * node machine for tube cluster. broker/master/standby
+ * cluster machine for tube cluster. broker/master/standby
*/
@Entity
-@Table(name = "node")
+@Table(name = "cluster", uniqueConstraints=
+ @UniqueConstraint(columnNames={"clusterName"}))
@Data
-public class NodeEntry {
+public class ClusterEntry {
@Id
- @GeneratedValue(strategy= GenerationType.AUTO)
- private long brokerId;
-
- private boolean master;
-
- private boolean standby;
-
- private boolean broker;
-
- private String ip;
+ @GeneratedValue(strategy= GenerationType.IDENTITY)
+ private int clusterId;
- private int port;
+ private String clusterName;
- private int webPort;
+ private Date createTime;
- private int clusterId;
+ private Date modifyTime;
- private String clusterName;
+ private String createUser;
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/NodeEntry.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/NodeEntry.java
index fb19232..9be4493 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/NodeEntry.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/NodeEntry.java
@@ -32,8 +32,8 @@ import lombok.Data;
@Data
public class NodeEntry {
@Id
- @GeneratedValue(strategy= GenerationType.AUTO)
- private long brokerId;
+ @GeneratedValue(strategy= GenerationType.IDENTITY)
+ private long id;
private boolean master;
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/TopicEntry.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/TopicEntry.java
index 17b7711..767fbe9 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/TopicEntry.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/TopicEntry.java
@@ -37,7 +37,7 @@ import org.springframework.data.jpa.domain.support.AuditingEntityListener;
@EntityListeners(AuditingEntityListener.class) // support CreationTimestamp annotation
public class TopicEntry {
@Id
- @GeneratedValue(strategy=GenerationType.AUTO)
+ @GeneratedValue(strategy= GenerationType.IDENTITY)
private long businessId;
@Size(max = 30)
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/NodeRepository.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/ClusterRepository.java
similarity index 76%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/NodeRepository.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/ClusterRepository.java
index 7a0cbb0..6959052 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/NodeRepository.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/ClusterRepository.java
@@ -17,18 +17,17 @@
package org.apache.tubemq.manager.repository;
+import java.util.List;
+import org.apache.tubemq.manager.entry.ClusterEntry;
import org.apache.tubemq.manager.entry.NodeEntry;
import org.springframework.data.jpa.repository.JpaRepository;
-import org.springframework.stereotype.Repository;
-
-import java.util.List;
-
-@Repository
-public interface NodeRepository extends JpaRepository<NodeEntry, Long> {
-
- NodeEntry findNodeEntryByClusterIdIsAndMasterIsTrue(int clusterId);
- List<NodeEntry> findNodeEntriesByClusterIdIs(int clusterId);
+public interface ClusterRepository extends JpaRepository<ClusterEntry, Long> {
- List<NodeEntry> findAll();
+ /**
+ * find clusterEntry by clusterId
+ * @param clusterId
+ * @return
+ */
+ ClusterEntry findClusterEntryByClusterId(Integer clusterId);
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/NodeRepository.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/NodeRepository.java
index 7a0cbb0..c8b2eb9 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/NodeRepository.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/NodeRepository.java
@@ -26,9 +26,23 @@ import java.util.List;
@Repository
public interface NodeRepository extends JpaRepository<NodeEntry, Long> {
+ /**
+ * find master By clusterId
+ * @param clusterId
+ * @return
+ */
NodeEntry findNodeEntryByClusterIdIsAndMasterIsTrue(int clusterId);
+ /**
+ * find all nodes in cluster
+ * @param clusterId
+ * @return
+ */
List<NodeEntry> findNodeEntriesByClusterIdIs(int clusterId);
+ /**
+ * find all nodes
+ * @return
+ */
List<NodeEntry> findAll();
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/ClusterServiceImpl.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/ClusterServiceImpl.java
new file mode 100644
index 0000000..a16aea2
--- /dev/null
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/ClusterServiceImpl.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.tubemq.manager.service;
+
+import java.util.Date;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.tubemq.manager.controller.cluster.request.AddClusterReq;
+import org.apache.tubemq.manager.entry.ClusterEntry;
+import org.apache.tubemq.manager.entry.NodeEntry;
+import org.apache.tubemq.manager.repository.ClusterRepository;
+import org.apache.tubemq.manager.service.interfaces.ClusterService;
+import org.apache.tubemq.manager.service.interfaces.NodeService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+
+@Component
+@Slf4j
+public class ClusterServiceImpl implements ClusterService {
+
+ @Autowired
+ ClusterRepository clusterRepository;
+
+ @Autowired
+ NodeService nodeService;
+
+ @Override
+ public Boolean addClusterAndMasterNode(AddClusterReq req) {
+ ClusterEntry entry = new ClusterEntry();
+ entry.setCreateTime(new Date());
+ entry.setCreateUser(req.getCreateUser());
+ entry.setClusterName(req.getClusterName());
+ ClusterEntry retEntry = null;
+ try {
+ retEntry = clusterRepository.saveAndFlush(entry);
+ } catch (Exception e) {
+ log.error("create cluster fail with exception", e);
+ return false;
+ }
+ // add master node
+ return addMasterNode(req, retEntry);
+ }
+
+ private boolean addMasterNode(AddClusterReq req, ClusterEntry clusterEntry) {
+ if (clusterEntry == null) {
+ return false;
+ }
+ NodeEntry nodeEntry = new NodeEntry();
+ nodeEntry.setPort(req.getMasterPort());
+ nodeEntry.setMaster(true);
+ nodeEntry.setClusterId(clusterEntry.getClusterId());
+ nodeEntry.setWebPort(req.getMasterWebPort());
+ nodeEntry.setIp(req.getMasterIp());
+ nodeEntry.setBroker(false);
+ return nodeService.addNode(nodeEntry);
+ }
+
+}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/MasterService.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/MasterServiceImpl.java
similarity index 85%
rename from tubemq-manager/src/main/java/org/apache/tubemq/manager/service/MasterService.java
rename to tubemq-manager/src/main/java/org/apache/tubemq/manager/service/MasterServiceImpl.java
index 8f726e9..71758ed 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/MasterService.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/MasterServiceImpl.java
@@ -20,38 +20,34 @@ package org.apache.tubemq.manager.service;
import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.apache.tubemq.manager.controller.TubeMQResult;
-import org.apache.tubemq.manager.controller.node.request.AddTopicReq;
import org.apache.tubemq.manager.controller.node.request.BaseReq;
import org.apache.tubemq.manager.entry.NodeEntry;
import org.apache.tubemq.manager.repository.NodeRepository;
+import org.apache.tubemq.manager.service.interfaces.MasterService;
import org.apache.tubemq.manager.service.tube.TubeHttpResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import org.springframework.web.bind.annotation.RequestBody;
import java.io.InputStreamReader;
-import java.net.URLEncoder;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
-import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.tubemq.manager.controller.TubeMQResult.getErrorResult;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.BROKER_RUN_STATUS;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.SCHEMA;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.SUCCESS_CODE;
import static org.apache.tubemq.manager.utils.ConvertUtils.convertReqToQueryStr;
+import static org.apache.tubemq.manager.utils.ConvertUtils.covertMapToQueryString;
@Slf4j
@Component
-public class MasterService {
+public class MasterServiceImpl implements MasterService {
private static CloseableHttpClient httpclient = HttpClients.createDefault();
private static Gson gson = new Gson();
@@ -60,19 +56,8 @@ public class MasterService {
@Autowired
NodeRepository nodeRepository;
- public static String covertMapToQueryString(Map<String, String> requestMap) throws Exception {
- List<String> queryList = new ArrayList<>();
-
- for (Map.Entry<String, String> entry : requestMap.entrySet()) {
- queryList.add(entry.getKey() + "=" + URLEncoder.encode(
- entry.getValue(), UTF_8.toString()));
- }
- return StringUtils.join(queryList, "&");
- }
-
-
-
- public static TubeMQResult requestMaster(String url) {
+ @Override
+ public TubeMQResult requestMaster(String url) {
log.info("start to request {}", url);
HttpGet httpGet = new HttpGet(url);
@@ -99,7 +84,8 @@ public class MasterService {
* @param url
* @return query info
*/
- public static String queryMaster(String url) {
+ @Override
+ public String queryMaster(String url) {
log.info("start to request {}", url);
HttpGet httpGet = new HttpGet(url);
TubeMQResult defaultResult = new TubeMQResult();
@@ -116,6 +102,7 @@ public class MasterService {
}
+ @Override
public TubeMQResult baseRequestMaster(BaseReq req) {
if (req.getClusterId() == null) {
return TubeMQResult.getErrorResult("please input clusterId");
@@ -131,6 +118,7 @@ public class MasterService {
}
+ @Override
public NodeEntry getMasterNode(BaseReq req) {
if (req.getClusterId() == null) {
return null;
@@ -140,6 +128,7 @@ public class MasterService {
}
+ @Override
public String getQueryUrl(Map<String, String> queryBody) throws Exception {
int clusterId = Integer.parseInt(queryBody.get("clusterId"));
queryBody.remove("clusterId");
@@ -149,4 +138,9 @@ public class MasterService {
+ "/" + TUBE_REQUEST_PATH + "?" + covertMapToQueryString(queryBody);
}
+ @Override
+ public TubeMQResult checkMasterNodeStatus(String masterIp, Integer masterWebPort) {
+ String url = SCHEMA + masterIp + ":" + masterWebPort + BROKER_RUN_STATUS;
+ return requestMaster(url);
+ }
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeServiceImpl.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeServiceImpl.java
index 30b5adf..cb9536e 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeServiceImpl.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeServiceImpl.java
@@ -19,9 +19,7 @@ package org.apache.tubemq.manager.service;
import static org.apache.tubemq.manager.controller.node.request.AddBrokersReq.getAddBrokerReq;
-import static org.apache.tubemq.manager.service.MasterService.TUBE_REQUEST_PATH;
-import static org.apache.tubemq.manager.service.MasterService.queryMaster;
-import static org.apache.tubemq.manager.service.MasterService.requestMaster;
+import static org.apache.tubemq.manager.service.MasterServiceImpl.TUBE_REQUEST_PATH;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.ADD_TUBE_TOPIC;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.BROKER_RUN_STATUS;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.NO_SUCH_CLUSTER;
@@ -53,6 +51,9 @@ import org.apache.tubemq.manager.controller.node.request.CloneTopicReq;
import org.apache.tubemq.manager.controller.node.request.QueryBrokerCfgReq;
import org.apache.tubemq.manager.entry.NodeEntry;
import org.apache.tubemq.manager.repository.NodeRepository;
+import org.apache.tubemq.manager.service.interfaces.MasterService;
+import org.apache.tubemq.manager.service.interfaces.NodeService;
+import org.apache.tubemq.manager.service.interfaces.TopicService;
import org.apache.tubemq.manager.service.tube.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
@@ -208,7 +209,7 @@ public class NodeServiceImpl implements NodeService {
private BrokerStatusInfo getBrokerStatusInfo(QueryBrokerCfgReq queryReq, NodeEntry masterEntry) throws Exception {
String url = SCHEMA + masterEntry.getIp() + ":" + masterEntry.getWebPort()
+ "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(queryReq);
- BrokerStatusInfo brokerStatusInfo = gson.fromJson(queryMaster(url),
+ BrokerStatusInfo brokerStatusInfo = gson.fromJson(masterService.queryMaster(url),
BrokerStatusInfo.class);
return brokerStatusInfo;
}
@@ -217,7 +218,7 @@ public class NodeServiceImpl implements NodeService {
public TubeMQResult addTopicToBrokers(AddTopicReq req, NodeEntry masterEntry) throws Exception {
String url = SCHEMA + masterEntry.getIp() + ":" + masterEntry.getWebPort()
+ "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
- return requestMaster(url);
+ return masterService.requestMaster(url);
}
@@ -375,28 +376,6 @@ public class NodeServiceImpl implements NodeService {
}
@Override
- public String queryClusterInfo(Integer clusterId) {
- TubeHttpClusterInfoList clusterInfoList;
- try {
- // find all nodes by given clusterIds, show all nodes if clusterIds not provided
- List<NodeEntry> nodeEntries = clusterId == null ?
- nodeRepository.findAll() : nodeRepository.findNodeEntriesByClusterIdIs(clusterId);
- // divide all entries by clusterId
- Map<Integer, List<NodeEntry>> nodeEntriesPerCluster =
- nodeEntries.parallelStream().collect(Collectors.groupingBy(NodeEntry::getClusterId));
-
- clusterInfoList = TubeHttpClusterInfoList.getClusterInfoList(nodeEntriesPerCluster);
- } catch (Exception e) {
- log.error("query cluster info error", e);
- return gson.toJson(TubeMQResult.getErrorResult(""));
- }
-
- return gson.toJson(clusterInfoList);
- }
-
-
-
- @Override
public void close() throws IOException {
httpclient.close();
}
@@ -469,4 +448,16 @@ public class NodeServiceImpl implements NodeService {
}
return addTopicsToBrokers(masterEntry, req.getBrokerIds(), req.getAddTopicReqs());
}
+
+
+ @Override
+ public boolean addNode(NodeEntry nodeEntry) {
+ try {
+ nodeRepository.saveAndFlush(nodeEntry);
+ } catch (Exception e) {
+ log.error("create node error with exception", e);
+ return false;
+ }
+ return true;
+ }
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicBackendWorker.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicBackendWorker.java
index 7f2f910..7341a89 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicBackendWorker.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicBackendWorker.java
@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import org.apache.tubemq.manager.repository.TopicRepository;
+import org.apache.tubemq.manager.service.interfaces.NodeService;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicServiceImpl.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicServiceImpl.java
index e8af636..f0dba76 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicServiceImpl.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicServiceImpl.java
@@ -18,15 +18,13 @@
package org.apache.tubemq.manager.service;
-import static org.apache.tubemq.manager.service.MasterService.queryMaster;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.QUERY_GROUP_DETAIL_INFO;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.SCHEMA;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.SUCCESS_CODE;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.TOPIC_CONFIG_INFO;
import static org.apache.tubemq.manager.utils.ConvertUtils.convertReqToQueryStr;
import static org.apache.tubemq.manager.utils.ConvertUtils.convertToRebalanceConsumerReq;
-import static org.apache.tubemq.manager.service.MasterService.TUBE_REQUEST_PATH;
-import static org.apache.tubemq.manager.service.MasterService.requestMaster;
+import static org.apache.tubemq.manager.service.MasterServiceImpl.TUBE_REQUEST_PATH;
import com.google.gson.Gson;
import java.io.InputStreamReader;
@@ -47,6 +45,8 @@ import org.apache.tubemq.manager.controller.node.request.CloneOffsetReq;
import org.apache.tubemq.manager.controller.topic.request.RebalanceConsumerReq;
import org.apache.tubemq.manager.controller.topic.request.RebalanceGroupReq;
import org.apache.tubemq.manager.entry.NodeEntry;
+import org.apache.tubemq.manager.service.interfaces.MasterService;
+import org.apache.tubemq.manager.service.interfaces.TopicService;
import org.apache.tubemq.manager.service.tube.CleanOffsetResult;
import org.apache.tubemq.manager.service.tube.RebalanceGroupResult;
import org.apache.tubemq.manager.service.tube.TubeHttpGroupDetailInfo;
@@ -112,7 +112,7 @@ public class TopicServiceImpl implements TopicService {
String brokerIp = topicInfo.getBrokerIp();
String url = SCHEMA + brokerIp + ":" + brokerWebPort
+ "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
- result = requestMaster(url);
+ result = masterService.requestMaster(url);
if (result.getErrCode() != SUCCESS_CODE) {
return result;
}
@@ -160,7 +160,7 @@ public class TopicServiceImpl implements TopicService {
consumerId);
String url = SCHEMA + master.getIp() + ":" + master.getWebPort()
+ "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(rebalanceConsumerReq);
- TubeMQResult result = requestMaster(url);
+ TubeMQResult result = masterService.requestMaster(url);
if (result.getErrCode() != 0) {
rebalanceGroupResult.getFailConsumers().add(consumerId);
}
@@ -196,7 +196,7 @@ public class TopicServiceImpl implements TopicService {
String brokerIp = topicInfo.getBrokerIp();
String url = SCHEMA + brokerIp + ":" + brokerWebPort
+ "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
- result = requestMaster(url);
+ result = masterService.requestMaster(url);
if (result.getErrCode() != SUCCESS_CODE) {
cleanOffsetResult.getFailBrokers().add(brokerIp);
} else {
@@ -234,7 +234,7 @@ public class TopicServiceImpl implements TopicService {
String brokerIp = topicInfo.getBrokerIp();
String url = SCHEMA + brokerIp + ":" + brokerWebPort
+ "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
- OffsetQueryRes res = gson.fromJson(queryMaster(url), OffsetQueryRes.class);
+ OffsetQueryRes res = gson.fromJson(masterService.queryMaster(url), OffsetQueryRes.class);
if (res.getErrCode() != SUCCESS_CODE) {
return TubeMQResult.getErrorResult("query broker id" + topicInfo.getBrokerId() + " fail");
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/NodeRepository.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/interfaces/ClusterService.java
similarity index 62%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/NodeRepository.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/service/interfaces/ClusterService.java
index 7a0cbb0..8bc1abd 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/NodeRepository.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/interfaces/ClusterService.java
@@ -15,20 +15,19 @@
* limitations under the License.
*/
-package org.apache.tubemq.manager.repository;
+package org.apache.tubemq.manager.service.interfaces;
-import org.apache.tubemq.manager.entry.NodeEntry;
-import org.springframework.data.jpa.repository.JpaRepository;
-import org.springframework.stereotype.Repository;
-import java.util.List;
+import org.apache.tubemq.manager.controller.cluster.request.AddClusterReq;
+import org.springframework.stereotype.Component;
-@Repository
-public interface NodeRepository extends JpaRepository<NodeEntry, Long> {
+@Component
+public interface ClusterService {
- NodeEntry findNodeEntryByClusterIdIsAndMasterIsTrue(int clusterId);
-
- List<NodeEntry> findNodeEntriesByClusterIdIs(int clusterId);
-
- List<NodeEntry> findAll();
+ /**
+ * add cluster and the master node in the cluster
+ * @param req
+ * @return
+ */
+ Boolean addClusterAndMasterNode(AddClusterReq req);
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/interfaces/MasterService.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/interfaces/MasterService.java
new file mode 100644
index 0000000..340e6f0
--- /dev/null
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/interfaces/MasterService.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.tubemq.manager.service.interfaces;
+
+import java.util.Map;
+import org.apache.tubemq.manager.controller.TubeMQResult;
+import org.apache.tubemq.manager.controller.node.request.BaseReq;
+import org.apache.tubemq.manager.entry.NodeEntry;
+import org.springframework.stereotype.Component;
+
+@Component
+public interface MasterService {
+
+ /**
+ * request master with request url, return action result (success or fail)
+ * @param url
+ * @return
+ */
+ TubeMQResult requestMaster(String url);
+
+ /**
+ * query master with query url, return the information returned by master
+ * @param url
+ * @return
+ */
+ String queryMaster(String url);
+
+ /**
+ * request master with baseReq, return action result (success or fail)
+ * @param req
+ * @return
+ */
+ TubeMQResult baseRequestMaster(BaseReq req);
+
+ /**
+ * get the master node in the cluster
+ * @param req
+ * @return
+ */
+ NodeEntry getMasterNode(BaseReq req);
+
+ /**
+ * use queryBody to generate queryUrl for master query
+ * @param queryBody
+ * @return
+ * @throws Exception
+ */
+ String getQueryUrl(Map<String, String> queryBody) throws Exception;
+
+ /**
+ * check whether the master node is alive
+ * @param masterIp
+ * @param masterPort
+ * @return
+ */
+ TubeMQResult checkMasterNodeStatus(String masterIp, Integer masterPort);
+}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/interfaces/NodeService.java
similarity index 92%
rename from tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
rename to tubemq-manager/src/main/java/org/apache/tubemq/manager/service/interfaces/NodeService.java
index eebcf96..ddb1a59 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/interfaces/NodeService.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.tubemq.manager.service;
+package org.apache.tubemq.manager.service.interfaces;
import java.io.IOException;
import java.util.List;
@@ -27,6 +27,7 @@ import org.apache.tubemq.manager.controller.node.request.BatchAddTopicReq;
import org.apache.tubemq.manager.controller.node.request.CloneBrokersReq;
import org.apache.tubemq.manager.controller.node.request.CloneTopicReq;
import org.apache.tubemq.manager.entry.NodeEntry;
+import org.apache.tubemq.manager.service.TopicFuture;
import org.apache.tubemq.manager.service.tube.AddBrokerResult;
public interface NodeService {
@@ -65,13 +66,6 @@ public interface NodeService {
*/
void updateBrokerStatus(int clusterId, Map<String, TopicFuture> pendingTopic);
- /**
- * query cluster info
- * @param clusterId
- * @return
- */
- String queryClusterInfo(Integer clusterId);
-
void close() throws IOException;
/**
@@ -88,4 +82,11 @@ public interface NodeService {
* @return
*/
TubeMQResult batchAddTopic(BatchAddTopicReq req);
+
+ /**
+ * add one node to node repository
+ * @param nodeEntry
+ * @return
+ */
+ boolean addNode(NodeEntry nodeEntry);
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicService.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/interfaces/TopicService.java
similarity index 97%
rename from tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicService.java
rename to tubemq-manager/src/main/java/org/apache/tubemq/manager/service/interfaces/TopicService.java
index 7fd6f48..2b27945 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicService.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/interfaces/TopicService.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.tubemq.manager.service;
+package org.apache.tubemq.manager.service.interfaces;
import org.apache.tubemq.manager.controller.TubeMQResult;
import org.apache.tubemq.manager.controller.group.request.DeleteOffsetReq;
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpBrokerInfoList.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpBrokerInfoList.java
index 61a5bd9..7cf3fce 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpBrokerInfoList.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpBrokerInfoList.java
@@ -37,7 +37,7 @@ public class TubeHttpBrokerInfoList {
* json class for broker info.
*/
@Data
- private static class BrokerInfo {
+ public static class BrokerInfo {
private int brokerId;
private String brokerIp;
private int brokerPort;
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpClusterInfoList.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpClusterInfoList.java
deleted file mode 100644
index 4da3f04..0000000
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpClusterInfoList.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tubemq.manager.service.tube;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import org.apache.tubemq.manager.controller.TubeMQResult;
-import org.apache.tubemq.manager.entry.NodeEntry;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-@Data
-public class TubeHttpClusterInfoList extends TubeMQResult {
-
- private List<ClusterData> clusterData = new ArrayList<>();
-
- @Data
- @AllArgsConstructor
- public static class ClusterData {
-
- @Data
- public static class ClusterInfo {
-
- @Data
- public static class BrokerInfo {
- private int brokerId;
- private String brokerIp;
- }
-
- private String master;
- private List<String> standby = new ArrayList<>();
- private List<BrokerInfo> broker = new ArrayList<>();
-
- }
-
- private int clusterId;
- private String clusterName;
- private ClusterInfo clusterInfo;
-
- }
-
-
- public static TubeHttpClusterInfoList getClusterInfoList(Map<Integer, List<NodeEntry>> nodeEntriesPerCluster) {
- // for each cluster provide cluster information
- TubeHttpClusterInfoList clusterInfoList = new TubeHttpClusterInfoList();
- nodeEntriesPerCluster.forEach((id, entries) -> {
- ClusterData.ClusterInfo singleClusterInfo = getSingleClusterInfo(entries);
- ClusterData clusterData =
- new ClusterData(id, entries.get(0).getClusterName(), singleClusterInfo);
- clusterInfoList.getClusterData().add(clusterData);
- }
- );
- return clusterInfoList;
- }
-
- private static ClusterData.ClusterInfo getSingleClusterInfo(List<NodeEntry> entries) {
-
- TubeHttpClusterInfoList.ClusterData.ClusterInfo clusterInfo =
- new TubeHttpClusterInfoList.ClusterData.ClusterInfo();
-
- entries.forEach(nodeEntry -> {
- if (nodeEntry.isMaster()) {
- clusterInfo.setMaster(nodeEntry.getIp());
- }
- if (nodeEntry.isBroker()) {
- ClusterData.ClusterInfo.BrokerInfo brokerInfo =
- new ClusterData.ClusterInfo.BrokerInfo();
- brokerInfo.setBrokerId((int) nodeEntry.getBrokerId());
- brokerInfo.setBrokerIp(nodeEntry.getIp());
- clusterInfo.getBroker().add(brokerInfo);
- }
- if (nodeEntry.isStandby()) {
- clusterInfo.getStandby().add(nodeEntry.getIp());
- }
- });
-
- return clusterInfo;
- }
-
-
-}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpGroupDetailInfo.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpGroupDetailInfo.java
index 9834d98..29de567 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpGroupDetailInfo.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpGroupDetailInfo.java
@@ -19,10 +19,7 @@ package org.apache.tubemq.manager.service.tube;
import com.google.common.collect.Lists;
import java.util.List;
-import java.util.Map;
import lombok.Data;
-import org.apache.tubemq.manager.entry.NodeEntry;
-import org.apache.tubemq.manager.service.tube.TubeHttpClusterInfoList.ClusterData;
@Data
public class TubeHttpGroupDetailInfo {
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java
index f36c98d..9ed7c28 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java
@@ -18,6 +18,7 @@
package org.apache.tubemq.manager.utils;
import com.google.gson.Gson;
+import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@@ -89,4 +90,17 @@ public class ConvertUtils {
consumerReq.setMethod(REBALANCE_GROUP);
return consumerReq;
}
+
+
+ public static String covertMapToQueryString(Map<String, String> requestMap) throws Exception {
+ List<String> queryList = new ArrayList<>();
+
+ for (Map.Entry<String, String> entry : requestMap.entrySet()) {
+ queryList.add(entry.getKey() + "=" + URLEncoder.encode(
+ entry.getValue(), UTF_8.toString()));
+ }
+ return StringUtils.join(queryList, "&");
+ }
+
+
}
diff --git a/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestClusterController.java b/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestClusterController.java
index bd893a7..53e3306 100644
--- a/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestClusterController.java
+++ b/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestClusterController.java
@@ -18,19 +18,24 @@
package org.apache.tubemq.manager.controller;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.when;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
-import org.apache.tubemq.manager.controller.cluster.ClusterController;
+import org.apache.tubemq.manager.controller.cluster.request.AddClusterReq;
+import org.apache.tubemq.manager.entry.ClusterEntry;
import org.apache.tubemq.manager.entry.NodeEntry;
+import org.apache.tubemq.manager.repository.ClusterRepository;
import org.apache.tubemq.manager.repository.NodeRepository;
+import org.apache.tubemq.manager.service.interfaces.MasterService;
+import org.apache.tubemq.manager.service.interfaces.NodeService;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.mockito.InjectMocks;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
import org.springframework.boot.test.context.SpringBootTest;
@@ -52,8 +57,14 @@ public class TestClusterController {
@MockBean
private NodeRepository nodeRepository;
- @InjectMocks
- private ClusterController clusterController;
+ @MockBean
+ private ClusterRepository clusterRepository;
+
+ @MockBean
+ private NodeService nodeService;
+
+ @MockBean
+ private MasterService masterService;
@Autowired
private MockMvc mockMvc;
@@ -147,4 +158,36 @@ public class TestClusterController {
log.info("result json string is {}, response type is {}", resultStr,
result.getResponse().getContentType());
}
+
+
+ private ClusterEntry getOneClusterEntry() {
+ ClusterEntry clusterEntry = new ClusterEntry();
+ clusterEntry.setClusterId(1);
+ clusterEntry.setClusterName("test");
+ return clusterEntry;
+ }
+
+ @Test
+ public void testAddCluster() throws Exception {
+
+ AddClusterReq req = new AddClusterReq();
+ req.setClusterName("test");
+ req.setMasterIp("127.0.0.1");
+ req.setMasterWebPort(8080);
+ req.setMasterPort(8089);
+
+ ClusterEntry entry = getOneClusterEntry();
+ TubeMQResult successResult = new TubeMQResult();
+
+ when(clusterRepository.saveAndFlush(any(ClusterEntry.class))).thenReturn(entry);
+ when(nodeService.addNode(any(NodeEntry.class))).thenReturn(Boolean.TRUE);
+ when(masterService.checkMasterNodeStatus(anyString(), anyInt())).thenReturn(successResult);
+
+ RequestBuilder request = post("/v1/cluster")
+ .contentType(MediaType.APPLICATION_JSON).content(gson.toJson(req));
+ MvcResult result = mockMvc.perform(request).andReturn();
+ String resultStr = result.getResponse().getContentAsString();
+ String expectRes = "{\"errMsg\":\"\",\"errCode\":0,\"result\":true,\"data\":null}";
+ Assert.assertEquals(resultStr, expectRes);
+ }
}
diff --git a/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestNodeController.java b/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestNodeController.java
deleted file mode 100644
index 8bad33b..0000000
--- a/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestNodeController.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tubemq.manager.controller;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.tubemq.manager.entry.NodeEntry;
-import org.apache.tubemq.manager.repository.NodeRepository;
-import org.assertj.core.util.Lists;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.boot.test.mock.mockito.MockBean;
-import org.springframework.test.context.junit4.SpringRunner;
-import org.springframework.test.web.servlet.MockMvc;
-import org.springframework.test.web.servlet.MvcResult;
-import org.springframework.test.web.servlet.RequestBuilder;
-
-import java.util.List;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.when;
-import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
-import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
-
-@Slf4j
-@RunWith(SpringRunner.class)
-@SpringBootTest
-@AutoConfigureMockMvc
-public class TestNodeController {
-
- @MockBean
- private NodeRepository nodeRepository;
-
- @Autowired
- private MockMvc mockMvc;
-
- private List<NodeEntry> getOneNodeEntry() {
- NodeEntry nodeEntry = new NodeEntry();
- nodeEntry.setMaster(true);
- nodeEntry.setIp("127.0.0.1");
- nodeEntry.setWebPort(8014);
- nodeEntry.setClusterId(0);
- return Lists.newArrayList(nodeEntry);
- }
-
-
- private List<NodeEntry> getTwoNodeEntries() {
- NodeEntry nodeEntry1 = new NodeEntry();
- nodeEntry1.setMaster(true);
- nodeEntry1.setIp("127.0.0.1");
- nodeEntry1.setWebPort(8014);
- nodeEntry1.setClusterId(1);
-
- NodeEntry nodeEntry2 = new NodeEntry();
- nodeEntry2.setMaster(true);
- nodeEntry2.setIp("127.0.0.1");
- nodeEntry2.setWebPort(8014);
- nodeEntry2.setClusterId(2);
-
- return Lists.newArrayList(nodeEntry1, nodeEntry2);
- }
-
- private String expectedOneEntry =
- "{\"data\":[{\"clusterId\":0," +
- "\"clusterInfo\":{\"master\":\"127.0.0.1\"," +
- "\"standby\":[],\"broker\":[]}}],\"errMsg\":" +
- "\"\",\"errCode\":0,\"result\":true}";
-
-
- private String expectedTwoEntries =
- "{\"data\":[{\"clusterId\":1,\"clusterInfo\":" +
- "{\"master\":\"127.0.0.1\",\"standby\"" +
- ":[],\"broker\":[]}},{\"clusterId\":2," +
- "\"clusterInfo\":{\"master\":\"127.0.0.1\"," +
- "\"standby\":[],\"broker\":[]}}]," +
- "\"errMsg\":\"\",\"errCode\":0,\"result\":true}";
-
- @Test
- public void testClusterInfo() throws Exception {
- List<NodeEntry> nodeEntries = getOneNodeEntry();
- when(nodeRepository.findNodeEntriesByClusterIdIs(any(Integer.class)))
- .thenReturn(nodeEntries);
- RequestBuilder request = get("/v1/node/query?method=admin_query_cluster_info&" +
- "type=op_query&clusterId=1");
- MvcResult result = mockMvc.perform(request).andReturn();
- String resultStr = result.getResponse().getContentAsString();
- Assert.assertEquals(resultStr, expectedOneEntry);
- log.info("result json string is {}, response type is {}", resultStr,
- result.getResponse().getContentType());
- }
-
-
- @Test
- public void testClusterInfoTwoEntries() throws Exception {
- List<NodeEntry> nodeEntries = getTwoNodeEntries();
- when(nodeRepository.findNodeEntriesByClusterIdIs(any(Integer.class)))
- .thenReturn(nodeEntries);
- RequestBuilder request = get("/v1/node/query?method=admin_query_cluster_info&" +
- "type=op_query&clusterId=1");
- MvcResult result = mockMvc.perform(request).andReturn();
- String resultStr = result.getResponse().getContentAsString();
- Assert.assertEquals(resultStr, expectedTwoEntries);
- log.info("result json string is {}, response type is {}", resultStr,
- result.getResponse().getContentType());
- }
-}