You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2022/06/04 14:35:33 UTC

[rocketmq] branch develop updated: [iSSUE-4396] Support get all producer on one broker (#4395)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new d5ca4d05c [iSSUE-4396] Support get all producer on one broker (#4395)
d5ca4d05c is described below

commit d5ca4d05c97c203751b53a134030ad9a2691c09c
Author: tiger lee <10...@qq.com>
AuthorDate: Sat Jun 4 22:35:24 2022 +0800

    [iSSUE-4396] Support get all producer on one broker (#4395)
    
    * Update .travis.yml
    
    Depracated MODERATE noisy mail for dev@rocketmq.apache.org. Still Failing: apache/rocketmq#5670 (5.0.0-alpha-static-topic - 5ebc327) @dongeforever
    
    * Updated Notice file to 2022 (#3735)
    
    * add api: get all producer in one broker
    
    * add license header for new files
    
    * add test case and optimize some variables
    
    * remove unused imports and change some words to English
    
    * fix code style
    
    * update code style
    
    * update code style
    
    Co-authored-by: von gosling <vo...@apache.org>
    Co-authored-by: ZhangJian He <sh...@gmail.com>
    Co-authored-by: tigerweili <ti...@tencent.com>
---
 .../rocketmq/broker/client/ProducerManager.java    | 31 ++++++++
 .../broker/processor/AdminBrokerProcessor.java     | 21 ++++++
 .../broker/processor/AdminBrokerProcessorTest.java |  7 ++
 .../rocketmq/client/impl/MQClientAPIImpl.java      | 23 ++++++
 .../rocketmq/common/protocol/RequestCode.java      |  2 +
 .../common/protocol/body/ProducerInfo.java         | 84 +++++++++++++++++++++
 .../common/protocol/body/ProducerTableInfo.java    | 39 ++++++++++
 .../header/GetAllProducerInfoRequestHeader.java    | 29 ++++++++
 .../rocketmq/tools/admin/DefaultMQAdminExt.java    |  6 ++
 .../tools/admin/DefaultMQAdminExtImpl.java         |  6 ++
 .../apache/rocketmq/tools/admin/MQAdminExt.java    |  4 +
 .../rocketmq/tools/command/MQAdminStartup.java     |  3 +
 .../tools/command/producer/ProducerSubCommand.java | 86 ++++++++++++++++++++++
 .../tools/admin/DefaultMQAdminExtTest.java         | 47 +++++++++---
 .../command/producer/ProducerSubCommandTest.java   | 85 +++++++++++++++++++++
 15 files changed, 461 insertions(+), 12 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
index 4bd00eff1..e8f45e702 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
@@ -18,6 +18,8 @@ package org.apache.rocketmq.broker.client;
 
 import io.netty.channel.Channel;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -25,6 +27,8 @@ import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.rocketmq.broker.util.PositiveAtomicCounter;
 import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.body.ProducerInfo;
+import org.apache.rocketmq.common.protocol.body.ProducerTableInfo;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
@@ -46,6 +50,33 @@ public class ProducerManager {
         return groupChannelTable;
     }
 
+    public ProducerTableInfo getProducerTable() {
+        Map<String, List<ProducerInfo>> map = new HashMap<>();
+        for (String group : this.groupChannelTable.keySet()) {
+            for (Entry<Channel, ClientChannelInfo> entry: this.groupChannelTable.get(group).entrySet()) {
+                ClientChannelInfo clientChannelInfo = entry.getValue();
+                if (map.containsKey(group)) {
+                    map.get(group).add(new ProducerInfo(
+                            clientChannelInfo.getClientId(),
+                            clientChannelInfo.getChannel().remoteAddress().toString(),
+                            clientChannelInfo.getLanguage(),
+                            clientChannelInfo.getVersion(),
+                            clientChannelInfo.getLastUpdateTimestamp()
+                    ));
+                } else {
+                    map.put(group, new ArrayList<ProducerInfo>(Collections.singleton(new ProducerInfo(
+                            clientChannelInfo.getClientId(),
+                            clientChannelInfo.getChannel().remoteAddress().toString(),
+                            clientChannelInfo.getLanguage(),
+                            clientChannelInfo.getVersion(),
+                            clientChannelInfo.getLastUpdateTimestamp()
+                    ))));
+                }
+            }
+        }
+        return new ProducerTableInfo(map);
+    }
+
     public void scanNotActiveChannel() {
         for (final Map.Entry<String, ConcurrentHashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable
                 .entrySet()) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index f4ada3684..0a70b5546 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -26,6 +26,8 @@ import org.apache.rocketmq.broker.client.ClientChannelInfo;
 import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
 import org.apache.rocketmq.broker.filter.ConsumerFilterData;
 import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
+import org.apache.rocketmq.common.protocol.body.ProducerTableInfo;
+import org.apache.rocketmq.common.protocol.header.GetAllProducerInfoRequestHeader;
 import org.apache.rocketmq.common.topic.TopicValidator;
 import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
 import org.apache.rocketmq.common.AclConfig;
@@ -188,6 +190,8 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
                 return this.getConsumerConnectionList(ctx, request);
             case RequestCode.GET_PRODUCER_CONNECTION_LIST:
                 return this.getProducerConnectionList(ctx, request);
+            case RequestCode.GET_ALL_PRODUCER_INFO:
+                return this.getAllProducerInfo(ctx, request);
             case RequestCode.GET_CONSUME_STATS:
                 return this.getConsumeStats(ctx, request);
             case RequestCode.GET_ALL_CONSUMER_OFFSET:
@@ -842,6 +846,23 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
         return response;
     }
 
+    private RemotingCommand getAllProducerInfo(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        final GetAllProducerInfoRequestHeader requestHeader =
+                (GetAllProducerInfoRequestHeader) request.decodeCommandCustomHeader(GetAllProducerInfoRequestHeader.class);
+
+        ProducerTableInfo producerTable = this.brokerController.getProducerManager().getProducerTable();
+        if (producerTable != null) {
+            byte[] body = producerTable.encode();
+            response.setBody(body);
+            response.setCode(ResponseCode.SUCCESS);
+            response.setRemark(null);
+            return response;
+        }
+
+        response.setCode(ResponseCode.SYSTEM_ERROR);
+        return response;
+    }
     private RemotingCommand getProducerConnectionList(ChannelHandlerContext ctx,
         RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
index 8b29f4ffb..35df22719 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
@@ -463,6 +463,13 @@ public class AdminBrokerProcessorTest {
         assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
     }
 
+    @Test
+    public void testGetAllProducerInfo() throws RemotingCommandException {
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_PRODUCER_INFO, null);
+        RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+    }
+
     @Test
     public void testGetConsumeStats() throws RemotingCommandException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUME_STATS, null);
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index f2d01897c..22aefda8a 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.consumer.PullCallback;
@@ -77,6 +78,7 @@ import org.apache.rocketmq.common.protocol.body.KVTable;
 import org.apache.rocketmq.common.protocol.body.LockBatchRequestBody;
 import org.apache.rocketmq.common.protocol.body.LockBatchResponseBody;
 import org.apache.rocketmq.common.protocol.body.ProducerConnection;
+import org.apache.rocketmq.common.protocol.body.ProducerTableInfo;
 import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
 import org.apache.rocketmq.common.protocol.body.QueryConsumeTimeSpanBody;
 import org.apache.rocketmq.common.protocol.body.QueryCorrectionOffsetBody;
@@ -95,6 +97,7 @@ import org.apache.rocketmq.common.protocol.header.DeleteAccessConfigRequestHeade
 import org.apache.rocketmq.common.protocol.header.DeleteSubscriptionGroupRequestHeader;
 import org.apache.rocketmq.common.protocol.header.DeleteTopicRequestHeader;
 import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetAllProducerInfoRequestHeader;
 import org.apache.rocketmq.common.protocol.header.GetBrokerAclConfigResponseHeader;
 import org.apache.rocketmq.common.protocol.header.GetBrokerClusterAclConfigResponseBody;
 import org.apache.rocketmq.common.protocol.header.GetConsumeStatsInBrokerHeader;
@@ -1245,6 +1248,26 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
     }
 
+    public ProducerTableInfo getAllProducerInfo(final String addr, final long timeoutMillis)
+            throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
+            MQBrokerException {
+        GetAllProducerInfoRequestHeader requestHeader = new GetAllProducerInfoRequestHeader();
+
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_PRODUCER_INFO, requestHeader);
+
+        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+                request, timeoutMillis);
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                return ProducerTableInfo.decode(response.getBody(), ProducerTableInfo.class);
+            }
+            default:
+                break;
+        }
+
+        throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
+    }
+
     public ConsumerConnection getConsumerConnectionList(final String addr, final String consumerGroup,
         final long timeoutMillis)
         throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
