You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/06/19 01:04:36 UTC

[rocketmq] branch develop updated: [ISSUE #598] Enhance transaction by putting messages that exceed max check times to system topic (#633)

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new d66243c  [ISSUE #598] Enhance transaction by putting messages that exceed max check times to system topic (#633)
d66243c is described below

commit d66243c03cb903588c0089b7152fe8c8f33bcc84
Author: 程向往 <ch...@cmss.chinamobile.com>
AuthorDate: Wed Jun 19 09:04:29 2019 +0800

    [ISSUE #598] Enhance transaction by putting messages that exceed max check times to system topic (#633)
    
    * add logic of putting message that exceeds max-check-times to system topic TRANS_CHECK_MAXTIME_TOPIC
    
    * add test case:testResolveDiscardMsg
    
    * add @After logic to test case
    
    * comment brokerController.shutdown and use mock
    
    * add logic of resuming half message check
    
    * add test case:resumeCheckHalfMessage
    
    * delete commented codes
---
 .../broker/processor/AdminBrokerProcessor.java     |  70 +++++++++++-
 .../rocketmq/broker/topic/TopicConfigManager.java  |  48 +++++++-
 .../AbstractTransactionalMessageCheckListener.java |   5 +
 .../DefaultTransactionalMessageCheckListener.java  |  45 +++++++-
 .../broker/processor/AdminBrokerProcessorTest.java | 125 +++++++++++++++++++++
 ...faultTransactionalMessageCheckListenerTest.java |  48 ++++++--
 .../rocketmq/client/impl/MQClientAPIImpl.java      |  21 ++++
 .../rocketmq/client/impl/MQClientAPIImplTest.java  |  46 +++++++-
 .../java/org/apache/rocketmq/common/MixAll.java    |   1 +
 .../rocketmq/common/protocol/RequestCode.java      |   5 +
 .../ResumeCheckHalfMessageRequestHeader.java       |  33 ++++--
 pom.xml                                            |   2 +-
 .../rocketmq/tools/admin/DefaultMQAdminExt.java    |  13 +++
 .../tools/admin/DefaultMQAdminExtImpl.java         |  19 ++++
 .../apache/rocketmq/tools/admin/MQAdminExt.java    |   5 +
 .../message/QueryMsgByUniqueKeySubCommandTest.java |  15 +++
 16 files changed, 468 insertions(+), 33 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index f23cca6..76a051b 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -37,6 +37,7 @@ import org.apache.rocketmq.broker.client.ClientChannelInfo;
 import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
 import org.apache.rocketmq.broker.filter.ConsumerFilterData;
 import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
+import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
 import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.PlainAccessConfig;
@@ -53,7 +54,10 @@ import org.apache.rocketmq.common.protocol.header.GetBrokerAclConfigResponseHead
 import org.apache.rocketmq.common.protocol.header.UpdateGlobalWhiteAddrsConfigRequestHeader;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageId;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.RequestCode;
@@ -100,6 +104,7 @@ import org.apache.rocketmq.common.protocol.header.QueryConsumeTimeSpanRequestHea
 import org.apache.rocketmq.common.protocol.header.QueryCorrectionOffsetHeader;
 import org.apache.rocketmq.common.protocol.header.QueryTopicConsumeByWhoRequestHeader;
 import org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.ResumeCheckHalfMessageRequestHeader;
 import org.apache.rocketmq.common.protocol.header.SearchOffsetRequestHeader;
 import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
 import org.apache.rocketmq.common.protocol.header.ViewBrokerStatsDataRequestHeader;
@@ -120,8 +125,11 @@ import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 import org.apache.rocketmq.store.ConsumeQueue;
 import org.apache.rocketmq.store.ConsumeQueueExt;
 import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.MessageExtBrokerInner;
 import org.apache.rocketmq.store.MessageFilter;
 import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.PutMessageStatus;
 import org.apache.rocketmq.store.SelectMappedBufferResult;
 
 public class AdminBrokerProcessor implements NettyRequestProcessor {
@@ -216,6 +224,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
                 return getBrokerAclConfigVersion(ctx, request);
             case RequestCode.UPDATE_GLOBAL_WHITE_ADDRS_CONFIG:
                 return updateGlobalWhiteAddrsConfig(ctx, request);
+            case RequestCode.RESUME_CHECK_HALF_MESSAGE:
+                return resumeCheckHalfMessage(ctx, request);
             default:
                 break;
         }
@@ -262,7 +272,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
 
         this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
 
-        this.brokerController.registerIncrementBrokerData(topicConfig,this.brokerController.getTopicConfigManager().getDataVersion());
+        this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion());
 
         return null;
     }
@@ -1518,4 +1528,62 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
 
         return response;
     }
+
+    private RemotingCommand resumeCheckHalfMessage(ChannelHandlerContext ctx,
+        RemotingCommand request)
+        throws RemotingCommandException {
+        final ResumeCheckHalfMessageRequestHeader requestHeader = (ResumeCheckHalfMessageRequestHeader) request
+            .decodeCommandCustomHeader(ResumeCheckHalfMessageRequestHeader.class);
+        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        SelectMappedBufferResult selectMappedBufferResult = null;
+        try {
+            MessageId messageId = MessageDecoder.decodeMessageId(requestHeader.getMsgId());
+            selectMappedBufferResult = this.brokerController.getMessageStore()
+                .selectOneMessageByOffset(messageId.getOffset());
+            MessageExt msg = MessageDecoder.decode(selectMappedBufferResult.getByteBuffer());
+            msg.putUserProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES, String.valueOf(0));
+            PutMessageResult putMessageResult = this.brokerController.getMessageStore()
+                .putMessage(toMessageExtBrokerInner(msg));
+            if (putMessageResult != null
+                && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
+                log.info(
+                    "Put message back to RMQ_SYS_TRANS_HALF_TOPIC. real topic={}",
+                    msg.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
+                response.setCode(ResponseCode.SUCCESS);
+                response.setRemark(null);
+            } else {
+                log.error("Put message back to RMQ_SYS_TRANS_HALF_TOPIC failed.");
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setRemark("Put message back to RMQ_SYS_TRANS_HALF_TOPIC failed.");
+            }
+        } catch (Exception e) {
+            log.error("Exception was thrown when putting message back to RMQ_SYS_TRANS_HALF_TOPIC.");
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark("Exception was thrown when putting message back to RMQ_SYS_TRANS_HALF_TOPIC.");
+        } finally {
+            if (selectMappedBufferResult != null) {
+                selectMappedBufferResult.release();
+            }
+        }
+        return response;
+    }
+
+    private MessageExtBrokerInner toMessageExtBrokerInner(MessageExt msgExt) {
+        MessageExtBrokerInner inner = new MessageExtBrokerInner();
+        inner.setTopic(TransactionalMessageUtil.buildHalfTopic());
+        inner.setBody(msgExt.getBody());
+        inner.setFlag(msgExt.getFlag());
+        MessageAccessor.setProperties(inner, msgExt.getProperties());
+        inner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
+        inner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(msgExt.getTags()));
+        inner.setQueueId(0);
+        inner.setSysFlag(msgExt.getSysFlag());
+        inner.setBornHost(msgExt.getBornHost());
+        inner.setBornTimestamp(msgExt.getBornTimestamp());
+        inner.setStoreHost(msgExt.getStoreHost());
+        inner.setReconsumeTimes(msgExt.getReconsumeTimes());
+        inner.setMsgId(msgExt.getMsgId());
+        inner.setWaitStoreMsgOK(false);
+        return inner;
+    }
 }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index ed353da..8f215cd 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -220,7 +220,7 @@ public class TopicConfigManager extends ConfigManager {
         }
 
         if (createNew) {
-            this.brokerController.registerBrokerAll(false, true,true);
+            this.brokerController.registerBrokerAll(false, true, true);
         }
 
         return topicConfig;
@@ -264,7 +264,47 @@ public class TopicConfigManager extends ConfigManager {
         }
 
         if (createNew) {
-            this.brokerController.registerBrokerAll(false, true,true);
+            this.brokerController.registerBrokerAll(false, true, true);
+        }
+
+        return topicConfig;
+    }
+
+    public TopicConfig createTopicOfTranCheckMaxTime(final int clientDefaultTopicQueueNums, final int perm) {
+        TopicConfig topicConfig = this.topicConfigTable.get(MixAll.TRANS_CHECK_MAX_TIME_TOPIC);
+        if (topicConfig != null)
+            return topicConfig;
+
+        boolean createNew = false;
+
+        try {
+            if (this.lockTopicConfigTable.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+                try {
+                    topicConfig = this.topicConfigTable.get(MixAll.TRANS_CHECK_MAX_TIME_TOPIC);
+                    if (topicConfig != null)
+                        return topicConfig;
+
+                    topicConfig = new TopicConfig(MixAll.TRANS_CHECK_MAX_TIME_TOPIC);
+                    topicConfig.setReadQueueNums(clientDefaultTopicQueueNums);
+                    topicConfig.setWriteQueueNums(clientDefaultTopicQueueNums);
+                    topicConfig.setPerm(perm);
+                    topicConfig.setTopicSysFlag(0);
+
+                    log.info("create new topic {}", topicConfig);
+                    this.topicConfigTable.put(MixAll.TRANS_CHECK_MAX_TIME_TOPIC, topicConfig);
+                    createNew = true;
+                    this.dataVersion.nextVersion();
+                    this.persist();
+                } finally {
+                    this.lockTopicConfigTable.unlock();
+                }
+            }
+        } catch (InterruptedException e) {
+            log.error("create TRANS_CHECK_MAX_TIME_TOPIC exception", e);
+        }
+
+        if (createNew) {
+            this.brokerController.registerBrokerAll(false, true, true);
         }
 
         return topicConfig;
@@ -289,7 +329,7 @@ public class TopicConfigManager extends ConfigManager {
             this.dataVersion.nextVersion();
 
             this.persist();
-            this.brokerController.registerBrokerAll(false, true,true);
+            this.brokerController.registerBrokerAll(false, true, true);
         }
     }
 
@@ -309,7 +349,7 @@ public class TopicConfigManager extends ConfigManager {
             this.dataVersion.nextVersion();
 
             this.persist();
-            this.brokerController.registerBrokerAll(false, true,true);
+            this.brokerController.registerBrokerAll(false, true, true);
         }
     }
 
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java
index 659c6af..62507cd 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java
@@ -17,6 +17,7 @@
 package org.apache.rocketmq.broker.transaction;
 
 import io.netty.channel.Channel;
+import java.util.Random;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.message.MessageConst;
@@ -36,6 +37,10 @@ public abstract class AbstractTransactionalMessageCheckListener {
 
     private BrokerController brokerController;
 
+    //queue nums of topic TRANS_CHECK_MAX_TIME_TOPIC
+    protected final static int TCMT_QUEUE_NUMS = 1;
+    protected final Random random = new Random(System.currentTimeMillis());
+
     private static ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
         @Override
         public Thread newThread(Runnable r) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java
index 529bfe4..ee87bd3 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java
@@ -17,10 +17,18 @@
 package org.apache.rocketmq.broker.transaction.queue;
 
 import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
+import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.store.MessageExtBrokerInner;
+import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.PutMessageStatus;
 
 public class DefaultTransactionalMessageCheckListener extends AbstractTransactionalMessageCheckListener {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
@@ -31,6 +39,41 @@ public class DefaultTransactionalMessageCheckListener extends AbstractTransactio
 
     @Override
     public void resolveDiscardMsg(MessageExt msgExt) {
-        log.error("MsgExt:{} has been checked too many times, so discard it", msgExt);
+        log.error("MsgExt:{} has been checked too many times, so discard it by moving it to system topic TRANS_CHECK_MAXTIME_TOPIC", msgExt);
+
+        try {
+            MessageExtBrokerInner brokerInner = toMessageExtBrokerInner(msgExt);
+            PutMessageResult putMessageResult = this.getBrokerController().getMessageStore().putMessage(brokerInner);
+            if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
+                log.info("Put checked-too-many-time half message to TRANS_CHECK_MAXTIME_TOPIC OK. Restored in queueOffset={}, " +
+                    "commitLogOffset={}, real topic={}", msgExt.getQueueOffset(), msgExt.getCommitLogOffset(), msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
+            } else {
+                log.error("Put checked-too-many-time half message to TRANS_CHECK_MAXTIME_TOPIC failed, real topic={}, msgId={}", msgExt.getTopic(), msgExt.getMsgId());
+            }
+        } catch (Exception e) {
+            log.warn("Put checked-too-many-time message to TRANS_CHECK_MAXTIME_TOPIC error. {}", e);
+        }
+
+    }
+
+    private MessageExtBrokerInner toMessageExtBrokerInner(MessageExt msgExt) {
+        TopicConfig topicConfig = this.getBrokerController().getTopicConfigManager().createTopicOfTranCheckMaxTime(TCMT_QUEUE_NUMS, PermName.PERM_READ | PermName.PERM_WRITE);
+        int queueId = Math.abs(random.nextInt() % 99999999) % TCMT_QUEUE_NUMS;
+        MessageExtBrokerInner inner = new MessageExtBrokerInner();
+        inner.setTopic(topicConfig.getTopicName());
+        inner.setBody(msgExt.getBody());
+        inner.setFlag(msgExt.getFlag());
+        MessageAccessor.setProperties(inner, msgExt.getProperties());
+        inner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
+        inner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(msgExt.getTags()));
+        inner.setQueueId(queueId);
+        inner.setSysFlag(msgExt.getSysFlag());
+        inner.setBornHost(msgExt.getBornHost());
+        inner.setBornTimestamp(msgExt.getBornTimestamp());
+        inner.setStoreHost(msgExt.getStoreHost());
+        inner.setReconsumeTimes(msgExt.getReconsumeTimes());
+        inner.setMsgId(msgExt.getMsgId());
+        inner.setWaitStoreMsgOK(false);
+        return inner;
     }
 }
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
new file mode 100644
index 0000000..ec0a879
--- /dev/null
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.processor;
+
+import io.netty.channel.ChannelHandlerContext;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.ResumeCheckHalfMessageRequestHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.store.AppendMessageResult;
+import org.apache.rocketmq.store.AppendMessageStatus;
+import org.apache.rocketmq.store.MappedFile;
+import org.apache.rocketmq.store.MessageExtBrokerInner;
+import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.PutMessageStatus;
+import org.apache.rocketmq.store.SelectMappedBufferResult;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class AdminBrokerProcessorTest {
+
+    private AdminBrokerProcessor adminBrokerProcessor;
+
+    @Mock
+    private ChannelHandlerContext handlerContext;
+
+    @Spy
+    private BrokerController
+        brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(),
+        new MessageStoreConfig());
+
+    @Mock
+    private MessageStore messageStore;
+
+
+    @Before
+    public void init() {
+        brokerController.setMessageStore(messageStore);
+        adminBrokerProcessor = new AdminBrokerProcessor(brokerController);
+    }
+
+    @Test
+    public void testProcessRequest_success() throws RemotingCommandException, UnknownHostException {
+        RemotingCommand request = createResumeCheckHalfMessageCommand();
+        when(messageStore.selectOneMessageByOffset(any(Long.class))).thenReturn(createSelectMappedBufferResult());
+        when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult
+                (PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)));
+        RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+    }
+
+    @Test
+    public void testProcessRequest_fail() throws RemotingCommandException, UnknownHostException {
+        RemotingCommand request = createResumeCheckHalfMessageCommand();
+        when(messageStore.selectOneMessageByOffset(any(Long.class))).thenReturn(createSelectMappedBufferResult());
+        when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult
+                (PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
+        RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
+    }
+
+    private MessageExt createDefaultMessageExt() {
+        MessageExt messageExt = new MessageExt();
+        messageExt.setMsgId("12345678");
+        messageExt.setQueueId(0);
+        messageExt.setCommitLogOffset(123456789L);
+        messageExt.setQueueOffset(1234);
+        MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_REAL_QUEUE_ID, "0");
+        MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_REAL_TOPIC, "testTopic");
+        MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES, "15");
+        return messageExt;
+    }
+
+    private SelectMappedBufferResult createSelectMappedBufferResult(){
+        SelectMappedBufferResult result = new SelectMappedBufferResult(0, ByteBuffer.allocate(1024) ,0, new MappedFile());
+        return result;
+    }
+    private ResumeCheckHalfMessageRequestHeader createResumeCheckHalfMessageRequestHeader() {
+        ResumeCheckHalfMessageRequestHeader header = new ResumeCheckHalfMessageRequestHeader();
+        header.setMsgId("C0A803CA00002A9F0000000000031367");
+        return header;
+    }
+
+    private RemotingCommand createResumeCheckHalfMessageCommand() {
+        ResumeCheckHalfMessageRequestHeader header = createResumeCheckHalfMessageRequestHeader();
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.RESUME_CHECK_HALF_MESSAGE, header);
+        request.makeCustomHeaderToNet();
+        return request;
+    }
+}
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListenerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListenerTest.java
index 17bf00b..653a969 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListenerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListenerTest.java
@@ -16,18 +16,23 @@
  */
 package org.apache.rocketmq.broker.transaction.queue;
 
+import java.net.InetSocketAddress;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.message.MessageAccessor;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
 import org.apache.rocketmq.remoting.netty.NettyServerConfig;
 import org.apache.rocketmq.store.MessageExtBrokerInner;
+import org.apache.rocketmq.store.MessageStore;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.Mock;
 import org.mockito.Spy;
 import org.mockito.junit.MockitoJUnitRunner;
 
@@ -35,16 +40,25 @@ import org.mockito.junit.MockitoJUnitRunner;
 public class DefaultTransactionalMessageCheckListenerTest {
 
     private DefaultTransactionalMessageCheckListener listener;
+    @Mock
+    private MessageStore messageStore;
 
     @Spy
-    private BrokerController brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(),
+    private BrokerController brokerController = new BrokerController(new BrokerConfig(),
+        new NettyServerConfig(),
         new NettyClientConfig(), new MessageStoreConfig());
 
-
     @Before
-    public void init() {
+    public void init() throws Exception {
         listener = new DefaultTransactionalMessageCheckListener();
         listener.setBrokerController(brokerController);
+        brokerController.setMessageStore(messageStore);
+
+    }
+
+    @After
+    public void destroy() {
+//        brokerController.shutdown();
     }
 
     @Test
@@ -53,21 +67,21 @@ public class DefaultTransactionalMessageCheckListenerTest {
     }
 
     @Test
-    public void testSendCheckMessage() throws Exception{
+    public void testSendCheckMessage() throws Exception {
         MessageExt messageExt = createMessageExt();
         listener.sendCheckMessage(messageExt);
     }
 
     @Test
-    public void sendCheckMessage(){
+    public void sendCheckMessage() {
         listener.resolveDiscardMsg(createMessageExt());
     }
 
     private MessageExtBrokerInner createMessageExt() {
         MessageExtBrokerInner inner = new MessageExtBrokerInner();
-        MessageAccessor.putProperty(inner,MessageConst.PROPERTY_REAL_QUEUE_ID,"1");
-        MessageAccessor.putProperty(inner,MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,"1234255");
-        MessageAccessor.putProperty(inner,MessageConst.PROPERTY_REAL_TOPIC,"realTopic");
+        MessageAccessor.putProperty(inner, MessageConst.PROPERTY_REAL_QUEUE_ID, "1");
+        MessageAccessor.putProperty(inner, MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, "1234255");
+        MessageAccessor.putProperty(inner, MessageConst.PROPERTY_REAL_TOPIC, "realTopic");
         inner.setTransactionId(inner.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
         inner.setBody("check".getBytes());
         inner.setMsgId("12344567890");
@@ -75,4 +89,22 @@ public class DefaultTransactionalMessageCheckListenerTest {
         return inner;
     }
 
+    @Test
+    public void testResolveDiscardMsg() {
+        MessageExt messageExt = new MessageExt();
+        messageExt.setTopic(MixAll.RMQ_SYS_TRANS_HALF_TOPIC);
+        messageExt.setQueueId(0);
+        messageExt.setBody("test resolve discard msg".getBytes());
+        messageExt.setStoreHost(new InetSocketAddress("127.0.0.1", 10911));
+        messageExt.setBornHost(new InetSocketAddress("127.0.0.1", 54270));
+        MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_REAL_TOPIC, "test_topic");
+        MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_PRODUCER_GROUP, "PID_TEST_DISCARD_MSG");
+        MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
+        MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES, "15");
+        MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_REAL_QUEUE_ID, "2");
+        MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_TAGS, "test_discard_msg");
+        MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, "AC14157E4F1C18B4AAC27EB1A0F30000");
+        listener.resolveDiscardMsg(messageExt);
+    }
+
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index c3382ca..b743af9 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -122,6 +122,7 @@ import org.apache.rocketmq.common.protocol.header.QueryCorrectionOffsetHeader;
 import org.apache.rocketmq.common.protocol.header.QueryMessageRequestHeader;
 import org.apache.rocketmq.common.protocol.header.QueryTopicConsumeByWhoRequestHeader;
 import org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.ResumeCheckHalfMessageRequestHeader;
 import org.apache.rocketmq.common.protocol.header.SearchOffsetRequestHeader;
 import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
 import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
@@ -2207,4 +2208,24 @@ public class MQClientAPIImpl {
             throw new MQClientException(response.getCode(), response.getRemark());
         }
     }
+
+    public boolean resumeCheckHalfMessage(final String addr, String msgId,
+        final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
+        ResumeCheckHalfMessageRequestHeader requestHeader = new ResumeCheckHalfMessageRequestHeader();
+        requestHeader.setMsgId(msgId);
+
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.RESUME_CHECK_HALF_MESSAGE, requestHeader);
+
+        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+            request, timeoutMillis);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                return true;
+            }
+            default:
+                log.error("Failed to resume half message check logic. Remark={}", response.getRemark());
+                return false;
+        }
+    }
 }
diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
index 2855595..84af632 100644
--- a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
@@ -26,17 +26,12 @@ import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.SendCallback;
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.client.producer.SendStatus;
-import org.apache.rocketmq.common.DataVersion;
 import org.apache.rocketmq.common.PlainAccessConfig;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.protocol.ResponseCode;
-import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo;
-import org.apache.rocketmq.common.protocol.header.CreateAccessConfigRequestHeader;
-import org.apache.rocketmq.common.protocol.header.GetBrokerAclConfigResponseHeader;
 import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
 import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
-import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
 import org.apache.rocketmq.remoting.InvokeCallback;
 import org.apache.rocketmq.remoting.RemotingClient;
 import org.apache.rocketmq.remoting.exception.RemotingException;
@@ -294,6 +289,45 @@ public class MQClientAPIImplTest {
             assertThat(ex.getErrorMessage()).isEqualTo("corresponding to accessConfig has been deleted failed");
         }
     }
+    @Test
+    public void testResumeCheckHalfMessage_WithException() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
+        doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock mock) throws Throwable {
+                RemotingCommand request = mock.getArgument(1);
+                RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setOpaque(request.getOpaque());
+                response.setRemark("Put message back to RMQ_SYS_TRANS_HALF_TOPIC failed.");
+                return response;
+            }
+        }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
+
+        boolean result = mqClientAPI.resumeCheckHalfMessage(brokerAddr, "test", 3000);
+        assertThat(result).isEqualTo(false);
+    }
+
+    @Test
+    public void testResumeCheckHalfMessage_Success() throws InterruptedException, RemotingException, MQBrokerException, MQClientException {
+        doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock mock) throws Throwable {
+                RemotingCommand request = mock.getArgument(1);
+                return createResumeSuccessResponse(request);
+            }
+        }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
+
+        boolean result = mqClientAPI.resumeCheckHalfMessage(brokerAddr, "test", 3000);
+
+        assertThat(result).isEqualTo(true);
+    }
+
+    private RemotingCommand createResumeSuccessResponse(RemotingCommand request) {
+        RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        response.setCode(ResponseCode.SUCCESS);
+        response.setOpaque(request.getOpaque());
+        return response;
+    }
 
     private RemotingCommand createSuccessResponse(RemotingCommand request) {
         RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
@@ -329,7 +363,7 @@ public class MQClientAPIImplTest {
         response.setOpaque(request.getOpaque());
         response.markResponseType();
         response.setRemark(null);
-        
+
         return response;
     }
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
index 550b0b6..0af65df 100644
--- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
@@ -92,6 +92,7 @@ public class MixAll {
     public static final String RMQ_SYS_TRANS_HALF_TOPIC = "RMQ_SYS_TRANS_HALF_TOPIC";
     public static final String RMQ_SYS_TRACE_TOPIC = "RMQ_SYS_TRACE_TOPIC";
     public static final String RMQ_SYS_TRANS_OP_HALF_TOPIC = "RMQ_SYS_TRANS_OP_HALF_TOPIC";
+    public static final String TRANS_CHECK_MAX_TIME_TOPIC = "TRANS_CHECK_MAX_TIME_TOPIC";
     public static final String CID_SYS_RMQ_TRANS = "CID_RMQ_SYS_TRANS";
     public static final String ACL_CONF_TOOLS_FILE = "/conf/tools.yml";
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
index b771b77..58c4b9f 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
@@ -175,4 +175,9 @@ public class RequestCode {
     public static final int QUERY_CONSUME_QUEUE = 321;
 
     public static final int QUERY_DATA_VERSION = 322;
+
+    /**
+     * resume logic of checking half messages that have been put in TRANS_CHECK_MAXTIME_TOPIC before
+     */
+    public static final int RESUME_CHECK_HALF_MESSAGE = 323;
 }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ResumeCheckHalfMessageRequestHeader.java
similarity index 54%
copy from broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java
copy to common/src/main/java/org/apache/rocketmq/common/protocol/header/ResumeCheckHalfMessageRequestHeader.java
index 529bfe4..14dacd5 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ResumeCheckHalfMessageRequestHeader.java
@@ -14,23 +14,32 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.broker.transaction.queue;
 
-import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
+package org.apache.rocketmq.common.protocol.header;
 
-public class DefaultTransactionalMessageCheckListener extends AbstractTransactionalMessageCheckListener {
-    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNullable;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-    public DefaultTransactionalMessageCheckListener() {
-        super();
+public class ResumeCheckHalfMessageRequestHeader implements CommandCustomHeader {
+    @CFNullable
+    private String msgId;
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+
+    }
+
+    public String getMsgId() {
+        return msgId;
+    }
+
+    public void setMsgId(String msgId) {
+        this.msgId = msgId;
     }
 
     @Override
-    public void resolveDiscardMsg(MessageExt msgExt) {
-        log.error("MsgExt:{} has been checked too many times, so discard it", msgExt);
+    public String toString() {
+        return "ResumeCheckHalfMessageRequestHeader [msgId=" + msgId + "]";
     }
 }
diff --git a/pom.xml b/pom.xml
index 7ab8991..7d183e7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -455,7 +455,7 @@
         <dependency>
             <groupId>org.mockito</groupId>
             <artifactId>mockito-core</artifactId>
-            <version>2.6.3</version>
+            <version>2.23.0</version>
             <scope>test</scope>
         </dependency>
     </dependencies>
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index f00dcef..92371f1 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -537,4 +537,17 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
             brokerAddr, topic, queueId, index, count, consumerGroup
         );
     }
+
+    @Override
+    public boolean resumeCheckHalfMessage(String msgId)
+            throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+        return this.defaultMQAdminExtImpl.resumeCheckHalfMessage(msgId);
+    }
+
+    @Override
+    public boolean resumeCheckHalfMessage(String topic,
+            String msgId)
+            throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+        return this.defaultMQAdminExtImpl.resumeCheckHalfMessage(topic, msgId);
+    }
 }
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 502e9da..210d5a9 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -1025,4 +1025,23 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
             brokerAddr, topic, queueId, index, count, consumerGroup, timeoutMillis
         );
     }
+
+    @Override
+    public boolean resumeCheckHalfMessage(String msgId)
+            throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+        MessageExt msg = this.viewMessage(msgId);
+
+        return this.mqClientInstance.getMQClientAPIImpl().resumeCheckHalfMessage(RemotingUtil.socketAddress2String(msg.getStoreHost()), msgId, timeoutMillis);
+    }
+
+    @Override
+    public boolean resumeCheckHalfMessage(final String topic, final String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+        MessageExt msg = this.viewMessage(topic, msgId);
+        if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) {
+            return this.mqClientInstance.getMQClientAPIImpl().resumeCheckHalfMessage(RemotingUtil.socketAddress2String(msg.getStoreHost()), msgId, timeoutMillis);
+        } else {
+            MessageClientExt msgClient = (MessageClientExt) msg;
+            return this.mqClientInstance.getMQClientAPIImpl().resumeCheckHalfMessage(RemotingUtil.socketAddress2String(msg.getStoreHost()), msgClient.getOffsetMsgId(), timeoutMillis);
+        }
+    }
 }
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index 930785e..d5c75f0 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -273,4 +273,9 @@ public interface MQAdminExt extends MQAdmin {
         final String topic, final int queueId,
         final long index, final int count, final String consumerGroup)
         throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException;
+
+    boolean resumeCheckHalfMessage(String msgId)
+            throws RemotingException, MQClientException, InterruptedException, MQBrokerException;
+
+    boolean resumeCheckHalfMessage(final String topic, final String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException;
 }
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommandTest.java
index be6b636..e757608 100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommandTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommandTest.java
@@ -226,5 +226,20 @@ public class QueryMsgByUniqueKeySubCommandTest {
 
     }
 
+    @Test
+    public void testExecute() throws SubCommandException {
+
+        System.setProperty("rocketmq.namesrv.addr", "127.0.0.1:9876");
 
+        QueryMsgByUniqueKeySubCommand cmd = new QueryMsgByUniqueKeySubCommand();
+        String[] args = new String[]{"-t myTopicTest", "-i 0A3A54F7BF7D18B4AAC28A3FA2CF0000"};
+        Options options = ServerUtil.buildCommandlineOptions(new Options());
+        CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin ", args, cmd.buildCommandlineOptions(options), new PosixParser());
+        cmd.execute(commandLine, options, null);
+
+        args = new String[]{"-t myTopicTest", "-i 0A3A54F7BF7D18B4AAC28A3FA2CF0000", "-g producerGroupName", "-d clientId"};
+        commandLine = ServerUtil.parseCmdLine("mqadmin ", args, cmd.buildCommandlineOptions(options), new PosixParser());
+        cmd.execute(commandLine, options, null);
+
+    }
 }