You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2022/06/04 14:35:33 UTC
[rocketmq] branch develop updated: [iSSUE-4396] Support get all producer on one broker (#4395)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new d5ca4d05c [iSSUE-4396] Support get all producer on one broker (#4395)
d5ca4d05c is described below
commit d5ca4d05c97c203751b53a134030ad9a2691c09c
Author: tiger lee <10...@qq.com>
AuthorDate: Sat Jun 4 22:35:24 2022 +0800
[iSSUE-4396] Support get all producer on one broker (#4395)
* Update .travis.yml
Depracated MODERATE noisy mail for dev@rocketmq.apache.org. Still Failing: apache/rocketmq#5670 (5.0.0-alpha-static-topic - 5ebc327) @dongeforever
* Updated Notice file to 2022 (#3735)
* add api: get all producer in one broker
* add license header for new files
* add test case and optimize some variables
* remove unused imports and change some words to English
* fix code style
* update code style
* update code style
Co-authored-by: von gosling <vo...@apache.org>
Co-authored-by: ZhangJian He <sh...@gmail.com>
Co-authored-by: tigerweili <ti...@tencent.com>
---
.../rocketmq/broker/client/ProducerManager.java | 31 ++++++++
.../broker/processor/AdminBrokerProcessor.java | 21 ++++++
.../broker/processor/AdminBrokerProcessorTest.java | 7 ++
.../rocketmq/client/impl/MQClientAPIImpl.java | 23 ++++++
.../rocketmq/common/protocol/RequestCode.java | 2 +
.../common/protocol/body/ProducerInfo.java | 84 +++++++++++++++++++++
.../common/protocol/body/ProducerTableInfo.java | 39 ++++++++++
.../header/GetAllProducerInfoRequestHeader.java | 29 ++++++++
.../rocketmq/tools/admin/DefaultMQAdminExt.java | 6 ++
.../tools/admin/DefaultMQAdminExtImpl.java | 6 ++
.../apache/rocketmq/tools/admin/MQAdminExt.java | 4 +
.../rocketmq/tools/command/MQAdminStartup.java | 3 +
.../tools/command/producer/ProducerSubCommand.java | 86 ++++++++++++++++++++++
.../tools/admin/DefaultMQAdminExtTest.java | 47 +++++++++---
.../command/producer/ProducerSubCommandTest.java | 85 +++++++++++++++++++++
15 files changed, 461 insertions(+), 12 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
index 4bd00eff1..e8f45e702 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
@@ -18,6 +18,8 @@ package org.apache.rocketmq.broker.client;
import io.netty.channel.Channel;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -25,6 +27,8 @@ import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.broker.util.PositiveAtomicCounter;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.body.ProducerInfo;
+import org.apache.rocketmq.common.protocol.body.ProducerTableInfo;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
@@ -46,6 +50,33 @@ public class ProducerManager {
return groupChannelTable;
}
+ public ProducerTableInfo getProducerTable() {
+ Map<String, List<ProducerInfo>> map = new HashMap<>();
+ for (String group : this.groupChannelTable.keySet()) {
+ for (Entry<Channel, ClientChannelInfo> entry: this.groupChannelTable.get(group).entrySet()) {
+ ClientChannelInfo clientChannelInfo = entry.getValue();
+ if (map.containsKey(group)) {
+ map.get(group).add(new ProducerInfo(
+ clientChannelInfo.getClientId(),
+ clientChannelInfo.getChannel().remoteAddress().toString(),
+ clientChannelInfo.getLanguage(),
+ clientChannelInfo.getVersion(),
+ clientChannelInfo.getLastUpdateTimestamp()
+ ));
+ } else {
+ map.put(group, new ArrayList<ProducerInfo>(Collections.singleton(new ProducerInfo(
+ clientChannelInfo.getClientId(),
+ clientChannelInfo.getChannel().remoteAddress().toString(),
+ clientChannelInfo.getLanguage(),
+ clientChannelInfo.getVersion(),
+ clientChannelInfo.getLastUpdateTimestamp()
+ ))));
+ }
+ }
+ }
+ return new ProducerTableInfo(map);
+ }
+
public void scanNotActiveChannel() {
for (final Map.Entry<String, ConcurrentHashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable
.entrySet()) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index f4ada3684..0a70b5546 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -26,6 +26,8 @@ import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.broker.filter.ConsumerFilterData;
import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
+import org.apache.rocketmq.common.protocol.body.ProducerTableInfo;
+import org.apache.rocketmq.common.protocol.header.GetAllProducerInfoRequestHeader;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
import org.apache.rocketmq.common.AclConfig;
@@ -188,6 +190,8 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
return this.getConsumerConnectionList(ctx, request);
case RequestCode.GET_PRODUCER_CONNECTION_LIST:
return this.getProducerConnectionList(ctx, request);
+ case RequestCode.GET_ALL_PRODUCER_INFO:
+ return this.getAllProducerInfo(ctx, request);
case RequestCode.GET_CONSUME_STATS:
return this.getConsumeStats(ctx, request);
case RequestCode.GET_ALL_CONSUMER_OFFSET:
@@ -842,6 +846,23 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
return response;
}
+ private RemotingCommand getAllProducerInfo(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ final GetAllProducerInfoRequestHeader requestHeader =
+ (GetAllProducerInfoRequestHeader) request.decodeCommandCustomHeader(GetAllProducerInfoRequestHeader.class);
+
+ ProducerTableInfo producerTable = this.brokerController.getProducerManager().getProducerTable();
+ if (producerTable != null) {
+ byte[] body = producerTable.encode();
+ response.setBody(body);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ }
+
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ return response;
+ }
private RemotingCommand getProducerConnectionList(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
index 8b29f4ffb..35df22719 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
@@ -463,6 +463,13 @@ public class AdminBrokerProcessorTest {
assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
}
+ @Test
+ public void testGetAllProducerInfo() throws RemotingCommandException {
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_PRODUCER_INFO, null);
+ RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
@Test
public void testGetConsumeStats() throws RemotingCommandException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUME_STATS, null);
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index f2d01897c..22aefda8a 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.PullCallback;
@@ -77,6 +78,7 @@ import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.LockBatchRequestBody;
import org.apache.rocketmq.common.protocol.body.LockBatchResponseBody;
import org.apache.rocketmq.common.protocol.body.ProducerConnection;
+import org.apache.rocketmq.common.protocol.body.ProducerTableInfo;
import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
import org.apache.rocketmq.common.protocol.body.QueryConsumeTimeSpanBody;
import org.apache.rocketmq.common.protocol.body.QueryCorrectionOffsetBody;
@@ -95,6 +97,7 @@ import org.apache.rocketmq.common.protocol.header.DeleteAccessConfigRequestHeade
import org.apache.rocketmq.common.protocol.header.DeleteSubscriptionGroupRequestHeader;
import org.apache.rocketmq.common.protocol.header.DeleteTopicRequestHeader;
import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetAllProducerInfoRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetBrokerAclConfigResponseHeader;
import org.apache.rocketmq.common.protocol.header.GetBrokerClusterAclConfigResponseBody;
import org.apache.rocketmq.common.protocol.header.GetConsumeStatsInBrokerHeader;
@@ -1245,6 +1248,26 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
+ public ProducerTableInfo getAllProducerInfo(final String addr, final long timeoutMillis)
+ throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
+ MQBrokerException {
+ GetAllProducerInfoRequestHeader requestHeader = new GetAllProducerInfoRequestHeader();
+
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_PRODUCER_INFO, requestHeader);
+
+ RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+ request, timeoutMillis);
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ return ProducerTableInfo.decode(response.getBody(), ProducerTableInfo.class);
+ }
+ default:
+ break;
+ }
+
+ throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
+ }
+
public ConsumerConnection getConsumerConnectionList(final String addr, final String consumerGroup,
final long timeoutMillis)
throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
index 5624a7ec0..24c072bc8 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
@@ -190,4 +190,6 @@ public class RequestCode {
public static final int PUSH_REPLY_MESSAGE_TO_CLIENT = 326;
public static final int ADD_WRITE_PERM_OF_BROKER = 327;
+
+ public static final int GET_ALL_PRODUCER_INFO = 328;
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ProducerInfo.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ProducerInfo.java
new file mode 100644
index 000000000..73bf1c304
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ProducerInfo.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.body;
+
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+
+public class ProducerInfo extends RemotingSerializable {
+ private String clientId;
+ private String remoteIP;
+ private LanguageCode language;
+ private int version;
+ private long lastUpdateTimestamp;
+
+ public ProducerInfo(String clientId, String remoteIP, LanguageCode language, int version, long lastUpdateTimestamp) {
+ this.clientId = clientId;
+ this.remoteIP = remoteIP;
+ this.language = language;
+ this.version = version;
+ this.lastUpdateTimestamp = lastUpdateTimestamp;
+ }
+
+ public String getClientId() {
+ return clientId;
+ }
+
+ public void setClientId(String clientId) {
+ this.clientId = clientId;
+ }
+
+ public String getRemoteIP() {
+ return remoteIP;
+ }
+
+ public void setRemoteIP(String remoteIP) {
+ this.remoteIP = remoteIP;
+ }
+
+ public LanguageCode getLanguage() {
+ return language;
+ }
+
+ public void setLanguage(LanguageCode language) {
+ this.language = language;
+ }
+
+ public int getVersion() {
+ return version;
+ }
+
+ public void setVersion(int version) {
+ this.version = version;
+ }
+
+ public long getLastUpdateTimestamp() {
+ return lastUpdateTimestamp;
+ }
+
+ public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
+ this.lastUpdateTimestamp = lastUpdateTimestamp;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("clientId=%s,remoteIP=%s, language=%s, version=%d, lastUpdateTimestamp=%d",
+ clientId, remoteIP, language.name(), version, lastUpdateTimestamp);
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ProducerTableInfo.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ProducerTableInfo.java
new file mode 100644
index 000000000..f8d8538b8
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ProducerTableInfo.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.body;
+
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.List;
+import java.util.Map;
+
+public class ProducerTableInfo extends RemotingSerializable {
+ public ProducerTableInfo(Map<String, List<ProducerInfo>> data) {
+ this.data = data;
+ }
+
+ private Map<String, List<ProducerInfo>> data;
+
+ public Map<String, List<ProducerInfo>> getData() {
+ return data;
+ }
+
+ public void setData(Map<String, List<ProducerInfo>> data) {
+ this.data = data;
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetAllProducerInfoRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetAllProducerInfoRequestHeader.java
new file mode 100644
index 000000000..201294565
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetAllProducerInfoRequestHeader.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class GetAllProducerInfoRequestHeader implements CommandCustomHeader {
+ @Override
+ public void checkFields() throws RemotingCommandException {
+ // To change body of implemented methods use File | Settings | File
+ // Templates.
+ }
+}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index b5f4d6230..22b6dcdc7 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -43,6 +43,7 @@ import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.body.GroupList;
import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.ProducerConnection;
+import org.apache.rocketmq.common.protocol.body.ProducerTableInfo;
import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
@@ -285,6 +286,11 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
return defaultMQAdminExtImpl.examineProducerConnectionInfo(producerGroup, topic);
}
+ @Override
+ public ProducerTableInfo getAllProducerInfo(final String brokerAddr) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+ return defaultMQAdminExtImpl.getAllProducerInfo(brokerAddr);
+ }
+
@Override
public List<String> getNameServerAddressList() {
return this.defaultMQAdminExtImpl.getNameServerAddressList();
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 1fff6be0f..292d80cdb 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -50,6 +50,7 @@ import org.apache.rocketmq.common.admin.TopicOffset;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo;
+import org.apache.rocketmq.common.protocol.body.ProducerTableInfo;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageClientExt;
import org.apache.rocketmq.common.message.MessageConst;
@@ -407,6 +408,11 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
return result;
}
+ @Override
+ public ProducerTableInfo getAllProducerInfo(final String brokerAddr) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+ return this.mqClientInstance.getMQClientAPIImpl().getAllProducerInfo(brokerAddr, timeoutMillis);
+ }
+
@Override
public List<String> getNameServerAddressList() {
return this.mqClientInstance.getMQClientAPIImpl().getNameServerAddressList();
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index d222a1500..7bbe8e331 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -42,6 +42,7 @@ import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.body.GroupList;
import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.ProducerConnection;
+import org.apache.rocketmq.common.protocol.body.ProducerTableInfo;
import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
@@ -133,6 +134,9 @@ public interface MQAdminExt extends MQAdmin {
final String topic) throws RemotingException,
MQClientException, InterruptedException, MQBrokerException;
+ ProducerTableInfo getAllProducerInfo(final String brokerAddr) throws RemotingException,
+ MQClientException, InterruptedException, MQBrokerException;
+
List<String> getNameServerAddressList();
int wipeWritePermOfBroker(final String namesrvAddr, String brokerName) throws RemotingCommandException,
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
index 046ed5d55..fca04e175 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
@@ -72,6 +72,7 @@ import org.apache.rocketmq.tools.command.namesrv.WipeWritePermSubCommand;
import org.apache.rocketmq.tools.command.offset.CloneGroupOffsetCommand;
import org.apache.rocketmq.tools.command.offset.ResetOffsetByTimeCommand;
import org.apache.rocketmq.tools.command.offset.SkipAccumulationSubCommand;
+import org.apache.rocketmq.tools.command.producer.ProducerSubCommand;
import org.apache.rocketmq.tools.command.queue.QueryConsumeQueueCommand;
import org.apache.rocketmq.tools.command.stats.StatsAllSubCommand;
import org.apache.rocketmq.tools.command.topic.AllocateMQSubCommand;
@@ -184,6 +185,8 @@ public class MQAdminStartup {
initCommand(new ConsumerProgressSubCommand());
initCommand(new ConsumerStatusSubCommand());
initCommand(new CloneGroupOffsetCommand());
+ //for producer
+ initCommand(new ProducerSubCommand());
initCommand(new ClusterListSubCommand());
initCommand(new TopicListSubCommand());
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/producer/ProducerSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/producer/ProducerSubCommand.java
new file mode 100644
index 000000000..f6fa63f13
--- /dev/null
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/producer/ProducerSubCommand.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.producer;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.protocol.body.ProducerInfo;
+import org.apache.rocketmq.common.protocol.body.ProducerTableInfo;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.MQAdminStartup;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+import java.util.List;
+
+public class ProducerSubCommand implements SubCommand {
+
+ public static void main(String[] args) {
+ System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876");
+ MQAdminStartup.main(new String[]{new ProducerSubCommand().commandName(), "-b", "127.0.0.1:10911"});
+ }
+
+ @Override
+ public String commandName() {
+ return "producer";
+ }
+
+ @Override
+ public String commandDesc() {
+ return "Query producer's instances, connection, status, etc.";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ Option opt = new Option("b", "broker", true, "broker address");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+ defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+ try {
+ defaultMQAdminExt.start();
+ String brokerAddr = commandLine.getOptionValue('b').trim();
+ ProducerTableInfo cc = defaultMQAdminExt.getAllProducerInfo(brokerAddr);
+ if (cc != null && cc.getData() != null && !cc.getData().isEmpty()) {
+ for (String group : cc.getData().keySet()) {
+ List<ProducerInfo> list = cc.getData().get(group);
+ if (list == null || list.isEmpty()) {
+ System.out.printf("producer group (%s) instances are empty\n", group);
+ continue;
+ }
+ for (ProducerInfo producer : list) {
+ System.out.printf("producer group (%s) instance : %s\n", group, producer.toString());
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
+ } finally {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+}
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
index 74723d43f..febc80e44 100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
@@ -16,18 +16,6 @@
*/
package org.apache.rocketmq.tools.admin;
-import java.io.UnsupportedEncodingException;
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
@@ -53,6 +41,8 @@ import org.apache.rocketmq.common.protocol.body.GroupList;
import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo;
import org.apache.rocketmq.common.protocol.body.ProducerConnection;
+import org.apache.rocketmq.common.protocol.body.ProducerInfo;
+import org.apache.rocketmq.common.protocol.body.ProducerTableInfo;
import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
@@ -69,6 +59,7 @@ import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.tools.admin.api.MessageTrack;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -76,6 +67,20 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
@@ -210,6 +215,17 @@ public class DefaultMQAdminExtTest {
producerConnection.setConnectionSet(connectionSet);
when(mQClientAPIImpl.getProducerConnectionList(anyString(), anyString(), anyLong())).thenReturn(producerConnection);
+ ProducerTableInfo producerTableInfo = new ProducerTableInfo(new HashMap<>());
+ producerTableInfo.getData().put("test-producer-group", Arrays.asList(new ProducerInfo(
+ "xxxx-client-id",
+ "127.0.0.1:18978",
+ LanguageCode.JAVA,
+ 400,
+ System.currentTimeMillis()
+
+ )));
+ when(mQClientAPIImpl.getAllProducerInfo(anyString(), anyLong())).thenReturn(producerTableInfo);
+
when(mQClientAPIImpl.wipeWritePermOfBroker(anyString(), anyString(), anyLong())).thenReturn(6);
when(mQClientAPIImpl.addWritePermOfBroker(anyString(), anyString(), anyLong())).thenReturn(7);
@@ -308,6 +324,13 @@ public class DefaultMQAdminExtTest {
assertThat(producerConnection.getConnectionSet().size()).isEqualTo(1);
}
+ @Test
+ public void testGetAllProducerInfo() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+ ProducerTableInfo producerTableInfo = defaultMQAdminExt.getAllProducerInfo("127.0.0.1:10911");
+ assertThat(producerTableInfo.getData().size()).isEqualTo(1);
+ }
+
+
@Test
public void testWipeWritePermOfBroker() throws InterruptedException, RemotingCommandException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, RemotingConnectException {
int result = defaultMQAdminExt.wipeWritePermOfBroker("127.0.0.1:9876", "default-broker");
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/producer/ProducerSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/producer/ProducerSubCommandTest.java
new file mode 100644
index 000000000..6d6cc93ab
--- /dev/null
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/producer/ProducerSubCommandTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tools.command.producer;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.rocketmq.common.admin.ConsumeStats;
+import org.apache.rocketmq.common.admin.OffsetWrapper;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.command.SubCommandException;
+import org.apache.rocketmq.tools.command.server.NameServerMocker;
+import org.apache.rocketmq.tools.command.server.ServerResponseMocker;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+
+public class ProducerSubCommandTest {
+ private static final int NAME_SERVER_PORT = 45677;
+
+ private static final int BROKER_PORT = 45676;
+
+ private ServerResponseMocker brokerMocker;
+
+ private ServerResponseMocker nameServerMocker;
+
+ @Before
+ public void before() {
+ brokerMocker = startOneBroker();
+ nameServerMocker = NameServerMocker.startByDefaultConf(NAME_SERVER_PORT, BROKER_PORT);
+ }
+
+ @After
+ public void after() {
+ brokerMocker.shutdown();
+ nameServerMocker.shutdown();
+ }
+
+ @Test
+ public void testExecute() throws SubCommandException {
+ ProducerSubCommand cmd = new ProducerSubCommand();
+ Options options = ServerUtil.buildCommandlineOptions(new Options());
+ String[] subargs = new String[]{"-b 127.0.0.1:" + BROKER_PORT};
+ final CommandLine commandLine =
+ ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
+ cmd.execute(commandLine, options, null);
+ }
+
+ private ServerResponseMocker startOneBroker() {
+ ConsumeStats consumeStats = new ConsumeStats();
+ HashMap<MessageQueue, OffsetWrapper> offsetTable = new HashMap<>();
+ MessageQueue messageQueue = new MessageQueue();
+ messageQueue.setBrokerName("mockBrokerName");
+ messageQueue.setQueueId(1);
+ messageQueue.setBrokerName("mockTopicName");
+
+ OffsetWrapper offsetWrapper = new OffsetWrapper();
+ offsetWrapper.setBrokerOffset(1);
+ offsetWrapper.setConsumerOffset(1);
+ offsetWrapper.setLastTimestamp(System.currentTimeMillis());
+
+ offsetTable.put(messageQueue, offsetWrapper);
+ consumeStats.setOffsetTable(offsetTable);
+ // start broker
+ return ServerResponseMocker.startServer(BROKER_PORT, consumeStats.encode());
+ }
+}