index 5624a7ec0..24c072bc8 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
@@ -190,4 +190,6 @@ public class RequestCode {
     public static final int PUSH_REPLY_MESSAGE_TO_CLIENT = 326;
 
     public static final int ADD_WRITE_PERM_OF_BROKER = 327;
+
+    public static final int GET_ALL_PRODUCER_INFO = 328;
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ProducerInfo.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ProducerInfo.java
new file mode 100644
index 000000000..73bf1c304
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ProducerInfo.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.body;
+
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+
+public class ProducerInfo extends RemotingSerializable {
+    private String clientId;
+    private String remoteIP;
+    private LanguageCode language;
+    private int version;
+    private long lastUpdateTimestamp;
+
+    public ProducerInfo(String clientId, String remoteIP, LanguageCode language, int version, long lastUpdateTimestamp) {
+        this.clientId = clientId;
+        this.remoteIP = remoteIP;
+        this.language = language;
+        this.version = version;
+        this.lastUpdateTimestamp = lastUpdateTimestamp;
+    }
+
+    public String getClientId() {
+        return clientId;
+    }
+
+    public void setClientId(String clientId) {
+        this.clientId = clientId;
+    }
+
+    public String getRemoteIP() {
+        return remoteIP;
+    }
+
+    public void setRemoteIP(String remoteIP) {
+        this.remoteIP = remoteIP;
+    }
+
+    public LanguageCode getLanguage() {
+        return language;
+    }
+
+    public void setLanguage(LanguageCode language) {
+        this.language = language;
+    }
+
+    public int getVersion() {
+        return version;
+    }
+
+    public void setVersion(int version) {
+        this.version = version;
+    }
+
+    public long getLastUpdateTimestamp() {
+        return lastUpdateTimestamp;
+    }
+
+    public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
+        this.lastUpdateTimestamp = lastUpdateTimestamp;
+    }
+
+    @Override
+    public String toString() {
+        return String.format("clientId=%s,remoteIP=%s, language=%s, version=%d, lastUpdateTimestamp=%d",
+                clientId, remoteIP, language.name(), version, lastUpdateTimestamp);
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ProducerTableInfo.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ProducerTableInfo.java
new file mode 100644
index 000000000..f8d8538b8
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ProducerTableInfo.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.body;
+
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.List;
+import java.util.Map;
+
+public class ProducerTableInfo extends RemotingSerializable {
+    public ProducerTableInfo(Map<String, List<ProducerInfo>> data) {
+        this.data = data;
+    }
+
+    private Map<String, List<ProducerInfo>> data;
+
+    public Map<String, List<ProducerInfo>> getData() {
+        return data;
+    }
+
+    public void setData(Map<String, List<ProducerInfo>> data) {
+        this.data = data;
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetAllProducerInfoRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetAllProducerInfoRequestHeader.java
new file mode 100644
index 000000000..201294565
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetAllProducerInfoRequestHeader.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class GetAllProducerInfoRequestHeader implements CommandCustomHeader {
+    @Override
+    public void checkFields() throws RemotingCommandException {
+        // To change body of implemented methods use File | Settings | File
+        // Templates.
+    }
+}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index b5f4d6230..22b6dcdc7 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -43,6 +43,7 @@ import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
 import org.apache.rocketmq.common.protocol.body.GroupList;
 import org.apache.rocketmq.common.protocol.body.KVTable;
 import org.apache.rocketmq.common.protocol.body.ProducerConnection;
+import org.apache.rocketmq.common.protocol.body.ProducerTableInfo;
 import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
 import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
 import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
@@ -285,6 +286,11 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
         return defaultMQAdminExtImpl.examineProducerConnectionInfo(producerGroup, topic);
     }
 
+    @Override
+    public ProducerTableInfo getAllProducerInfo(final String brokerAddr) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+        return defaultMQAdminExtImpl.getAllProducerInfo(brokerAddr);
+    }
+
     @Override
     public List<String> getNameServerAddressList() {
         return this.defaultMQAdminExtImpl.getNameServerAddressList();
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 1fff6be0f..292d80cdb 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -50,6 +50,7 @@ import org.apache.rocketmq.common.admin.TopicOffset;
 import org.apache.rocketmq.common.admin.TopicStatsTable;
 import org.apache.rocketmq.common.help.FAQUrl;
 import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo;
+import org.apache.rocketmq.common.protocol.body.ProducerTableInfo;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.common.message.MessageClientExt;
 import org.apache.rocketmq.common.message.MessageConst;
@@ -407,6 +408,11 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
         return result;
     }
 
+    @Override
+    public ProducerTableInfo getAllProducerInfo(final String brokerAddr) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+        return this.mqClientInstance.getMQClientAPIImpl().getAllProducerInfo(brokerAddr, timeoutMillis);
+    }
+
     @Override
     public List<String> getNameServerAddressList() {
         return this.mqClientInstance.getMQClientAPIImpl().getNameServerAddressList();
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index d222a1500..7bbe8e331 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -42,6 +42,7 @@ import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
 import org.apache.rocketmq.common.protocol.body.GroupList;
 import org.apache.rocketmq.common.protocol.body.KVTable;
 import org.apache.rocketmq.common.protocol.body.ProducerConnection;
+import org.apache.rocketmq.common.protocol.body.ProducerTableInfo;
 import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
 import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
 import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
@@ -133,6 +134,9 @@ public interface MQAdminExt extends MQAdmin {
         final String topic) throws RemotingException,
         MQClientException, InterruptedException, MQBrokerException;
 
+    ProducerTableInfo getAllProducerInfo(final String brokerAddr) throws RemotingException,
+            MQClientException, InterruptedException, MQBrokerException;
+
     List<String> getNameServerAddressList();
 
     int wipeWritePermOfBroker(final String namesrvAddr, String brokerName) throws RemotingCommandException,
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
index 046ed5d55..fca04e175 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
@@ -72,6 +72,7 @@ import org.apache.rocketmq.tools.command.namesrv.WipeWritePermSubCommand;
 import org.apache.rocketmq.tools.command.offset.CloneGroupOffsetCommand;
 import org.apache.rocketmq.tools.command.offset.ResetOffsetByTimeCommand;
 import org.apache.rocketmq.tools.command.offset.SkipAccumulationSubCommand;
+import org.apache.rocketmq.tools.command.producer.ProducerSubCommand;
 import org.apache.rocketmq.tools.command.queue.QueryConsumeQueueCommand;
 import org.apache.rocketmq.tools.command.stats.StatsAllSubCommand;
 import org.apache.rocketmq.tools.command.topic.AllocateMQSubCommand;
@@ -184,6 +185,8 @@ public class MQAdminStartup {
         initCommand(new ConsumerProgressSubCommand());
         initCommand(new ConsumerStatusSubCommand());
         initCommand(new CloneGroupOffsetCommand());
+        //for producer
+        initCommand(new ProducerSubCommand());
 
         initCommand(new ClusterListSubCommand());
         initCommand(new TopicListSubCommand());
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/producer/ProducerSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/producer/ProducerSubCommand.java
new file mode 100644
index 000000000..f6fa63f13
--- /dev/null
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/producer/ProducerSubCommand.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.producer;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.protocol.body.ProducerInfo;
+import org.apache.rocketmq.common.protocol.body.ProducerTableInfo;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.MQAdminStartup;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+import java.util.List;
+
+public class ProducerSubCommand implements SubCommand {
+
+    public static void main(String[] args) {
+        System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876");
+        MQAdminStartup.main(new String[]{new ProducerSubCommand().commandName(), "-b", "127.0.0.1:10911"});
+    }
+
+    @Override
+    public String commandName() {
+        return "producer";
+    }
+
+    @Override
+    public String commandDesc() {
+        return "Query producer's instances, connection, status, etc.";
+    }
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        Option opt = new Option("b", "broker", true, "broker address");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        return options;
+    }
+
+    @Override
+    public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+        try {
+            defaultMQAdminExt.start();
+            String brokerAddr = commandLine.getOptionValue('b').trim();
+            ProducerTableInfo cc = defaultMQAdminExt.getAllProducerInfo(brokerAddr);
+            if (cc != null && cc.getData() != null && !cc.getData().isEmpty()) {
+                for (String group : cc.getData().keySet()) {
+                    List<ProducerInfo> list = cc.getData().get(group);
+                    if (list == null || list.isEmpty()) {
+                        System.out.printf("producer group (%s) instances are empty\n", group);
+                        continue;
+                    }
+                    for (ProducerInfo producer : list) {
+                        System.out.printf("producer group (%s) instance : %s\n", group, producer.toString());
+                    }
+                }
+            }
+        } catch (Exception e) {
+            throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+    }
+}
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
index 74723d43f..febc80e44 100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
@@ -16,18 +16,6 @@
  */
 package org.apache.rocketmq.tools.admin;
 
-import java.io.UnsupportedEncodingException;
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.ConcurrentHashMap;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
@@ -53,6 +41,8 @@ import org.apache.rocketmq.common.protocol.body.GroupList;
 import org.apache.rocketmq.common.protocol.body.KVTable;
 import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo;
 import org.apache.rocketmq.common.protocol.body.ProducerConnection;
+import org.apache.rocketmq.common.protocol.body.ProducerInfo;
+import org.apache.rocketmq.common.protocol.body.ProducerTableInfo;
 import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
 import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
@@ -69,6 +59,7 @@ import org.apache.rocketmq.remoting.exception.RemotingConnectException;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
 import org.apache.rocketmq.tools.admin.api.MessageTrack;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -76,6 +67,20 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.junit.MockitoJUnitRunner;
 
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyLong;
@@ -210,6 +215,17 @@ public class DefaultMQAdminExtTest {
         producerConnection.setConnectionSet(connectionSet);
         when(mQClientAPIImpl.getProducerConnectionList(anyString(), anyString(), anyLong())).thenReturn(producerConnection);
 
+        ProducerTableInfo producerTableInfo = new ProducerTableInfo(new HashMap<>());
+        producerTableInfo.getData().put("test-producer-group", Arrays.asList(new ProducerInfo(
+                "xxxx-client-id",
+                "127.0.0.1:18978",
+                LanguageCode.JAVA,
+                400,
+                System.currentTimeMillis()
+
+        )));
+        when(mQClientAPIImpl.getAllProducerInfo(anyString(), anyLong())).thenReturn(producerTableInfo);
+
         when(mQClientAPIImpl.wipeWritePermOfBroker(anyString(), anyString(), anyLong())).thenReturn(6);
         when(mQClientAPIImpl.addWritePermOfBroker(anyString(), anyString(), anyLong())).thenReturn(7);
 
@@ -308,6 +324,13 @@ public class DefaultMQAdminExtTest {
         assertThat(producerConnection.getConnectionSet().size()).isEqualTo(1);
     }
 
+    @Test
+    public void testGetAllProducerInfo() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+        ProducerTableInfo producerTableInfo = defaultMQAdminExt.getAllProducerInfo("127.0.0.1:10911");
+        assertThat(producerTableInfo.getData().size()).isEqualTo(1);
+    }
+
+
     @Test
     public void testWipeWritePermOfBroker() throws InterruptedException, RemotingCommandException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, RemotingConnectException {
         int result = defaultMQAdminExt.wipeWritePermOfBroker("127.0.0.1:9876", "default-broker");
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/producer/ProducerSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/producer/ProducerSubCommandTest.java
new file mode 100644
index 000000000..6d6cc93ab
--- /dev/null
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/producer/ProducerSubCommandTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tools.command.producer;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.rocketmq.common.admin.ConsumeStats;
+import org.apache.rocketmq.common.admin.OffsetWrapper;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.command.SubCommandException;
+import org.apache.rocketmq.tools.command.server.NameServerMocker;
+import org.apache.rocketmq.tools.command.server.ServerResponseMocker;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+
+public class ProducerSubCommandTest {
+    private static final int NAME_SERVER_PORT = 45677;
+
+    private static final int BROKER_PORT = 45676;
+
+    private ServerResponseMocker brokerMocker;
+
+    private ServerResponseMocker nameServerMocker;
+
+    @Before
+    public void before() {
+        brokerMocker = startOneBroker();
+        nameServerMocker = NameServerMocker.startByDefaultConf(NAME_SERVER_PORT, BROKER_PORT);
+    }
+
+    @After
+    public void after() {
+        brokerMocker.shutdown();
+        nameServerMocker.shutdown();
+    }
+
+    @Test
+    public void testExecute() throws SubCommandException {
+        ProducerSubCommand cmd = new ProducerSubCommand();
+        Options options = ServerUtil.buildCommandlineOptions(new Options());
+        String[] subargs = new String[]{"-b 127.0.0.1:" + BROKER_PORT};
+        final CommandLine commandLine =
+                ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
+        cmd.execute(commandLine, options, null);
+    }
+
+    private ServerResponseMocker startOneBroker() {
+        ConsumeStats consumeStats = new ConsumeStats();
+        HashMap<MessageQueue, OffsetWrapper> offsetTable = new HashMap<>();
+        MessageQueue messageQueue = new MessageQueue();
+        messageQueue.setBrokerName("mockBrokerName");
+        messageQueue.setQueueId(1);
+        messageQueue.setBrokerName("mockTopicName");
+
+        OffsetWrapper offsetWrapper = new OffsetWrapper();
+        offsetWrapper.setBrokerOffset(1);
+        offsetWrapper.setConsumerOffset(1);
+        offsetWrapper.setLastTimestamp(System.currentTimeMillis());
+
+        offsetTable.put(messageQueue, offsetWrapper);
+        consumeStats.setOffsetTable(offsetTable);
+        // start broker
+        return ServerResponseMocker.startServer(BROKER_PORT, consumeStats.encode());
+    }
+}