You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/19 09:40:22 UTC
[05/43] incubator-rocketmq git commit: Finish code dump. Reviewed by:
@yukon @vongosling @stevenschew @vintagewang @lollipop @zander
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
new file mode 100644
index 0000000..1de96db
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -0,0 +1,933 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.tools.admin;
+
+import com.alibaba.rocketmq.client.QueryResult;
+import com.alibaba.rocketmq.client.admin.MQAdminExtInner;
+import com.alibaba.rocketmq.client.exception.MQBrokerException;
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.client.impl.MQClientManager;
+import com.alibaba.rocketmq.client.impl.factory.MQClientInstance;
+import com.alibaba.rocketmq.client.log.ClientLogger;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.ServiceState;
+import com.alibaba.rocketmq.common.TopicConfig;
+import com.alibaba.rocketmq.common.UtilAll;
+import com.alibaba.rocketmq.common.admin.*;
+import com.alibaba.rocketmq.common.help.FAQUrl;
+import com.alibaba.rocketmq.common.message.*;
+import com.alibaba.rocketmq.common.namesrv.NamesrvUtil;
+import com.alibaba.rocketmq.common.protocol.ResponseCode;
+import com.alibaba.rocketmq.common.protocol.body.*;
+import com.alibaba.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
+import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import com.alibaba.rocketmq.common.protocol.route.BrokerData;
+import com.alibaba.rocketmq.common.protocol.route.QueueData;
+import com.alibaba.rocketmq.common.protocol.route.TopicRouteData;
+import com.alibaba.rocketmq.common.subscription.SubscriptionGroupConfig;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.remoting.common.RemotingHelper;
+import com.alibaba.rocketmq.remoting.common.RemotingUtil;
+import com.alibaba.rocketmq.remoting.exception.*;
+import com.alibaba.rocketmq.tools.admin.api.MessageTrack;
+import com.alibaba.rocketmq.tools.admin.api.TrackType;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+
+import java.io.UnsupportedEncodingException;
+import java.util.*;
+import java.util.Map.Entry;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
+ private final Logger log = ClientLogger.getLog();
+ private final DefaultMQAdminExt defaultMQAdminExt;
+ private ServiceState serviceState = ServiceState.CREATE_JUST;
+ private MQClientInstance mqClientInstance;
+ private RPCHook rpcHook;
+ private long timeoutMillis = 20000;
+ private Random random = new Random();
+
+
+ public DefaultMQAdminExtImpl(DefaultMQAdminExt defaultMQAdminExt, long timeoutMillis) {
+ this(defaultMQAdminExt, null, timeoutMillis);
+ }
+
+
+ public DefaultMQAdminExtImpl(DefaultMQAdminExt defaultMQAdminExt, RPCHook rpcHook, long timeoutMillis) {
+ this.defaultMQAdminExt = defaultMQAdminExt;
+ this.rpcHook = rpcHook;
+ this.timeoutMillis = timeoutMillis;
+ }
+
+
+ @Override
+ public void start() throws MQClientException {
+ switch (this.serviceState) {
+ case CREATE_JUST:
+ this.serviceState = ServiceState.START_FAILED;
+
+ this.defaultMQAdminExt.changeInstanceNameToPID();
+
+ this.mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQAdminExt, rpcHook);
+
+ boolean registerOK = mqClientInstance.registerAdminExt(this.defaultMQAdminExt.getAdminExtGroup(), this);
+ if (!registerOK) {
+ this.serviceState = ServiceState.CREATE_JUST;
+ throw new MQClientException("The adminExt group[" + this.defaultMQAdminExt.getAdminExtGroup()
+ + "] has created already, specifed another name please."//
+ + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null);
+ }
+
+ mqClientInstance.start();
+
+ log.info("the adminExt [{}] start OK", this.defaultMQAdminExt.getAdminExtGroup());
+
+ this.serviceState = ServiceState.RUNNING;
+ break;
+ case RUNNING:
+ case START_FAILED:
+ case SHUTDOWN_ALREADY:
+ throw new MQClientException("The AdminExt service state not OK, maybe started once, "//
+ + this.serviceState//
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null);
+ default:
+ break;
+ }
+ }
+
+
+ @Override
+ public void shutdown() {
+ switch (this.serviceState) {
+ case CREATE_JUST:
+ break;
+ case RUNNING:
+ this.mqClientInstance.unregisterAdminExt(this.defaultMQAdminExt.getAdminExtGroup());
+ this.mqClientInstance.shutdown();
+
+ log.info("the adminExt [{}] shutdown OK", this.defaultMQAdminExt.getAdminExtGroup());
+ this.serviceState = ServiceState.SHUTDOWN_ALREADY;
+ break;
+ case SHUTDOWN_ALREADY:
+ break;
+ default:
+ break;
+ }
+ }
+
+ @Override
+ public void updateBrokerConfig(String brokerAddr, Properties properties) throws RemotingConnectException, RemotingSendRequestException,
+ RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, MQBrokerException {
+ this.mqClientInstance.getMQClientAPIImpl().updateBrokerConfig(brokerAddr, properties, timeoutMillis);
+ }
+
+ @Override
+ public Properties getBrokerConfig(final String brokerAddr) throws RemotingConnectException,
+ RemotingSendRequestException, RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, MQBrokerException {
+ return this.mqClientInstance.getMQClientAPIImpl().getBrokerConfig(brokerAddr, timeoutMillis);
+ }
+
+ @Override
+ public void createAndUpdateTopicConfig(String addr, TopicConfig config) throws RemotingException, MQBrokerException,
+ InterruptedException, MQClientException {
+ this.mqClientInstance.getMQClientAPIImpl().createTopic(addr, this.defaultMQAdminExt.getCreateTopicKey(), config, timeoutMillis);
+ }
+
+ @Override
+ public void createAndUpdateSubscriptionGroupConfig(String addr, SubscriptionGroupConfig config) throws RemotingException,
+ MQBrokerException, InterruptedException, MQClientException {
+ this.mqClientInstance.getMQClientAPIImpl().createSubscriptionGroup(addr, config, timeoutMillis);
+ }
+
+ @Override
+ public SubscriptionGroupConfig examineSubscriptionGroupConfig(String addr, String group) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public TopicConfig examineTopicConfig(String addr, String topic) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public TopicStatsTable examineTopicStats(String topic) throws RemotingException, MQClientException, InterruptedException,
+ MQBrokerException {
+ TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
+ TopicStatsTable topicStatsTable = new TopicStatsTable();
+
+ for (BrokerData bd : topicRouteData.getBrokerDatas()) {
+ String addr = bd.selectBrokerAddr();
+ if (addr != null) {
+ TopicStatsTable tst = this.mqClientInstance.getMQClientAPIImpl().getTopicStatsInfo(addr, topic, timeoutMillis);
+ topicStatsTable.getOffsetTable().putAll(tst.getOffsetTable());
+ }
+ }
+
+ if (topicStatsTable.getOffsetTable().isEmpty()) {
+ throw new MQClientException("Not found the topic stats info", null);
+ }
+
+ return topicStatsTable;
+ }
+
+ @Override
+ public TopicList fetchAllTopicList() throws RemotingException, MQClientException, InterruptedException {
+ return this.mqClientInstance.getMQClientAPIImpl().getTopicListFromNameServer(timeoutMillis);
+ }
+
+ @Override
+ public TopicList fetchTopicsByCLuster(String clusterName) throws RemotingException, MQClientException, InterruptedException {
+ return this.mqClientInstance.getMQClientAPIImpl().getTopicsByCluster(clusterName, timeoutMillis);
+ }
+
+ @Override
+ public KVTable fetchBrokerRuntimeStats(final String brokerAddr) throws RemotingConnectException, RemotingSendRequestException,
+ RemotingTimeoutException, InterruptedException, MQBrokerException {
+ return this.mqClientInstance.getMQClientAPIImpl().getBrokerRuntimeInfo(brokerAddr, timeoutMillis);
+ }
+
+ @Override
+ public ConsumeStats examineConsumeStats(String consumerGroup) throws RemotingException, MQClientException, InterruptedException,
+ MQBrokerException {
+ return examineConsumeStats(consumerGroup, null);
+ }
+
+ @Override
+ public ConsumeStats examineConsumeStats(String consumerGroup, String topic) throws RemotingException, MQClientException,
+ InterruptedException, MQBrokerException {
+ String retryTopic = MixAll.getRetryTopic(consumerGroup);
+ TopicRouteData topicRouteData = this.examineTopicRouteInfo(retryTopic);
+ ConsumeStats result = new ConsumeStats();
+
+ for (BrokerData bd : topicRouteData.getBrokerDatas()) {
+ String addr = bd.selectBrokerAddr();
+ if (addr != null) {
+ ConsumeStats consumeStats =
+ this.mqClientInstance.getMQClientAPIImpl().getConsumeStats(addr, consumerGroup, topic, timeoutMillis * 3);
+ result.getOffsetTable().putAll(consumeStats.getOffsetTable());
+ double value = result.getConsumeTps() + consumeStats.getConsumeTps();
+ result.setConsumeTps(value);
+ }
+ }
+
+ if (result.getOffsetTable().isEmpty()) {
+ throw new MQClientException(ResponseCode.CONSUMER_NOT_ONLINE,
+ "Not found the consumer group consume stats, because return offset table is empty, maybe the consumer not consume any message");
+ }
+
+ return result;
+ }
+
+ @Override
+ public ClusterInfo examineBrokerClusterInfo() throws InterruptedException, MQBrokerException, RemotingTimeoutException,
+ RemotingSendRequestException, RemotingConnectException {
+ return this.mqClientInstance.getMQClientAPIImpl().getBrokerClusterInfo(timeoutMillis);
+ }
+
+ @Override
+ public TopicRouteData examineTopicRouteInfo(String topic) throws RemotingException, MQClientException, InterruptedException {
+ return this.mqClientInstance.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis);
+ }
+
+
+ @Override
+ public MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ try {
+ MessageDecoder.decodeMessageId(msgId);
+ return this.viewMessage(msgId);
+ } catch (Exception e) {
+ log.warn("the msgId maybe created by new client. msgId={}", msgId, e);
+ }
+ return this.mqClientInstance.getMQAdminImpl().queryMessageByUniqKey(topic, msgId);
+ }
+
+ @Override
+ public ConsumerConnection examineConsumerConnectionInfo(String consumerGroup) throws InterruptedException, MQBrokerException,
+ RemotingException, MQClientException {
+ ConsumerConnection result = new ConsumerConnection();
+ String topic = MixAll.getRetryTopic(consumerGroup);
+ List<BrokerData> brokers = this.examineTopicRouteInfo(topic).getBrokerDatas();
+ BrokerData brokerData = brokers.get(random.nextInt(brokers.size()));
+ String addr = null;
+ if (brokerData != null) {
+ addr = brokerData.selectBrokerAddr();
+ if (StringUtils.isNotBlank(addr)) {
+ result = this.mqClientInstance.getMQClientAPIImpl().getConsumerConnectionList(addr, consumerGroup, timeoutMillis);
+ }
+ }
+
+ if (result.getConnectionSet().isEmpty()) {
+ log.warn("the consumer group not online. brokerAddr={}, group={}", addr, consumerGroup);
+ throw new MQClientException(ResponseCode.CONSUMER_NOT_ONLINE, "Not found the consumer group connection");
+ }
+
+ return result;
+ }
+
+ @Override
+ public ProducerConnection examineProducerConnectionInfo(String producerGroup, final String topic) throws RemotingException,
+ MQClientException, InterruptedException, MQBrokerException {
+ ProducerConnection result = new ProducerConnection();
+ List<BrokerData> brokers = this.examineTopicRouteInfo(topic).getBrokerDatas();
+ BrokerData brokerData = brokers.get(random.nextInt(brokers.size()));
+ String addr = null;
+ if (brokerData != null) {
+ addr = brokerData.selectBrokerAddr();
+ if (StringUtils.isNotBlank(addr)) {
+ result = this.mqClientInstance.getMQClientAPIImpl().getProducerConnectionList(addr, producerGroup, timeoutMillis);
+ }
+ }
+
+ if (result.getConnectionSet().isEmpty()) {
+ log.warn("the producer group not online. brokerAddr={}, group={}", addr, producerGroup);
+ throw new MQClientException("Not found the producer group connection", null);
+ }
+
+ return result;
+ }
+
+ @Override
+ public List<String> getNameServerAddressList() {
+ return this.mqClientInstance.getMQClientAPIImpl().getNameServerAddressList();
+ }
+
+ @Override
+ public int wipeWritePermOfBroker(final String namesrvAddr, String brokerName) throws RemotingCommandException,
+ RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException {
+ return this.mqClientInstance.getMQClientAPIImpl().wipeWritePermOfBroker(namesrvAddr, brokerName, timeoutMillis);
+ }
+
+ @Override
+ public void putKVConfig(String namespace, String key, String value) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public String getKVConfig(String namespace, String key) throws RemotingException, MQClientException, InterruptedException {
+ return this.mqClientInstance.getMQClientAPIImpl().getKVConfigValue(namespace, key, timeoutMillis);
+ }
+
+ @Override
+ public KVTable getKVListByNamespace(String namespace) throws RemotingException, MQClientException, InterruptedException {
+ return this.mqClientInstance.getMQClientAPIImpl().getKVListByNamespace(namespace, timeoutMillis);
+ }
+
+ @Override
+ public void deleteTopicInBroker(Set<String> addrs, String topic) throws RemotingException, MQBrokerException, InterruptedException,
+ MQClientException {
+ for (String addr : addrs) {
+ this.mqClientInstance.getMQClientAPIImpl().deleteTopicInBroker(addr, topic, timeoutMillis);
+ }
+ }
+
+ @Override
+ public void deleteTopicInNameServer(Set<String> addrs, String topic) throws RemotingException, MQBrokerException, InterruptedException,
+ MQClientException {
+ if (addrs == null) {
+ String ns = this.mqClientInstance.getMQClientAPIImpl().fetchNameServerAddr();
+ addrs = new HashSet(Arrays.asList(ns.split(";")));
+ }
+ for (String addr : addrs) {
+ this.mqClientInstance.getMQClientAPIImpl().deleteTopicInNameServer(addr, topic, timeoutMillis);
+ }
+ }
+
+ @Override
+ public void deleteSubscriptionGroup(String addr, String groupName) throws RemotingException, MQBrokerException, InterruptedException,
+ MQClientException {
+ this.mqClientInstance.getMQClientAPIImpl().deleteSubscriptionGroup(addr, groupName, timeoutMillis);
+ }
+
+ @Override
+ public void createAndUpdateKvConfig(String namespace, String key, String value) throws RemotingException, MQBrokerException,
+ InterruptedException, MQClientException {
+ this.mqClientInstance.getMQClientAPIImpl().putKVConfigValue(namespace, key, value, timeoutMillis);
+ }
+
+ @Override
+ public void deleteKvConfig(String namespace, String key) throws RemotingException, MQBrokerException, InterruptedException,
+ MQClientException {
+ this.mqClientInstance.getMQClientAPIImpl().deleteKVConfigValue(namespace, key, timeoutMillis);
+ }
+
+ @Override
+ public List<RollbackStats> resetOffsetByTimestampOld(String consumerGroup, String topic, long timestamp, boolean force)
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
+ List<RollbackStats> rollbackStatsList = new ArrayList<RollbackStats>();
+ Map<String, Integer> topicRouteMap = new HashMap<String, Integer>();
+ for (BrokerData bd : topicRouteData.getBrokerDatas()) {
+ for (QueueData queueData : topicRouteData.getQueueDatas()) {
+ topicRouteMap.put(bd.selectBrokerAddr(), queueData.getReadQueueNums());
+ }
+ }
+ for (BrokerData bd : topicRouteData.getBrokerDatas()) {
+ String addr = bd.selectBrokerAddr();
+ if (addr != null) {
+ ConsumeStats consumeStats = this.mqClientInstance.getMQClientAPIImpl().getConsumeStats(addr, consumerGroup, timeoutMillis);
+
+ boolean hasConsumed = false;
+ for (Map.Entry<MessageQueue, OffsetWrapper> entry : consumeStats.getOffsetTable().entrySet()) {
+ MessageQueue queue = entry.getKey();
+ OffsetWrapper offsetWrapper = entry.getValue();
+ if (topic.equals(queue.getTopic())) {
+ hasConsumed = true;
+ RollbackStats rollbackStats = resetOffsetConsumeOffset(addr, consumerGroup, queue, offsetWrapper, timestamp, force);
+ rollbackStatsList.add(rollbackStats);
+ }
+ }
+
+ if (!hasConsumed) {
+ HashMap<MessageQueue, TopicOffset> topicStatus =
+ this.mqClientInstance.getMQClientAPIImpl().getTopicStatsInfo(addr, topic, timeoutMillis).getOffsetTable();
+ for (int i = 0; i < topicRouteMap.get(addr); i++) {
+ MessageQueue queue = new MessageQueue(topic, bd.getBrokerName(), i);
+ OffsetWrapper offsetWrapper = new OffsetWrapper();
+ offsetWrapper.setBrokerOffset(topicStatus.get(queue).getMaxOffset());
+ offsetWrapper.setConsumerOffset(topicStatus.get(queue).getMinOffset());
+
+ RollbackStats rollbackStats = resetOffsetConsumeOffset(addr, consumerGroup, queue, offsetWrapper, timestamp, force);
+ rollbackStatsList.add(rollbackStats);
+ }
+ }
+ }
+ }
+ return rollbackStatsList;
+ }
+
+ @Override
+ public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce)
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ return resetOffsetByTimestamp(topic, group, timestamp, isForce, false);
+ }
+
+ @Override
+ public void resetOffsetNew(String consumerGroup, String topic, long timestamp) throws RemotingException, MQBrokerException,
+ InterruptedException, MQClientException {
+ try {
+ this.resetOffsetByTimestamp(topic, consumerGroup, timestamp, true);
+ } catch (MQClientException e) {
+ if (ResponseCode.CONSUMER_NOT_ONLINE == e.getResponseCode()) {
+ this.resetOffsetByTimestampOld(consumerGroup, topic, timestamp, true);
+ return;
+ }
+ throw e;
+ }
+ }
+
+ public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce, boolean isC)
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
+ List<BrokerData> brokerDatas = topicRouteData.getBrokerDatas();
+ Map<MessageQueue, Long> allOffsetTable = new HashMap<MessageQueue, Long>();
+ if (brokerDatas != null) {
+ for (BrokerData brokerData : brokerDatas) {
+ String addr = brokerData.selectBrokerAddr();
+ if (addr != null) {
+ Map<MessageQueue, Long> offsetTable =
+ this.mqClientInstance.getMQClientAPIImpl().invokeBrokerToResetOffset(addr, topic, group, timestamp, isForce,
+ timeoutMillis, isC);
+ if (offsetTable != null) {
+ allOffsetTable.putAll(offsetTable);
+ }
+ }
+ }
+ }
+ return allOffsetTable;
+ }
+
+ private RollbackStats resetOffsetConsumeOffset(String brokerAddr, String consumeGroup, MessageQueue queue, OffsetWrapper offsetWrapper,
+ long timestamp, boolean force) throws RemotingException, InterruptedException, MQBrokerException {
+ long resetOffset;
+ if (timestamp == -1) {
+
+ resetOffset = this.mqClientInstance.getMQClientAPIImpl().getMaxOffset(brokerAddr, queue.getTopic(), queue.getQueueId(), timeoutMillis);
+ } else {
+ resetOffset =
+ this.mqClientInstance.getMQClientAPIImpl().searchOffset(brokerAddr, queue.getTopic(), queue.getQueueId(), timestamp,
+ timeoutMillis);
+ }
+
+
+ RollbackStats rollbackStats = new RollbackStats();
+ rollbackStats.setBrokerName(queue.getBrokerName());
+ rollbackStats.setQueueId(queue.getQueueId());
+ rollbackStats.setBrokerOffset(offsetWrapper.getBrokerOffset());
+ rollbackStats.setConsumerOffset(offsetWrapper.getConsumerOffset());
+ rollbackStats.setTimestampOffset(resetOffset);
+ rollbackStats.setRollbackOffset(offsetWrapper.getConsumerOffset());
+
+ if (force || resetOffset <= offsetWrapper.getConsumerOffset()) {
+ rollbackStats.setRollbackOffset(resetOffset);
+ UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
+ requestHeader.setConsumerGroup(consumeGroup);
+ requestHeader.setTopic(queue.getTopic());
+ requestHeader.setQueueId(queue.getQueueId());
+ requestHeader.setCommitOffset(resetOffset);
+ this.mqClientInstance.getMQClientAPIImpl().updateConsumerOffset(brokerAddr, requestHeader, timeoutMillis);
+ }
+ return rollbackStats;
+ }
+
+ @Override
+ public Map<String, Map<MessageQueue, Long>> getConsumeStatus(String topic, String group, String clientAddr) throws RemotingException,
+ MQBrokerException, InterruptedException, MQClientException {
+ TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
+ List<BrokerData> brokerDatas = topicRouteData.getBrokerDatas();
+ if (brokerDatas != null && brokerDatas.size() > 0) {
+ String addr = brokerDatas.get(0).selectBrokerAddr();
+ if (addr != null) {
+ return this.mqClientInstance.getMQClientAPIImpl().invokeBrokerToGetConsumerStatus(addr, topic, group, clientAddr,
+ timeoutMillis);
+ }
+ }
+ return Collections.EMPTY_MAP;
+ }
+
+ public void createOrUpdateOrderConf(String key, String value, boolean isCluster) throws RemotingException, MQBrokerException,
+ InterruptedException, MQClientException {
+
+ if (isCluster) {
+ this.mqClientInstance.getMQClientAPIImpl()
+ .putKVConfigValue(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, key, value, timeoutMillis);
+ } else {
+ String oldOrderConfs = null;
+ try {
+ oldOrderConfs =
+ this.mqClientInstance.getMQClientAPIImpl().getKVConfigValue(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, key,
+ timeoutMillis);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ Map<String, String> orderConfMap = new HashMap<String, String>();
+ if (!UtilAll.isBlank(oldOrderConfs)) {
+ String[] oldOrderConfArr = oldOrderConfs.split(";");
+ for (String oldOrderConf : oldOrderConfArr) {
+ String[] items = oldOrderConf.split(":");
+ orderConfMap.put(items[0], oldOrderConf);
+ }
+ }
+ String[] items = value.split(":");
+ orderConfMap.put(items[0], value);
+
+ StringBuilder newOrderConf = new StringBuilder();
+ String splitor = "";
+ for (Map.Entry<String, String> entry : orderConfMap.entrySet()) {
+ newOrderConf.append(splitor).append(entry.getValue());
+ splitor = ";";
+ }
+ this.mqClientInstance.getMQClientAPIImpl().putKVConfigValue(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, key,
+ newOrderConf.toString(), timeoutMillis);
+ }
+ }
+
+ @Override
+ public GroupList queryTopicConsumeByWho(String topic) throws InterruptedException, MQBrokerException, RemotingException,
+ MQClientException {
+ TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
+
+ for (BrokerData bd : topicRouteData.getBrokerDatas()) {
+ String addr = bd.selectBrokerAddr();
+ if (addr != null) {
+ return this.mqClientInstance.getMQClientAPIImpl().queryTopicConsumeByWho(addr, topic, timeoutMillis);
+ }
+
+ break;
+ }
+
+ return null;
+ }
+
+ @Override
+ public List<QueueTimeSpan> queryConsumeTimeSpan(final String topic, final String group) throws InterruptedException, MQBrokerException,
+ RemotingException, MQClientException {
+ List<QueueTimeSpan> spanSet = new ArrayList<QueueTimeSpan>();
+ TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
+ for (BrokerData bd : topicRouteData.getBrokerDatas()) {
+ String addr = bd.selectBrokerAddr();
+ if (addr != null) {
+ spanSet.addAll(this.mqClientInstance.getMQClientAPIImpl().queryConsumeTimeSpan(addr, topic, group, timeoutMillis));
+ }
+ }
+ return spanSet;
+ }
+
+ @Override
+ public boolean cleanExpiredConsumerQueue(String cluster) throws RemotingConnectException, RemotingSendRequestException,
+ RemotingTimeoutException, MQClientException, InterruptedException {
+ boolean result = false;
+ try {
+ ClusterInfo clusterInfo = examineBrokerClusterInfo();
+ if (null == cluster || "".equals(cluster)) {
+ for (String targetCluster : clusterInfo.retrieveAllClusterNames()) {
+ result = cleanExpiredConsumerQueueByCluster(clusterInfo, targetCluster);
+ }
+ } else {
+ result = cleanExpiredConsumerQueueByCluster(clusterInfo, cluster);
+ }
+ } catch (MQBrokerException e) {
+ log.error("cleanExpiredConsumerQueue error.", e);
+ }
+
+ return result;
+ }
+
+ public boolean cleanExpiredConsumerQueueByCluster(ClusterInfo clusterInfo, String cluster) throws RemotingConnectException,
+ RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
+ boolean result = false;
+ String[] addrs = clusterInfo.retrieveAllAddrByCluster(cluster);
+ for (String addr : addrs) {
+ result = cleanExpiredConsumerQueueByAddr(addr);
+ }
+ return result;
+ }
+
+ @Override
+ public boolean cleanExpiredConsumerQueueByAddr(String addr) throws RemotingConnectException, RemotingSendRequestException,
+ RemotingTimeoutException, MQClientException, InterruptedException {
+ boolean result = mqClientInstance.getMQClientAPIImpl().cleanExpiredConsumeQueue(addr, timeoutMillis);
+ log.warn("clean expired ConsumeQueue on target " + addr + " broker " + result);
+ return result;
+ }
+
+ @Override
+ public boolean cleanUnusedTopic(String cluster) throws RemotingConnectException, RemotingSendRequestException,
+ RemotingTimeoutException, MQClientException, InterruptedException {
+ boolean result = false;
+ try {
+ ClusterInfo clusterInfo = examineBrokerClusterInfo();
+ if (null == cluster || "".equals(cluster)) {
+ for (String targetCluster : clusterInfo.retrieveAllClusterNames()) {
+ result = cleanUnusedTopicByCluster(clusterInfo, targetCluster);
+ }
+ } else {
+ result = cleanUnusedTopicByCluster(clusterInfo, cluster);
+ }
+ } catch (MQBrokerException e) {
+ log.error("cleanExpiredConsumerQueue error.", e);
+ }
+
+ return result;
+ }
+
+ public boolean cleanUnusedTopicByCluster(ClusterInfo clusterInfo, String cluster) throws RemotingConnectException,
+ RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
+ boolean result = false;
+ String[] addrs = clusterInfo.retrieveAllAddrByCluster(cluster);
+ for (String addr : addrs) {
+ result = cleanUnusedTopicByAddr(addr);
+ }
+ return result;
+ }
+
+ @Override
+ public boolean cleanUnusedTopicByAddr(String addr) throws RemotingConnectException, RemotingSendRequestException,
+ RemotingTimeoutException, MQClientException, InterruptedException {
+ boolean result = mqClientInstance.getMQClientAPIImpl().cleanUnusedTopicByAddr(addr, timeoutMillis);
+ log.warn("clean expired ConsumeQueue on target " + addr + " broker " + result);
+ return result;
+ }
+
+ @Override
+ public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack) throws RemotingException,
+ MQClientException, InterruptedException {
+ String topic = MixAll.RETRY_GROUP_TOPIC_PREFIX + consumerGroup;
+ TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
+ List<BrokerData> brokerDatas = topicRouteData.getBrokerDatas();
+ if (brokerDatas != null) {
+ for (BrokerData brokerData : brokerDatas) {
+ String addr = brokerData.selectBrokerAddr();
+ if (addr != null) {
+ return this.mqClientInstance.getMQClientAPIImpl().getConsumerRunningInfo(addr, consumerGroup, clientId, jstack,
+ timeoutMillis * 3);
+ }
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup, String clientId, String msgId)
+ throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+ MessageExt msg = this.viewMessage(msgId);
+
+ return this.mqClientInstance.getMQClientAPIImpl().consumeMessageDirectly(RemotingUtil.socketAddress2String(msg.getStoreHost()),
+ consumerGroup, clientId, msgId, timeoutMillis * 3);
+ }
+
+ @Override
+ public ConsumeMessageDirectlyResult consumeMessageDirectly(final String consumerGroup, final String clientId, final String topic, final String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+ MessageExt msg = this.viewMessage(topic, msgId);
+ if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) {
+ return this.mqClientInstance.getMQClientAPIImpl().consumeMessageDirectly(RemotingUtil.socketAddress2String(msg.getStoreHost()),
+ consumerGroup, clientId, msgId, timeoutMillis * 3);
+ } else {
+ MessageClientExt msgClient = (MessageClientExt) msg;
+ return this.mqClientInstance.getMQClientAPIImpl().consumeMessageDirectly(RemotingUtil.socketAddress2String(msg.getStoreHost()),
+ consumerGroup, clientId, msgClient.getOffsetMsgId(), timeoutMillis * 3);
+ }
+ }
+
+ @Override
+ public List<MessageTrack> messageTrackDetail(MessageExt msg) throws RemotingException, MQClientException, InterruptedException,
+ MQBrokerException {
+ List<MessageTrack> result = new ArrayList<MessageTrack>();
+
+ GroupList groupList = this.queryTopicConsumeByWho(msg.getTopic());
+
+ for (String group : groupList.getGroupList()) {
+
+ MessageTrack mt = new MessageTrack();
+ mt.setConsumerGroup(group);
+ mt.setTrackType(TrackType.UNKNOWN);
+ ConsumerConnection cc = null;
+ try {
+ cc = this.examineConsumerConnectionInfo(group);
+ } catch (MQBrokerException e) {
+ if (ResponseCode.CONSUMER_NOT_ONLINE == e.getResponseCode()) {
+ mt.setTrackType(TrackType.NOT_ONLINE);
+ }
+ mt.setExceptionDesc("CODE:" + e.getResponseCode() + " DESC:" + e.getErrorMessage());
+ result.add(mt);
+ continue;
+ } catch (Exception e) {
+ mt.setExceptionDesc(RemotingHelper.exceptionSimpleDesc(e));
+ result.add(mt);
+ continue;
+ }
+
+ switch (cc.getConsumeType()) {
+ case CONSUME_ACTIVELY:
+ mt.setTrackType(TrackType.PULL);
+ break;
+ case CONSUME_PASSIVELY:
+ boolean ifConsumed = false;
+ try {
+ ifConsumed = this.consumed(msg, group);
+ } catch (MQClientException e) {
+ if (ResponseCode.CONSUMER_NOT_ONLINE == e.getResponseCode()) {
+ mt.setTrackType(TrackType.NOT_ONLINE);
+ }
+ mt.setExceptionDesc("CODE:" + e.getResponseCode() + " DESC:" + e.getErrorMessage());
+ result.add(mt);
+ continue;
+ } catch (MQBrokerException e) {
+ if (ResponseCode.CONSUMER_NOT_ONLINE == e.getResponseCode()) {
+ mt.setTrackType(TrackType.NOT_ONLINE);
+ }
+ mt.setExceptionDesc("CODE:" + e.getResponseCode() + " DESC:" + e.getErrorMessage());
+ result.add(mt);
+ continue;
+ } catch (Exception e) {
+ mt.setExceptionDesc(RemotingHelper.exceptionSimpleDesc(e));
+ result.add(mt);
+ continue;
+ }
+
+ if (ifConsumed) {
+ mt.setTrackType(TrackType.CONSUMED);
+ Iterator<Entry<String, SubscriptionData>> it = cc.getSubscriptionTable().entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, SubscriptionData> next = it.next();
+ if (next.getKey().equals(msg.getTopic())) {
+ if (next.getValue().getTagsSet().contains(msg.getTags())
+ || next.getValue().getTagsSet().contains("*")
+ || next.getValue().getTagsSet().isEmpty()) {
+ } else {
+ mt.setTrackType(TrackType.CONSUMED_BUT_FILTERED);
+ }
+ }
+ }
+ } else {
+ mt.setTrackType(TrackType.NOT_CONSUME_YET);
+ }
+ break;
+ default:
+ break;
+ }
+ result.add(mt);
+ }
+
+ return result;
+ }
+
+ public boolean consumed(final MessageExt msg, final String group) throws RemotingException, MQClientException, InterruptedException,
+ MQBrokerException {
+
+ ConsumeStats cstats = this.examineConsumeStats(group);
+
+ ClusterInfo ci = this.examineBrokerClusterInfo();
+
+ Iterator<Entry<MessageQueue, OffsetWrapper>> it = cstats.getOffsetTable().entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<MessageQueue, OffsetWrapper> next = it.next();
+ MessageQueue mq = next.getKey();
+ if (mq.getTopic().equals(msg.getTopic()) && mq.getQueueId() == msg.getQueueId()) {
+ BrokerData brokerData = ci.getBrokerAddrTable().get(mq.getBrokerName());
+ if (brokerData != null) {
+ String addr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
+ if (addr.equals(RemotingUtil.socketAddress2String(msg.getStoreHost()))) {
+ if (next.getValue().getConsumerOffset() > msg.getQueueOffset()) {
+ return true;
+ }
+ }
+ }
+ }
+ }
+
+ return false;
+ }
+
+ @Override
+ public void cloneGroupOffset(String srcGroup, String destGroup, String topic, boolean isOffline) throws RemotingException,
+ MQClientException, InterruptedException, MQBrokerException {
+ String retryTopic = MixAll.getRetryTopic(srcGroup);
+ TopicRouteData topicRouteData = this.examineTopicRouteInfo(retryTopic);
+
+ for (BrokerData bd : topicRouteData.getBrokerDatas()) {
+ String addr = bd.selectBrokerAddr();
+ if (addr != null) {
+ this.mqClientInstance.getMQClientAPIImpl().cloneGroupOffset(addr, srcGroup, destGroup, topic, isOffline, timeoutMillis * 3);
+ }
+ }
+ }
+
+ @Override
+ public BrokerStatsData viewBrokerStatsData(String brokerAddr, String statsName, String statsKey) throws RemotingConnectException,
+ RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
+ return this.mqClientInstance.getMQClientAPIImpl().viewBrokerStatsData(brokerAddr, statsName, statsKey, timeoutMillis);
+ }
+
+ @Override
+ public Set<String> getClusterList(String topic) throws RemotingConnectException, RemotingSendRequestException,
+ RemotingTimeoutException, MQClientException, InterruptedException {
+ return this.mqClientInstance.getMQClientAPIImpl().getClusterList(topic, timeoutMillis);
+ }
+
+ @Override
+ public ConsumeStatsList fetchConsumeStatsInBroker(final String brokerAddr, boolean isOrder, long timeoutMillis)
+ throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException,
+ InterruptedException {
+ return this.mqClientInstance.getMQClientAPIImpl().fetchConsumeStatsInBroker(brokerAddr, isOrder, timeoutMillis);
+ }
+
+ @Override
+ public Set<String> getTopicClusterList(final String topic) throws InterruptedException, MQBrokerException, MQClientException,
+ RemotingException {
+ Set<String> clusterSet = new HashSet<String>();
+ ClusterInfo clusterInfo = examineBrokerClusterInfo();
+ TopicRouteData topicRouteData = examineTopicRouteInfo(topic);
+ BrokerData brokerData = topicRouteData.getBrokerDatas().get(0);
+ String brokerName = brokerData.getBrokerName();
+ Iterator<Map.Entry<String, Set<String>>> it = clusterInfo.getClusterAddrTable().entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<String, Set<String>> next = it.next();
+ if (next.getValue().contains(brokerName)) {
+ clusterSet.add(next.getKey());
+ }
+ }
+ return clusterSet;
+ }
+
+ @Override
+ public SubscriptionGroupWrapper getAllSubscriptionGroup(final String brokerAddr, long timeoutMillis) throws InterruptedException,
+ RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+ return this.mqClientInstance.getMQClientAPIImpl().getAllSubscriptionGroup(brokerAddr, timeoutMillis);
+ }
+
+ @Override
+ public TopicConfigSerializeWrapper getAllTopicGroup(final String brokerAddr, long timeoutMillis) throws InterruptedException,
+ RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+ return this.mqClientInstance.getMQClientAPIImpl().getAllTopicConfig(brokerAddr, timeoutMillis);
+ }
+
+ @Override
+ public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
+ createTopic(key, newTopic, queueNum, 0);
+ }
+
+ @Override
+ public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
+ this.mqClientInstance.getMQAdminImpl().createTopic(key, newTopic, queueNum, topicSysFlag);
+ }
+
+ @Override
+ public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
+ return this.mqClientInstance.getMQAdminImpl().searchOffset(mq, timestamp);
+ }
+
+ @Override
+ public long maxOffset(MessageQueue mq) throws MQClientException {
+ return this.mqClientInstance.getMQAdminImpl().maxOffset(mq);
+ }
+
+ @Override
+ public long minOffset(MessageQueue mq) throws MQClientException {
+ return this.mqClientInstance.getMQAdminImpl().minOffset(mq);
+ }
+
+ @Override
+ public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
+ return this.mqClientInstance.getMQAdminImpl().earliestMsgStoreTime(mq);
+ }
+
+ @Override
+ public MessageExt viewMessage(String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ return this.mqClientInstance.getMQAdminImpl().viewMessage(msgId);
+ }
+
+ @Override
+ public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) throws MQClientException,
+ InterruptedException {
+ return this.mqClientInstance.getMQAdminImpl().queryMessage(topic, key, maxNum, begin, end);
+ }
+
+ @Override
+ public void updateConsumeOffset(String brokerAddr, String consumeGroup, MessageQueue mq, long offset) throws RemotingException, InterruptedException, MQBrokerException {
+ UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
+ requestHeader.setConsumerGroup(consumeGroup);
+ requestHeader.setTopic(mq.getTopic());
+ requestHeader.setQueueId(mq.getQueueId());
+ requestHeader.setCommitOffset(offset);
+ this.mqClientInstance.getMQClientAPIImpl().updateConsumerOffset(brokerAddr, requestHeader, timeoutMillis);
+ }
+
+ @Override
+ public void updateNameServerConfig(final Properties properties, final List<String> nameServers)
+ throws InterruptedException, RemotingConnectException,
+ UnsupportedEncodingException, RemotingSendRequestException, RemotingTimeoutException,
+ MQClientException, MQBrokerException {
+ this.mqClientInstance.getMQClientAPIImpl().updateNameServerConfig(properties, nameServers, timeoutMillis);
+ }
+
+ @Override
+ public Map<String, Properties> getNameServerConfig(final List<String> nameServers)
+ throws InterruptedException, RemotingTimeoutException,
+ RemotingSendRequestException, RemotingConnectException, MQClientException,
+ UnsupportedEncodingException {
+ return this.mqClientInstance.getMQClientAPIImpl().getNameServerConfig(nameServers, timeoutMillis);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/MQAdminExt.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/MQAdminExt.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/MQAdminExt.java
new file mode 100644
index 0000000..0075983
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/MQAdminExt.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.tools.admin;
+
+import com.alibaba.rocketmq.client.MQAdmin;
+import com.alibaba.rocketmq.client.exception.MQBrokerException;
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.common.TopicConfig;
+import com.alibaba.rocketmq.common.admin.ConsumeStats;
+import com.alibaba.rocketmq.common.admin.RollbackStats;
+import com.alibaba.rocketmq.common.admin.TopicStatsTable;
+import com.alibaba.rocketmq.common.message.MessageExt;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.common.protocol.body.*;
+import com.alibaba.rocketmq.common.protocol.route.TopicRouteData;
+import com.alibaba.rocketmq.common.subscription.SubscriptionGroupConfig;
+import com.alibaba.rocketmq.remoting.exception.*;
+import com.alibaba.rocketmq.tools.admin.api.MessageTrack;
+
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+public interface MQAdminExt extends MQAdmin {
+ void start() throws MQClientException;
+
+ void shutdown();
+
+ void updateBrokerConfig(final String brokerAddr, final Properties properties) throws RemotingConnectException,
+ RemotingSendRequestException, RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, MQBrokerException;
+
+ Properties getBrokerConfig(final String brokerAddr) throws RemotingConnectException,
+ RemotingSendRequestException, RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, MQBrokerException;
+
+ void createAndUpdateTopicConfig(final String addr, final TopicConfig config) throws RemotingException, MQBrokerException,
+ InterruptedException, MQClientException;
+
+ void createAndUpdateSubscriptionGroupConfig(final String addr, final SubscriptionGroupConfig config) throws RemotingException,
+ MQBrokerException, InterruptedException, MQClientException;
+
+ SubscriptionGroupConfig examineSubscriptionGroupConfig(final String addr, final String group);
+
+ TopicConfig examineTopicConfig(final String addr, final String topic);
+
+ TopicStatsTable examineTopicStats(final String topic) throws RemotingException, MQClientException, InterruptedException,
+ MQBrokerException;
+
+ TopicList fetchAllTopicList() throws RemotingException, MQClientException, InterruptedException;
+
+ TopicList fetchTopicsByCLuster(String clusterName) throws RemotingException, MQClientException, InterruptedException;
+
+ KVTable fetchBrokerRuntimeStats(final String brokerAddr) throws RemotingConnectException, RemotingSendRequestException,
+ RemotingTimeoutException, InterruptedException, MQBrokerException;
+
+ ConsumeStats examineConsumeStats(final String consumerGroup) throws RemotingException, MQClientException, InterruptedException,
+ MQBrokerException;
+
+ ConsumeStats examineConsumeStats(final String consumerGroup, final String topic) throws RemotingException, MQClientException,
+ InterruptedException, MQBrokerException;
+
+ ClusterInfo examineBrokerClusterInfo() throws InterruptedException, MQBrokerException, RemotingTimeoutException,
+ RemotingSendRequestException, RemotingConnectException;
+
+ TopicRouteData examineTopicRouteInfo(final String topic) throws RemotingException, MQClientException, InterruptedException;
+
+ ConsumerConnection examineConsumerConnectionInfo(final String consumerGroup) throws RemotingConnectException,
+ RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException, RemotingException,
+ MQClientException;
+
+ ProducerConnection examineProducerConnectionInfo(final String producerGroup, final String topic) throws RemotingException,
+ MQClientException, InterruptedException, MQBrokerException;
+
+ List<String> getNameServerAddressList();
+
+ int wipeWritePermOfBroker(final String namesrvAddr, String brokerName) throws RemotingCommandException,
+ RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException;
+
+ void putKVConfig(final String namespace, final String key, final String value);
+
+ String getKVConfig(final String namespace, final String key) throws RemotingException, MQClientException, InterruptedException;
+
+ KVTable getKVListByNamespace(final String namespace) throws RemotingException, MQClientException, InterruptedException;
+
+ void deleteTopicInBroker(final Set<String> addrs, final String topic) throws RemotingException, MQBrokerException,
+ InterruptedException, MQClientException;
+
+ void deleteTopicInNameServer(final Set<String> addrs, final String topic) throws RemotingException, MQBrokerException,
+ InterruptedException, MQClientException;
+
+ void deleteSubscriptionGroup(final String addr, String groupName) throws RemotingException, MQBrokerException,
+ InterruptedException, MQClientException;
+
+ void createAndUpdateKvConfig(String namespace, String key, String value) throws RemotingException, MQBrokerException,
+ InterruptedException, MQClientException;
+
+ void deleteKvConfig(String namespace, String key) throws RemotingException, MQBrokerException, InterruptedException,
+ MQClientException;
+
+ List<RollbackStats> resetOffsetByTimestampOld(String consumerGroup, String topic, long timestamp, boolean force)
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
+
+ Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce)
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
+
+ void resetOffsetNew(String consumerGroup, String topic, long timestamp) throws RemotingException, MQBrokerException,
+ InterruptedException, MQClientException;
+
+ Map<String, Map<MessageQueue, Long>> getConsumeStatus(String topic, String group, String clientAddr) throws RemotingException,
+ MQBrokerException, InterruptedException, MQClientException;
+
+ void createOrUpdateOrderConf(String key, String value, boolean isCluster) throws RemotingException, MQBrokerException,
+ InterruptedException, MQClientException;
+
+ GroupList queryTopicConsumeByWho(final String topic) throws RemotingConnectException, RemotingSendRequestException,
+ RemotingTimeoutException, InterruptedException, MQBrokerException, RemotingException, MQClientException;
+
+ List<QueueTimeSpan> queryConsumeTimeSpan(final String topic, final String group) throws InterruptedException, MQBrokerException,
+ RemotingException, MQClientException;
+
+ boolean cleanExpiredConsumerQueue(String cluster) throws RemotingConnectException, RemotingSendRequestException,
+ RemotingTimeoutException, MQClientException, InterruptedException;
+
+ boolean cleanExpiredConsumerQueueByAddr(String addr) throws RemotingConnectException, RemotingSendRequestException,
+ RemotingTimeoutException, MQClientException, InterruptedException;
+
+ boolean cleanUnusedTopic(String cluster) throws RemotingConnectException, RemotingSendRequestException,
+ RemotingTimeoutException, MQClientException, InterruptedException;
+
+
+ boolean cleanUnusedTopicByAddr(String addr) throws RemotingConnectException, RemotingSendRequestException,
+ RemotingTimeoutException, MQClientException, InterruptedException;
+
+ ConsumerRunningInfo getConsumerRunningInfo(final String consumerGroup, final String clientId, final boolean jstack)
+ throws RemotingException, MQClientException, InterruptedException;
+
+ ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup,
+ String clientId,
+ String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException;
+
+ ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup,
+ String clientId,
+ String topic,
+ String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException;
+
+ List<MessageTrack> messageTrackDetail(MessageExt msg) throws RemotingException, MQClientException, InterruptedException,
+ MQBrokerException;
+
+ void cloneGroupOffset(String srcGroup, String destGroup, String topic, boolean isOffline) throws RemotingException,
+ MQClientException, InterruptedException, MQBrokerException;
+
+ BrokerStatsData viewBrokerStatsData(final String brokerAddr, final String statsName, final String statsKey)
+ throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException,
+ InterruptedException;
+
+ Set<String> getClusterList(final String topic) throws RemotingConnectException, RemotingSendRequestException,
+ RemotingTimeoutException, MQClientException, InterruptedException;
+
+ ConsumeStatsList fetchConsumeStatsInBroker(final String brokerAddr, boolean isOrder, long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException,
+ RemotingTimeoutException, MQClientException, InterruptedException;
+
+ Set<String> getTopicClusterList(final String topic) throws InterruptedException, MQBrokerException, MQClientException, RemotingException;
+
+ SubscriptionGroupWrapper getAllSubscriptionGroup(final String brokerAddr, long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
+ RemotingConnectException, MQBrokerException;
+
+ TopicConfigSerializeWrapper getAllTopicGroup(final String brokerAddr, long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
+ RemotingConnectException, MQBrokerException;
+
+ void updateConsumeOffset(String brokerAddr, String consumeGroup, MessageQueue mq, long offset) throws RemotingException, InterruptedException, MQBrokerException;
+
+ /**
+ * Update name server config.
+ * <br>
+ * Command Code : RequestCode.UPDATE_NAMESRV_CONFIG
+ *
+ * <br> If param(nameServers) is null or empty, will use name servers from ns!
+ *
+ * @param properties
+ * @param nameServers
+ *
+ * @throws InterruptedException
+ * @throws RemotingConnectException
+ * @throws UnsupportedEncodingException
+ * @throws RemotingSendRequestException
+ * @throws RemotingTimeoutException
+ * @throws MQClientException
+ * @throws MQBrokerException
+ */
+ void updateNameServerConfig(final Properties properties, final List<String> nameServers) throws InterruptedException, RemotingConnectException,
+ UnsupportedEncodingException, RemotingSendRequestException, RemotingTimeoutException,
+ MQClientException, MQBrokerException;
+
+ /**
+ * Get name server config.
+ * <br>
+ * Command Code : RequestCode.GET_NAMESRV_CONFIG
+ * <br> If param(nameServers) is null or empty, will use name servers from ns!
+ *
+ * @param nameServers
+ *
+ * @return The fetched name server config
+ *
+ * @throws InterruptedException
+ * @throws RemotingTimeoutException
+ * @throws RemotingSendRequestException
+ * @throws RemotingConnectException
+ * @throws MQClientException
+ * @throws UnsupportedEncodingException
+ */
+ Map<String, Properties> getNameServerConfig(final List<String> nameServers) throws InterruptedException,
+ RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException,
+ MQClientException, UnsupportedEncodingException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/api/MessageTrack.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/api/MessageTrack.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/api/MessageTrack.java
new file mode 100644
index 0000000..bdc7288
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/api/MessageTrack.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.tools.admin.api;
+
+public class MessageTrack {
+ private String consumerGroup;
+ private TrackType trackType;
+ private String exceptionDesc;
+
+
+ public String getConsumerGroup() {
+ return consumerGroup;
+ }
+
+
+ public void setConsumerGroup(String consumerGroup) {
+ this.consumerGroup = consumerGroup;
+ }
+
+
+ public TrackType getTrackType() {
+ return trackType;
+ }
+
+
+ public void setTrackType(TrackType trackType) {
+ this.trackType = trackType;
+ }
+
+
+ public String getExceptionDesc() {
+ return exceptionDesc;
+ }
+
+
+ public void setExceptionDesc(String exceptionDesc) {
+ this.exceptionDesc = exceptionDesc;
+ }
+
+
+ @Override
+ public String toString() {
+ return "MessageTrack [consumerGroup=" + consumerGroup + ", trackType=" + trackType
+ + ", exceptionDesc=" + exceptionDesc + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/api/TrackType.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/api/TrackType.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/api/TrackType.java
new file mode 100644
index 0000000..ca475ac
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/api/TrackType.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.tools.admin.api;
+
+public enum TrackType {
+ CONSUMED,
+ CONSUMED_BUT_FILTERED,
+ PULL,
+ NOT_CONSUME_YET,
+ NOT_ONLINE,
+ UNKNOWN
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/CommandUtil.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/CommandUtil.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/CommandUtil.java
new file mode 100644
index 0000000..1b5d264
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/CommandUtil.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.tools.command;
+
+import com.alibaba.rocketmq.client.exception.MQBrokerException;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.protocol.body.ClusterInfo;
+import com.alibaba.rocketmq.common.protocol.route.BrokerData;
+import com.alibaba.rocketmq.remoting.exception.RemotingConnectException;
+import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException;
+import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException;
+import com.alibaba.rocketmq.tools.admin.MQAdminExt;
+
+import java.util.*;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class CommandUtil {
+
+ public static Map<String/*master addr*/, List<String>/*slave addr*/> fetchMasterAndSlaveDistinguish(
+ final MQAdminExt adminExt, final String clusterName)
+ throws InterruptedException, RemotingConnectException,
+ RemotingTimeoutException, RemotingSendRequestException,
+ MQBrokerException {
+ Map<String, List<String>> masterAndSlaveMap = new HashMap<String, List<String>>(4);
+
+ ClusterInfo clusterInfoSerializeWrapper = adminExt.examineBrokerClusterInfo();
+ Set<String> brokerNameSet = clusterInfoSerializeWrapper.getClusterAddrTable().get(clusterName);
+
+ if (brokerNameSet == null) {
+ System.out
+ .printf("[error] Make sure the specified clusterName exists or the nameserver which connected is correct.");
+ return masterAndSlaveMap;
+ }
+
+ for (String brokerName : brokerNameSet) {
+ BrokerData brokerData = clusterInfoSerializeWrapper.getBrokerAddrTable().get(brokerName);
+
+ if (brokerData == null || brokerData.getBrokerAddrs() == null) {
+ continue;
+ }
+
+ String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
+ masterAndSlaveMap.put(masterAddr, new ArrayList<String>());
+
+ for (Long id : brokerData.getBrokerAddrs().keySet()) {
+ if (brokerData.getBrokerAddrs().get(id) == null
+ || id.longValue() == MixAll.MASTER_ID) {
+ continue;
+ }
+
+ masterAndSlaveMap.get(masterAddr).add(brokerData.getBrokerAddrs().get(id));
+ }
+ }
+
+ return masterAndSlaveMap;
+ }
+
+ public static Set<String> fetchMasterAddrByClusterName(final MQAdminExt adminExt, final String clusterName)
+ throws InterruptedException, RemotingConnectException, RemotingTimeoutException,
+ RemotingSendRequestException, MQBrokerException {
+ Set<String> masterSet = new HashSet<String>();
+
+ ClusterInfo clusterInfoSerializeWrapper = adminExt.examineBrokerClusterInfo();
+
+ Set<String> brokerNameSet = clusterInfoSerializeWrapper.getClusterAddrTable().get(clusterName);
+
+ if (brokerNameSet != null) {
+ for (String brokerName : brokerNameSet) {
+ BrokerData brokerData = clusterInfoSerializeWrapper.getBrokerAddrTable().get(brokerName);
+ if (brokerData != null) {
+
+ String addr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
+ if (addr != null) {
+ masterSet.add(addr);
+ }
+ }
+ }
+ } else {
+ System.out
+ .printf("[error] Make sure the specified clusterName exists or the nameserver which connected is correct.");
+ }
+
+ return masterSet;
+ }
+
+ public static Set<String> fetchMasterAndSlaveAddrByClusterName(final MQAdminExt adminExt, final String clusterName)
+ throws InterruptedException, RemotingConnectException, RemotingTimeoutException,
+ RemotingSendRequestException, MQBrokerException {
+ Set<String> masterSet = new HashSet<String>();
+
+ ClusterInfo clusterInfoSerializeWrapper = adminExt.examineBrokerClusterInfo();
+
+ Set<String> brokerNameSet = clusterInfoSerializeWrapper.getClusterAddrTable().get(clusterName);
+
+ if (brokerNameSet != null) {
+ for (String brokerName : brokerNameSet) {
+ BrokerData brokerData = clusterInfoSerializeWrapper.getBrokerAddrTable().get(brokerName);
+ if (brokerData != null) {
+ final Collection<String> addrs = brokerData.getBrokerAddrs().values();
+ masterSet.addAll(addrs);
+ }
+ }
+ } else {
+ System.out
+ .printf("[error] Make sure the specified clusterName exists or the nameserver which connected is correct.");
+ }
+
+ return masterSet;
+ }
+
+
+ public static Set<String> fetchBrokerNameByClusterName(final MQAdminExt adminExt, final String clusterName)
+ throws Exception {
+ ClusterInfo clusterInfoSerializeWrapper = adminExt.examineBrokerClusterInfo();
+ Set<String> brokerNameSet = clusterInfoSerializeWrapper.getClusterAddrTable().get(clusterName);
+ if (brokerNameSet.isEmpty()) {
+ throw new Exception(
+ "Make sure the specified clusterName exists or the nameserver which connected is correct.");
+ }
+ return brokerNameSet;
+ }
+
+
+ public static String fetchBrokerNameByAddr(final MQAdminExt adminExt, final String addr) throws Exception {
+ ClusterInfo clusterInfoSerializeWrapper = adminExt.examineBrokerClusterInfo();
+ HashMap<String/* brokerName */, BrokerData> brokerAddrTable =
+ clusterInfoSerializeWrapper.getBrokerAddrTable();
+ Iterator<Map.Entry<String, BrokerData>> it = brokerAddrTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<String, BrokerData> entry = it.next();
+ HashMap<Long, String> brokerAddrs = entry.getValue().getBrokerAddrs();
+ if (brokerAddrs.containsValue(addr))
+ return entry.getKey();
+ }
+ throw new Exception(
+ "Make sure the specified broker addr exists or the nameserver which connected is correct.");
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/MQAdminStartup.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/MQAdminStartup.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/MQAdminStartup.java
new file mode 100644
index 0000000..56ecc59
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/MQAdminStartup.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.tools.command;
+
+import ch.qos.logback.classic.LoggerContext;
+import ch.qos.logback.classic.joran.JoranConfigurator;
+import ch.qos.logback.core.joran.spi.JoranException;
+import com.alibaba.rocketmq.common.MQVersion;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import com.alibaba.rocketmq.srvutil.ServerUtil;
+import com.alibaba.rocketmq.tools.command.broker.*;
+import com.alibaba.rocketmq.tools.command.cluster.CLusterSendMsgRTCommand;
+import com.alibaba.rocketmq.tools.command.cluster.ClusterListSubCommand;
+import com.alibaba.rocketmq.tools.command.connection.ConsumerConnectionSubCommand;
+import com.alibaba.rocketmq.tools.command.connection.ProducerConnectionSubCommand;
+import com.alibaba.rocketmq.tools.command.consumer.*;
+import com.alibaba.rocketmq.tools.command.message.*;
+import com.alibaba.rocketmq.tools.command.namesrv.*;
+import com.alibaba.rocketmq.tools.command.offset.CloneGroupOffsetCommand;
+import com.alibaba.rocketmq.tools.command.offset.ResetOffsetByTimeCommand;
+import com.alibaba.rocketmq.tools.command.stats.StatsAllSubCommand;
+import com.alibaba.rocketmq.tools.command.topic.*;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class MQAdminStartup {
+ protected static List<SubCommand> subCommandList = new ArrayList<SubCommand>();
+
+ public static void main(String[] args) {
+ main0(args, null);
+ }
+
+ public static void main0(String[] args, RPCHook rpcHook) {
+ System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
+
+
+ //PackageConflictDetect.detectFastjson();
+
+ initCommand();
+
+ try {
+ initLogback();
+ switch (args.length) {
+ case 0:
+ printHelp();
+ break;
+ case 2:
+ if (args[0].equals("help")) {
+ SubCommand cmd = findSubCommand(args[1]);
+ if (cmd != null) {
+ Options options = ServerUtil.buildCommandlineOptions(new Options());
+ options = cmd.buildCommandlineOptions(options);
+ if (options != null) {
+ ServerUtil.printCommandLineHelp("mqadmin " + cmd.commandName(), options);
+ }
+ } else {
+ System.out.printf("The sub command \'" + args[1] + "\' not exist.%n");
+ }
+ break;
+ }
+ case 1:
+ default:
+ SubCommand cmd = findSubCommand(args[0]);
+ if (cmd != null) {
+ String[] subargs = parseSubArgs(args);
+
+
+ Options options = ServerUtil.buildCommandlineOptions(new Options());
+ final CommandLine commandLine =
+ ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options),
+ new PosixParser());
+ if (null == commandLine) {
+ System.exit(-1);
+ return;
+ }
+
+ if (commandLine.hasOption('n')) {
+ String namesrvAddr = commandLine.getOptionValue('n');
+ System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, namesrvAddr);
+ }
+
+ cmd.execute(commandLine, options, rpcHook);
+ } else {
+ System.out.printf("The sub command \'" + args[0] + "\' not exist.%n");
+ }
+ break;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ public static void initCommand() {
+ initCommand(new UpdateTopicSubCommand());
+ initCommand(new DeleteTopicSubCommand());
+ initCommand(new UpdateSubGroupSubCommand());
+ initCommand(new DeleteSubscriptionGroupCommand());
+ initCommand(new UpdateBrokerConfigSubCommand());
+ initCommand(new UpdateTopicPermSubCommand());
+
+ initCommand(new TopicRouteSubCommand());
+ initCommand(new TopicStatusSubCommand());
+ initCommand(new TopicClusterSubCommand());
+
+
+ initCommand(new BrokerStatusSubCommand());
+ initCommand(new QueryMsgByIdSubCommand());
+ initCommand(new QueryMsgByKeySubCommand());
+ initCommand(new QueryMsgByUniqueKeySubCommand());
+ initCommand(new QueryMsgByOffsetSubCommand());
+ initCommand(new QueryMsgByUniqueKeySubCommand());
+ initCommand(new PrintMessageSubCommand());
+ initCommand(new PrintMessageByQueueCommand());
+ initCommand(new SendMsgStatusCommand());
+ initCommand(new BrokerConsumeStatsSubCommad());
+
+
+ initCommand(new ProducerConnectionSubCommand());
+ initCommand(new ConsumerConnectionSubCommand());
+ initCommand(new ConsumerProgressSubCommand());
+ initCommand(new ConsumerStatusSubCommand());
+ initCommand(new CloneGroupOffsetCommand());
+
+ initCommand(new ClusterListSubCommand());
+ initCommand(new TopicListSubCommand());
+
+ initCommand(new UpdateKvConfigCommand());
+ initCommand(new DeleteKvConfigCommand());
+
+ initCommand(new WipeWritePermSubCommand());
+ initCommand(new ResetOffsetByTimeCommand());
+
+ initCommand(new UpdateOrderConfCommand());
+ initCommand(new CleanExpiredCQSubCommand());
+ initCommand(new CleanUnusedTopicCommand());
+
+ initCommand(new StartMonitoringSubCommand());
+ initCommand(new StatsAllSubCommand());
+
+ initCommand(new AllocateMQSubCommand());
+
+ initCommand(new CheckMsgSendRTCommand());
+ initCommand(new CLusterSendMsgRTCommand());
+
+ initCommand(new GetNamesrvConfigCommand());
+ initCommand(new UpdateNamesrvConfigCommand());
+ initCommand(new GetBrokerConfigCommand());
+ }
+
+ private static void initLogback() throws JoranException {
+ String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
+
+ LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
+ JoranConfigurator configurator = new JoranConfigurator();
+ configurator.setContext(lc);
+ lc.reset();
+ configurator.doConfigure(rocketmqHome + "/conf/logback_tools.xml");
+ }
+
+ private static void printHelp() {
+ System.out.printf("The most commonly used mqadmin commands are:%n");
+
+ for (SubCommand cmd : subCommandList) {
+ System.out.printf(" %-20s %s%n", cmd.commandName(), cmd.commandDesc());
+ }
+
+ System.out.printf("%nSee 'mqadmin help <command>' for more information on a specific command.");
+ }
+
+ private static SubCommand findSubCommand(final String name) {
+ for (SubCommand cmd : subCommandList) {
+ if (cmd.commandName().toUpperCase().equals(name.toUpperCase())) {
+ return cmd;
+ }
+ }
+
+ return null;
+ }
+
+ private static String[] parseSubArgs(String[] args) {
+ if (args.length > 1) {
+ String[] result = new String[args.length - 1];
+ for (int i = 0; i < args.length - 1; i++) {
+ result[i] = args[i + 1];
+ }
+ return result;
+ }
+ return null;
+ }
+
+ public static void initCommand(SubCommand command) {
+ subCommandList.add(command);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/SubCommand.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/SubCommand.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/SubCommand.java
new file mode 100644
index 0000000..e28b7bc
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/SubCommand.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.tools.command;
+
+import com.alibaba.rocketmq.remoting.RPCHook;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+
+
+/**
+ * @author shijia.wxr
+ */
+public interface SubCommand {
+ public String commandName();
+
+
+ public String commandDesc();
+
+
+ public Options buildCommandlineOptions(final Options options);
+
+
+ public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook);
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommad.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommad.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommad.java
new file mode 100644
index 0000000..0f24df7
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommad.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.tools.command.broker;
+
+import com.alibaba.rocketmq.common.UtilAll;
+import com.alibaba.rocketmq.common.admin.ConsumeStats;
+import com.alibaba.rocketmq.common.admin.OffsetWrapper;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.common.protocol.body.ConsumeStatsList;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt;
+import com.alibaba.rocketmq.tools.command.SubCommand;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+import java.util.*;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class BrokerConsumeStatsSubCommad implements SubCommand {
+
+ @Override
+ public String commandName() {
+ return "brokerConsumeStats";
+ }
+
+ @Override
+ public String commandDesc() {
+ return "Fetch broker consume stats data";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ Option opt = new Option("b", "brokerAddr", true, "Broker address");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option("t", "timeoutMillis", true, "request timeout Millis");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("l", "level", true, "threshold of print diff");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("o", "order", true, "order topic");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+ defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+ try {
+ defaultMQAdminExt.start();
+ String brokerAddr = commandLine.getOptionValue('b').trim();
+ boolean isOrder = false;
+ long timeoutMillis = 50000;
+ long diffLevel = 0;
+ if (commandLine.hasOption('o')) {
+ isOrder = Boolean.parseBoolean(commandLine.getOptionValue('o').trim());
+ }
+ if (commandLine.hasOption('t')) {
+ timeoutMillis = Long.parseLong(commandLine.getOptionValue('t').trim());
+ }
+ if (commandLine.hasOption('l')) {
+ diffLevel = Long.parseLong(commandLine.getOptionValue('l').trim());
+ }
+
+ ConsumeStatsList consumeStatsList = defaultMQAdminExt.fetchConsumeStatsInBroker(brokerAddr, isOrder, timeoutMillis);
+ System.out.printf("%-32s %-32s %-32s %-4s %-20s %-20s %-20s %s%n",
+ "#Topic",
+ "#Group",
+ "#Broker Name",
+ "#QID",
+ "#Broker Offset",
+ "#Consumer Offset",
+ "#Diff",
+ "#LastTime");
+ for (Map<String, List<ConsumeStats>> map : consumeStatsList.getConsumeStatsList()) {
+ for (Map.Entry<String, List<ConsumeStats>> entry : map.entrySet()) {
+ String group = entry.getKey();
+ List<ConsumeStats> consumeStatsArray = entry.getValue();
+ for (ConsumeStats consumeStats : consumeStatsArray) {
+ List<MessageQueue> mqList = new LinkedList<MessageQueue>();
+ mqList.addAll(consumeStats.getOffsetTable().keySet());
+ Collections.sort(mqList);
+ for (MessageQueue mq : mqList) {
+ OffsetWrapper offsetWrapper = consumeStats.getOffsetTable().get(mq);
+ long diff = offsetWrapper.getBrokerOffset() - offsetWrapper.getConsumerOffset();
+
+ if (diff < diffLevel) {
+ continue;
+ }
+ String lastTime = "-";
+ try {
+ lastTime = UtilAll.formatDate(new Date(offsetWrapper.getLastTimestamp()), UtilAll.YYYY_MM_DD_HH_MM_SS);
+ } catch (Exception e) {
+
+ }
+ if (offsetWrapper.getLastTimestamp() > 0)
+ System.out.printf("%-32s %-32s %-32s %-4d %-20d %-20d %-20d %s%n",
+ UtilAll.frontStringAtLeast(mq.getTopic(), 32),
+ group,
+ UtilAll.frontStringAtLeast(mq.getBrokerName(), 32),
+ mq.getQueueId(),
+ offsetWrapper.getBrokerOffset(),
+ offsetWrapper.getConsumerOffset(),
+ diff,
+ lastTime
+ );
+ }
+ }
+ }
+ }
+ System.out.printf("%nDiff Total: %d%n", consumeStatsList.getTotalDiff());
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+}