You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2021/01/09 01:45:15 UTC
[incubator-tubemq] branch TUBEMQ-421 updated: [TUBEMQ-503] offset
query + design by contract
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch TUBEMQ-421
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/TUBEMQ-421 by this push:
new de85d0e [TUBEMQ-503] offset query + design by contract
de85d0e is described below
commit de85d0e7b504461cd4ccec884db0b48c5216189d
Author: EMsnap <zp...@connect.ust.hk>
AuthorDate: Fri Jan 8 19:46:14 2021 +0800
[TUBEMQ-503] offset query + design by contract
---
.../manager/controller/group/GroupController.java | 10 +-
.../group/request/QueryOffsetReq.java} | 30 +-
.../group/result/AllBrokersOffsetRes.java} | 38 +-
.../controller/group/result/OffsetQueryRes.java | 57 +++
.../manager/controller/node/NodeController.java | 2 -
.../controller/topic/TopicWebController.java | 8 +-
.../apache/tubemq/manager/service/NodeService.java | 438 ++-------------------
.../{NodeService.java => NodeServiceImpl.java} | 17 +-
.../tubemq/manager/service/TopicBackendWorker.java | 2 +-
.../tubemq/manager/service/TopicService.java | 227 +++--------
.../{TopicService.java => TopicServiceImpl.java} | 62 ++-
.../tubemq/manager/service/TubeMQHttpConst.java | 2 +-
.../service/tube/TubeHttpTopicInfoList.java | 6 +-
.../apache/tubemq/manager/utils/ConvertUtils.java | 2 +-
.../src/main/resources/application.properties | 2 +-
.../manager/controller/TestNodeController.java | 5 -
16 files changed, 244 insertions(+), 664 deletions(-)
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/GroupController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/GroupController.java
index 12b4fc4..052986b 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/GroupController.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/GroupController.java
@@ -19,11 +19,11 @@ package org.apache.tubemq.manager.controller.group;
-import static org.apache.tubemq.manager.service.MasterService.requestMaster;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.ADD;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.CLONE;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.DELETE;
import static org.apache.tubemq.manager.service.MasterService.queryMaster;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.QUERY;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.REBALANCE_CONSUMER_GROUP;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.REBALANCE_CONSUMER;
@@ -34,12 +34,13 @@ import org.apache.tubemq.manager.controller.TubeMQResult;
import org.apache.tubemq.manager.controller.group.request.AddBlackGroupReq;
import org.apache.tubemq.manager.controller.group.request.DeleteBlackGroupReq;
import org.apache.tubemq.manager.controller.group.request.DeleteOffsetReq;
+import org.apache.tubemq.manager.controller.group.request.QueryOffsetReq;
import org.apache.tubemq.manager.controller.node.request.CloneOffsetReq;
import org.apache.tubemq.manager.controller.topic.request.BatchAddGroupAuthReq;
import org.apache.tubemq.manager.controller.topic.request.DeleteGroupReq;
import org.apache.tubemq.manager.controller.topic.request.RebalanceConsumerReq;
import org.apache.tubemq.manager.controller.topic.request.RebalanceGroupReq;
-import org.apache.tubemq.manager.service.TopicService;
+import org.apache.tubemq.manager.service.TopicServiceImpl;
import org.apache.tubemq.manager.service.MasterService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
@@ -55,13 +56,14 @@ import org.springframework.web.bind.annotation.RestController;
@Slf4j
public class GroupController {
+
public Gson gson = new Gson();
@Autowired
private MasterService masterService;
@Autowired
- private TopicService topicService;
+ private TopicServiceImpl topicService;
@PostMapping("")
@@ -103,6 +105,8 @@ public class GroupController {
return topicService.cloneOffsetToOtherGroups(gson.fromJson(req, CloneOffsetReq.class));
case DELETE:
return topicService.deleteOffset(gson.fromJson(req, DeleteOffsetReq.class));
+ case QUERY:
+ return topicService.queryOffset(gson.fromJson(req, QueryOffsetReq.class));
default:
return TubeMQResult.getErrorResult("no such method");
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpBrokerCfgInfo.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/request/QueryOffsetReq.java
similarity index 50%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpBrokerCfgInfo.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/request/QueryOffsetReq.java
index 53fe096..c4ace9b 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpBrokerCfgInfo.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/request/QueryOffsetReq.java
@@ -15,29 +15,13 @@
* limitations under the License.
*/
+package org.apache.tubemq.manager.controller.group.request;
-package org.apache.tubemq.manager.service.tube;
+import lombok.Data;
+import org.apache.tubemq.manager.controller.node.request.BaseReq;
-public class TubeHttpBrokerCfgInfo {
- private boolean acceptPublish;
- private boolean acceptSubscribe;
- private Integer brokerId;
- private String brokerIp;
- private Integer brokerPort;
- private Integer brokerTLSPort;
- private String createDate;
- private String createUser;
- private String deletePolicy;
- private String deleteWhen;
- private String modifyDate;
- private String modifyUser;
- private Integer memCacheFlushIntvl;
- private Integer memCacheMsgCntInK;
- private Integer memCacheMsgSizeInMB;
- private Integer numPartitions;
- private Integer numTopicStores;
- private Integer unflushDataHold;
- private Integer unflushInterval;
- private Integer unflushThreshold;
- private boolean hasTLSPort;
+@Data
+public class QueryOffsetReq extends BaseReq {
+ private String topicName;
+ private String groupName;
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpBrokerCfgInfo.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/result/AllBrokersOffsetRes.java
similarity index 50%
rename from tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpBrokerCfgInfo.java
rename to tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/result/AllBrokersOffsetRes.java
index 53fe096..20b0bf0 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpBrokerCfgInfo.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/result/AllBrokersOffsetRes.java
@@ -15,29 +15,21 @@
* limitations under the License.
*/
+package org.apache.tubemq.manager.controller.group.result;
-package org.apache.tubemq.manager.service.tube;
+import java.util.ArrayList;
+import java.util.List;
+import lombok.Data;
-public class TubeHttpBrokerCfgInfo {
- private boolean acceptPublish;
- private boolean acceptSubscribe;
- private Integer brokerId;
- private String brokerIp;
- private Integer brokerPort;
- private Integer brokerTLSPort;
- private String createDate;
- private String createUser;
- private String deletePolicy;
- private String deleteWhen;
- private String modifyDate;
- private String modifyUser;
- private Integer memCacheFlushIntvl;
- private Integer memCacheMsgCntInK;
- private Integer memCacheMsgSizeInMB;
- private Integer numPartitions;
- private Integer numTopicStores;
- private Integer unflushDataHold;
- private Integer unflushInterval;
- private Integer unflushThreshold;
- private boolean hasTLSPort;
+
+@Data
+public class AllBrokersOffsetRes {
+
+ private List<OffsetInfo> offsetPerBroker = new ArrayList<>();
+
+ @Data
+ public static class OffsetInfo {
+ private int brokerId;
+ private OffsetQueryRes offsetQueryRes;
+ }
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/result/OffsetQueryRes.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/result/OffsetQueryRes.java
new file mode 100644
index 0000000..434923a
--- /dev/null
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/group/result/OffsetQueryRes.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.manager.controller.group.result;
+
+import java.util.List;
+import lombok.Data;
+
+@Data
+public class OffsetQueryRes {
+ private boolean result;
+ private int errCode;
+ private String errMsg;
+ private List<GroupOffsetRes> dataSet;
+ private int totalCnt;
+
+ @Data
+ public static class GroupOffsetRes {
+ private String groupName;
+ private List<TopicOffsetRes> subInfo;
+ private int topicCount;
+
+ @Data
+ public static class TopicOffsetRes {
+ private String topicName;
+ private List<OffsetPartitionRes> offsets;
+ private int partCount;
+
+ @Data
+ public static class OffsetPartitionRes {
+ private int partitionId;
+ private long curOffset;
+ private int flightOffset;
+ private int curDataOffset;
+ private int offsetLag;
+ private int dataLag;
+ private int offsetMax;
+ private int dataMax;
+ }
+ }
+ }
+
+}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java
index 4689762..26411fc 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java
@@ -19,8 +19,6 @@ package org.apache.tubemq.manager.controller.node;
import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
import org.apache.tubemq.manager.controller.TubeMQResult;
import org.apache.tubemq.manager.controller.node.request.AddBrokersReq;
import org.apache.tubemq.manager.controller.node.request.BrokerSetReadOrWriteReq;
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java
index b4c4fc0..2f19062 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java
@@ -25,7 +25,6 @@ import static org.apache.tubemq.manager.service.TubeMQHttpConst.DELETE;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.MODIFY;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.REMOVE;
import static org.apache.tubemq.manager.service.MasterService.queryMaster;
-import static org.apache.tubemq.manager.service.MasterService.requestMaster;
import com.google.gson.Gson;
@@ -36,16 +35,11 @@ import org.apache.tubemq.manager.controller.node.request.BatchAddTopicReq;
import org.apache.tubemq.manager.controller.node.request.CloneTopicReq;
import org.apache.tubemq.manager.controller.topic.request.DeleteTopicReq;
import org.apache.tubemq.manager.controller.topic.request.ModifyTopicReq;
-import org.apache.tubemq.manager.controller.topic.request.RebalanceGroupReq;
import org.apache.tubemq.manager.controller.topic.request.SetAuthControlReq;
-import org.apache.tubemq.manager.entry.NodeEntry;
-import org.apache.tubemq.manager.repository.NodeRepository;
import org.apache.tubemq.manager.service.NodeService;
import org.apache.tubemq.manager.service.MasterService;
-import org.apache.tubemq.manager.service.TopicService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
-import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
@@ -74,7 +68,7 @@ public class TopicWebController {
@RequestParam String method, @RequestBody String req) throws Exception {
switch (method) {
case ADD:
- return nodeService.addTopic(gson.fromJson(req, BatchAddTopicReq.class));
+ return nodeService.batchAddTopic(gson.fromJson(req, BatchAddTopicReq.class));
case CLONE:
return nodeService.cloneTopicToBrokers(gson.fromJson(req, CloneTopicReq.class));
case AUTH_CONTROL:
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
index ddcfc75..eebcf96 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -17,447 +17,75 @@
package org.apache.tubemq.manager.service;
-
-import static org.apache.tubemq.manager.controller.node.request.AddBrokersReq.getAddBrokerReq;
-import static org.apache.tubemq.manager.service.MasterService.TUBE_REQUEST_PATH;
-import static org.apache.tubemq.manager.service.MasterService.queryMaster;
-import static org.apache.tubemq.manager.service.MasterService.requestMaster;
-import static org.apache.tubemq.manager.service.TubeMQHttpConst.ADD_TUBE_TOPIC;
-import static org.apache.tubemq.manager.service.TubeMQHttpConst.BROKER_RUN_STATUS;
-import static org.apache.tubemq.manager.service.TubeMQHttpConst.NO_SUCH_CLUSTER;
-import static org.apache.tubemq.manager.service.TubeMQHttpConst.RELOAD_BROKER;
-import static org.apache.tubemq.manager.service.TubeMQHttpConst.SCHEMA;
-import static org.apache.tubemq.manager.service.TubeMQHttpConst.SUCCESS_CODE;
-import static org.apache.tubemq.manager.utils.ConvertUtils.convertReqToQueryStr;
-
-import com.google.common.collect.Lists;
-import com.google.gson.Gson;
import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.*;
-import java.util.stream.Collectors;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
+import java.util.List;
+import java.util.Map;
import org.apache.tubemq.manager.controller.TubeMQResult;
import org.apache.tubemq.manager.controller.node.request.AddBrokersReq;
import org.apache.tubemq.manager.controller.node.request.AddTopicReq;
import org.apache.tubemq.manager.controller.node.request.BatchAddTopicReq;
import org.apache.tubemq.manager.controller.node.request.CloneBrokersReq;
import org.apache.tubemq.manager.controller.node.request.CloneTopicReq;
-import org.apache.tubemq.manager.controller.node.request.QueryBrokerCfgReq;
import org.apache.tubemq.manager.entry.NodeEntry;
-import org.apache.tubemq.manager.repository.NodeRepository;
-import org.apache.tubemq.manager.service.tube.*;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Component;
-
-/**
- * node service to query broker/master/standby status of tube cluster.
- */
-@Slf4j
-@Component
-public class NodeService {
-
- private final CloseableHttpClient httpclient = HttpClients.createDefault();
- private final Gson gson = new Gson();
-
- @Value("${manager.max.configurable.broker.size:50}")
- private int maxConfigurableBrokerSize;
-
- @Value("${manager.max.retry.adding.topic:10}")
- private int maxRetryAddingTopic;
-
- private final TopicBackendWorker worker;
-
- @Autowired
- private NodeRepository nodeRepository;
-
- @Autowired
- private TopicService topicService;
-
- @Autowired
- private MasterService masterService;
-
- public NodeService(TopicBackendWorker worker) {
- this.worker = worker;
- }
-
- /**
- * request node status via http.
- *
- * @param nodeEntry - node entry
- * @return
- * @throws IOException
- */
- private TubeHttpBrokerInfoList requestClusterNodeStatus(NodeEntry nodeEntry) {
- String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort() + BROKER_RUN_STATUS;
- HttpGet httpget = new HttpGet(url);
- try (CloseableHttpResponse response = httpclient.execute(httpget)) {
- TubeHttpBrokerInfoList brokerInfoList =
- gson.fromJson(new InputStreamReader(response.getEntity().getContent()),
- TubeHttpBrokerInfoList.class);
- // request return normal.
- if (brokerInfoList.getCode() == SUCCESS_CODE) {
- // divide by state.
- brokerInfoList.divideBrokerListByState();
- return brokerInfoList;
- }
- } catch (Exception ex) {
- log.error("exception caught while requesting broker status", ex);
- }
- return null;
- }
-
-
+import org.apache.tubemq.manager.service.tube.AddBrokerResult;
+public interface NodeService {
/**
- * clone source broker to generate brokers with the same config and copy the topics in it.
+ * clone brokers with topic in it
* @param req
* @return
* @throws Exception
*/
- public TubeMQResult cloneBrokersWithTopic(CloneBrokersReq req) throws Exception {
-
- int clusterId = req.getClusterId();
- // 1. query source broker config
- QueryBrokerCfgReq queryReq = QueryBrokerCfgReq.getReq(req.getSourceBrokerId());
- NodeEntry masterEntry = nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(
- clusterId);
- BrokerStatusInfo brokerStatusInfo = getBrokerStatusInfo(queryReq, masterEntry);
-
- // 2. use source broker config to clone brokers
- BrokerConf sourceBrokerConf = brokerStatusInfo.getData().get(0);
- AddBrokersReq addBrokersReq = getBatchAddBrokersReq(req, clusterId, sourceBrokerConf);
-
- // 3. request master, return broker ids generated by master
- AddBrokerResult addBrokerResult = addBrokersToClusterWithId(addBrokersReq, masterEntry);
-
- // might have duplicate brokers
- if (addBrokerResult.getErrCode() != SUCCESS_CODE) {
- return TubeMQResult.getErrorResult(addBrokerResult.getErrMsg());
- }
- List<Integer> brokerIds = getBrokerIds(addBrokerResult);
- List<AddTopicReq> addTopicReqs = req.getAddTopicReqs();
-
- // 4. add topic to brokers
- return addTopicsToBrokers(masterEntry, brokerIds, addTopicReqs);
- }
-
- public TubeMQResult addTopicsToBrokers(NodeEntry masterEntry, List<Integer> brokerIds, List<AddTopicReq> addTopicReqs) {
- TubeMQResult tubeResult = new TubeMQResult();
- AddTopicsResult addTopicsResult = new AddTopicsResult();
-
- if (CollectionUtils.isEmpty(addTopicReqs)) {
- return tubeResult;
- }
- addTopicReqs.forEach(addTopicReq -> {
- try {
- String brokerStr = StringUtils.join(brokerIds, ",");
- addTopicReq.setBrokerId(brokerStr);
- TubeMQResult result = addTopicToBrokers(addTopicReq, masterEntry);
- if (result.getErrCode() == SUCCESS_CODE) {
- addTopicsResult.getSuccessTopics().add(addTopicReq.getTopicName());
- } else {
- addTopicsResult.getFailTopics().add(addTopicReq.getTopicName());
- }
- } catch (Exception e) {
- log.error("add topic to brokers fail with exception", e);
- addTopicsResult.getFailTopics().add(addTopicReq.getTopicName());
- }
- });
-
- tubeResult.setData(gson.toJson(addTopicsResult));
- return tubeResult;
- }
-
- private List<Integer> getBrokerIds(AddBrokerResult addBrokerResult) {
- List<IpIdRelation> ipids = addBrokerResult.getData();
- List<Integer> brokerIds = Lists.newArrayList();
- for (IpIdRelation ipid : ipids) {
- brokerIds.add(ipid.getId());
- }
- return brokerIds;
- }
-
- private AddBrokersReq getBatchAddBrokersReq(CloneBrokersReq req, int clusterId, BrokerConf sourceBrokerConf) {
- AddBrokersReq addBrokersReq = getAddBrokerReq(req.getConfModAuthToken(), clusterId);
-
- // generate add brokers req using given target broker ips
- List<BrokerConf> brokerConfs = Lists.newArrayList();
- req.getTargetIps().forEach(ip -> {
- BrokerConf brokerConf = new BrokerConf(sourceBrokerConf);
- brokerConf.setBrokerIp(ip);
- brokerConf.setBrokerId(0);
- brokerConfs.add(brokerConf);
- });
- addBrokersReq.setBrokerJsonSet(brokerConfs);
- return addBrokersReq;
- }
-
- private BrokerStatusInfo getBrokerStatusInfo(QueryBrokerCfgReq queryReq, NodeEntry masterEntry) throws Exception {
- String url = SCHEMA + masterEntry.getIp() + ":" + masterEntry.getWebPort()
- + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(queryReq);
- BrokerStatusInfo brokerStatusInfo = gson.fromJson(queryMaster(url),
- BrokerStatusInfo.class);
- return brokerStatusInfo;
- }
-
- public TubeMQResult addTopicToBrokers(AddTopicReq req, NodeEntry masterEntry) throws Exception {
- String url = SCHEMA + masterEntry.getIp() + ":" + masterEntry.getWebPort()
- + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
- return requestMaster(url);
- }
-
-
- private boolean configBrokersForTopics(NodeEntry nodeEntry,
- Set<String> topics, List<Integer> brokerList, int maxBrokers) {
- List<Integer> finalBrokerList = brokerList.subList(0, maxBrokers);
- String brokerStr = StringUtils.join(finalBrokerList, ",");
- String topicStr = StringUtils.join(topics, ",");
- String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort()
- + ADD_TUBE_TOPIC + "&topicName=" + topicStr + "&brokerId=" + brokerStr;
- HttpGet httpget = new HttpGet(url);
- try (CloseableHttpResponse response = httpclient.execute(httpget)) {
- TubeHttpResponse result =
- gson.fromJson(new InputStreamReader(response.getEntity().getContent()),
- TubeHttpResponse.class);
- return result.getCode() == SUCCESS_CODE && result.getErrCode() == SUCCESS_CODE;
- } catch (Exception ex) {
- log.error("exception caught while requesting broker status", ex);
- }
- return false;
- }
+ TubeMQResult cloneBrokersWithTopic(CloneBrokersReq req) throws Exception;
/**
- * handle result, if success, complete it,
- * if not success, add back to queue without exceeding max retry,
- * otherwise complete it with exception.
- *
- * @param isSuccess
- * @param topics
- * @param pendingTopic
+ * add topics to brokers
+ * @param masterEntry
+ * @param brokerIds
+ * @param addTopicReqs
+ * @return
*/
- private void handleAddingResult(boolean isSuccess, Set<String> topics,
- Map<String, TopicFuture> pendingTopic) {
- for (String topic : topics) {
- TopicFuture future = pendingTopic.get(topic);
- if (future != null) {
- if (isSuccess) {
- future.complete();
- } else {
- future.increaseRetryTime();
- if (future.getRetryTime() > maxRetryAddingTopic) {
- future.completeExceptional();
- } else {
- // add back to queue.
- worker.addTopicFuture(future);
- }
- }
- }
- }
- }
-
+ TubeMQResult addTopicsToBrokers(NodeEntry masterEntry, List<Integer> brokerIds,
+ List<AddTopicReq> addTopicReqs);
/**
- * Adding topic is an async operation, so this method should
- * 1. check whether pendingTopic contains topic that has failed/succeeded to be added.
- * 2. async add topic to tubemq cluster
- *
- * @param brokerInfoList - broker list
- * @param pendingTopic - topicMap
+ * add one topic to brokers
+ * @param req
+ * @param masterEntry
+ * @return
+ * @throws Exception
*/
- private void handleAddingTopic(NodeEntry nodeEntry,
- TubeHttpBrokerInfoList brokerInfoList,
- Map<String, TopicFuture> pendingTopic) {
- // 1. check tubemq cluster by topic name, remove pending topic if has added.
- Set<String> brandNewTopics = new HashSet<>();
- for (String topic : pendingTopic.keySet()) {
- TubeHttpTopicInfoList topicInfoList = topicService.requestTopicConfigInfo(nodeEntry, topic);
- if (topicInfoList != null) {
- // get broker list by topic request
- List<Integer> topicBrokerList = topicInfoList.getTopicBrokerIdList();
- if (topicBrokerList.isEmpty()) {
- brandNewTopics.add(topic);
- } else {
- // remove brokers which have been added.
- List<Integer> configurableBrokerIdList =
- brokerInfoList.getConfigurableBrokerIdList();
- configurableBrokerIdList.removeAll(topicBrokerList);
- // add topic to satisfy max broker number.
- Set<String> singleTopic = new HashSet<>();
- singleTopic.add(topic);
- int maxBrokers = maxConfigurableBrokerSize - topicBrokerList.size();
- boolean isSuccess = configBrokersForTopics(nodeEntry, singleTopic,
- configurableBrokerIdList, maxBrokers);
- handleAddingResult(isSuccess, singleTopic, pendingTopic);
- }
- }
- }
- // 2. add new topics to cluster
- List<Integer> configurableBrokerIdList = brokerInfoList.getConfigurableBrokerIdList();
- int maxBrokers = Math.min(maxConfigurableBrokerSize, configurableBrokerIdList.size());
- boolean isSuccess = configBrokersForTopics(nodeEntry, brandNewTopics,
- configurableBrokerIdList, maxBrokers);
- handleAddingResult(isSuccess, brandNewTopics, pendingTopic);
- }
+ TubeMQResult addTopicToBrokers(AddTopicReq req, NodeEntry masterEntry) throws Exception;
/**
- * reload broker list, cannot exceed maxConfigurableBrokerSize each time.
- *
- * @param nodeEntry
- * @param needReloadList
+ * update broker status
+ * @param clusterId
+ * @param pendingTopic
*/
- private void handleReloadBroker(NodeEntry nodeEntry, List<Integer> needReloadList) {
- // reload without exceed max broker.
- int begin = 0;
- int end = 0;
- do {
- end = Math.min(maxConfigurableBrokerSize + begin, needReloadList.size());
- List<Integer> brokerIdList = needReloadList.subList(begin, end);
- String brokerStr = StringUtils.join(brokerIdList, ",");
- String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort()
- + RELOAD_BROKER + "&brokerId=" + brokerStr;
- HttpGet httpget = new HttpGet(url);
- try (CloseableHttpResponse response = httpclient.execute(httpget)) {
- TubeHttpResponse result =
- gson.fromJson(new InputStreamReader(response.getEntity().getContent()),
- TubeHttpResponse.class);
- if (result.getErrCode() == SUCCESS_CODE && result.getCode() == SUCCESS_CODE) {
- log.info("reload tube broker cgi: " +
- url + " ; return value : " + result.getCode());
- }
- } catch (Exception ex) {
- log.error("exception caught while requesting broker status", ex);
- }
- begin = end;
- } while (end >= needReloadList.size());
- }
-
-
+ void updateBrokerStatus(int clusterId, Map<String, TopicFuture> pendingTopic);
/**
- * update broker status
+ * query cluster info
+ * @param clusterId
+ * @return
*/
- public void updateBrokerStatus(int clusterId, Map<String, TopicFuture> pendingTopic) {
- NodeEntry nodeEntry = nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(clusterId);
- if (nodeEntry != null) {
- try {
- TubeHttpBrokerInfoList brokerInfoList = requestClusterNodeStatus(nodeEntry);
- if (brokerInfoList != null) {
- handleAddingTopic(nodeEntry, brokerInfoList, pendingTopic);
- }
-
- // refresh broker list
- brokerInfoList = requestClusterNodeStatus(nodeEntry);
- if (brokerInfoList != null) {
- handleReloadBroker(nodeEntry, brokerInfoList.getNeedReloadList());
- }
-
- } catch (Exception ex) {
- log.error("exception caught while requesting broker status", ex);
- }
- } else {
- log.error("cannot get master ip by clusterId {}, please check it", clusterId);
- }
- }
+ String queryClusterInfo(Integer clusterId);
- public String queryClusterInfo(Integer clusterId) {
- TubeHttpClusterInfoList clusterInfoList;
- try {
- // find all nodes by given clusterIds, show all nodes if clusterIds not provided
- List<NodeEntry> nodeEntries = clusterId == null ?
- nodeRepository.findAll() : nodeRepository.findNodeEntriesByClusterIdIs(clusterId);
- // divide all entries by clusterId
- Map<Integer, List<NodeEntry>> nodeEntriesPerCluster =
- nodeEntries.parallelStream().collect(Collectors.groupingBy(NodeEntry::getClusterId));
-
- clusterInfoList = TubeHttpClusterInfoList.getClusterInfoList(nodeEntriesPerCluster);
- } catch (Exception e) {
- log.error("query cluster info error", e);
- return gson.toJson(TubeMQResult.getErrorResult(""));
- }
-
- return gson.toJson(clusterInfoList);
- }
-
-
-
- public void close() throws IOException {
- httpclient.close();
- }
-
- public AddBrokerResult addBrokersToClusterWithId(AddBrokersReq req, NodeEntry masterEntry) throws Exception {
-
- String url = SCHEMA + masterEntry.getIp() + ":" + masterEntry.getWebPort()
- + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
- HttpGet httpget = new HttpGet(url);
- try (CloseableHttpResponse response = httpclient.execute(httpget)) {
- return gson.fromJson(new InputStreamReader(response.getEntity().getContent()),
- AddBrokerResult.class);
- } catch (Exception ex) {
- log.error("exception caught while requesting broker status", ex);
- }
- return null;
- }
+ void close() throws IOException;
/**
- * given one topic, copy its config and clone to brokers
- * if no broker is is provided, topics will be cloned to all brokers in cluster
+ * clone topic to brokers
* @param req
* @return
* @throws Exception
*/
- public TubeMQResult cloneTopicToBrokers(CloneTopicReq req) throws Exception {
-
- NodeEntry master = masterService.getMasterNode(req);
- if (master == null) {
-
- return TubeMQResult.getErrorResult(NO_SUCH_CLUSTER);
- }
- // 1 query topic config
- TubeHttpTopicInfoList topicInfoList = topicService.requestTopicConfigInfo(master, req.getSourceTopicName());
-
- if (topicInfoList == null) {
- return TubeMQResult.getErrorResult("no such topic");
- }
-
- // 2 if there's no specific broker ids then clone to all of the brokers
- List<Integer> brokerId = req.getBrokerId();
-
- if (CollectionUtils.isEmpty(brokerId)) {
- TubeHttpBrokerInfoList brokerInfoList = requestClusterNodeStatus(master);
- if (brokerInfoList != null) {
- brokerId = brokerInfoList.getConfigurableBrokerIdList();
- }
- }
-
- // 3 generate add topic req
- AddTopicReq addTopicReq = topicInfoList.getAddTopicReq(brokerId,
- req.getTargetTopicName(), req.getConfModAuthToken());
-
- // 4 send to master
- return addTopicToBrokers(addTopicReq, master);
-
- }
+ TubeMQResult cloneTopicToBrokers(CloneTopicReq req) throws Exception;
/**
- * add topic to brokers
+ * batch add topic to master
* @param req
* @return
*/
- public TubeMQResult addTopic(BatchAddTopicReq req) {
- NodeEntry masterEntry = masterService.getMasterNode(req);
- if (masterEntry == null) {
- return TubeMQResult.getErrorResult(NO_SUCH_CLUSTER);
- }
- return addTopicsToBrokers(masterEntry, req.getBrokerIds(), req.getAddTopicReqs());
- }
+ TubeMQResult batchAddTopic(BatchAddTopicReq req);
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeServiceImpl.java
similarity index 98%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeServiceImpl.java
index ddcfc75..30b5adf 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeServiceImpl.java
@@ -63,7 +63,7 @@ import org.springframework.stereotype.Component;
*/
@Slf4j
@Component
-public class NodeService {
+public class NodeServiceImpl implements NodeService {
private final CloseableHttpClient httpclient = HttpClients.createDefault();
private final Gson gson = new Gson();
@@ -85,7 +85,7 @@ public class NodeService {
@Autowired
private MasterService masterService;
- public NodeService(TopicBackendWorker worker) {
+ public NodeServiceImpl(TopicBackendWorker worker) {
this.worker = worker;
}
@@ -124,6 +124,7 @@ public class NodeService {
* @return
* @throws Exception
*/
+ @Override
public TubeMQResult cloneBrokersWithTopic(CloneBrokersReq req) throws Exception {
int clusterId = req.getClusterId();
@@ -151,7 +152,9 @@ public class NodeService {
return addTopicsToBrokers(masterEntry, brokerIds, addTopicReqs);
}
- public TubeMQResult addTopicsToBrokers(NodeEntry masterEntry, List<Integer> brokerIds, List<AddTopicReq> addTopicReqs) {
+ @Override
+ public TubeMQResult addTopicsToBrokers(NodeEntry masterEntry, List<Integer> brokerIds,
+ List<AddTopicReq> addTopicReqs) {
TubeMQResult tubeResult = new TubeMQResult();
AddTopicsResult addTopicsResult = new AddTopicsResult();
@@ -210,6 +213,7 @@ public class NodeService {
return brokerStatusInfo;
}
+ @Override
public TubeMQResult addTopicToBrokers(AddTopicReq req, NodeEntry masterEntry) throws Exception {
String url = SCHEMA + masterEntry.getIp() + ":" + masterEntry.getWebPort()
+ "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
@@ -346,6 +350,7 @@ public class NodeService {
/**
* update broker status
*/
+ @Override
public void updateBrokerStatus(int clusterId, Map<String, TopicFuture> pendingTopic) {
NodeEntry nodeEntry = nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(clusterId);
if (nodeEntry != null) {
@@ -369,6 +374,7 @@ public class NodeService {
}
}
+ @Override
public String queryClusterInfo(Integer clusterId) {
TubeHttpClusterInfoList clusterInfoList;
try {
@@ -390,6 +396,7 @@ public class NodeService {
+ @Override
public void close() throws IOException {
httpclient.close();
}
@@ -415,6 +422,7 @@ public class NodeService {
* @return
* @throws Exception
*/
+ @Override
public TubeMQResult cloneTopicToBrokers(CloneTopicReq req) throws Exception {
NodeEntry master = masterService.getMasterNode(req);
@@ -453,7 +461,8 @@ public class NodeService {
* @param req
* @return
*/
- public TubeMQResult addTopic(BatchAddTopicReq req) {
+ @Override
+ public TubeMQResult batchAddTopic(BatchAddTopicReq req) {
NodeEntry masterEntry = masterService.getMasterNode(req);
if (masterEntry == null) {
return TubeMQResult.getErrorResult(NO_SUCH_CLUSTER);
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicBackendWorker.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicBackendWorker.java
index 86b72d5..7637c0f 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicBackendWorker.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicBackendWorker.java
@@ -67,7 +67,7 @@ public class TopicBackendWorker implements DisposableBean, Runnable {
// daemon thread
thread.setDaemon(true);
thread.start();
- nodeService = new NodeService(this);
+ nodeService = new NodeServiceImpl(this);
}
/**
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicService.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicService.java
index 1abff54..7fd6f48 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicService.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicService.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -17,190 +17,59 @@
package org.apache.tubemq.manager.service;
-
-import static org.apache.tubemq.manager.service.TubeMQHttpConst.QUERY_GROUP_DETAIL_INFO;
-import static org.apache.tubemq.manager.service.TubeMQHttpConst.SCHEMA;
-import static org.apache.tubemq.manager.service.TubeMQHttpConst.SUCCESS_CODE;
-import static org.apache.tubemq.manager.service.TubeMQHttpConst.TOPIC_CONFIG_INFO;
-import static org.apache.tubemq.manager.utils.ConvertUtils.convertReqToQueryStr;
-import static org.apache.tubemq.manager.utils.ConvertUtils.convertToRebalanceConsumerReq;
-import static org.apache.tubemq.manager.service.MasterService.TUBE_REQUEST_PATH;
-import static org.apache.tubemq.manager.service.MasterService.requestMaster;
-
-import com.google.gson.Gson;
-import java.io.InputStreamReader;
-import java.util.List;
-import java.util.Objects;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
import org.apache.tubemq.manager.controller.TubeMQResult;
import org.apache.tubemq.manager.controller.group.request.DeleteOffsetReq;
+import org.apache.tubemq.manager.controller.group.request.QueryOffsetReq;
import org.apache.tubemq.manager.controller.node.request.CloneOffsetReq;
-import org.apache.tubemq.manager.controller.topic.request.BatchAddGroupAuthReq;
-import org.apache.tubemq.manager.controller.topic.request.DeleteGroupReq;
-import org.apache.tubemq.manager.controller.topic.request.RebalanceConsumerReq;
import org.apache.tubemq.manager.controller.topic.request.RebalanceGroupReq;
import org.apache.tubemq.manager.entry.NodeEntry;
-import org.apache.tubemq.manager.repository.NodeRepository;
-import org.apache.tubemq.manager.service.tube.CleanOffsetResult;
-import org.apache.tubemq.manager.service.tube.RebalanceGroupResult;
import org.apache.tubemq.manager.service.tube.TubeHttpGroupDetailInfo;
import org.apache.tubemq.manager.service.tube.TubeHttpTopicInfoList;
-import org.apache.tubemq.manager.service.tube.TubeHttpTopicInfoList.TopicInfoList.TopicInfo;
-import org.apache.tubemq.manager.utils.ConvertUtils;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Component;
-
-/**
- * node service to query broker/master/standby status of tube cluster.
- */
-@Slf4j
-@Component
-public class TopicService {
-
- private final CloseableHttpClient httpclient = HttpClients.createDefault();
- private final Gson gson = new Gson();
-
- @Value("${manager.broker.webPort:8081}")
- private int brokerWebPort;
-
- @Autowired
- private MasterService masterService;
-
- private TubeHttpGroupDetailInfo requestGroupRunInfo(NodeEntry nodeEntry, String group) {
- String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort()
- + QUERY_GROUP_DETAIL_INFO + "&consumeGroup=" + group;
- HttpGet httpget = new HttpGet(url);
- try (CloseableHttpResponse response = httpclient.execute(httpget)) {
- TubeHttpGroupDetailInfo groupDetailInfo =
- gson.fromJson(new InputStreamReader(response.getEntity().getContent()),
- TubeHttpGroupDetailInfo.class);
- if (groupDetailInfo.getErrCode() == 0) {
- return groupDetailInfo;
- }
- } catch (Exception ex) {
- log.error("exception caught while requesting group status", ex);
- }
- return null;
- }
-
-
- public TubeMQResult cloneOffsetToOtherGroups(CloneOffsetReq req) {
-
- NodeEntry master = masterService.getMasterNode(req);
- if (master == null) {
- return TubeMQResult.getErrorResult("no such cluster");
- }
- // 1. query the corresponding brokers having given topic
- TubeHttpTopicInfoList topicInfoList = requestTopicConfigInfo(master, req.getTopicName());
- TubeMQResult result = new TubeMQResult();
-
- if (topicInfoList == null) {
- return result;
- }
-
- List<TopicInfo> topicInfos = topicInfoList.getTopicInfo();
- // 2. for each broker, request to clone offset
- for (TopicInfo topicInfo : topicInfos) {
- String brokerIp = topicInfo.getBrokerIp();
- String url = SCHEMA + brokerIp + ":" + brokerWebPort
- + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
- result = requestMaster(url);
- if (result.getErrCode() != SUCCESS_CODE) {
- return result;
- }
- }
-
- return result;
- }
-
-
- public TubeHttpTopicInfoList requestTopicConfigInfo(NodeEntry nodeEntry, String topic) {
- String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort()
- + TOPIC_CONFIG_INFO + "&topicName=" + topic;
- HttpGet httpget = new HttpGet(url);
- try (CloseableHttpResponse response = httpclient.execute(httpget)) {
- TubeHttpTopicInfoList topicInfoList =
- gson.fromJson(new InputStreamReader(response.getEntity().getContent()),
- TubeHttpTopicInfoList.class);
- if (topicInfoList.getErrCode() == SUCCESS_CODE) {
- return topicInfoList;
- }
- } catch (Exception ex) {
- log.error("exception caught while requesting broker status", ex);
- }
- return null;
- }
-
-
- public TubeMQResult rebalanceGroup(RebalanceGroupReq req) {
-
- NodeEntry master = masterService.getMasterNode(req);
- if (master == null) {
- return TubeMQResult.getErrorResult("no such cluster");
- }
-
- // 1. get all consumer ids in group
- List<String> consumerIds = Objects
- .requireNonNull(requestGroupRunInfo(master, req.getGroupName())).getConsumerIds();
- RebalanceGroupResult rebalanceGroupResult = new RebalanceGroupResult();
-
- // 2. rebalance consumers in group
- consumerIds.forEach(consumerId -> {
- RebalanceConsumerReq rebalanceConsumerReq = convertToRebalanceConsumerReq(req,
- consumerId);
- String url = SCHEMA + master.getIp() + ":" + master.getWebPort()
- + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(rebalanceConsumerReq);
- TubeMQResult result = requestMaster(url);
- if (result.getErrCode() != 0) {
- rebalanceGroupResult.getFailConsumers().add(consumerId);
- }
- rebalanceGroupResult.getSuccessConsumers().add(consumerId);
- });
-
- TubeMQResult tubeResult = new TubeMQResult();
- tubeResult.setData(gson.toJson(rebalanceGroupResult));
-
- return tubeResult;
- }
-
-
- public TubeMQResult deleteOffset(DeleteOffsetReq req) {
-
- NodeEntry master = masterService.getMasterNode(req);
- if (master == null) {
- return TubeMQResult.getErrorResult("no such cluster");
- }
-
- // 1. query the corresponding brokers having given topic
- TubeHttpTopicInfoList topicInfoList = requestTopicConfigInfo(master, req.getTopicName());
- TubeMQResult result = new TubeMQResult();
- CleanOffsetResult cleanOffsetResult = new CleanOffsetResult();
- if (topicInfoList == null) {
- return TubeMQResult.getErrorResult("no such topic");
- }
-
- List<TopicInfo> topicInfos = topicInfoList.getTopicInfo();
- // 2. for each broker, request to delete offset
- for (TopicInfo topicInfo : topicInfos) {
- String brokerIp = topicInfo.getBrokerIp();
- String url = SCHEMA + brokerIp + ":" + brokerWebPort
- + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
- result = requestMaster(url);
- if (result.getErrCode() != SUCCESS_CODE) {
- cleanOffsetResult.getFailBrokers().add(brokerIp);
- } else {
- cleanOffsetResult.getSuccessBrokers().add(brokerIp);
- }
- }
-
- result.setData(gson.toJson(cleanOffsetResult));
-
- return result;
- }
+public interface TopicService {
+
+ /**
+ * get consumer group run info
+ * @param nodeEntry
+ * @param group
+ * @return
+ */
+ TubeHttpGroupDetailInfo requestGroupRunInfo(NodeEntry nodeEntry, String group);
+
+ /**
+ * clone offset to other groups
+ * @param req
+ * @return
+ */
+ TubeMQResult cloneOffsetToOtherGroups(CloneOffsetReq req);
+
+ /**
+ * get topic config info
+ * @param nodeEntry
+ * @param topic
+ * @return
+ */
+ TubeHttpTopicInfoList requestTopicConfigInfo(NodeEntry nodeEntry, String topic);
+
+ /**
+ * rebalance group
+ * @param req
+ * @return
+ */
+ TubeMQResult rebalanceGroup(RebalanceGroupReq req);
+
+ /**
+ * delete offset given topic and broker
+ * @param req
+ * @return
+ */
+ TubeMQResult deleteOffset(DeleteOffsetReq req);
+
+
+ /**
+ * query offset given topic and group name
+ * @param req
+ * @return
+ */
+ TubeMQResult queryOffset(QueryOffsetReq req);
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicService.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicServiceImpl.java
similarity index 77%
copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicService.java
copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicServiceImpl.java
index 1abff54..e8af636 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicService.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TopicServiceImpl.java
@@ -18,6 +18,7 @@
package org.apache.tubemq.manager.service;
+import static org.apache.tubemq.manager.service.MasterService.queryMaster;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.QUERY_GROUP_DETAIL_INFO;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.SCHEMA;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.SUCCESS_CODE;
@@ -38,19 +39,19 @@ import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.tubemq.manager.controller.TubeMQResult;
import org.apache.tubemq.manager.controller.group.request.DeleteOffsetReq;
+import org.apache.tubemq.manager.controller.group.request.QueryOffsetReq;
+import org.apache.tubemq.manager.controller.group.result.AllBrokersOffsetRes;
+import org.apache.tubemq.manager.controller.group.result.AllBrokersOffsetRes.OffsetInfo;
+import org.apache.tubemq.manager.controller.group.result.OffsetQueryRes;
import org.apache.tubemq.manager.controller.node.request.CloneOffsetReq;
-import org.apache.tubemq.manager.controller.topic.request.BatchAddGroupAuthReq;
-import org.apache.tubemq.manager.controller.topic.request.DeleteGroupReq;
import org.apache.tubemq.manager.controller.topic.request.RebalanceConsumerReq;
import org.apache.tubemq.manager.controller.topic.request.RebalanceGroupReq;
import org.apache.tubemq.manager.entry.NodeEntry;
-import org.apache.tubemq.manager.repository.NodeRepository;
import org.apache.tubemq.manager.service.tube.CleanOffsetResult;
import org.apache.tubemq.manager.service.tube.RebalanceGroupResult;
import org.apache.tubemq.manager.service.tube.TubeHttpGroupDetailInfo;
import org.apache.tubemq.manager.service.tube.TubeHttpTopicInfoList;
import org.apache.tubemq.manager.service.tube.TubeHttpTopicInfoList.TopicInfoList.TopicInfo;
-import org.apache.tubemq.manager.utils.ConvertUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@@ -60,7 +61,7 @@ import org.springframework.stereotype.Component;
*/
@Slf4j
@Component
-public class TopicService {
+public class TopicServiceImpl implements TopicService {
private final CloseableHttpClient httpclient = HttpClients.createDefault();
private final Gson gson = new Gson();
@@ -71,7 +72,8 @@ public class TopicService {
@Autowired
private MasterService masterService;
- private TubeHttpGroupDetailInfo requestGroupRunInfo(NodeEntry nodeEntry, String group) {
+ @Override
+ public TubeHttpGroupDetailInfo requestGroupRunInfo(NodeEntry nodeEntry, String group) {
String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort()
+ QUERY_GROUP_DETAIL_INFO + "&consumeGroup=" + group;
HttpGet httpget = new HttpGet(url);
@@ -89,6 +91,7 @@ public class TopicService {
}
+ @Override
public TubeMQResult cloneOffsetToOtherGroups(CloneOffsetReq req) {
NodeEntry master = masterService.getMasterNode(req);
@@ -119,6 +122,7 @@ public class TopicService {
}
+ @Override
public TubeHttpTopicInfoList requestTopicConfigInfo(NodeEntry nodeEntry, String topic) {
String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort()
+ TOPIC_CONFIG_INFO + "&topicName=" + topic;
@@ -137,6 +141,7 @@ public class TopicService {
}
+ @Override
public TubeMQResult rebalanceGroup(RebalanceGroupReq req) {
NodeEntry master = masterService.getMasterNode(req);
@@ -169,6 +174,7 @@ public class TopicService {
}
+ @Override
public TubeMQResult deleteOffset(DeleteOffsetReq req) {
NodeEntry master = masterService.getMasterNode(req);
@@ -203,4 +209,48 @@ public class TopicService {
return result;
}
+ @Override
+ public TubeMQResult queryOffset(QueryOffsetReq req) {
+
+ NodeEntry master = masterService.getMasterNode(req);
+ if (master == null) {
+ return TubeMQResult.getErrorResult("no such cluster");
+ }
+
+ // 1. query the corresponding brokers having given topic
+ TubeHttpTopicInfoList topicInfoList = requestTopicConfigInfo(master, req.getTopicName());
+ TubeMQResult result = new TubeMQResult();
+ if (topicInfoList == null) {
+ return TubeMQResult.getErrorResult("no such topic");
+ }
+
+ List<TopicInfo> topicInfos = topicInfoList.getTopicInfo();
+
+ AllBrokersOffsetRes allBrokersOffsetRes = new AllBrokersOffsetRes();
+ List<OffsetInfo> offsetPerBroker = allBrokersOffsetRes.getOffsetPerBroker();
+
+ // 2. for each broker, request to query offset
+ for (TopicInfo topicInfo : topicInfos) {
+ String brokerIp = topicInfo.getBrokerIp();
+ String url = SCHEMA + brokerIp + ":" + brokerWebPort
+ + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
+ OffsetQueryRes res = gson.fromJson(queryMaster(url), OffsetQueryRes.class);
+ if (res.getErrCode() != SUCCESS_CODE) {
+ return TubeMQResult.getErrorResult("query broker id" + topicInfo.getBrokerId() + " fail");
+ }
+ generateOffsetInfo(offsetPerBroker, topicInfo, res);
+ }
+
+ result.setData(gson.toJson(allBrokersOffsetRes));
+ return result;
+ }
+
+
+ private void generateOffsetInfo(List<OffsetInfo> offsetPerBroker, TopicInfo topicInfo,
+ OffsetQueryRes res) {
+ OffsetInfo offsetInfo = new OffsetInfo();
+ offsetInfo.setBrokerId(topicInfo.getBrokerId());
+ offsetInfo.setOffsetQueryRes(res);
+ offsetPerBroker.add(offsetInfo);
+ }
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TubeMQHttpConst.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TubeMQHttpConst.java
index 04cf17b..9939dac 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TubeMQHttpConst.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TubeMQHttpConst.java
@@ -52,5 +52,5 @@ public class TubeMQHttpConst {
public static final String REBALANCE_CONSUMER = "rebalanceConsumer";
public static final String NO_SUCH_CLUSTER = "no such cluster";
public static final Integer SUCCESS_CODE = 0;
-
+ public static final String QUERY = "query";
}
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpTopicInfoList.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpTopicInfoList.java
index 0ba3426..5350ca5 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpTopicInfoList.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpTopicInfoList.java
@@ -105,10 +105,10 @@ public class TubeHttpTopicInfoList {
public List<TopicInfo> getTopicInfo() {
- if (data != null) {
- return data.get(0).getTopicInfo();
+ if (CollectionUtils.isEmpty(data)) {
+ return Lists.newArrayList();
}
- return Lists.newArrayList();
+ return data.get(0).getTopicInfo();
}
public AddTopicReq getAddTopicReq(List<Integer> brokerIds, List<String> targetTopicNames, String token) {
diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java
index 58fdd15..130faa0 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java
@@ -36,7 +36,7 @@ import static org.apache.tubemq.manager.service.TubeMQHttpConst.REBALANCE_GROUP;
@Slf4j
public class ConvertUtils {
- public static Gson gson = new Gson();
+ public static final Gson gson = new Gson();
public static String convertReqToQueryStr(Object req) {
List<String> queryList = new ArrayList<>();
diff --git a/tubemq-manager/src/main/resources/application.properties b/tubemq-manager/src/main/resources/application.properties
index dee51b7..9afbdab 100644
--- a/tubemq-manager/src/main/resources/application.properties
+++ b/tubemq-manager/src/main/resources/application.properties
@@ -14,4 +14,4 @@
# limitations under the License.
spring.jpa.hibernate.ddl-auto=update
-# configuration for manager
+# configuration for manager
\ No newline at end of file
diff --git a/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestNodeController.java b/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestNodeController.java
index 9b8ce64..8bad33b 100644
--- a/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestNodeController.java
+++ b/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestNodeController.java
@@ -16,11 +16,9 @@
*/
package org.apache.tubemq.manager.controller;
-import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
import org.apache.tubemq.manager.entry.NodeEntry;
import org.apache.tubemq.manager.repository.NodeRepository;
-import org.apache.tubemq.manager.service.NodeService;
import org.assertj.core.util.Lists;
import org.junit.Assert;
import org.junit.Test;
@@ -29,15 +27,12 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
-import org.springframework.http.MediaType;
-import org.springframework.http.ResponseEntity;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.web.servlet.MockMvc;
import org.springframework.test.web.servlet.MvcResult;
import org.springframework.test.web.servlet.RequestBuilder;
import java.util.List;
-import java.util.Objects;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertTrue;