You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by yu...@apache.org on 2021/03/04 03:38:55 UTC
[incubator-tubemq] branch TUBEMQ-421 updated: [TUBEMQ-566] manage
brokers in region (#438)
This is an automated email from the ASF dual-hosted git repository.
yuanbo pushed a commit to branch TUBEMQ-421
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/TUBEMQ-421 by this push:
new d239d40 [TUBEMQ-566] manage brokers in region (#438)
d239d40 is described below
commit d239d40ae21239c4ace0a5d93fc27af7969602d4
Author: EMsnap <zp...@connect.ust.hk>
AuthorDate: Thu Mar 4 11:38:47 2021 +0800
[TUBEMQ-566] manage brokers in region (#438)
* [TUBEMQ-566] manage brokers in region
* [TUBEMQ-566] manage brokers in region
* [TUBEMQ-566] add unit test
---
.../tubemq/manager/controller/TubeMQResult.java | 16 +-
.../controller/cluster/ClusterController.java | 13 +-
.../manager/controller/group/GroupController.java | 8 +-
.../manager/controller/node/NodeController.java | 5 +-
.../region/CreateRegionReq.java} | 16 +-
.../region/DeleteRegionReq.java} | 14 +-
.../region/ModifyRegionReq.java} | 16 +-
.../region/QueryRegionReq.java} | 13 +-
.../controller/region/RegionController.java | 121 ++++++++++++++
.../controller/topic/TopicWebController.java | 3 +-
.../entry/{ClusterEntry.java => BrokerEntry.java} | 29 +++-
.../apache/tubemq/manager/entry/ClusterEntry.java | 2 +-
.../org/apache/tubemq/manager/entry/NodeEntry.java | 2 +-
.../entry/{ClusterEntry.java => RegionEntry.java} | 40 +++--
...lusterRepository.java => BrokerRepository.java} | 22 +--
.../manager/repository/ClusterRepository.java | 2 +-
.../tubemq/manager/repository/NodeRepository.java | 4 +-
.../{NodeRepository.java => RegionRepository.java} | 34 ++--
.../tubemq/manager/service/BrokerServiceImpl.java | 74 +++++++++
.../tubemq/manager/service/ClusterServiceImpl.java | 2 +-
.../tubemq/manager/service/MasterServiceImpl.java | 10 +-
.../tubemq/manager/service/NodeServiceImpl.java | 9 +-
.../tubemq/manager/service/RegionServiceImpl.java | 184 +++++++++++++++++++++
.../tubemq/manager/service/TopicServiceImpl.java | 14 +-
...BrokerStatusInfo.java => TubeMQErrorConst.java} | 17 +-
.../tubemq/manager/service/TubeMQHttpConst.java | 2 +-
.../{ClusterService.java => BrokerService.java} | 35 ++--
.../manager/service/interfaces/ClusterService.java | 2 +-
.../{ClusterService.java => RegionService.java} | 39 +++--
.../manager/service/tube/BrokerStatusInfo.java | 4 +-
.../ValidateUtils.java} | 21 ++-
.../manager/service/broker/TestBrokerService.java | 70 ++++++++
.../manager/service/region/TestRegionService.java | 100 +++++++++++
33 files changed, 759 insertions(+), 184 deletions(-)
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/TubeMQResult.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/TubeMQResult.java
index d8e033e..e58f9aa 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/TubeMQResult.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/TubeMQResult.java
@@ -17,6 +17,7 @@
package org.apache.tubemq.manager.controller;
+import com.google.gson.Gson;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
@@ -32,8 +33,21 @@ public class TubeMQResult {
private boolean result = true;
private String data;
- public static TubeMQResult getErrorResult(String errorMsg) {
+ private static Gson json = new Gson();
+
+ public static TubeMQResult errorResult(String errorMsg) {
return TubeMQResult.builder().errCode(-1)
.errMsg(errorMsg).result(false).data("").build();
}
+
+ public static TubeMQResult successResult() {
+ return TubeMQResult.builder().errCode(0)
+ .result(true).data("").build();
+ }
+
+ public static TubeMQResult successResult(Object data) {
+ return TubeMQResult.builder().errCode(0)
+ .result(true).data(json.toJson(data)).build();
+ }
+
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/ClusterController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/ClusterController.java
index 5d1ea75..49004ac 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/ClusterController.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/ClusterController.java
@@ -20,6 +20,7 @@ package org.apache.tubemq.manager.controller.cluster;
import static org.apache.tubemq.manager.service.MasterServiceImpl.TUBE_REQUEST_PATH;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.ADD;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.DELETE;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.NO_SUCH_METHOD;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.SCHEMA;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.SUCCESS_CODE;
import static org.apache.tubemq.manager.utils.ConvertUtils.covertMapToQueryString;
@@ -73,7 +74,7 @@ public class ClusterController {
case DELETE:
return deleteCluster(gson.fromJson(req, DeleteClusterReq.class));
default:
- return TubeMQResult.getErrorResult("no such method");
+ return TubeMQResult.errorResult(NO_SUCH_METHOD);
}
}
@@ -83,16 +84,16 @@ public class ClusterController {
public TubeMQResult addNewCluster(AddClusterReq req) {
// 1. validate params
if (req.getMasterIp() == null || req.getMasterWebPort() == null) {
- return TubeMQResult.getErrorResult("please input master ip and webPort");
+ return TubeMQResult.errorResult("please input master ip and webPort");
}
TubeMQResult checkResult = masterService.checkMasterNodeStatus(req.getMasterIp(), req.getMasterWebPort());
if (checkResult.getErrCode() != SUCCESS_CODE) {
- return TubeMQResult.getErrorResult("please check master ip and webPort");
+ return TubeMQResult.errorResult("please check master ip and webPort");
}
// 2. add cluster and master node
Boolean addSuccess = clusterService.addClusterAndMasterNode(req);
if (!addSuccess) {
- return TubeMQResult.getErrorResult("add cluster and master fail");
+ return TubeMQResult.errorResult("add cluster and master fail");
}
return new TubeMQResult();
}
@@ -114,7 +115,7 @@ public class ClusterController {
}
ClusterEntry clusterEntry = clusterService.getOneCluster(clusterId);
if (clusterEntry == null) {
- return TubeMQResult.getErrorResult("no such cluster with id " + clusterId);
+ return TubeMQResult.errorResult("no such cluster with id " + clusterId);
}
result.setData(gson.toJson(clusterEntry));
return result;
@@ -126,7 +127,7 @@ public class ClusterController {
public TubeMQResult deleteCluster(DeleteClusterReq req) {
// 1. validate params
if (req.getClusterId() == null || StringUtils.isEmpty(req.getModifyUser())) {
- return TubeMQResult.getErrorResult("please input clusterId and modifyUser");
+ return TubeMQResult.errorResult("please input clusterId and modifyUser");
}
// 2. delete cluster
clusterService.deleteCluster(req.getClusterId());
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/GroupController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/GroupController.java
index 7c588ee..26a492a 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/GroupController.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/GroupController.java
@@ -22,6 +22,8 @@ package org.apache.tubemq.manager.controller.group;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.ADD;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.CLONE;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.DELETE;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.NO_SUCH_CLUSTER;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.NO_SUCH_METHOD;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.QUERY;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.REBALANCE_CONSUMER_GROUP;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.REBALANCE_CONSUMER;
@@ -78,7 +80,7 @@ public class GroupController {
case REBALANCE_CONSUMER:
return masterService.baseRequestMaster(gson.fromJson(req, RebalanceConsumerReq.class));
default:
- return TubeMQResult.getErrorResult("no such method");
+ return TubeMQResult.errorResult(NO_SUCH_METHOD);
}
}
@@ -107,7 +109,7 @@ public class GroupController {
case QUERY:
return topicService.queryOffset(gson.fromJson(req, QueryOffsetReq.class));
default:
- return TubeMQResult.getErrorResult("no such method");
+ return TubeMQResult.errorResult(NO_SUCH_METHOD);
}
}
@@ -121,7 +123,7 @@ public class GroupController {
case DELETE:
return masterService.baseRequestMaster(gson.fromJson(req, DeleteBlackGroupReq.class));
default:
- return TubeMQResult.getErrorResult("no such method");
+ return TubeMQResult.errorResult(NO_SUCH_METHOD);
}
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java
index f26da51..53cfa5e 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java
@@ -35,15 +35,12 @@ import org.springframework.web.bind.annotation.*;
import java.util.Map;
-import static org.apache.tubemq.manager.controller.TubeMQResult.getErrorResult;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.ADD;
-import static org.apache.tubemq.manager.service.TubeMQHttpConst.ADMIN_QUERY_CLUSTER_INFO;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.CLONE;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.DELETE;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.NO_SUCH_METHOD;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.OFFLINE;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.ONLINE;
-import static org.apache.tubemq.manager.service.TubeMQHttpConst.OP_QUERY;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.RELOAD;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.SET_READ_OR_WRITE;
@@ -114,7 +111,7 @@ public class NodeController {
case SET_READ_OR_WRITE:
return masterService.baseRequestMaster(gson.fromJson(req, BrokerSetReadOrWriteReq.class));
default:
- return TubeMQResult.getErrorResult("no such method");
+ return TubeMQResult.errorResult(NO_SUCH_METHOD);
}
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/BrokerStatusInfo.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/region/CreateRegionReq.java
similarity index 76%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/BrokerStatusInfo.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/region/CreateRegionReq.java
index 5982ac6..7ab018f 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/BrokerStatusInfo.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/region/CreateRegionReq.java
@@ -15,16 +15,16 @@
* limitations under the License.
*/
-package org.apache.tubemq.manager.service.tube;
-import lombok.Data;
+package org.apache.tubemq.manager.controller.region;
-import java.util.List;
+import java.util.Set;
+import lombok.Data;
+import org.apache.tubemq.manager.entry.RegionEntry;
@Data
-public class BrokerStatusInfo {
- private int code;
- private String errMsg;
- // total broker configuration info list of brokers.
- private List<BrokerConf> data;
+public class CreateRegionReq {
+ private long clusterId;
+ private RegionEntry regionEntry;
+ private Set<Long> brokerIdSet;
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/BrokerStatusInfo.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/region/DeleteRegionReq.java
similarity index 77%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/BrokerStatusInfo.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/region/DeleteRegionReq.java
index 5982ac6..38254df 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/BrokerStatusInfo.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/region/DeleteRegionReq.java
@@ -15,16 +15,14 @@
* limitations under the License.
*/
-package org.apache.tubemq.manager.service.tube;
-import lombok.Data;
+package org.apache.tubemq.manager.controller.region;
+
-import java.util.List;
+import lombok.Data;
@Data
-public class BrokerStatusInfo {
- private int code;
- private String errMsg;
- // total broker configuration info list of brokers.
- private List<BrokerConf> data;
+public class DeleteRegionReq {
+ private long regionId;
+ private long clusterId;
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/BrokerStatusInfo.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/region/ModifyRegionReq.java
similarity index 76%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/BrokerStatusInfo.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/region/ModifyRegionReq.java
index 5982ac6..e9fb01a 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/BrokerStatusInfo.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/region/ModifyRegionReq.java
@@ -15,16 +15,16 @@
* limitations under the License.
*/
-package org.apache.tubemq.manager.service.tube;
-import lombok.Data;
+package org.apache.tubemq.manager.controller.region;
-import java.util.List;
+import java.util.Set;
+import lombok.Data;
+import org.apache.tubemq.manager.entry.RegionEntry;
@Data
-public class BrokerStatusInfo {
- private int code;
- private String errMsg;
- // total broker configuration info list of brokers.
- private List<BrokerConf> data;
+public class ModifyRegionReq {
+ private RegionEntry regionEntry;
+ private Set<Long> brokerIdSet;
+ private long clusterId;
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/BrokerStatusInfo.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/region/QueryRegionReq.java
similarity index 77%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/BrokerStatusInfo.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/region/QueryRegionReq.java
index 5982ac6..51ace38 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/BrokerStatusInfo.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/region/QueryRegionReq.java
@@ -14,17 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-package org.apache.tubemq.manager.service.tube;
+package org.apache.tubemq.manager.controller.region;
import lombok.Data;
-import java.util.List;
-
@Data
-public class BrokerStatusInfo {
- private int code;
- private String errMsg;
- // total broker configuration info list of brokers.
- private List<BrokerConf> data;
+public class QueryRegionReq {
+ private Long clusterId;
+ private Long regionId;
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/region/RegionController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/region/RegionController.java
new file mode 100644
index 0000000..c5e5459
--- /dev/null
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/region/RegionController.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.manager.controller.region;
+
+import static org.apache.tubemq.manager.service.TubeMQErrorConst.PARAM_ILLEGAL;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.ADD;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.DELETE;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.MODIFY;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.NO_SUCH_CLUSTER;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.NO_SUCH_METHOD;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.QUERY;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import java.util.ArrayList;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.tubemq.manager.controller.TubeMQResult;
+import org.apache.tubemq.manager.entry.ClusterEntry;
+import org.apache.tubemq.manager.entry.RegionEntry;
+import org.apache.tubemq.manager.service.interfaces.ClusterService;
+import org.apache.tubemq.manager.service.interfaces.RegionService;
+import org.apache.tubemq.manager.utils.ValidateUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.ResponseBody;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+@RequestMapping(path = "/v1/region")
+@Slf4j
+public class RegionController {
+ private final Gson gson = new GsonBuilder().serializeNulls().create();
+
+ @Autowired
+ RegionService regionService;
+
+ @Autowired
+ ClusterService clusterService;
+
+ /**
+ * broker method proxy
+ * divides the operation on broker to different method
+ */
+ @RequestMapping(value = "")
+ public @ResponseBody
+ TubeMQResult brokerMethodProxy(
+ @RequestParam String method, @RequestBody String req) {
+ switch (method) {
+ case ADD:
+ return createNewRegion(gson.fromJson(req, CreateRegionReq.class));
+ case DELETE:
+ return deleteRegion(gson.fromJson(req, DeleteRegionReq.class));
+ case MODIFY:
+ return modifyRegion(gson.fromJson(req, ModifyRegionReq.class));
+ case QUERY:
+ return queryRegion(gson.fromJson(req, QueryRegionReq.class));
+ default:
+ return TubeMQResult.errorResult(NO_SUCH_METHOD);
+ }
+ }
+
+ private TubeMQResult queryRegion(QueryRegionReq req) {
+ if (ValidateUtils.isNull(req.getClusterId())) {
+ return TubeMQResult.errorResult(PARAM_ILLEGAL);
+ }
+
+ List<RegionEntry> regionEntries = regionService
+ .queryRegion(req.getRegionId(), req.getClusterId());
+
+ return TubeMQResult.successResult(regionEntries);
+ }
+
+ private TubeMQResult deleteRegion(DeleteRegionReq req) {
+ return regionService.deleteRegion(req.getRegionId(), req.getClusterId());
+ }
+
+ private TubeMQResult createNewRegion(CreateRegionReq req) {
+ RegionEntry regionEntry = req.getRegionEntry();
+ if (ValidateUtils.isNull(regionEntry) || !regionEntry.legal() ||
+ ValidateUtils.isNull(req.getBrokerIdSet())) {
+ return TubeMQResult.errorResult(PARAM_ILLEGAL);
+ }
+ ClusterEntry clusterEntry = clusterService.getOneCluster(
+ req.getClusterId());
+ if (clusterEntry == null) {
+ return TubeMQResult.errorResult(NO_SUCH_CLUSTER);
+ }
+ List<Long> brokerList = new ArrayList<>(req.getBrokerIdSet());
+ return regionService.createNewRegion(regionEntry, brokerList);
+ }
+
+ private TubeMQResult modifyRegion(ModifyRegionReq req) {
+ RegionEntry regionEntry = req.getRegionEntry();
+ if (!regionEntry.legal() || ValidateUtils.isNull(regionEntry.getClusterId())) {
+ return TubeMQResult.errorResult(PARAM_ILLEGAL);
+ }
+ List<Long> brokerList = new ArrayList<>(req.getBrokerIdSet());
+ return regionService.updateRegion(req.getRegionEntry(),
+ brokerList, req.getClusterId());
+ }
+
+
+}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java
index 7890846..27e8b5c 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java
@@ -23,6 +23,7 @@ import static org.apache.tubemq.manager.service.TubeMQHttpConst.AUTH_CONTROL;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.CLONE;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.DELETE;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.MODIFY;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.NO_SUCH_METHOD;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.REMOVE;
@@ -78,7 +79,7 @@ public class TopicWebController {
case REMOVE:
return masterService.baseRequestMaster(gson.fromJson(req, DeleteTopicReq.class));
default:
- return TubeMQResult.getErrorResult("no such method");
+ return TubeMQResult.errorResult(NO_SUCH_METHOD);
}
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/ClusterEntry.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/BrokerEntry.java
similarity index 69%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/ClusterEntry.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/BrokerEntry.java
index 0464ed7..c7008d4 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/ClusterEntry.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/BrokerEntry.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,35 +15,46 @@
* limitations under the License.
*/
+
package org.apache.tubemq.manager.entry;
import java.util.Date;
import javax.persistence.Entity;
+import javax.persistence.EntityListeners;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Table;
import javax.persistence.UniqueConstraint;
import lombok.Data;
+import org.springframework.data.annotation.CreatedDate;
+import org.springframework.data.annotation.LastModifiedDate;
+import org.springframework.data.jpa.domain.support.AuditingEntityListener;
-/**
- * cluster machine for tube cluster. broker/master/standby
- */
@Entity
-@Table(name = "cluster", uniqueConstraints=
- @UniqueConstraint(columnNames={"clusterName"}))
+@Table(name = "broker", uniqueConstraints=
+@UniqueConstraint(columnNames={"brokerId"}))
@Data
-public class ClusterEntry {
+@EntityListeners(AuditingEntityListener.class)
+public class BrokerEntry {
@Id
@GeneratedValue(strategy= GenerationType.IDENTITY)
- private int clusterId;
+ private Long id;
+
+ private Long brokerId;
- private String clusterName;
+ private long brokerIp;
+ @CreatedDate
private Date createTime;
+ @LastModifiedDate
private Date modifyTime;
private String createUser;
+
+ private Long regionId;
+
+ private Long clusterId;
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/ClusterEntry.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/ClusterEntry.java
index 0464ed7..3a25131 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/ClusterEntry.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/ClusterEntry.java
@@ -37,7 +37,7 @@ import lombok.Data;
public class ClusterEntry {
@Id
@GeneratedValue(strategy= GenerationType.IDENTITY)
- private int clusterId;
+ private long clusterId;
private String clusterName;
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/NodeEntry.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/NodeEntry.java
index 9be4493..ca7231c 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/NodeEntry.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/NodeEntry.java
@@ -47,7 +47,7 @@ public class NodeEntry {
private int webPort;
- private int clusterId;
+ private long clusterId;
private String clusterName;
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/ClusterEntry.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/RegionEntry.java
similarity index 59%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/ClusterEntry.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/RegionEntry.java
index 0464ed7..2f78f22 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/ClusterEntry.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/RegionEntry.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -20,30 +20,48 @@ package org.apache.tubemq.manager.entry;
import java.util.Date;
import javax.persistence.Entity;
+import javax.persistence.EntityListeners;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Table;
import javax.persistence.UniqueConstraint;
import lombok.Data;
+import org.apache.tubemq.manager.utils.ValidateUtils;
+import org.springframework.data.annotation.CreatedDate;
+import org.springframework.data.annotation.LastModifiedDate;
+import org.springframework.data.jpa.domain.support.AuditingEntityListener;
-/**
- * cluster machine for tube cluster. broker/master/standby
- */
@Entity
-@Table(name = "cluster", uniqueConstraints=
- @UniqueConstraint(columnNames={"clusterName"}))
+@Table(name = "region", uniqueConstraints=
+ {
+ @UniqueConstraint(columnNames={"clusterId", "regionId"}),
+ @UniqueConstraint(columnNames={"id"})
+ })
@Data
-public class ClusterEntry {
+@EntityListeners(AuditingEntityListener.class)
+public class RegionEntry {
@Id
@GeneratedValue(strategy= GenerationType.IDENTITY)
- private int clusterId;
+ private Long id;
+
+ private Long regionId;
- private String clusterName;
+ private String name;
- private Date createTime;
+ @CreatedDate
+ private Date createDate;
- private Date modifyTime;
+ @LastModifiedDate
+ private Date modifyDate;
private String createUser;
+
+ private String modifyUser;
+
+ private Long clusterId;
+
+ public boolean legal() {
+ return !ValidateUtils.isNull(clusterId);
+ }
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/ClusterRepository.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/BrokerRepository.java
similarity index 69%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/ClusterRepository.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/BrokerRepository.java
index 25b67d7..0c80ef2 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/ClusterRepository.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/BrokerRepository.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -17,24 +17,16 @@
package org.apache.tubemq.manager.repository;
+import com.sun.corba.se.pept.broker.Broker;
import java.util.List;
+import org.apache.tubemq.manager.entry.BrokerEntry;
import org.apache.tubemq.manager.entry.ClusterEntry;
-import org.apache.tubemq.manager.entry.NodeEntry;
import org.springframework.data.jpa.repository.JpaRepository;
-public interface ClusterRepository extends JpaRepository<ClusterEntry, Long> {
+public interface BrokerRepository extends JpaRepository<BrokerEntry, Long> {
- /**
- * find clusterEntry by clusterId
- * @param clusterId
- * @return
- */
- ClusterEntry findClusterEntryByClusterId(Integer clusterId);
+ List<BrokerEntry> findBrokerEntryByBrokerIdInAndClusterIdEquals(List<Long> brokerIds, long clusterId);
+
+ List<BrokerEntry> findBrokerEntriesByRegionIdEqualsAndClusterIdEquals(Long regionId, long clusterId);
- /**
- * delete cluster by cluster id
- * @param clusterId
- * @return
- */
- Integer deleteByClusterId(Integer clusterId);
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/ClusterRepository.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/ClusterRepository.java
index 25b67d7..bf62c05 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/ClusterRepository.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/ClusterRepository.java
@@ -29,7 +29,7 @@ public interface ClusterRepository extends JpaRepository<ClusterEntry, Long> {
* @param clusterId
* @return
*/
- ClusterEntry findClusterEntryByClusterId(Integer clusterId);
+ ClusterEntry findClusterEntryByClusterId(long clusterId);
/**
* delete cluster by cluster id
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/NodeRepository.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/NodeRepository.java
index c8b2eb9..d1d7d14 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/NodeRepository.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/NodeRepository.java
@@ -31,14 +31,14 @@ public interface NodeRepository extends JpaRepository<NodeEntry, Long> {
* @param clusterId
* @return
*/
- NodeEntry findNodeEntryByClusterIdIsAndMasterIsTrue(int clusterId);
+ NodeEntry findNodeEntryByClusterIdIsAndMasterIsTrue(long clusterId);
/**
* find all nodes in cluster
* @param clusterId
* @return
*/
- List<NodeEntry> findNodeEntriesByClusterIdIs(int clusterId);
+ List<NodeEntry> findNodeEntriesByClusterIdIs(long clusterId);
/**
* find all nodes
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/NodeRepository.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/RegionRepository.java
similarity index 62%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/NodeRepository.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/RegionRepository.java
index c8b2eb9..b07fdf9 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/NodeRepository.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/repository/RegionRepository.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -17,32 +17,22 @@
package org.apache.tubemq.manager.repository;
-import org.apache.tubemq.manager.entry.NodeEntry;
+import java.util.List;
+import org.apache.tubemq.manager.entry.RegionEntry;
import org.springframework.data.jpa.repository.JpaRepository;
+import org.springframework.data.jpa.repository.Modifying;
+import org.springframework.data.jpa.repository.Query;
import org.springframework.stereotype.Repository;
-import java.util.List;
-
@Repository
-public interface NodeRepository extends JpaRepository<NodeEntry, Long> {
+public interface RegionRepository extends JpaRepository<RegionEntry, Long> {
+
+ List<RegionEntry> findRegionEntriesByClusterIdEquals(long clusterId);
- /**
- * find master By clusterId
- * @param clusterId
- * @return
- */
- NodeEntry findNodeEntryByClusterIdIsAndMasterIsTrue(int clusterId);
+ List<RegionEntry> findRegionEntriesByClusterIdEqualsAndRegionIdEquals(long clusterId, long regionId);
- /**
- * find all nodes in cluster
- * @param clusterId
- * @return
- */
- List<NodeEntry> findNodeEntriesByClusterIdIs(int clusterId);
+ @Modifying
+ @Query(value = "DELETE FROM Region WHERE region_id = ?1 AND cluster_id = ?2",nativeQuery = true)
+ void deleteRegion(long regionId, long clusterId);
- /**
- * find all nodes
- * @return
- */
- List<NodeEntry> findAll();
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/BrokerServiceImpl.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/BrokerServiceImpl.java
new file mode 100644
index 0000000..02ad291
--- /dev/null
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/BrokerServiceImpl.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.tubemq.manager.service;
+
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.DEFAULT_REGION;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.tubemq.manager.entry.BrokerEntry;
+import org.apache.tubemq.manager.repository.BrokerRepository;
+import org.apache.tubemq.manager.service.interfaces.BrokerService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class BrokerServiceImpl implements BrokerService {
+
+ @Autowired
+ BrokerRepository brokerRepository;
+
+ @Override
+ public void resetBrokerRegions(long regionId, long clusterId) {
+ List<BrokerEntry> brokerEntries = brokerRepository
+ .findBrokerEntriesByRegionIdEqualsAndClusterIdEquals(regionId, clusterId);
+ for (BrokerEntry brokerEntry : brokerEntries) {
+ brokerEntry.setRegionId(DEFAULT_REGION);
+ brokerRepository.save(brokerEntry);
+ }
+ }
+
+ @Override
+ public void updateBrokersRegion(List<Long> brokerIdList, Long regionId, Long clusterId) {
+ List<BrokerEntry> brokerEntries = brokerRepository.
+ findBrokerEntryByBrokerIdInAndClusterIdEquals(brokerIdList, clusterId);
+ for (BrokerEntry brokerEntry : brokerEntries) {
+ brokerEntry.setRegionId(regionId);
+ brokerRepository.save(brokerEntry);
+ }
+ }
+
+ @Override
+ public boolean checkIfBrokersAllExsit(List<Long> brokerIdList, long clusterId) {
+ List<BrokerEntry> brokerEntries = brokerRepository
+ .findBrokerEntryByBrokerIdInAndClusterIdEquals(brokerIdList, clusterId);
+ List<Long> regionBrokerIdList = brokerEntries.stream().map(BrokerEntry::getBrokerId).collect(
+ Collectors.toList());
+ return regionBrokerIdList.containsAll(brokerIdList);
+ }
+
+ @Override
+ public List<Long> getBrokerIdListInRegion(long regionId, long clusterId) {
+ List<BrokerEntry> brokerEntries = brokerRepository
+ .findBrokerEntriesByRegionIdEqualsAndClusterIdEquals(regionId, clusterId);
+ List<Long> regionBrokerIdList = brokerEntries.stream().map(BrokerEntry::getBrokerId).collect(
+ Collectors.toList());
+ return regionBrokerIdList;
+ }
+}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/ClusterServiceImpl.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/ClusterServiceImpl.java
index 67ef3fb..5e7107f 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/ClusterServiceImpl.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/ClusterServiceImpl.java
@@ -71,7 +71,7 @@ public class ClusterServiceImpl implements ClusterService {
}
@Override
- public ClusterEntry getOneCluster(Integer clusterId) {
+ public ClusterEntry getOneCluster(long clusterId) {
return clusterRepository
.findClusterEntryByClusterId(clusterId);
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/MasterServiceImpl.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/MasterServiceImpl.java
index 71758ed..ed327b1 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/MasterServiceImpl.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/MasterServiceImpl.java
@@ -37,7 +37,7 @@ import org.springframework.stereotype.Component;
import java.io.InputStreamReader;
import java.util.Map;
-import static org.apache.tubemq.manager.controller.TubeMQResult.getErrorResult;
+import static org.apache.tubemq.manager.controller.TubeMQResult.errorResult;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.BROKER_RUN_STATUS;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.SCHEMA;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.SUCCESS_CODE;
@@ -70,11 +70,11 @@ public class MasterServiceImpl implements MasterService {
if (tubeResponse.getCode() == SUCCESS_CODE && tubeResponse.getErrCode() == SUCCESS_CODE) {
return defaultResult;
} else {
- defaultResult = getErrorResult(tubeResponse.getErrMsg());
+ defaultResult = errorResult(tubeResponse.getErrMsg());
}
} catch (Exception ex) {
log.error("exception caught while requesting broker status", ex);
- defaultResult = getErrorResult(ex.getMessage());
+ defaultResult = errorResult(ex.getMessage());
}
return defaultResult;
}
@@ -105,12 +105,12 @@ public class MasterServiceImpl implements MasterService {
@Override
public TubeMQResult baseRequestMaster(BaseReq req) {
if (req.getClusterId() == null) {
- return TubeMQResult.getErrorResult("please input clusterId");
+ return TubeMQResult.errorResult("please input clusterId");
}
NodeEntry masterEntry = nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(
req.getClusterId());
if (masterEntry == null) {
- return TubeMQResult.getErrorResult("no such cluster");
+ return TubeMQResult.errorResult("no such cluster");
}
String url = SCHEMA + masterEntry.getIp() + ":" + masterEntry.getWebPort()
+ "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeServiceImpl.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeServiceImpl.java
index cb9536e..36e27e4 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeServiceImpl.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeServiceImpl.java
@@ -33,7 +33,6 @@ import com.google.gson.Gson;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.*;
-import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
@@ -144,7 +143,7 @@ public class NodeServiceImpl implements NodeService {
// might have duplicate brokers
if (addBrokerResult.getErrCode() != SUCCESS_CODE) {
- return TubeMQResult.getErrorResult(addBrokerResult.getErrMsg());
+ return TubeMQResult.errorResult(addBrokerResult.getErrMsg());
}
List<Integer> brokerIds = getBrokerIds(addBrokerResult);
List<AddTopicReq> addTopicReqs = req.getAddTopicReqs();
@@ -407,13 +406,13 @@ public class NodeServiceImpl implements NodeService {
NodeEntry master = masterService.getMasterNode(req);
if (master == null) {
- return TubeMQResult.getErrorResult(NO_SUCH_CLUSTER);
+ return TubeMQResult.errorResult(NO_SUCH_CLUSTER);
}
// 1 query topic config
TubeHttpTopicInfoList topicInfoList = topicService.requestTopicConfigInfo(master, req.getSourceTopicName());
if (topicInfoList == null) {
- return TubeMQResult.getErrorResult("no such topic");
+ return TubeMQResult.errorResult("no such topic");
}
// 2 if there's no specific broker ids then clone to all of the brokers
@@ -444,7 +443,7 @@ public class NodeServiceImpl implements NodeService {
public TubeMQResult batchAddTopic(BatchAddTopicReq req) {
NodeEntry masterEntry = masterService.getMasterNode(req);
if (masterEntry == null) {
- return TubeMQResult.getErrorResult(NO_SUCH_CLUSTER);
+ return TubeMQResult.errorResult(NO_SUCH_CLUSTER);
}
return addTopicsToBrokers(masterEntry, req.getBrokerIds(), req.getAddTopicReqs());
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/RegionServiceImpl.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/RegionServiceImpl.java
new file mode 100644
index 0000000..d70cb05
--- /dev/null
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/RegionServiceImpl.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.manager.service;
+
+import static org.apache.tubemq.manager.service.TubeMQErrorConst.MYSQL_ERROR;
+import static org.apache.tubemq.manager.service.TubeMQErrorConst.PARAM_ILLEGAL;
+import static org.apache.tubemq.manager.service.TubeMQErrorConst.RESOURCE_ALREADY_USED;
+import static org.apache.tubemq.manager.service.TubeMQErrorConst.RESOURCE_NOT_EXIST;
+
+import java.util.List;
+import javax.transaction.Transactional;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.tubemq.manager.controller.TubeMQResult;
+import org.apache.tubemq.manager.entry.RegionEntry;
+import org.apache.tubemq.manager.repository.BrokerRepository;
+import org.apache.tubemq.manager.repository.NodeRepository;
+import org.apache.tubemq.manager.repository.RegionRepository;
+import org.apache.tubemq.manager.service.interfaces.BrokerService;
+import org.apache.tubemq.manager.service.interfaces.RegionService;
+import org.apache.tubemq.manager.utils.ValidateUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.dao.DataIntegrityViolationException;
+import org.springframework.stereotype.Component;
+
+
+@Slf4j
+@Component
+public class RegionServiceImpl implements RegionService {
+
+ public static final String DUPLICATE_ENTRY = "duplicate entry";
+ public static final String NO_SUCH_REGION = "no such region";
+ @Autowired
+ RegionRepository regionRepository;
+
+ @Autowired
+ BrokerRepository brokerRepository;
+
+ @Autowired
+ NodeRepository nodeRepository;
+
+ @Autowired
+ BrokerService brokerService;
+
+ @Override
+ public TubeMQResult createNewRegion(RegionEntry regionEntry,
+ List<Long> brokerIdList) {
+ try {
+ Long clusterId = regionEntry.getClusterId();
+ if (!brokerService.checkIfBrokersAllExsit(brokerIdList, clusterId)) {
+ return TubeMQResult.errorResult(RESOURCE_NOT_EXIST);
+ }
+ if (existBrokerIdAlreadyInRegion(clusterId, brokerIdList, null)) {
+ return TubeMQResult.errorResult(RESOURCE_ALREADY_USED);
+ }
+ regionRepository.save(regionEntry);
+ brokerService.updateBrokersRegion(brokerIdList, regionEntry.getRegionId(), clusterId);
+ } catch (DataIntegrityViolationException e) {
+ log.error("duplicate entry, newRegionDO:{}.", regionEntry, e);
+ return TubeMQResult.errorResult(DUPLICATE_ENTRY);
+ } catch (Exception e) {
+ log.error("create region failed, newRegionDO:{}.", regionEntry, e);
+ return TubeMQResult.errorResult(MYSQL_ERROR);
+ }
+ return TubeMQResult.successResult();
+ }
+
+ @Override
+ @Transactional(rollbackOn = Exception.class)
+ public TubeMQResult deleteRegion(long regionId, long clusterId) {
+ try {
+ regionRepository.deleteRegion(regionId, clusterId);
+ brokerService.resetBrokerRegions(regionId, clusterId);
+ } catch (Exception e) {
+ log.error("delete region failed, regionId:{}.", regionId, e);
+ throw new RuntimeException(MYSQL_ERROR);
+ }
+ return TubeMQResult.successResult();
+ }
+
+ private RegionEntry getRegionEntry(long clusterId, long regionId) {
+ List<RegionEntry> regionEntries = regionRepository
+ .findRegionEntriesByClusterIdEqualsAndRegionIdEquals(clusterId, regionId);
+ if (regionEntries.isEmpty()) {
+ return null;
+ }
+ if (regionEntries.size() > 1) {
+ throw new RuntimeException(DUPLICATE_ENTRY);
+ }
+ return regionEntries.get(0);
+ }
+
+ @Override
+ public TubeMQResult updateRegion(RegionEntry newRegionEntry, List<Long> brokerIdList, long clusterId) {
+ if (ValidateUtils.isNull(newRegionEntry) || ValidateUtils.isNull(newRegionEntry.getRegionId())) {
+ return TubeMQResult.errorResult(PARAM_ILLEGAL);
+ }
+ try {
+ RegionEntry oldRegionDO = getRegionEntry(clusterId, newRegionEntry.getRegionId());
+ if (ValidateUtils.isNull(oldRegionDO)) {
+ return TubeMQResult.errorResult(RESOURCE_NOT_EXIST);
+ }
+ // set id for update operation
+ newRegionEntry.setId(oldRegionDO.getId());
+ List<Long> oldBrokerList = brokerService.getBrokerIdListInRegion(oldRegionDO.getRegionId(),clusterId);
+ if (oldBrokerList.equals(brokerIdList)) {
+ // no change in broker list update directly
+ regionRepository.save(newRegionEntry);
+ return TubeMQResult.successResult();
+ }
+ if (existBrokerIdAlreadyInRegion(newRegionEntry.getClusterId(), brokerIdList,
+ newRegionEntry.getRegionId())) {
+ return TubeMQResult.errorResult(RESOURCE_ALREADY_USED);
+ }
+ handleUpdateRepo(newRegionEntry, brokerIdList, clusterId);
+ } catch (Exception e) {
+ log.error("update region failed, newRegionDO:{}", newRegionEntry, e);
+ return TubeMQResult.errorResult(MYSQL_ERROR);
+ }
+ return TubeMQResult.successResult();
+ }
+
+ @Transactional(rollbackOn = Exception.class)
+ public void handleUpdateRepo(RegionEntry newRegionEntry, List<Long> brokerIdList, long clusterId) {
+ regionRepository.save(newRegionEntry);
+ // reset brokers to default region
+ brokerService.resetBrokerRegions(newRegionEntry.getRegionId(), clusterId);
+ // update brokers to new region
+ brokerService.updateBrokersRegion(brokerIdList, newRegionEntry.getRegionId(),
+ newRegionEntry.getClusterId());
+ }
+
+ @Override
+ public List<RegionEntry> queryRegion(Long regionId, Long clusterId) {
+ if (ValidateUtils.isNull(regionId)) {
+ return regionRepository.findRegionEntriesByClusterIdEquals(clusterId);
+ }
+ return regionRepository
+ .findRegionEntriesByClusterIdEqualsAndRegionIdEquals(clusterId, regionId);
+ }
+
+
+ private boolean existBrokerIdAlreadyInRegion(Long clusterId, List<Long> newBrokerIdList, Long regionId) {
+ if (ValidateUtils.isNull(clusterId) || ValidateUtils.isEmptyList(newBrokerIdList)) {
+ return true;
+ }
+ List<RegionEntry> regionEntries = regionRepository.findRegionEntriesByClusterIdEquals(clusterId);
+ if (ValidateUtils.isEmptyList(regionEntries)) {
+ return false;
+ }
+ for (RegionEntry regionEntry : regionEntries) {
+ if (regionEntry.getRegionId().equals(regionId)) {
+ continue;
+ }
+ List<Long> regionBrokerIdList = brokerService.getBrokerIdListInRegion(regionEntry.getRegionId(), clusterId);
+ if (ValidateUtils.isEmptyList(regionBrokerIdList)) {
+ continue;
+ }
+ if (regionBrokerIdList.stream().anyMatch(newBrokerIdList::contains)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+
+
+
+
+}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicServiceImpl.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicServiceImpl.java
index f0dba76..9c60853 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicServiceImpl.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicServiceImpl.java
@@ -96,7 +96,7 @@ public class TopicServiceImpl implements TopicService {
NodeEntry master = masterService.getMasterNode(req);
if (master == null) {
- return TubeMQResult.getErrorResult("no such cluster");
+ return TubeMQResult.errorResult("no such cluster");
}
// 1. query the corresponding brokers having given topic
TubeHttpTopicInfoList topicInfoList = requestTopicConfigInfo(master, req.getTopicName());
@@ -146,7 +146,7 @@ public class TopicServiceImpl implements TopicService {
NodeEntry master = masterService.getMasterNode(req);
if (master == null) {
- return TubeMQResult.getErrorResult("no such cluster");
+ return TubeMQResult.errorResult("no such cluster");
}
// 1. get all consumer ids in group
@@ -179,7 +179,7 @@ public class TopicServiceImpl implements TopicService {
NodeEntry master = masterService.getMasterNode(req);
if (master == null) {
- return TubeMQResult.getErrorResult("no such cluster");
+ return TubeMQResult.errorResult("no such cluster");
}
// 1. query the corresponding brokers having given topic
@@ -187,7 +187,7 @@ public class TopicServiceImpl implements TopicService {
TubeMQResult result = new TubeMQResult();
CleanOffsetResult cleanOffsetResult = new CleanOffsetResult();
if (topicInfoList == null) {
- return TubeMQResult.getErrorResult("no such topic");
+ return TubeMQResult.errorResult("no such topic");
}
List<TopicInfo> topicInfos = topicInfoList.getTopicInfo();
@@ -214,14 +214,14 @@ public class TopicServiceImpl implements TopicService {
NodeEntry master = masterService.getMasterNode(req);
if (master == null) {
- return TubeMQResult.getErrorResult("no such cluster");
+ return TubeMQResult.errorResult("no such cluster");
}
// 1. query the corresponding brokers having given topic
TubeHttpTopicInfoList topicInfoList = requestTopicConfigInfo(master, req.getTopicName());
TubeMQResult result = new TubeMQResult();
if (topicInfoList == null) {
- return TubeMQResult.getErrorResult("no such topic");
+ return TubeMQResult.errorResult("no such topic");
}
List<TopicInfo> topicInfos = topicInfoList.getTopicInfo();
@@ -236,7 +236,7 @@ public class TopicServiceImpl implements TopicService {
+ "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
OffsetQueryRes res = gson.fromJson(masterService.queryMaster(url), OffsetQueryRes.class);
if (res.getErrCode() != SUCCESS_CODE) {
- return TubeMQResult.getErrorResult("query broker id" + topicInfo.getBrokerId() + " fail");
+ return TubeMQResult.errorResult("query broker id" + topicInfo.getBrokerId() + " fail");
}
generateOffsetInfo(offsetPerBroker, topicInfo, res);
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/BrokerStatusInfo.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TubeMQErrorConst.java
similarity index 69%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/BrokerStatusInfo.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TubeMQErrorConst.java
index 5982ac6..5d191d8 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/BrokerStatusInfo.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TubeMQErrorConst.java
@@ -15,16 +15,11 @@
* limitations under the License.
*/
-package org.apache.tubemq.manager.service.tube;
+package org.apache.tubemq.manager.service;
-import lombok.Data;
-
-import java.util.List;
-
-@Data
-public class BrokerStatusInfo {
- private int code;
- private String errMsg;
- // total broker configuration info list of brokers.
- private List<BrokerConf> data;
+public class TubeMQErrorConst {
+ public static final String PARAM_ILLEGAL = "param illegal";
+ public static final String RESOURCE_ALREADY_USED = "resource already used";
+ public static final String RESOURCE_NOT_EXIST = "resource not exsit";
+ public static final String MYSQL_ERROR = "mysql error";
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TubeMQHttpConst.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TubeMQHttpConst.java
index 68d7f22..9976711 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TubeMQHttpConst.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TubeMQHttpConst.java
@@ -41,7 +41,6 @@ public class TubeMQHttpConst {
public static final String DELETE = "delete";
public static final String REMOVE = "remove";
public static final String NO_SUCH_METHOD = "no such method";
- public static final String ADMIN_QUERY_CLUSTER_INFO = "admin_query_cluster_info";
public static final String CLONE = "clone";
public static final String ADD = "add";
public static final String ONLINE = "online";
@@ -54,4 +53,5 @@ public class TubeMQHttpConst {
public static final Integer SUCCESS_CODE = 0;
public static final Integer DELETE_FAIL = 0;
public static final String QUERY = "query";
+ public static final Long DEFAULT_REGION = 0L;
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/interfaces/ClusterService.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/interfaces/BrokerService.java
similarity index 58%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/service/interfaces/ClusterService.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/service/interfaces/BrokerService.java
index 1aeb2b4..de52607 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/interfaces/ClusterService.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/interfaces/BrokerService.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -17,38 +17,39 @@
package org.apache.tubemq.manager.service.interfaces;
-
import java.util.List;
-import org.apache.tubemq.manager.controller.cluster.request.AddClusterReq;
-import org.apache.tubemq.manager.entry.ClusterEntry;
-import org.springframework.stereotype.Component;
+import org.springframework.stereotype.Service;
-@Component
-public interface ClusterService {
+public interface BrokerService {
/**
- * add cluster and the master node in the cluster
- * @param req
- * @return
+ * reset the brokers in a region to be default region
+ * @param regionId
+ * @param clusterId
*/
- Boolean addClusterAndMasterNode(AddClusterReq req);
+ void resetBrokerRegions(long regionId, long clusterId);
/**
- * delete cluster by id
+ * update brokers to be in a region
+ * @param brokerIdList
+ * @param regionId
* @param clusterId
*/
- void deleteCluster(Integer clusterId);
+ void updateBrokersRegion(List<Long> brokerIdList, Long regionId, Long clusterId);
/**
- * get one cluster
+ * check if all the brokers exsit in this cluster
+ * @param brokerIdList
* @param clusterId
* @return
*/
- ClusterEntry getOneCluster(Integer clusterId);
+ boolean checkIfBrokersAllExsit(List<Long> brokerIdList, long clusterId);
/**
- * get all clusters
+ * get all broker id list in a region
+ * @param regionId
+ * @param cluster
* @return
*/
- List<ClusterEntry> getAllClusters();
+ List<Long> getBrokerIdListInRegion(long regionId, long cluster);
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/interfaces/ClusterService.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/interfaces/ClusterService.java
index 1aeb2b4..21a4781 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/interfaces/ClusterService.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/interfaces/ClusterService.java
@@ -44,7 +44,7 @@ public interface ClusterService {
* @param clusterId
* @return
*/
- ClusterEntry getOneCluster(Integer clusterId);
+ ClusterEntry getOneCluster(long clusterId);
/**
* get all clusters
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/interfaces/ClusterService.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/interfaces/RegionService.java
similarity index 51%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/service/interfaces/ClusterService.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/service/interfaces/RegionService.java
index 1aeb2b4..6260ca5 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/interfaces/ClusterService.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/interfaces/RegionService.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -17,38 +17,45 @@
package org.apache.tubemq.manager.service.interfaces;
-
import java.util.List;
-import org.apache.tubemq.manager.controller.cluster.request.AddClusterReq;
-import org.apache.tubemq.manager.entry.ClusterEntry;
-import org.springframework.stereotype.Component;
+import org.apache.tubemq.manager.controller.TubeMQResult;
+import org.apache.tubemq.manager.entry.RegionEntry;
-@Component
-public interface ClusterService {
+public interface RegionService {
/**
- * add cluster and the master node in the cluster
- * @param req
+ * create new region with brokers
+ * @param regionEntry
+ * @param brokerIdList
* @return
*/
- Boolean addClusterAndMasterNode(AddClusterReq req);
+ TubeMQResult createNewRegion(RegionEntry regionEntry,
+ List<Long> brokerIdList);
/**
- * delete cluster by id
+ * delete region and set the brokers in the region to be default region
+ * @param regionId
* @param clusterId
+ * @return
*/
- void deleteCluster(Integer clusterId);
+ TubeMQResult deleteRegion(long regionId, long clusterId);
/**
- * get one cluster
+ * update region to contain new brokers
+ * need to check if other region contains the same brokers
+ * @param newRegionEntry
+ * @param brokerIdList
* @param clusterId
* @return
*/
- ClusterEntry getOneCluster(Integer clusterId);
+ TubeMQResult updateRegion(RegionEntry newRegionEntry, List<Long> brokerIdList, long clusterId);
/**
- * get all clusters
+ * query region inside a cluster
+ * if no regionId is passed return all regions inside the cluster
+ * @param regionId
+ * @param clusterId
* @return
*/
- List<ClusterEntry> getAllClusters();
+ List<RegionEntry> queryRegion(Long regionId, Long clusterId);
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/BrokerStatusInfo.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/BrokerStatusInfo.java
index 5982ac6..fc54c84 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/BrokerStatusInfo.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/BrokerStatusInfo.java
@@ -25,6 +25,8 @@ import java.util.List;
public class BrokerStatusInfo {
private int code;
private String errMsg;
- // total broker configuration info list of brokers.
+ /**
+ * total broker configuration info list of brokers.
+ */
private List<BrokerConf> data;
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/BrokerStatusInfo.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ValidateUtils.java
similarity index 75%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/BrokerStatusInfo.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ValidateUtils.java
index 5982ac6..82a3cad 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/BrokerStatusInfo.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ValidateUtils.java
@@ -15,16 +15,19 @@
* limitations under the License.
*/
-package org.apache.tubemq.manager.service.tube;
-
-import lombok.Data;
+package org.apache.tubemq.manager.utils;
import java.util.List;
-@Data
-public class BrokerStatusInfo {
- private int code;
- private String errMsg;
- // total broker configuration info list of brokers.
- private List<BrokerConf> data;
+public class ValidateUtils {
+
+ public static boolean isNull(Object object) {
+ return object == null;
+ }
+
+
+ public static boolean isEmptyList(List<?> seq) {
+ return isNull(seq) || seq.isEmpty();
+ }
+
}
diff --git a/tubemq-manager/src/test/java/org/apache/tubemq/manager/service/broker/TestBrokerService.java b/tubemq-manager/src/test/java/org/apache/tubemq/manager/service/broker/TestBrokerService.java
new file mode 100644
index 0000000..dd39e9f
--- /dev/null
+++ b/tubemq-manager/src/test/java/org/apache/tubemq/manager/service/broker/TestBrokerService.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.manager.service.broker;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
+
+import java.util.ArrayList;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.tubemq.manager.entry.BrokerEntry;
+import org.apache.tubemq.manager.repository.BrokerRepository;
+import org.apache.tubemq.manager.service.BrokerServiceImpl;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+@Slf4j
+@SpringBootTest
+@AutoConfigureMockMvc
+@RunWith(SpringJUnit4ClassRunner.class)
+public class TestBrokerService {
+
+
+
+ @MockBean
+ private BrokerRepository brokerRepository;
+
+
+ @Autowired
+ @InjectMocks
+ private BrokerServiceImpl brokerService;
+
+ @Test
+ public void testBrokerService() {
+ BrokerEntry entry1 = new BrokerEntry();
+ entry1.setBrokerId(1L);
+ BrokerEntry entry2 = new BrokerEntry();
+ entry2.setBrokerId(2L);
+ List<BrokerEntry> brokers = new ArrayList<>();
+ brokers.add(entry1);
+ brokers.add(entry2);
+ List<Long> brokerIdList = new ArrayList<>();
+ brokerIdList.add(1L);
+ brokerIdList.add(2L);
+ doReturn(brokers).when(brokerRepository).
+ findBrokerEntryByBrokerIdInAndClusterIdEquals(brokerIdList, 1L);
+ assertThat(brokerService.checkIfBrokersAllExsit(brokerIdList, 1));
+ }
+}
diff --git a/tubemq-manager/src/test/java/org/apache/tubemq/manager/service/region/TestRegionService.java b/tubemq-manager/src/test/java/org/apache/tubemq/manager/service/region/TestRegionService.java
new file mode 100644
index 0000000..177b552
--- /dev/null
+++ b/tubemq-manager/src/test/java/org/apache/tubemq/manager/service/region/TestRegionService.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.manager.service.region;
+
+
+import static org.apache.tubemq.manager.service.TubeMQErrorConst.RESOURCE_NOT_EXIST;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.SUCCESS_CODE;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+
+import java.util.ArrayList;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.tubemq.manager.controller.TubeMQResult;
+import org.apache.tubemq.manager.entry.BrokerEntry;
+import org.apache.tubemq.manager.entry.RegionEntry;
+import org.apache.tubemq.manager.repository.BrokerRepository;
+import org.apache.tubemq.manager.repository.RegionRepository;
+import org.apache.tubemq.manager.service.RegionServiceImpl;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+@Slf4j
+@SpringBootTest
+@AutoConfigureMockMvc
+@RunWith(SpringJUnit4ClassRunner.class)
+public class TestRegionService {
+
+
+
+ @MockBean
+ private RegionRepository regionRepository;
+
+ @MockBean
+ private BrokerRepository brokerRepository;
+
+ @Autowired
+ @InjectMocks
+ private RegionServiceImpl regionService;
+
+ @Test
+ public void testNoResource() {
+ RegionEntry regionEntry = new RegionEntry();
+ regionEntry.setRegionId(1L);
+ regionEntry.setClusterId(1L);
+ List<Long> brokerIdList = new ArrayList<>();
+ brokerIdList.add(1L);
+ brokerIdList.add(2L);
+ TubeMQResult result = regionService.createNewRegion(regionEntry, brokerIdList);
+ assertThat(result.getErrMsg().equals(RESOURCE_NOT_EXIST));
+ }
+
+
+ @Test
+ public void testCreateNewRegion() {
+ RegionEntry regionEntry = new RegionEntry();
+ regionEntry.setRegionId(1L);
+ regionEntry.setClusterId(1L);
+ List<Long> brokerIdList = new ArrayList<>();
+ brokerIdList.add(1L);
+ RegionEntry regionEntry1 = new RegionEntry();
+ regionEntry1.setRegionId(1L);
+ List<RegionEntry> regionEntries = new ArrayList<>();
+ regionEntries.add(regionEntry1);
+
+ BrokerEntry brokerEntry1 = new BrokerEntry();
+ brokerEntry1.setBrokerId(1L);
+ List<BrokerEntry> brokerEntries = new ArrayList<>();
+ brokerEntries.add(brokerEntry1);
+ doReturn(regionEntries).when(regionRepository).
+ findRegionEntriesByClusterIdEquals(1);
+ doReturn(brokerEntries).when(brokerRepository).
+ findBrokerEntryByBrokerIdInAndClusterIdEquals(any(), any());
+ TubeMQResult result = regionService.createNewRegion(regionEntry, brokerIdList);
+ assertThat(result.getErrCode()==SUCCESS_CODE);
+ }
+
+}