You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 09:14:17 UTC

[26/99] [abbrv] [partial] incubator-rocketmq git commit: ROCKETMQ-18 Rename package name from com.alibaba to org.apache

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
new file mode 100644
index 0000000..99204b0
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -0,0 +1,1996 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl;
+
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.consumer.PullCallback;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.PullStatus;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.hook.SendMessageContext;
+import org.apache.rocketmq.client.impl.consumer.PullResultExt;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
+import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.MQVersion;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.admin.ConsumeStats;
+import org.apache.rocketmq.common.admin.TopicStatsTable;
+import org.apache.rocketmq.common.message.*;
+import org.apache.rocketmq.common.namesrv.TopAddressing;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.*;
+import org.apache.rocketmq.common.protocol.header.*;
+import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterMessageFilterClassRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.*;
+import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.remoting.InvokeCallback;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.RemotingClient;
+import org.apache.rocketmq.remoting.exception.*;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
+import org.apache.rocketmq.remoting.netty.ResponseFuture;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.slf4j.Logger;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class MQClientAPIImpl {
+
+    private final static Logger log = ClientLogger.getLog();
+    public static boolean sendSmartMsg =
+            Boolean.parseBoolean(System.getProperty("org.apache.rocketmq.client.sendSmartMsg", "true"));
+
+    static {
+        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
+    }
+
+    private final RemotingClient remotingClient;
+    private final TopAddressing topAddressing;
+    private final ClientRemotingProcessor clientRemotingProcessor;
+    private String nameSrvAddr = null;
+    private ClientConfig clientConfig;
+
+    public MQClientAPIImpl(final NettyClientConfig nettyClientConfig, final ClientRemotingProcessor clientRemotingProcessor,
+                           RPCHook rpcHook, final ClientConfig clientConfig) {
+        this.clientConfig = clientConfig;
+        topAddressing = new TopAddressing(MixAll.WS_ADDR, clientConfig.getUnitName());
+        this.remotingClient = new NettyRemotingClient(nettyClientConfig, null);
+        this.clientRemotingProcessor = clientRemotingProcessor;
+
+        this.remotingClient.registerRPCHook(rpcHook);
+        this.remotingClient.registerProcessor(RequestCode.CHECK_TRANSACTION_STATE, this.clientRemotingProcessor, null);
+
+        this.remotingClient.registerProcessor(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, this.clientRemotingProcessor, null);
+
+        this.remotingClient.registerProcessor(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, this.clientRemotingProcessor, null);
+
+        this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT, this.clientRemotingProcessor, null);
+
+        this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_RUNNING_INFO, this.clientRemotingProcessor, null);
+
+        this.remotingClient.registerProcessor(RequestCode.CONSUME_MESSAGE_DIRECTLY, this.clientRemotingProcessor, null);
+    }
+
+    public List<String> getNameServerAddressList() {
+        return this.remotingClient.getNameServerAddressList();
+    }
+
+    public RemotingClient getRemotingClient() {
+        return remotingClient;
+    }
+
+    public String fetchNameServerAddr() {
+        try {
+            String addrs = this.topAddressing.fetchNSAddr();
+            if (addrs != null) {
+                if (!addrs.equals(this.nameSrvAddr)) {
+                    log.info("name server address changed, old=" + this.nameSrvAddr + ", new=" + addrs);
+                    this.updateNameServerAddressList(addrs);
+                    this.nameSrvAddr = addrs;
+                    return nameSrvAddr;
+                }
+            }
+        } catch (Exception e) {
+            log.error("fetchNameServerAddr Exception", e);
+        }
+        return nameSrvAddr;
+    }
+
+    public void updateNameServerAddressList(final String addrs) {
+        List<String> lst = new ArrayList<String>();
+        String[] addrArray = addrs.split(";");
+        if (addrArray != null) {
+            for (String addr : addrArray) {
+                lst.add(addr);
+            }
+
+            this.remotingClient.updateNameServerAddressList(lst);
+        }
+    }
+
+    public void start() {
+        this.remotingClient.start();
+    }
+
+    public void shutdown() {
+        this.remotingClient.shutdown();
+    }
+
+    public void createSubscriptionGroup(final String addr, final SubscriptionGroupConfig config, final long timeoutMillis)
+            throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP, null);
+
+        byte[] body = RemotingSerializable.encode(config);
+        request.setBody(body);
+
+        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+                request, timeoutMillis);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                return;
+            }
+            default:
+                break;
+        }
+
+        throw new MQClientException(response.getCode(), response.getRemark());
+
+    }
+
+    public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final long timeoutMillis)
+            throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
+        requestHeader.setTopic(topicConfig.getTopicName());
+        requestHeader.setDefaultTopic(defaultTopic);
+        requestHeader.setReadQueueNums(topicConfig.getReadQueueNums());
+        requestHeader.setWriteQueueNums(topicConfig.getWriteQueueNums());
+        requestHeader.setPerm(topicConfig.getPerm());
+        requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name());
+        requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag());
+        requestHeader.setOrder(topicConfig.isOrder());
+
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);
+
+        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+                request, timeoutMillis);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                return;
+            }
+            default:
+                break;
+        }
+
+        throw new MQClientException(response.getCode(), response.getRemark());
+    }
+
+    public SendResult sendMessage(//
+                                  final String addr, // 1
+                                  final String brokerName, // 2
+                                  final Message msg, // 3
+                                  final SendMessageRequestHeader requestHeader, // 4
+                                  final long timeoutMillis, // 5
+                                  final CommunicationMode communicationMode, // 6
+                                  final SendMessageContext context, // 7
+                                  final DefaultMQProducerImpl producer // 8
+    ) throws RemotingException, MQBrokerException, InterruptedException {
+        return sendMessage(addr, brokerName, msg, requestHeader, timeoutMillis, communicationMode, null, null, null, 0, context, producer);
+    }
+
+    public SendResult sendMessage(//
+                                  final String addr, // 1
+                                  final String brokerName, // 2
+                                  final Message msg, // 3
+                                  final SendMessageRequestHeader requestHeader, // 4
+                                  final long timeoutMillis, // 5
+                                  final CommunicationMode communicationMode, // 6
+                                  final SendCallback sendCallback, // 7
+                                  final TopicPublishInfo topicPublishInfo, // 8
+                                  final MQClientInstance instance, // 9
+                                  final int retryTimesWhenSendFailed, // 10
+                                  final SendMessageContext context, // 11
+                                  final DefaultMQProducerImpl producer // 12
+    ) throws RemotingException, MQBrokerException, InterruptedException {
+        RemotingCommand request = null;
+        if (sendSmartMsg) {
+            SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
+            request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
+        } else {
+            request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
+        }
+
+        request.setBody(msg.getBody());
+
+        switch (communicationMode) {
+            case ONEWAY:
+                this.remotingClient.invokeOneway(addr, request, timeoutMillis);
+                return null;
+            case ASYNC:
+                final AtomicInteger times = new AtomicInteger();
+                this.sendMessageAsync(addr, brokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
+                        retryTimesWhenSendFailed, times, context, producer);
+                return null;
+            case SYNC:
+                return this.sendMessageSync(addr, brokerName, msg, timeoutMillis, request);
+            default:
+                assert false;
+                break;
+        }
+
+        return null;
+    }
+
+    private SendResult sendMessageSync(//
+                                       final String addr, //
+                                       final String brokerName, //
+                                       final Message msg, //
+                                       final long timeoutMillis, //
+                                       final RemotingCommand request//
+    ) throws RemotingException, MQBrokerException, InterruptedException {
+        RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
+        assert response != null;
+        return this.processSendResponse(brokerName, msg, response);
+    }
+
+    private void sendMessageAsync(//
+                                  final String addr, //
+                                  final String brokerName, //
+                                  final Message msg, //
+                                  final long timeoutMillis, //
+                                  final RemotingCommand request, //
+                                  final SendCallback sendCallback, //
+                                  final TopicPublishInfo topicPublishInfo, //
+                                  final MQClientInstance instance, //
+                                  final int retryTimesWhenSendFailed, //
+                                  final AtomicInteger times, //
+                                  final SendMessageContext context, //
+                                  final DefaultMQProducerImpl producer //
+    ) throws InterruptedException, RemotingException {
+        this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
+            @Override
+            public void operationComplete(ResponseFuture responseFuture) {
+                RemotingCommand response = responseFuture.getResponseCommand();
+                if (null == sendCallback && response != null) {
+
+                    try {
+                        SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);
+                        if (context != null && sendResult != null) {
+                            context.setSendResult(sendResult);
+                            context.getProducer().executeSendMessageHookAfter(context);
+                        }
+                    } catch (Throwable e) {
+                        //
+                    }
+
+                    producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
+                    return;
+                }
+
+                if (response != null) {
+                    try {
+                        SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);
+                        assert sendResult != null;
+                        if (context != null) {
+                            context.setSendResult(sendResult);
+                            context.getProducer().executeSendMessageHookAfter(context);
+                        }
+
+                        try {
+                            sendCallback.onSuccess(sendResult);
+                        } catch (Throwable e) {
+                        }
+
+                        producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
+                    } catch (Exception e) {
+                        producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
+                        onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
+                                retryTimesWhenSendFailed, times, e, context, false, producer);
+                    }
+                } else {
+                    producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
+                    if (!responseFuture.isSendRequestOK()) {
+                        MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());
+                        onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
+                                retryTimesWhenSendFailed, times, ex, context, true, producer);
+                    } else if (responseFuture.isTimeout()) {
+                        MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
+                                responseFuture.getCause());
+                        onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
+                                retryTimesWhenSendFailed, times, ex, context, true, producer);
+                    } else {
+                        MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());
+                        onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
+                                retryTimesWhenSendFailed, times, ex, context, true, producer);
+                    }
+                }
+            }
+        });
+    }
+
+
+    private void onExceptionImpl(final String brokerName, //
+                                 final Message msg, //
+                                 final long timeoutMillis, //
+                                 final RemotingCommand request, //
+                                 final SendCallback sendCallback, //
+                                 final TopicPublishInfo topicPublishInfo, //
+                                 final MQClientInstance instance, //
+                                 final int timesTotal, //
+                                 final AtomicInteger curTimes, //
+                                 final Exception e, //
+                                 final SendMessageContext context, //
+                                 final boolean needRetry, //
+                                 final DefaultMQProducerImpl producer // 12
+    ) {
+        int tmp = curTimes.incrementAndGet();
+        if (needRetry && tmp <= timesTotal) {
+            MessageQueue tmpmq = producer.selectOneMessageQueue(topicPublishInfo, brokerName);
+            String addr = instance.findBrokerAddressInPublish(tmpmq.getBrokerName());
+            log.info("async send msg by retry {} times. topic={}, brokerAddr={}, brokerName={}", tmp, msg.getTopic(), addr,
+                    tmpmq.getBrokerName());
+            try {
+                request.setOpaque(RemotingCommand.createNewRequestId());
+                sendMessageAsync(addr, tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
+                        timesTotal, curTimes, context, producer);
+            } catch (InterruptedException e1) {
+                onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
+                        context, false, producer);
+            } catch (RemotingConnectException e1) {
+                producer.updateFaultItem(brokerName, 3000, true);
+                onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
+                        context, true, producer);
+            } catch (RemotingTooMuchRequestException e1) {
+                onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
+                        context, false, producer);
+            } catch (RemotingException e1) {
+                producer.updateFaultItem(brokerName, 3000, true);
+                onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
+                        context, true, producer);
+            }
+        } else {
+            if (context != null) {
+                context.setException(e);
+                context.getProducer().executeSendMessageHookAfter(context);
+            }
+            try {
+                sendCallback.onException(e);
+            } catch (Exception e2) {
+            }
+        }
+    }
+
+
+    private SendResult processSendResponse(//
+                                           final String brokerName, //
+                                           final Message msg, //
+                                           final RemotingCommand response//
+    ) throws MQBrokerException, RemotingCommandException {
+        switch (response.getCode()) {
+            case ResponseCode.FLUSH_DISK_TIMEOUT:
+            case ResponseCode.FLUSH_SLAVE_TIMEOUT:
+            case ResponseCode.SLAVE_NOT_AVAILABLE: {
+                // TODO LOG
+            }
+            case ResponseCode.SUCCESS: {
+                SendStatus sendStatus = SendStatus.SEND_OK;
+                switch (response.getCode()) {
+                    case ResponseCode.FLUSH_DISK_TIMEOUT:
+                        sendStatus = SendStatus.FLUSH_DISK_TIMEOUT;
+                        break;
+                    case ResponseCode.FLUSH_SLAVE_TIMEOUT:
+                        sendStatus = SendStatus.FLUSH_SLAVE_TIMEOUT;
+                        break;
+                    case ResponseCode.SLAVE_NOT_AVAILABLE:
+                        sendStatus = SendStatus.SLAVE_NOT_AVAILABLE;
+                        break;
+                    case ResponseCode.SUCCESS:
+                        sendStatus = SendStatus.SEND_OK;
+                        break;
+                    default:
+                        assert false;
+                        break;
+                }
+
+                SendMessageResponseHeader responseHeader =
+                        (SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class);
+
+                MessageQueue messageQueue = new MessageQueue(msg.getTopic(), brokerName, responseHeader.getQueueId());
+
+                SendResult sendResult = new SendResult(sendStatus,
+                        MessageClientIDSetter.getUniqID(msg),
+                        responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset());
+                sendResult.setTransactionId(responseHeader.getTransactionId());
+                String regionId = response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION);
+                String traceOn = response.getExtFields().get(MessageConst.PROPERTY_TRACE_SWITCH);
+                if (regionId == null || regionId.isEmpty()) {
+                    regionId = MixAll.DEFAULT_TRACE_REGION_ID;
+                }
+                if (traceOn != null && traceOn.equals("false")) {
+                    sendResult.setTraceOn(false);
+                } else {
+                    sendResult.setTraceOn(true);
+                }
+                sendResult.setRegionId(regionId);
+                return sendResult;
+            }
+            default:
+                break;
+        }
+
+        throw new MQBrokerException(response.getCode(), response.getRemark());
+    }
+
+
+    public PullResult pullMessage(//
+                                  final String addr, //
+                                  final PullMessageRequestHeader requestHeader, //
+                                  final long timeoutMillis, //
+                                  final CommunicationMode communicationMode, //
+                                  final PullCallback pullCallback//
+    ) throws RemotingException, MQBrokerException, InterruptedException {
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
+
+        switch (communicationMode) {
+            case ONEWAY:
+                assert false;
+                return null;
+            case ASYNC:
+                this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);
+                return null;
+            case SYNC:
+                return this.pullMessageSync(addr, request, timeoutMillis);
+            default:
+                assert false;
+                break;
+        }
+
+        return null;
+    }
+
+
+    private void pullMessageAsync(//
+                                  final String addr, // 1
+                                  final RemotingCommand request, //
+                                  final long timeoutMillis, //
+                                  final PullCallback pullCallback//
+    ) throws RemotingException, InterruptedException {
+        this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
+            @Override
+            public void operationComplete(ResponseFuture responseFuture) {
+                RemotingCommand response = responseFuture.getResponseCommand();
+                if (response != null) {
+                    try {
+                        PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response);
+                        assert pullResult != null;
+                        pullCallback.onSuccess(pullResult);
+                    } catch (Exception e) {
+                        pullCallback.onException(e);
+                    }
+                } else {
+                    if (!responseFuture.isSendRequestOK()) {
+                        pullCallback.onException(new MQClientException("send request failed", responseFuture.getCause()));
+                    } else if (responseFuture.isTimeout()) {
+                        pullCallback.onException(new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
+                                responseFuture.getCause()));
+                    } else {
+                        pullCallback.onException(new MQClientException("unknow reseaon", responseFuture.getCause()));
+                    }
+                }
+            }
+        });
+    }
+
+    private PullResult pullMessageSync(//
+                                       final String addr, // 1
+                                       final RemotingCommand request, // 2
+                                       final long timeoutMillis// 3
+    ) throws RemotingException, InterruptedException, MQBrokerException {
+        RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
+        assert response != null;
+        return this.processPullResponse(response);
+    }
+
+    private PullResult processPullResponse(final RemotingCommand response) throws MQBrokerException, RemotingCommandException {
+        PullStatus pullStatus = PullStatus.NO_NEW_MSG;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS:
+                pullStatus = PullStatus.FOUND;
+                break;
+            case ResponseCode.PULL_NOT_FOUND:
+                pullStatus = PullStatus.NO_NEW_MSG;
+                break;
+            case ResponseCode.PULL_RETRY_IMMEDIATELY:
+                pullStatus = PullStatus.NO_MATCHED_MSG;
+                break;
+            case ResponseCode.PULL_OFFSET_MOVED:
+                pullStatus = PullStatus.OFFSET_ILLEGAL;
+                break;
+
+            default:
+                throw new MQBrokerException(response.getCode(), response.getRemark());
+        }
+
+        PullMessageResponseHeader responseHeader =
+                (PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class);
+
+        return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(),
+                responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody());
+    }
+
+    public MessageExt viewMessage(final String addr, final long phyoffset, final long timeoutMillis)
+            throws RemotingException, MQBrokerException, InterruptedException {
+        ViewMessageRequestHeader requestHeader = new ViewMessageRequestHeader();
+        requestHeader.setOffset(phyoffset);
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.VIEW_MESSAGE_BY_ID, requestHeader);
+
+        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+                request, timeoutMillis);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                ByteBuffer byteBuffer = ByteBuffer.wrap(response.getBody());
+                MessageExt messageExt = MessageDecoder.clientDecode(byteBuffer, true);
+                return messageExt;
+            }
+            default:
+                break;
+        }
+
+        throw new MQBrokerException(response.getCode(), response.getRemark());
+    }
+
+
+    public long searchOffset(final String addr, final String topic, final int queueId, final long timestamp, final long timeoutMillis)
+            throws RemotingException, MQBrokerException, InterruptedException {
+        SearchOffsetRequestHeader requestHeader = new SearchOffsetRequestHeader();
+        requestHeader.setTopic(topic);
+        requestHeader.setQueueId(queueId);
+        requestHeader.setTimestamp(timestamp);
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, requestHeader);
+
+        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+                request, timeoutMillis);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                SearchOffsetResponseHeader responseHeader =
+                        (SearchOffsetResponseHeader) response.decodeCommandCustomHeader(SearchOffsetResponseHeader.class);
+                return responseHeader.getOffset();
+            }
+            default:
+                break;
+        }
+
+        throw new MQBrokerException(response.getCode(), response.getRemark());
+    }
+
+
+    public long getMaxOffset(final String addr, final String topic, final int queueId, final long timeoutMillis)
+            throws RemotingException, MQBrokerException, InterruptedException {
+        GetMaxOffsetRequestHeader requestHeader = new GetMaxOffsetRequestHeader();
+        requestHeader.setTopic(topic);
+        requestHeader.setQueueId(queueId);
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MAX_OFFSET, requestHeader);
+
+        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+                request, timeoutMillis);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                GetMaxOffsetResponseHeader responseHeader =
+                        (GetMaxOffsetResponseHeader) response.decodeCommandCustomHeader(GetMaxOffsetResponseHeader.class);
+
+                return responseHeader.getOffset();
+            }
+            default:
+                break;
+        }
+
+        throw new MQBrokerException(response.getCode(), response.getRemark());
+    }
+
+
+    public List<String> getConsumerIdListByGroup(//
+                                                 final String addr, //
+                                                 final String consumerGroup, //
+                                                 final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
+            MQBrokerException, InterruptedException {
+        GetConsumerListByGroupRequestHeader requestHeader = new GetConsumerListByGroupRequestHeader();
+        requestHeader.setConsumerGroup(consumerGroup);
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_LIST_BY_GROUP, requestHeader);
+
+        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+                request, timeoutMillis);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                if (response.getBody() != null) {
+                    GetConsumerListByGroupResponseBody body =
+                            GetConsumerListByGroupResponseBody.decode(response.getBody(), GetConsumerListByGroupResponseBody.class);
+                    return body.getConsumerIdList();
+                }
+            }
+            default:
+                break;
+        }
+
+        throw new MQBrokerException(response.getCode(), response.getRemark());
+    }
+
+
+    public long getMinOffset(final String addr, final String topic, final int queueId, final long timeoutMillis)
+            throws RemotingException, MQBrokerException, InterruptedException {
+        GetMinOffsetRequestHeader requestHeader = new GetMinOffsetRequestHeader();
+        requestHeader.setTopic(topic);
+        requestHeader.setQueueId(queueId);
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MIN_OFFSET, requestHeader);
+
+        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+                request, timeoutMillis);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                GetMinOffsetResponseHeader responseHeader =
+                        (GetMinOffsetResponseHeader) response.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class);
+
+                return responseHeader.getOffset();
+            }
+            default:
+                break;
+        }
+
+        throw new MQBrokerException(response.getCode(), response.getRemark());
+    }
+
+
+    public long getEarliestMsgStoretime(final String addr, final String topic, final int queueId, final long timeoutMillis)
+            throws RemotingException, MQBrokerException, InterruptedException {
+        GetEarliestMsgStoretimeRequestHeader requestHeader = new GetEarliestMsgStoretimeRequestHeader();
+        requestHeader.setTopic(topic);
+        requestHeader.setQueueId(queueId);
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_EARLIEST_MSG_STORETIME, requestHeader);
+
+        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+                request, timeoutMillis);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                GetEarliestMsgStoretimeResponseHeader responseHeader =
+                        (GetEarliestMsgStoretimeResponseHeader) response.decodeCommandCustomHeader(GetEarliestMsgStoretimeResponseHeader.class);
+
+                return responseHeader.getTimestamp();
+            }
+            default:
+                break;
+        }
+
+        throw new MQBrokerException(response.getCode(), response.getRemark());
+    }
+
+
+    public long queryConsumerOffset(//
+                                    final String addr, //
+                                    final QueryConsumerOffsetRequestHeader requestHeader, //
+                                    final long timeoutMillis//
+    ) throws RemotingException, MQBrokerException, InterruptedException {
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUMER_OFFSET, requestHeader);
+
+        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+                request, timeoutMillis);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                QueryConsumerOffsetResponseHeader responseHeader =
+                        (QueryConsumerOffsetResponseHeader) response.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class);
+
+                return responseHeader.getOffset();
+            }
+            default:
+                break;
+        }
+
+        throw new MQBrokerException(response.getCode(), response.getRemark());
+    }
+
+
+    public void updateConsumerOffset(//
+                                     final String addr, //
+                                     final UpdateConsumerOffsetRequestHeader requestHeader, //
+                                     final long timeoutMillis//
+    ) throws RemotingException, MQBrokerException, InterruptedException {
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader);
+
+        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+                request, timeoutMillis);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                return;
+            }
+            default:
+                break;
+        }
+
+        throw new MQBrokerException(response.getCode(), response.getRemark());
+    }
+
+
+    public void updateConsumerOffsetOneway(//
+                                           final String addr, //
+                                           final UpdateConsumerOffsetRequestHeader requestHeader, //
+                                           final long timeoutMillis//
+    ) throws RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException,
+            InterruptedException {
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader);
+
+        this.remotingClient.invokeOneway(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
+    }
+
+
+    public void sendHearbeat(//
+                             final String addr, //
+                             final HeartbeatData heartbeatData, //
+                             final long timeoutMillis//
+    ) throws RemotingException, MQBrokerException, InterruptedException {
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
+
+        request.setBody(heartbeatData.encode());
+        RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                return;
+            }
+            default:
+                break;
+        }
+
+        throw new MQBrokerException(response.getCode(), response.getRemark());
+    }
+
+
+    public void unregisterClient(//
+                                 final String addr, //
+                                 final String clientID, //
+                                 final String producerGroup, //
+                                 final String consumerGroup, //
+                                 final long timeoutMillis//
+    ) throws RemotingException, MQBrokerException, InterruptedException {
+        final UnregisterClientRequestHeader requestHeader = new UnregisterClientRequestHeader();
+        requestHeader.setClientID(clientID);
+        requestHeader.setProducerGroup(producerGroup);
+        requestHeader.setConsumerGroup(consumerGroup);
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_CLIENT, requestHeader);
+
+        RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                return;
+            }
+            default:
+                break;
+        }
+
+        throw new MQBrokerException(response.getCode(), response.getRemark());
+    }
+
+
+    public void endTransactionOneway(//
+                                     final String addr, //
+                                     final EndTransactionRequestHeader requestHeader, //
+                                     final String remark, //
+                                     final long timeoutMillis//
+    ) throws RemotingException, MQBrokerException, InterruptedException {
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader);
+
+        request.setRemark(remark);
+        this.remotingClient.invokeOneway(addr, request, timeoutMillis);
+    }
+
+
+    public void queryMessage(
+            final String addr,
+            final QueryMessageRequestHeader requestHeader,
+            final long timeoutMillis,
+            final InvokeCallback invokeCallback,
+            final Boolean isUnqiueKey
+    ) throws RemotingException, MQBrokerException, InterruptedException {
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_MESSAGE, requestHeader);
+        request.addExtField(MixAll.UNIQUE_MSG_QUERY_FLAG, isUnqiueKey.toString());
+        this.remotingClient.invokeAsync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis,
+                invokeCallback);
+    }
+
+
+    public boolean registerClient(final String addr, final HeartbeatData heartbeat, final long timeoutMillis)
+            throws RemotingException, InterruptedException {
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
+
+        request.setBody(heartbeat.encode());
+        RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
+        return response.getCode() == ResponseCode.SUCCESS;
+    }
+
+
+    public void consumerSendMessageBack(
+            final String addr,
+            final MessageExt msg,
+            final String consumerGroup,
+            final int delayLevel,
+            final long timeoutMillis,
+            final int maxConsumeRetryTimes
+    ) throws RemotingException, MQBrokerException, InterruptedException {
+        ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader();
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);
+
+        requestHeader.setGroup(consumerGroup);
+        requestHeader.setOriginTopic(msg.getTopic());
+        requestHeader.setOffset(msg.getCommitLogOffset());
+        requestHeader.setDelayLevel(delayLevel);
+        requestHeader.setOriginMsgId(msg.getMsgId());
+        requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);
+
+        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+                request, timeoutMillis);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                return;
+            }
+            default:
+                break;
+        }
+
+        throw new MQBrokerException(response.getCode(), response.getRemark());
+    }
+
+
+    public Set<MessageQueue> lockBatchMQ(//
+                                         final String addr, //
+                                         final LockBatchRequestBody requestBody, //
+                                         final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null);
+
+        request.setBody(requestBody.encode());
+        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+                request, timeoutMillis);
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                LockBatchResponseBody responseBody = LockBatchResponseBody.decode(response.getBody(), LockBatchResponseBody.class);
+                Set<MessageQueue> messageQueues = responseBody.getLockOKMQSet();
+                return messageQueues;
+            }
+            default:
+                break;
+        }
+
+        throw new MQBrokerException(response.getCode(), response.getRemark());
+    }
+
+
+    public void unlockBatchMQ(//
+                              final String addr, //
+                              final UnlockBatchRequestBody requestBody, //
+                              final long timeoutMillis, //
+                              final boolean oneway//
+    ) throws RemotingException, MQBrokerException, InterruptedException {
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNLOCK_BATCH_MQ, null);
+
+        request.setBody(requestBody.encode());
+
+        if (oneway) {
+            this.remotingClient.invokeOneway(addr, request, timeoutMillis);
+        } else {
+            RemotingCommand response = this.remotingClient
+                    .invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
+            switch (response.getCode()) {
+                case ResponseCode.SUCCESS: {
+                    return;
+                }
+                default:
+                    break;
+            }
+
+            throw new MQBrokerException(response.getCode(), response.getRemark());
+        }
+    }
+
+
+    public TopicStatsTable getTopicStatsInfo(final String addr, final String topic, final long timeoutMillis) throws InterruptedException,
+            RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+        GetTopicStatsInfoRequestHeader requestHeader = new GetTopicStatsInfoRequestHeader();
+        requestHeader.setTopic(topic);
+
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TOPIC_STATS_INFO, requestHeader);
+
+        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+                request, timeoutMillis);
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                TopicStatsTable topicStatsTable = TopicStatsTable.decode(response.getBody(), TopicStatsTable.class);
+                return topicStatsTable;
+            }
+            default:
+                break;
+        }
+
+        throw new MQBrokerException(response.getCode(), response.getRemark());
+    }
+
+
+    public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final long timeoutMillis)
+            throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException,
+            MQBrokerException {
+        return getConsumeStats(addr, consumerGroup, null, timeoutMillis);
+    }
+
+
+    public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final String topic, final long timeoutMillis)
+            throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException,
+            MQBrokerException {
+        GetConsumeStatsRequestHeader requestHeader = new GetConsumeStatsRequestHeader();
+        requestHeader.setConsumerGroup(consumerGroup);
+        requestHeader.setTopic(topic);
+
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUME_STATS, requestHeader);
+
+        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+                request, timeoutMillis);
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                ConsumeStats consumeStats = ConsumeStats.decode(response.getBody(), ConsumeStats.class);
+                return consumeStats;
+            }
+            default:
+                break;
+        }
+
+        throw new MQBrokerException(response.getCode(), response.getRemark());
+    }
+
+
+    public ProducerConnection getProducerConnectionList(final String addr, final String producerGroup, final long timeoutMillis)
+            throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
+            MQBrokerException {
+        GetProducerConnectionListRequestHeader requestHeader = new GetProducerConnectionListRequestHeader();
+        requestHeader.setProducerGroup(producerGroup);
+
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_PRODUCER_CONNECTION_LIST, requestHeader);
+
+        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+                request, timeoutMillis);
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                return ProducerConnection.decode(response.getBody(), ProducerConnection.class);
+            }
+            default:
+                break;
+        }
+
+        throw new MQBrokerException(response.getCode(), response.getRemark());
+    }
+
+
+    public ConsumerConnection getConsumerConnectionList(final String addr, final String consumerGroup, final long timeoutMillis)
+            throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
+            MQBrokerException {
+        GetConsumerConnectionListRequestHeader requestHeader = new GetConsumerConnectionListRequestHeader();
+        requestHeader.setConsumerGroup(consumerGroup);
+
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_CONNECTION_LIST, requestHeader);
+
+        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+                request, timeoutMillis);
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                ConsumerConnection consumerConnection = ConsumerConnection.decode(response.getBody(), ConsumerConnection.class);
+                return consumerConnection;
+            }
+            default:
+                break;
+        }
+
+        throw new MQBrokerException(response.getCode(), response.getRemark());
+    }
+
+
+    public KVTable getBrokerRuntimeInfo(final String addr, final long timeoutMillis) throws RemotingConnectException,
+            RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
+
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_RUNTIME_INFO, null);
+
+        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+                request, timeoutMillis);
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                return KVTable.decode(response.getBody(), KVTable.class);
+            }
+            default:
+                break;
+        }
+
+        throw new MQBrokerException(response.getCode(), response.getRemark());
+    }
+
+
+    public void updateBrokerConfig(final String addr, final Properties properties, final long timeoutMillis)
+            throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
+            MQBrokerException, UnsupportedEncodingException {
+
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_BROKER_CONFIG, null);
+
+        String str = MixAll.properties2String(properties);
+        if (str != null && str.length() > 0) {
+            request.setBody(str.getBytes(MixAll.DEFAULT_CHARSET));
+            RemotingCommand response = this.remotingClient
+                    .invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
+            switch (response.getCode()) {
+                case ResponseCode.SUCCESS: {
+                    return;
+                }
+                default:
+                    break;
+            }
+
+            throw new MQBrokerException(response.getCode(), response.getRemark());
+        }
+    }
+
+
+    public Properties getBrokerConfig(final String addr, final long timeoutMillis)
+            throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
+            MQBrokerException, UnsupportedEncodingException {
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CONFIG, null);
+
+        RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                return MixAll.string2Properties(new String(response.getBody(), MixAll.DEFAULT_CHARSET));
+            }
+            default:
+                break;
+        }
+
+        throw new MQBrokerException(response.getCode(), response.getRemark());
+    }
+
+    public ClusterInfo getBrokerClusterInfo(final long timeoutMillis) throws InterruptedException, RemotingTimeoutException,
+            RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_INFO, null);
+
+        RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                ClusterInfo responseBody = ClusterInfo.decode(response.getBody(), ClusterInfo.class);
+                return responseBody;
+            }
+            default:
+                break;
+        }
+
+        throw new MQBrokerException(response.getCode(), response.getRemark());
+    }
+
+
+    public TopicRouteData getDefaultTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis)
+            throws RemotingException, MQClientException, InterruptedException {
+        GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
+        requestHeader.setTopic(topic);
+
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);
+
+        RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.TOPIC_NOT_EXIST: {
+                // TODO LOG
+                break;
+            }
+            case ResponseCode.SUCCESS: {
+                byte[] body = response.getBody();
+                if (body != null) {
+                    return TopicRouteData.decode(body, TopicRouteData.class);
+                }
+            }
+            default:
+                break;
+        }
+
+        throw new MQClientException(response.getCode(), response.getRemark());
+    }
+
+
+    public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis)
+            throws RemotingException, MQClientException, InterruptedException {
+        GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
+        requestHeader.setTopic(topic);
+
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);
+
+        RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.TOPIC_NOT_EXIST: {
+                if (!topic.equals(MixAll.DEFAULT_TOPIC))
+                    log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);
+                break;
+            }
+            case ResponseCode.SUCCESS: {
+                byte[] body = response.getBody();
+                if (body != null) {
+                    return TopicRouteData.decode(body, TopicRouteData.class);
+                }
+            }
+            default:
+                break;
+        }
+
+        throw new MQClientException(response.getCode(), response.getRemark());
+    }
+
+
+    public TopicList getTopicListFromNameServer(final long timeoutMillis)
+            throws RemotingException, MQClientException, InterruptedException {
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER, null);
+
+        RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                byte[] body = response.getBody();
+                if (body != null) {
+                    TopicList topicList = TopicList.decode(body, TopicList.class);
+                    return topicList;
+                }
+            }
+            default:
+                break;
+        }
+
+        throw new MQClientException(response.getCode(), response.getRemark());
+    }
+
+
+    public int wipeWritePermOfBroker(final String namesrvAddr, String brokerName, final long timeoutMillis) throws RemotingCommandException,
+            RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException {
+        WipeWritePermOfBrokerRequestHeader requestHeader = new WipeWritePermOfBrokerRequestHeader();
+        requestHeader.setBrokerName(brokerName);
+
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.WIPE_WRITE_PERM_OF_BROKER, requestHeader);
+
+        RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMillis);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                WipeWritePermOfBrokerResponseHeader responseHeader =
+                        (WipeWritePermOfBrokerResponseHeader) response.decodeCommandCustomHeader(WipeWritePermOfBrokerResponseHeader.class);
+                return responseHeader.getWipeTopicCount();
+            }
+            default:
+                break;
+        }
+
+        throw new MQClientException(response.getCode(), response.getRemark());
+    }
+
+
+    public void deleteTopicInBroker(final String addr, final String topic, final long timeoutMillis)
+            throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        DeleteTopicRequestHeader requestHeader = new DeleteTopicRequestHeader();
+        requestHeader.setTopic(topic);
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_TOPIC_IN_BROKER, requestHeader);
+
+        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+                request, timeoutMillis);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                return;
+            }
+            default:
+                break;
+        }
+
+        throw new MQClientException(response.getCode(), response.getRemark());
+    }
+
+
+    public void deleteTopicInNameServer(final String addr, final String topic, final long timeoutMillis)
+            throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        DeleteTopicRequestHeader requestHeader = new DeleteTopicRequestHeader();
+        requestHeader.setTopic(topic);
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_TOPIC_IN_NAMESRV, requestHeader);
+
+        RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                return;
+            }
+            default:
+                break;
+        }
+
+        throw new MQClientException(response.getCode(), response.getRemark());
+    }
+
+
+    public void deleteSubscriptionGroup(final String addr, final String groupName, final long timeoutMillis)
+            throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        DeleteSubscriptionGroupRequestHeader requestHeader = new DeleteSubscriptionGroupRequestHeader();
+        requestHeader.setGroupName(groupName);
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_SUBSCRIPTIONGROUP, requestHeader);
+
+        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+                request, timeoutMillis);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                return;
+            }
+            default:
+                break;
+        }
+
+        throw new MQClientException(response.getCode(), response.getRemark());
+    }
+
+
+    public String getKVConfigValue(final String namespace, final String key, final long timeoutMillis)
+            throws RemotingException, MQClientException, InterruptedException {
+        GetKVConfigRequestHeader requestHeader = new GetKVConfigRequestHeader();
+        requestHeader.setNamespace(namespace);
+        requestHeader.setKey(key);
+
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_KV_CONFIG, requestHeader);
+
+        RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                GetKVConfigResponseHeader responseHeader =
+                        (GetKVConfigResponseHeader) response.decodeCommandCustomHeader(GetKVConfigResponseHeader.class);
+                return responseHeader.getValue();
+            }
+            default:
+                break;
+        }
+
+        throw new MQClientException(response.getCode(), response.getRemark());
+    }
+
+
+    public void putKVConfigValue(final String namespace, final String key, final String value, final long timeoutMillis)
+            throws RemotingException, MQClientException, InterruptedException {
+        PutKVConfigRequestHeader requestHeader = new PutKVConfigRequestHeader();
+        requestHeader.setNamespace(namespace);
+        requestHeader.setKey(key);
+        requestHeader.setValue(value);
+
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PUT_KV_CONFIG, requestHeader);
+
+        List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
+        if (nameServerAddressList != null) {
+            RemotingCommand errResponse = null;
+            for (String namesrvAddr : nameServerAddressList) {
+                RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMillis);
+                assert response != null;
+                switch (response.getCode()) {
+                    case ResponseCode.SUCCESS: {
+                        break;
+                    }
+                    default:
+                        errResponse = response;
+                }
+            }
+
+            if (errResponse != null) {
+                throw new MQClientException(errResponse.getCode(), errResponse.getRemark());
+            }
+        }
+    }
+
+
+    public void deleteKVConfigValue(final String namespace, final String key, final long timeoutMillis)
+            throws RemotingException, MQClientException, InterruptedException {
+        DeleteKVConfigRequestHeader requestHeader = new DeleteKVConfigRequestHeader();
+        requestHeader.setNamespace(namespace);
+        requestHeader.setKey(key);
+
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_KV_CONFIG, requestHeader);
+
+        List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
+        if (nameServerAddressList != null) {
+            RemotingCommand errResponse = null;
+            for (String namesrvAddr : nameServerAddressList) {
+                RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMillis);
+                assert response != null;
+                switch (response.getCode()) {
+                    case ResponseCode.SUCCESS: {
+                        break;
+                    }
+                    default:
+                        errResponse = response;
+                }
+            }
+            if (errResponse != null) {
+                throw new MQClientException(errResponse.getCode(), errResponse.getRemark());
+            }
+        }
+    }
+
+
+    public KVTable getKVListByNamespace(final String namespace, final long timeoutMillis)
+            throws RemotingException, MQClientException, InterruptedException {
+        GetKVListByNamespaceRequestHeader requestHeader = new GetKVListByNamespaceRequestHeader();
+        requestHeader.setNamespace(namespace);
+
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_KVLIST_BY_NAMESPACE, requestHeader);
+
+        RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                return KVTable.decode(response.getBody(), KVTable.class);
+            }
+            default:
+                break;
+        }
+
+        throw new MQClientException(response.getCode(), response.getRemark());
+    }
+
+
+    public Map<MessageQueue, Long> invokeBrokerToResetOffset(final String addr, final String topic, final String group,
+                                                             final long timestamp, final boolean isForce, final long timeoutMillis)
+            throws RemotingException, MQClientException, InterruptedException {
+        return invokeBrokerToResetOffset(addr, topic, group, timestamp, isForce, timeoutMillis, false);
+    }
+
+
+    public Map<MessageQueue, Long> invokeBrokerToResetOffset(final String addr, final String topic, final String group,
+                                                             final long timestamp, final boolean isForce, final long timeoutMillis, boolean isC)
+            throws RemotingException, MQClientException, InterruptedException {
+        ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
+        requestHeader.setTopic(topic);
+        requestHeader.setGroup(group);
+        requestHeader.setTimestamp(timestamp);
+        requestHeader.setForce(isForce);
+
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_RESET_OFFSET, requestHeader);
+        if (isC) {
+            request.setLanguage(LanguageCode.CPP);
+        }
+
+        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+                request, timeoutMillis);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                if (response.getBody() != null) {
+                    ResetOffsetBody body = ResetOffsetBody.decode(response.getBody(), ResetOffsetBody.class);
+                    return body.getOffsetTable();
+                }
+            }
+            default:
+                break;
+        }
+
+        throw new MQClientException(response.getCode(), response.getRemark());
+    }
+
+
+    public Map<String, Map<MessageQueue, Long>> invokeBrokerToGetConsumerStatus(final String addr, final String topic, final String group,
+                                                                                final String clientAddr, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
+        GetConsumerStatusRequestHeader requestHeader = new GetConsumerStatusRequestHeader();
+        requestHeader.setTopic(topic);
+        requestHeader.setGroup(group);
+        requestHeader.setClientAddr(clientAddr);
+
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_GET_CONSUMER_STATUS, requestHeader);
+
+        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+                request, timeoutMillis);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                if (response.getBody() != null) {
+                    GetConsumerStatusBody body = GetConsumerStatusBody.decode(response.getBody(), GetConsumerStatusBody.class);
+                    return body.getConsumerTable();
+                }
+            }
+            default:
+                break;
+        }
+
+        throw new MQClientException(response.getCode(), response.getRemark());
+    }
+
+
+    public GroupList queryTopicConsumeByWho(final String addr, final String topic, final long timeoutMillis)
+            throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
+            MQBrokerException {
+        QueryTopicConsumeByWhoRequestHeader requestHeader = new QueryTopicConsumeByWhoRequestHeader();
+        requestHeader.setTopic(topic);
+
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_TOPIC_CONSUME_BY_WHO, requestHeader);
+
+        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+                request, timeoutMillis);
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                GroupList groupList = GroupList.decode(response.getBody(), GroupList.class);
+                return groupList;
+            }
+            default:
+                break;
+        }
+
+        throw new MQBrokerException(response.getCode(), response.getRemark());
+    }
+
+
+    public List<QueueTimeSpan> queryConsumeTimeSpan(final String addr, final String topic, final String group, final long timeoutMillis)
+            throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
+            MQBrokerException {
+        QueryConsumeTimeSpanRequestHeader requestHeader = new QueryConsumeTimeSpanRequestHeader();
+        requestHeader.setTopic(topic);
+        requestHeader.setGroup(group);
+
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUME_TIME_SPAN, requestHeader);
+
+        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+                request, timeoutMillis);
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                QueryConsumeTimeSpanBody consumeTimeSpanBody = GroupList.decode(response.getBody(), QueryConsumeTimeSpanBody.class);
+                return consumeTimeSpanBody.getConsumeTimeSpanSet();
+            }
+            default:
+                break;
+        }
+
+        throw new MQBrokerException(response.getCode(), response.getRemark());
+    }
+
+
+    public TopicList getTopicsByCluster(final String cluster, final long timeoutMillis)
+            throws RemotingException, MQClientException, InterruptedException {
+        GetTopicsByClusterRequestHeader requestHeader = new GetTopicsByClusterRequestHeader();
+        requestHeader.setCluster(cluster);
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TOPICS_BY_CLUSTER, requestHeader);
+
+        RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                byte[] body = response.getBody();
+                if (body != null) {
+                    TopicList topicList = TopicList.decode(body, TopicList.class);
+                    return topicList;
+                }
+            }
+            default:
+                break;
+        }
+
+        throw new MQClientException(response.getCode(), response.getRemark());
+    }
+
+
+    public void registerMessageFilterClass(final String addr, //
+                                           final String consumerGroup, //
+                                           final String topic, //
+                                           final String className, //
+                                           final int classCRC, //
+                                           final byte[] classBody, //
+                                           final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
+            InterruptedException, MQBrokerException {
+        RegisterMessageFilterClassRequestHeader requestHeader = new RegisterMessageFilterClassRequestHeader();
+        requestHeader.setConsumerGroup(consumerGroup);
+        requestHeader.setClassName(className);
+        requestHeader.setTopic(topic);
+        requestHeader.setClassCRC(classCRC);
+
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_MESSAGE_FILTER_CLASS, requestHeader);
+        request.setBody(classBody);
+        RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                return;
+            }
+            default:
+                break;
+        }
+
+        throw new MQBrokerException(response.getCode(), response.getRemark());
+    }
+
+
+    public TopicList getSystemTopicList(final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS, null);
+
+        RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                byte[] body = response.getBody();
+                if (body != null) {
+                    TopicList topicList = TopicList.decode(response.getBody(), TopicList.class);
+                    if (topicList.getTopicList() != null && !topicList.getTopicList().isEmpty()
+                            && !UtilAll.isBlank(topicList.getBrokerAddr())) {
+                        TopicList tmp = getSystemTopicListFromBroker(topicList.getBrokerAddr(), timeoutMillis);
+                        if (tmp.getTopicList() != null && !tmp.getTopicList().isEmpty()) {
+                            topicList.getTopicList().addAll(tmp.getTopicList());
+                        }
+                    }
+                    return topicList;
+                }
+            }
+            default:
+                break;
+        }
+
+        throw new MQClientException(response.getCode(), response.getRemark());
+    }
+
+
+    public TopicList getSystemTopicListFromBroker(final String addr, final long timeoutMillis)
+            throws RemotingException, MQClientException, InterruptedException {
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_BROKER, null);
+
+        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+                request, timeoutMillis);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                byte[] body = response.getBody();
+                if (body != null) {
+                    TopicList topicList = TopicList.decode(body, TopicList.class);
+                    return topicList;
+                }
+            }
+            default:
+                break;
+        }
+
+        throw new MQClientException(response.getCode(), response.getRemark());
+    }
+
+
+    public boolean cleanExpiredConsumeQueue(final String addr, long timeoutMillis) throws MQClientException, RemotingConnectException,
+            RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CLEAN_EXPIRED_CONSUMEQUEUE, null);
+        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+                request, timeoutMillis);
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                return true;
+            }
+            default:
+                break;
+        }
+
+        throw new MQClientException(response.getCode(), response.getRemark());
+    }
+
+
+    public boolean cleanUnusedTopicByAddr(final String addr, long timeoutMillis) throws MQClientException, RemotingConnectException,
+            RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CLEAN_UNUSED_TOPIC, null);
+        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+                request, timeoutMillis);
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                return true;
+            }
+            default:
+                break;
+        }
+
+        throw new MQClientException(response.getCode(), response.getRemark());
+    }
+
+    public ConsumerRunningInfo getConsumerRunningInfo(final String addr, String consumerGroup, String clientId, boolean jstack,
+                                                      final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
+        GetConsumerRunningInfoRequestHeader requestHeader = new GetConsumerRunningInfoRequestHeader();
+        requestHeader.setConsumerGroup(consumerGroup);
+        requestHeader.setClientId(clientId);
+        requestHeader.setJstackEnable(jstack);
+
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_RUNNING_INFO, requestHeader);
+
+        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+                request, timeoutMillis);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                byte[] body = response.getBody();
+                if (body != null) {
+                    ConsumerRunningInfo info = ConsumerRunningInfo.decode(body, ConsumerRunningInfo.class);
+                    return info;
+                }
+            }
+            default:
+                break;
+        }
+
+        throw new MQClientException(response.getCode(), response.getRemark());
+    }
+
+    public ConsumeMessageDirectlyResult consumeMessageDirectly(final String addr, //
+                                                               String consumerGroup, //
+                                                               String clientId, //
+                                                               String msgId, //
+                                                               final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
+        ConsumeMessageDirectlyResultRequestHeader requestHeader = new ConsumeMessageDirectlyResultRequestHeader();
+        requestHeader.setConsumerGroup(consumerGroup);
+        requestHeader.setClientId(clientId);
+        requestHeader.setMsgId(msgId);
+
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUME_MESSAGE_DIRECTLY, requestHeader);
+
+        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+                request, timeoutMillis);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                byte[] body = response.getBody();
+                if (body != null) {
+                    ConsumeMessageDirectlyResult info = ConsumeMessageDirectlyResult.decode(body, ConsumeMessageDirectlyResult.class);
+                    return info;
+                }
+            }
+            default:
+                break;
+        }
+
+        throw new MQClientException(response.getCode(), response.getRemark());
+    }
+
+    public Map<Integer, Long> queryCorrectionOffset(final String addr, final String topic, final String group, Set<String> filterGroup,
+                                                    long timeoutMillis) throws MQClientException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
+            InterruptedException {
+        QueryCorrectionOffsetHeader requestHeader = new QueryCorrectionOffsetHeader();
+        requestHeader.setCompareGroup(group);
+        requestHeader.setTopic(topic);
+        if (filterGroup != null) {
+            StringBuilder sb = new StringBuilder();
+            String splitor = "";
+            for (String s : filterGroup) {
+                sb.append(splitor).append(s);
+                splitor = ",";
+            }
+            requestHeader.setFilterGroups(sb.toString());
+        }
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CORRECTION_OFFSET, requestHeader);
+
+        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+                request, timeoutMillis);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                if (response.getBody() != null) {
+                    QueryCorrectionOffsetBody body = QueryCorrectionOffsetBody.decode(response.getBody(), QueryCorrectionOffsetBody.class);
+                    return body.getCorrectionOffsets();
+                }
+            }
+            default:
+                break;
+        }
+
+        throw new MQClientException(response.getCode(), response.getRemark());
+    }
+
+    public TopicList getUnitTopicList(final boolean containRetry, final long timeoutMillis)
+            throws RemotingException, MQClientException, InterruptedException {
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_UNIT_TOPIC_LIST, null);
+
+        RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                byte[] body = response.getBody();
+                if (body != null) {
+                    TopicList topicList = TopicList.decode(response.getBody(), TopicList.class);
+                    if (!containRetry) {
+                        Iterator<String> it = topicList.getTopicList().iterator();
+                        while (it.hasNext()) {
+                            String topic = it.next();
+                            if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX))
+                                it.remove();
+                        }
+                    }
+
+                    return topicList;
+                }
+            }
+            default:
+                break;
+        }
+
+        throw new MQClientException(response.getCode(), response.getRemark());
+    }
+
+
+    public TopicList getHasUnitSubTopicList(final boolean containRetry, final long timeoutMillis)
+            throws RemotingException, MQClientException, InterruptedException {
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST, null);
+
+        RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                byte[] body = response.getBody();
+                if (body != null) {
+                    TopicList topicList = TopicList.decode(response.getBody(), TopicList.class);
+                    if (!containRetry) {
+                        Iterator<String> it = topicList.getTopicList().iterator();
+                        while (it.hasNext()) {
+                            String topic = it.next();
+                            if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX))
+                                it.remove();
+                        }
+                    }
+                    return topicList;
+                }
+            }
+            default:
+                break;
+        }
+
+        throw new MQClientException(response.getCode(), response.getRemark());
+    }
+
+
+    public TopicList getHasUnitSubUnUnitTopicList(final boolean containRetry, final long timeoutMillis)
+            throws RemotingException, MQClientException, InterruptedException {
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST, null);
+
+        RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                byte[] body = response.getBody();
+                if (body != null) {
+                    TopicList topicList = TopicList.decode(response.getBody(), TopicList.class);
+                    if (!containRetry) {
+                        Iterator<String> it = topicList.getTopicList().iterator();
+                        while (it.hasNext()) {
+                            String topic = it.next();
+                            if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX))
+                                it.remove();
+                        }
+                    }
+                    return topicList;
+                }
+            }
+            default:
+                break;
+        }
+
+        throw new MQClientException(response.getCode(), response.getRemark());
+    }
+
+
+    public void cloneGroupOffset(final String addr, final String srcGroup, final String destGroup, final String topic,
+                                 final boolean isOffline, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
+        CloneGroupOffsetRequestHeader requestHeader = new CloneGroupOffsetRequestHeader();
+        requestHeader.setSrcGroup(srcGroup);
+        requestHeader.setDestGroup(destGroup);
+        requestHeader.setTopic(topic);
+        requestHeader.setOffline(isOffline);
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CLONE_GROUP_OFFSET, requestHeader);
+
+        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+                request, timeoutMillis);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                return;
+            }
+            default:
+                break;
+        }
+
+        throw new MQClientException(response.getCode(), response.getRemark());
+    }
+
+
+    public BrokerStatsData viewBrokerStatsData(String brokerAddr, String statsName, String statsKey, long timeoutMillis)
+            throws MQClientException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
+            InterruptedException {
+        ViewBrokerStatsDataRequestHeader request

<TRUNCATED>