You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vi...@apache.org on 2016/12/28 10:02:43 UTC

[36/50] [abbrv] incubator-rocketmq git commit: ROCKETMQ-18 Reformat all codes.

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 84ee7db..2dd9200 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -16,6 +16,17 @@
  */
 package org.apache.rocketmq.client.impl;
 
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.consumer.PullCallback;
 import org.apache.rocketmq.client.consumer.PullResult;
@@ -37,21 +48,97 @@ import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.admin.ConsumeStats;
 import org.apache.rocketmq.common.admin.TopicStatsTable;
-import org.apache.rocketmq.common.message.*;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageClientIDSetter;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.namesrv.TopAddressing;
 import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.ResponseCode;
-import org.apache.rocketmq.common.protocol.body.*;
-import org.apache.rocketmq.common.protocol.header.*;
+import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
+import org.apache.rocketmq.common.protocol.body.ConsumeStatsList;
+import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
+import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
+import org.apache.rocketmq.common.protocol.body.GetConsumerStatusBody;
+import org.apache.rocketmq.common.protocol.body.GroupList;
+import org.apache.rocketmq.common.protocol.body.KVTable;
+import org.apache.rocketmq.common.protocol.body.LockBatchRequestBody;
+import org.apache.rocketmq.common.protocol.body.LockBatchResponseBody;
+import org.apache.rocketmq.common.protocol.body.ProducerConnection;
+import org.apache.rocketmq.common.protocol.body.QueryConsumeTimeSpanBody;
+import org.apache.rocketmq.common.protocol.body.QueryCorrectionOffsetBody;
+import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
+import org.apache.rocketmq.common.protocol.body.ResetOffsetBody;
+import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
+import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
+import org.apache.rocketmq.common.protocol.body.TopicList;
+import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody;
+import org.apache.rocketmq.common.protocol.header.CloneGroupOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
+import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;
+import org.apache.rocketmq.common.protocol.header.CreateTopicRequestHeader;
+import org.apache.rocketmq.common.protocol.header.DeleteSubscriptionGroupRequestHeader;
+import org.apache.rocketmq.common.protocol.header.DeleteTopicRequestHeader;
+import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetConsumeStatsInBrokerHeader;
+import org.apache.rocketmq.common.protocol.header.GetConsumeStatsRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetConsumerConnectionListRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseBody;
+import org.apache.rocketmq.common.protocol.header.GetConsumerRunningInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetConsumerStatusRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeResponseHeader;
+import org.apache.rocketmq.common.protocol.header.GetMaxOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetMaxOffsetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.GetMinOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.GetProducerConnectionListRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetTopicStatsInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetTopicsByClusterRequestHeader;
+import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
+import org.apache.rocketmq.common.protocol.header.QueryConsumeTimeSpanRequestHeader;
+import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.QueryCorrectionOffsetHeader;
+import org.apache.rocketmq.common.protocol.header.QueryMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.QueryTopicConsumeByWhoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.SearchOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
+import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
+import org.apache.rocketmq.common.protocol.header.UnregisterClientRequestHeader;
+import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.ViewBrokerStatsDataRequestHeader;
+import org.apache.rocketmq.common.protocol.header.ViewMessageRequestHeader;
 import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterMessageFilterClassRequestHeader;
-import org.apache.rocketmq.common.protocol.header.namesrv.*;
+import org.apache.rocketmq.common.protocol.header.namesrv.DeleteKVConfigRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.GetKVListByNamespaceRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.PutKVConfigRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerResponseHeader;
 import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.remoting.InvokeCallback;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.RemotingClient;
-import org.apache.rocketmq.remoting.exception.*;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
 import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
 import org.apache.rocketmq.remoting.netty.ResponseFuture;
@@ -60,17 +147,11 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 import org.slf4j.Logger;
 
-import java.io.UnsupportedEncodingException;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicInteger;
-
-
 public class MQClientAPIImpl {
 
     private final static Logger log = ClientLogger.getLog();
     public static boolean sendSmartMsg =
-            Boolean.parseBoolean(System.getProperty("org.apache.rocketmq.client.sendSmartMsg", "true"));
+        Boolean.parseBoolean(System.getProperty("org.apache.rocketmq.client.sendSmartMsg", "true"));
 
     static {
         System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
@@ -83,7 +164,7 @@ public class MQClientAPIImpl {
     private ClientConfig clientConfig;
 
     public MQClientAPIImpl(final NettyClientConfig nettyClientConfig, final ClientRemotingProcessor clientRemotingProcessor,
-                           RPCHook rpcHook, final ClientConfig clientConfig) {
+        RPCHook rpcHook, final ClientConfig clientConfig) {
         this.clientConfig = clientConfig;
         topAddressing = new TopAddressing(MixAll.WS_ADDR, clientConfig.getUnitName());
         this.remotingClient = new NettyRemotingClient(nettyClientConfig, null);
@@ -149,14 +230,14 @@ public class MQClientAPIImpl {
     }
 
     public void createSubscriptionGroup(final String addr, final SubscriptionGroupConfig config, final long timeoutMillis)
-            throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP, null);
 
         byte[] body = RemotingSerializable.encode(config);
         request.setBody(body);
 
         RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         assert response != null;
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
@@ -171,7 +252,7 @@ public class MQClientAPIImpl {
     }
 
     public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final long timeoutMillis)
-            throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
         requestHeader.setTopic(topicConfig.getTopicName());
         requestHeader.setDefaultTopic(defaultTopic);
@@ -185,7 +266,7 @@ public class MQClientAPIImpl {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);
 
         RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         assert response != null;
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
@@ -199,31 +280,31 @@ public class MQClientAPIImpl {
     }
 
     public SendResult sendMessage(//
-                                  final String addr, // 1
-                                  final String brokerName, // 2
-                                  final Message msg, // 3
-                                  final SendMessageRequestHeader requestHeader, // 4
-                                  final long timeoutMillis, // 5
-                                  final CommunicationMode communicationMode, // 6
-                                  final SendMessageContext context, // 7
-                                  final DefaultMQProducerImpl producer // 8
+        final String addr, // 1
+        final String brokerName, // 2
+        final Message msg, // 3
+        final SendMessageRequestHeader requestHeader, // 4
+        final long timeoutMillis, // 5
+        final CommunicationMode communicationMode, // 6
+        final SendMessageContext context, // 7
+        final DefaultMQProducerImpl producer // 8
     ) throws RemotingException, MQBrokerException, InterruptedException {
         return sendMessage(addr, brokerName, msg, requestHeader, timeoutMillis, communicationMode, null, null, null, 0, context, producer);
     }
 
     public SendResult sendMessage(//
-                                  final String addr, // 1
-                                  final String brokerName, // 2
-                                  final Message msg, // 3
-                                  final SendMessageRequestHeader requestHeader, // 4
-                                  final long timeoutMillis, // 5
-                                  final CommunicationMode communicationMode, // 6
-                                  final SendCallback sendCallback, // 7
-                                  final TopicPublishInfo topicPublishInfo, // 8
-                                  final MQClientInstance instance, // 9
-                                  final int retryTimesWhenSendFailed, // 10
-                                  final SendMessageContext context, // 11
-                                  final DefaultMQProducerImpl producer // 12
+        final String addr, // 1
+        final String brokerName, // 2
+        final Message msg, // 3
+        final SendMessageRequestHeader requestHeader, // 4
+        final long timeoutMillis, // 5
+        final CommunicationMode communicationMode, // 6
+        final SendCallback sendCallback, // 7
+        final TopicPublishInfo topicPublishInfo, // 8
+        final MQClientInstance instance, // 9
+        final int retryTimesWhenSendFailed, // 10
+        final SendMessageContext context, // 11
+        final DefaultMQProducerImpl producer // 12
     ) throws RemotingException, MQBrokerException, InterruptedException {
         RemotingCommand request = null;
         if (sendSmartMsg) {
@@ -242,7 +323,7 @@ public class MQClientAPIImpl {
             case ASYNC:
                 final AtomicInteger times = new AtomicInteger();
                 this.sendMessageAsync(addr, brokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
-                        retryTimesWhenSendFailed, times, context, producer);
+                    retryTimesWhenSendFailed, times, context, producer);
                 return null;
             case SYNC:
                 return this.sendMessageSync(addr, brokerName, msg, timeoutMillis, request);
@@ -255,11 +336,11 @@ public class MQClientAPIImpl {
     }
 
     private SendResult sendMessageSync(//
-                                       final String addr, //
-                                       final String brokerName, //
-                                       final Message msg, //
-                                       final long timeoutMillis, //
-                                       final RemotingCommand request//
+        final String addr, //
+        final String brokerName, //
+        final Message msg, //
+        final long timeoutMillis, //
+        final RemotingCommand request//
     ) throws RemotingException, MQBrokerException, InterruptedException {
         RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
         assert response != null;
@@ -267,18 +348,18 @@ public class MQClientAPIImpl {
     }
 
     private void sendMessageAsync(//
-                                  final String addr, //
-                                  final String brokerName, //
-                                  final Message msg, //
-                                  final long timeoutMillis, //
-                                  final RemotingCommand request, //
-                                  final SendCallback sendCallback, //
-                                  final TopicPublishInfo topicPublishInfo, //
-                                  final MQClientInstance instance, //
-                                  final int retryTimesWhenSendFailed, //
-                                  final AtomicInteger times, //
-                                  final SendMessageContext context, //
-                                  final DefaultMQProducerImpl producer //
+        final String addr, //
+        final String brokerName, //
+        final Message msg, //
+        final long timeoutMillis, //
+        final RemotingCommand request, //
+        final SendCallback sendCallback, //
+        final TopicPublishInfo topicPublishInfo, //
+        final MQClientInstance instance, //
+        final int retryTimesWhenSendFailed, //
+        final AtomicInteger times, //
+        final SendMessageContext context, //
+        final DefaultMQProducerImpl producer //
     ) throws InterruptedException, RemotingException {
         this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
             @Override
@@ -318,68 +399,67 @@ public class MQClientAPIImpl {
                     } catch (Exception e) {
                         producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
                         onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
-                                retryTimesWhenSendFailed, times, e, context, false, producer);
+                            retryTimesWhenSendFailed, times, e, context, false, producer);
                     }
                 } else {
                     producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
                     if (!responseFuture.isSendRequestOK()) {
                         MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());
                         onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
-                                retryTimesWhenSendFailed, times, ex, context, true, producer);
+                            retryTimesWhenSendFailed, times, ex, context, true, producer);
                     } else if (responseFuture.isTimeout()) {
                         MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
-                                responseFuture.getCause());
+                            responseFuture.getCause());
                         onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
-                                retryTimesWhenSendFailed, times, ex, context, true, producer);
+                            retryTimesWhenSendFailed, times, ex, context, true, producer);
                     } else {
                         MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());
                         onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
-                                retryTimesWhenSendFailed, times, ex, context, true, producer);
+                            retryTimesWhenSendFailed, times, ex, context, true, producer);
                     }
                 }
             }
         });
     }
 
-
     private void onExceptionImpl(final String brokerName, //
-                                 final Message msg, //
-                                 final long timeoutMillis, //
-                                 final RemotingCommand request, //
-                                 final SendCallback sendCallback, //
-                                 final TopicPublishInfo topicPublishInfo, //
-                                 final MQClientInstance instance, //
-                                 final int timesTotal, //
-                                 final AtomicInteger curTimes, //
-                                 final Exception e, //
-                                 final SendMessageContext context, //
-                                 final boolean needRetry, //
-                                 final DefaultMQProducerImpl producer // 12
+        final Message msg, //
+        final long timeoutMillis, //
+        final RemotingCommand request, //
+        final SendCallback sendCallback, //
+        final TopicPublishInfo topicPublishInfo, //
+        final MQClientInstance instance, //
+        final int timesTotal, //
+        final AtomicInteger curTimes, //
+        final Exception e, //
+        final SendMessageContext context, //
+        final boolean needRetry, //
+        final DefaultMQProducerImpl producer // 12
     ) {
         int tmp = curTimes.incrementAndGet();
         if (needRetry && tmp <= timesTotal) {
             MessageQueue tmpmq = producer.selectOneMessageQueue(topicPublishInfo, brokerName);
             String addr = instance.findBrokerAddressInPublish(tmpmq.getBrokerName());
             log.info("async send msg by retry {} times. topic={}, brokerAddr={}, brokerName={}", tmp, msg.getTopic(), addr,
-                    tmpmq.getBrokerName());
+                tmpmq.getBrokerName());
             try {
                 request.setOpaque(RemotingCommand.createNewRequestId());
                 sendMessageAsync(addr, tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
-                        timesTotal, curTimes, context, producer);
+                    timesTotal, curTimes, context, producer);
             } catch (InterruptedException e1) {
                 onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
-                        context, false, producer);
+                    context, false, producer);
             } catch (RemotingConnectException e1) {
                 producer.updateFaultItem(brokerName, 3000, true);
                 onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
-                        context, true, producer);
+                    context, true, producer);
             } catch (RemotingTooMuchRequestException e1) {
                 onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
-                        context, false, producer);
+                    context, false, producer);
             } catch (RemotingException e1) {
                 producer.updateFaultItem(brokerName, 3000, true);
                 onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
-                        context, true, producer);
+                    context, true, producer);
             }
         } else {
             if (context != null) {
@@ -393,11 +473,10 @@ public class MQClientAPIImpl {
         }
     }
 
-
     private SendResult processSendResponse(//
-                                           final String brokerName, //
-                                           final Message msg, //
-                                           final RemotingCommand response//
+        final String brokerName, //
+        final Message msg, //
+        final RemotingCommand response//
     ) throws MQBrokerException, RemotingCommandException {
         switch (response.getCode()) {
             case ResponseCode.FLUSH_DISK_TIMEOUT:
@@ -426,13 +505,13 @@ public class MQClientAPIImpl {
                 }
 
                 SendMessageResponseHeader responseHeader =
-                        (SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class);
+                    (SendMessageResponseHeader)response.decodeCommandCustomHeader(SendMessageResponseHeader.class);
 
                 MessageQueue messageQueue = new MessageQueue(msg.getTopic(), brokerName, responseHeader.getQueueId());
 
                 SendResult sendResult = new SendResult(sendStatus,
-                        MessageClientIDSetter.getUniqID(msg),
-                        responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset());
+                    MessageClientIDSetter.getUniqID(msg),
+                    responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset());
                 sendResult.setTransactionId(responseHeader.getTransactionId());
                 String regionId = response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION);
                 String traceOn = response.getExtFields().get(MessageConst.PROPERTY_TRACE_SWITCH);
@@ -454,13 +533,12 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-
     public PullResult pullMessage(//
-                                  final String addr, //
-                                  final PullMessageRequestHeader requestHeader, //
-                                  final long timeoutMillis, //
-                                  final CommunicationMode communicationMode, //
-                                  final PullCallback pullCallback//
+        final String addr, //
+        final PullMessageRequestHeader requestHeader, //
+        final long timeoutMillis, //
+        final CommunicationMode communicationMode, //
+        final PullCallback pullCallback//
     ) throws RemotingException, MQBrokerException, InterruptedException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
 
@@ -481,12 +559,11 @@ public class MQClientAPIImpl {
         return null;
     }
 
-
     private void pullMessageAsync(//
-                                  final String addr, // 1
-                                  final RemotingCommand request, //
-                                  final long timeoutMillis, //
-                                  final PullCallback pullCallback//
+        final String addr, // 1
+        final RemotingCommand request, //
+        final long timeoutMillis, //
+        final PullCallback pullCallback//
     ) throws RemotingException, InterruptedException {
         this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
             @Override
@@ -505,7 +582,7 @@ public class MQClientAPIImpl {
                         pullCallback.onException(new MQClientException("send request failed", responseFuture.getCause()));
                     } else if (responseFuture.isTimeout()) {
                         pullCallback.onException(new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
-                                responseFuture.getCause()));
+                            responseFuture.getCause()));
                     } else {
                         pullCallback.onException(new MQClientException("unknow reseaon", responseFuture.getCause()));
                     }
@@ -515,9 +592,9 @@ public class MQClientAPIImpl {
     }
 
     private PullResult pullMessageSync(//
-                                       final String addr, // 1
-                                       final RemotingCommand request, // 2
-                                       final long timeoutMillis// 3
+        final String addr, // 1
+        final RemotingCommand request, // 2
+        final long timeoutMillis// 3
     ) throws RemotingException, InterruptedException, MQBrokerException {
         RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
         assert response != null;
@@ -545,20 +622,20 @@ public class MQClientAPIImpl {
         }
 
         PullMessageResponseHeader responseHeader =
-                (PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class);
+            (PullMessageResponseHeader)response.decodeCommandCustomHeader(PullMessageResponseHeader.class);
 
         return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(),
-                responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody());
+            responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody());
     }
 
     public MessageExt viewMessage(final String addr, final long phyoffset, final long timeoutMillis)
-            throws RemotingException, MQBrokerException, InterruptedException {
+        throws RemotingException, MQBrokerException, InterruptedException {
         ViewMessageRequestHeader requestHeader = new ViewMessageRequestHeader();
         requestHeader.setOffset(phyoffset);
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.VIEW_MESSAGE_BY_ID, requestHeader);
 
         RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         assert response != null;
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
@@ -573,9 +650,8 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-
     public long searchOffset(final String addr, final String topic, final int queueId, final long timestamp, final long timeoutMillis)
-            throws RemotingException, MQBrokerException, InterruptedException {
+        throws RemotingException, MQBrokerException, InterruptedException {
         SearchOffsetRequestHeader requestHeader = new SearchOffsetRequestHeader();
         requestHeader.setTopic(topic);
         requestHeader.setQueueId(queueId);
@@ -583,12 +659,12 @@ public class MQClientAPIImpl {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, requestHeader);
 
         RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         assert response != null;
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
                 SearchOffsetResponseHeader responseHeader =
-                        (SearchOffsetResponseHeader) response.decodeCommandCustomHeader(SearchOffsetResponseHeader.class);
+                    (SearchOffsetResponseHeader)response.decodeCommandCustomHeader(SearchOffsetResponseHeader.class);
                 return responseHeader.getOffset();
             }
             default:
@@ -598,21 +674,20 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-
     public long getMaxOffset(final String addr, final String topic, final int queueId, final long timeoutMillis)
-            throws RemotingException, MQBrokerException, InterruptedException {
+        throws RemotingException, MQBrokerException, InterruptedException {
         GetMaxOffsetRequestHeader requestHeader = new GetMaxOffsetRequestHeader();
         requestHeader.setTopic(topic);
         requestHeader.setQueueId(queueId);
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MAX_OFFSET, requestHeader);
 
         RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         assert response != null;
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
                 GetMaxOffsetResponseHeader responseHeader =
-                        (GetMaxOffsetResponseHeader) response.decodeCommandCustomHeader(GetMaxOffsetResponseHeader.class);
+                    (GetMaxOffsetResponseHeader)response.decodeCommandCustomHeader(GetMaxOffsetResponseHeader.class);
 
                 return responseHeader.getOffset();
             }
@@ -623,24 +698,23 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-
     public List<String> getConsumerIdListByGroup(//
-                                                 final String addr, //
-                                                 final String consumerGroup, //
-                                                 final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
-            MQBrokerException, InterruptedException {
+        final String addr, //
+        final String consumerGroup, //
+        final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
+        MQBrokerException, InterruptedException {
         GetConsumerListByGroupRequestHeader requestHeader = new GetConsumerListByGroupRequestHeader();
         requestHeader.setConsumerGroup(consumerGroup);
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_LIST_BY_GROUP, requestHeader);
 
         RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         assert response != null;
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
                 if (response.getBody() != null) {
                     GetConsumerListByGroupResponseBody body =
-                            GetConsumerListByGroupResponseBody.decode(response.getBody(), GetConsumerListByGroupResponseBody.class);
+                        GetConsumerListByGroupResponseBody.decode(response.getBody(), GetConsumerListByGroupResponseBody.class);
                     return body.getConsumerIdList();
                 }
             }
@@ -651,21 +725,20 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-
     public long getMinOffset(final String addr, final String topic, final int queueId, final long timeoutMillis)
-            throws RemotingException, MQBrokerException, InterruptedException {
+        throws RemotingException, MQBrokerException, InterruptedException {
         GetMinOffsetRequestHeader requestHeader = new GetMinOffsetRequestHeader();
         requestHeader.setTopic(topic);
         requestHeader.setQueueId(queueId);
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MIN_OFFSET, requestHeader);
 
         RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         assert response != null;
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
                 GetMinOffsetResponseHeader responseHeader =
-                        (GetMinOffsetResponseHeader) response.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class);
+                    (GetMinOffsetResponseHeader)response.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class);
 
                 return responseHeader.getOffset();
             }
@@ -676,21 +749,20 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-
     public long getEarliestMsgStoretime(final String addr, final String topic, final int queueId, final long timeoutMillis)
-            throws RemotingException, MQBrokerException, InterruptedException {
+        throws RemotingException, MQBrokerException, InterruptedException {
         GetEarliestMsgStoretimeRequestHeader requestHeader = new GetEarliestMsgStoretimeRequestHeader();
         requestHeader.setTopic(topic);
         requestHeader.setQueueId(queueId);
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_EARLIEST_MSG_STORETIME, requestHeader);
 
         RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         assert response != null;
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
                 GetEarliestMsgStoretimeResponseHeader responseHeader =
-                        (GetEarliestMsgStoretimeResponseHeader) response.decodeCommandCustomHeader(GetEarliestMsgStoretimeResponseHeader.class);
+                    (GetEarliestMsgStoretimeResponseHeader)response.decodeCommandCustomHeader(GetEarliestMsgStoretimeResponseHeader.class);
 
                 return responseHeader.getTimestamp();
             }
@@ -701,21 +773,20 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-
     public long queryConsumerOffset(//
-                                    final String addr, //
-                                    final QueryConsumerOffsetRequestHeader requestHeader, //
-                                    final long timeoutMillis//
+        final String addr, //
+        final QueryConsumerOffsetRequestHeader requestHeader, //
+        final long timeoutMillis//
     ) throws RemotingException, MQBrokerException, InterruptedException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUMER_OFFSET, requestHeader);
 
         RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         assert response != null;
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
                 QueryConsumerOffsetResponseHeader responseHeader =
-                        (QueryConsumerOffsetResponseHeader) response.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class);
+                    (QueryConsumerOffsetResponseHeader)response.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class);
 
                 return responseHeader.getOffset();
             }
@@ -726,16 +797,15 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-
     public void updateConsumerOffset(//
-                                     final String addr, //
-                                     final UpdateConsumerOffsetRequestHeader requestHeader, //
-                                     final long timeoutMillis//
+        final String addr, //
+        final UpdateConsumerOffsetRequestHeader requestHeader, //
+        final long timeoutMillis//
     ) throws RemotingException, MQBrokerException, InterruptedException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader);
 
         RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         assert response != null;
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
@@ -748,23 +818,21 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-
     public void updateConsumerOffsetOneway(//
-                                           final String addr, //
-                                           final UpdateConsumerOffsetRequestHeader requestHeader, //
-                                           final long timeoutMillis//
+        final String addr, //
+        final UpdateConsumerOffsetRequestHeader requestHeader, //
+        final long timeoutMillis//
     ) throws RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException,
-            InterruptedException {
+        InterruptedException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader);
 
         this.remotingClient.invokeOneway(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
     }
 
-
     public void sendHearbeat(//
-                             final String addr, //
-                             final HeartbeatData heartbeatData, //
-                             final long timeoutMillis//
+        final String addr, //
+        final HeartbeatData heartbeatData, //
+        final long timeoutMillis//
     ) throws RemotingException, MQBrokerException, InterruptedException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
 
@@ -782,13 +850,12 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-
     public void unregisterClient(//
-                                 final String addr, //
-                                 final String clientID, //
-                                 final String producerGroup, //
-                                 final String consumerGroup, //
-                                 final long timeoutMillis//
+        final String addr, //
+        final String clientID, //
+        final String producerGroup, //
+        final String consumerGroup, //
+        final long timeoutMillis//
     ) throws RemotingException, MQBrokerException, InterruptedException {
         final UnregisterClientRequestHeader requestHeader = new UnregisterClientRequestHeader();
         requestHeader.setClientID(clientID);
@@ -809,12 +876,11 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-
     public void endTransactionOneway(//
-                                     final String addr, //
-                                     final EndTransactionRequestHeader requestHeader, //
-                                     final String remark, //
-                                     final long timeoutMillis//
+        final String addr, //
+        final EndTransactionRequestHeader requestHeader, //
+        final String remark, //
+        final long timeoutMillis//
     ) throws RemotingException, MQBrokerException, InterruptedException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader);
 
@@ -822,23 +888,21 @@ public class MQClientAPIImpl {
         this.remotingClient.invokeOneway(addr, request, timeoutMillis);
     }
 
-
     public void queryMessage(
-            final String addr,
-            final QueryMessageRequestHeader requestHeader,
-            final long timeoutMillis,
-            final InvokeCallback invokeCallback,
-            final Boolean isUnqiueKey
+        final String addr,
+        final QueryMessageRequestHeader requestHeader,
+        final long timeoutMillis,
+        final InvokeCallback invokeCallback,
+        final Boolean isUnqiueKey
     ) throws RemotingException, MQBrokerException, InterruptedException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_MESSAGE, requestHeader);
         request.addExtField(MixAll.UNIQUE_MSG_QUERY_FLAG, isUnqiueKey.toString());
         this.remotingClient.invokeAsync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis,
-                invokeCallback);
+            invokeCallback);
     }
 
-
     public boolean registerClient(final String addr, final HeartbeatData heartbeat, final long timeoutMillis)
-            throws RemotingException, InterruptedException {
+        throws RemotingException, InterruptedException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
 
         request.setBody(heartbeat.encode());
@@ -846,14 +910,13 @@ public class MQClientAPIImpl {
         return response.getCode() == ResponseCode.SUCCESS;
     }
 
-
     public void consumerSendMessageBack(
-            final String addr,
-            final MessageExt msg,
-            final String consumerGroup,
-            final int delayLevel,
-            final long timeoutMillis,
-            final int maxConsumeRetryTimes
+        final String addr,
+        final MessageExt msg,
+        final String consumerGroup,
+        final int delayLevel,
+        final long timeoutMillis,
+        final int maxConsumeRetryTimes
     ) throws RemotingException, MQBrokerException, InterruptedException {
         ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader();
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);
@@ -866,7 +929,7 @@ public class MQClientAPIImpl {
         requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);
 
         RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         assert response != null;
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
@@ -879,16 +942,15 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-
     public Set<MessageQueue> lockBatchMQ(//
-                                         final String addr, //
-                                         final LockBatchRequestBody requestBody, //
-                                         final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {
+        final String addr, //
+        final LockBatchRequestBody requestBody, //
+        final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null);
 
         request.setBody(requestBody.encode());
         RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
                 LockBatchResponseBody responseBody = LockBatchResponseBody.decode(response.getBody(), LockBatchResponseBody.class);
@@ -902,12 +964,11 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-
     public void unlockBatchMQ(//
-                              final String addr, //
-                              final UnlockBatchRequestBody requestBody, //
-                              final long timeoutMillis, //
-                              final boolean oneway//
+        final String addr, //
+        final UnlockBatchRequestBody requestBody, //
+        final long timeoutMillis, //
+        final boolean oneway//
     ) throws RemotingException, MQBrokerException, InterruptedException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNLOCK_BATCH_MQ, null);
 
@@ -917,7 +978,7 @@ public class MQClientAPIImpl {
             this.remotingClient.invokeOneway(addr, request, timeoutMillis);
         } else {
             RemotingCommand response = this.remotingClient
-                    .invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
+                .invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
             switch (response.getCode()) {
                 case ResponseCode.SUCCESS: {
                     return;
@@ -930,16 +991,15 @@ public class MQClientAPIImpl {
         }
     }
 
-
     public TopicStatsTable getTopicStatsInfo(final String addr, final String topic, final long timeoutMillis) throws InterruptedException,
-            RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+        RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
         GetTopicStatsInfoRequestHeader requestHeader = new GetTopicStatsInfoRequestHeader();
         requestHeader.setTopic(topic);
 
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TOPIC_STATS_INFO, requestHeader);
 
         RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
                 TopicStatsTable topicStatsTable = TopicStatsTable.decode(response.getBody(), TopicStatsTable.class);
@@ -952,17 +1012,15 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-
     public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final long timeoutMillis)
-            throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException,
-            MQBrokerException {
+        throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException,
+        MQBrokerException {
         return getConsumeStats(addr, consumerGroup, null, timeoutMillis);
     }
 
-
     public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final String topic, final long timeoutMillis)
-            throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException,
-            MQBrokerException {
+        throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException,
+        MQBrokerException {
         GetConsumeStatsRequestHeader requestHeader = new GetConsumeStatsRequestHeader();
         requestHeader.setConsumerGroup(consumerGroup);
         requestHeader.setTopic(topic);
@@ -970,7 +1028,7 @@ public class MQClientAPIImpl {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUME_STATS, requestHeader);
 
         RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
                 ConsumeStats consumeStats = ConsumeStats.decode(response.getBody(), ConsumeStats.class);
@@ -983,17 +1041,16 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-
     public ProducerConnection getProducerConnectionList(final String addr, final String producerGroup, final long timeoutMillis)
-            throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
-            MQBrokerException {
+        throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
+        MQBrokerException {
         GetProducerConnectionListRequestHeader requestHeader = new GetProducerConnectionListRequestHeader();
         requestHeader.setProducerGroup(producerGroup);
 
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_PRODUCER_CONNECTION_LIST, requestHeader);
 
         RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
                 return ProducerConnection.decode(response.getBody(), ProducerConnection.class);
@@ -1005,17 +1062,16 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-
     public ConsumerConnection getConsumerConnectionList(final String addr, final String consumerGroup, final long timeoutMillis)
-            throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
-            MQBrokerException {
+        throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
+        MQBrokerException {
         GetConsumerConnectionListRequestHeader requestHeader = new GetConsumerConnectionListRequestHeader();
         requestHeader.setConsumerGroup(consumerGroup);
 
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_CONNECTION_LIST, requestHeader);
 
         RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
                 ConsumerConnection consumerConnection = ConsumerConnection.decode(response.getBody(), ConsumerConnection.class);
@@ -1028,14 +1084,13 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-
     public KVTable getBrokerRuntimeInfo(final String addr, final long timeoutMillis) throws RemotingConnectException,
-            RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
+        RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
 
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_RUNTIME_INFO, null);
 
         RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
                 return KVTable.decode(response.getBody(), KVTable.class);
@@ -1047,10 +1102,9 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-
     public void updateBrokerConfig(final String addr, final Properties properties, final long timeoutMillis)
-            throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
-            MQBrokerException, UnsupportedEncodingException {
+        throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
+        MQBrokerException, UnsupportedEncodingException {
 
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_BROKER_CONFIG, null);
 
@@ -1058,7 +1112,7 @@ public class MQClientAPIImpl {
         if (str != null && str.length() > 0) {
             request.setBody(str.getBytes(MixAll.DEFAULT_CHARSET));
             RemotingCommand response = this.remotingClient
-                    .invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
+                .invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
             switch (response.getCode()) {
                 case ResponseCode.SUCCESS: {
                     return;
@@ -1071,10 +1125,9 @@ public class MQClientAPIImpl {
         }
     }
 
-
     public Properties getBrokerConfig(final String addr, final long timeoutMillis)
-            throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
-            MQBrokerException, UnsupportedEncodingException {
+        throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
+        MQBrokerException, UnsupportedEncodingException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CONFIG, null);
 
         RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
@@ -1091,7 +1144,7 @@ public class MQClientAPIImpl {
     }
 
     public ClusterInfo getBrokerClusterInfo(final long timeoutMillis) throws InterruptedException, RemotingTimeoutException,
-            RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+        RemotingSendRequestException, RemotingConnectException, MQBrokerException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_INFO, null);
 
         RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
@@ -1108,9 +1161,8 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-
     public TopicRouteData getDefaultTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis)
-            throws RemotingException, MQClientException, InterruptedException {
+        throws RemotingException, MQClientException, InterruptedException {
         GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
         requestHeader.setTopic(topic);
 
@@ -1136,9 +1188,8 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-
     public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis)
-            throws RemotingException, MQClientException, InterruptedException {
+        throws RemotingException, MQClientException, InterruptedException {
         GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
         requestHeader.setTopic(topic);
 
@@ -1165,9 +1216,8 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-
     public TopicList getTopicListFromNameServer(final long timeoutMillis)
-            throws RemotingException, MQClientException, InterruptedException {
+        throws RemotingException, MQClientException, InterruptedException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER, null);
 
         RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
@@ -1187,9 +1237,8 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-
     public int wipeWritePermOfBroker(final String namesrvAddr, String brokerName, final long timeoutMillis) throws RemotingCommandException,
-            RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException {
+        RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException {
         WipeWritePermOfBrokerRequestHeader requestHeader = new WipeWritePermOfBrokerRequestHeader();
         requestHeader.setBrokerName(brokerName);
 
@@ -1200,7 +1249,7 @@ public class MQClientAPIImpl {
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
                 WipeWritePermOfBrokerResponseHeader responseHeader =
-                        (WipeWritePermOfBrokerResponseHeader) response.decodeCommandCustomHeader(WipeWritePermOfBrokerResponseHeader.class);
+                    (WipeWritePermOfBrokerResponseHeader)response.decodeCommandCustomHeader(WipeWritePermOfBrokerResponseHeader.class);
                 return responseHeader.getWipeTopicCount();
             }
             default:
@@ -1210,15 +1259,14 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-
     public void deleteTopicInBroker(final String addr, final String topic, final long timeoutMillis)
-            throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         DeleteTopicRequestHeader requestHeader = new DeleteTopicRequestHeader();
         requestHeader.setTopic(topic);
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_TOPIC_IN_BROKER, requestHeader);
 
         RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         assert response != null;
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
@@ -1231,9 +1279,8 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-
     public void deleteTopicInNameServer(final String addr, final String topic, final long timeoutMillis)
-            throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         DeleteTopicRequestHeader requestHeader = new DeleteTopicRequestHeader();
         requestHeader.setTopic(topic);
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_TOPIC_IN_NAMESRV, requestHeader);
@@ -1251,15 +1298,14 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-
     public void deleteSubscriptionGroup(final String addr, final String groupName, final long timeoutMillis)
-            throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         DeleteSubscriptionGroupRequestHeader requestHeader = new DeleteSubscriptionGroupRequestHeader();
         requestHeader.setGroupName(groupName);
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_SUBSCRIPTIONGROUP, requestHeader);
 
         RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         assert response != null;
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
@@ -1272,9 +1318,8 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-
     public String getKVConfigValue(final String namespace, final String key, final long timeoutMillis)
-            throws RemotingException, MQClientException, InterruptedException {
+        throws RemotingException, MQClientException, InterruptedException {
         GetKVConfigRequestHeader requestHeader = new GetKVConfigRequestHeader();
         requestHeader.setNamespace(namespace);
         requestHeader.setKey(key);
@@ -1286,7 +1331,7 @@ public class MQClientAPIImpl {
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
                 GetKVConfigResponseHeader responseHeader =
-                        (GetKVConfigResponseHeader) response.decodeCommandCustomHeader(GetKVConfigResponseHeader.class);
+                    (GetKVConfigResponseHeader)response.decodeCommandCustomHeader(GetKVConfigResponseHeader.class);
                 return responseHeader.getValue();
             }
             default:
@@ -1296,9 +1341,8 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-
     public void putKVConfigValue(final String namespace, final String key, final String value, final long timeoutMillis)
-            throws RemotingException, MQClientException, InterruptedException {
+        throws RemotingException, MQClientException, InterruptedException {
         PutKVConfigRequestHeader requestHeader = new PutKVConfigRequestHeader();
         requestHeader.setNamespace(namespace);
         requestHeader.setKey(key);
@@ -1327,9 +1371,8 @@ public class MQClientAPIImpl {
         }
     }
 
-
     public void deleteKVConfigValue(final String namespace, final String key, final long timeoutMillis)
-            throws RemotingException, MQClientException, InterruptedException {
+        throws RemotingException, MQClientException, InterruptedException {
         DeleteKVConfigRequestHeader requestHeader = new DeleteKVConfigRequestHeader();
         requestHeader.setNamespace(namespace);
         requestHeader.setKey(key);
@@ -1356,9 +1399,8 @@ public class MQClientAPIImpl {
         }
     }
 
-
     public KVTable getKVListByNamespace(final String namespace, final long timeoutMillis)
-            throws RemotingException, MQClientException, InterruptedException {
+        throws RemotingException, MQClientException, InterruptedException {
         GetKVListByNamespaceRequestHeader requestHeader = new GetKVListByNamespaceRequestHeader();
         requestHeader.setNamespace(namespace);
 
@@ -1377,17 +1419,15 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-
     public Map<MessageQueue, Long> invokeBrokerToResetOffset(final String addr, final String topic, final String group,
-                                                             final long timestamp, final boolean isForce, final long timeoutMillis)
-            throws RemotingException, MQClientException, InterruptedException {
+        final long timestamp, final boolean isForce, final long timeoutMillis)
+        throws RemotingException, MQClientException, InterruptedException {
         return invokeBrokerToResetOffset(addr, topic, group, timestamp, isForce, timeoutMillis, false);
     }
 
-
     public Map<MessageQueue, Long> invokeBrokerToResetOffset(final String addr, final String topic, final String group,
-                                                             final long timestamp, final boolean isForce, final long timeoutMillis, boolean isC)
-            throws RemotingException, MQClientException, InterruptedException {
+        final long timestamp, final boolean isForce, final long timeoutMillis, boolean isC)
+        throws RemotingException, MQClientException, InterruptedException {
         ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
         requestHeader.setTopic(topic);
         requestHeader.setGroup(group);
@@ -1400,7 +1440,7 @@ public class MQClientAPIImpl {
         }
 
         RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         assert response != null;
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
@@ -1416,9 +1456,8 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-
     public Map<String, Map<MessageQueue, Long>> invokeBrokerToGetConsumerStatus(final String addr, final String topic, final String group,
-                                                                                final String clientAddr, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
+        final String clientAddr, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
         GetConsumerStatusRequestHeader requestHeader = new GetConsumerStatusRequestHeader();
         requestHeader.setTopic(topic);
         requestHeader.setGroup(group);
@@ -1427,7 +1466,7 @@ public class MQClientAPIImpl {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_GET_CONSUMER_STATUS, requestHeader);
 
         RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         assert response != null;
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
@@ -1443,17 +1482,16 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-
     public GroupList queryTopicConsumeByWho(final String addr, final String topic, final long timeoutMillis)
-            throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
-            MQBrokerException {
+        throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
+        MQBrokerException {
         QueryTopicConsumeByWhoRequestHeader requestHeader = new QueryTopicConsumeByWhoRequestHeader();
         requestHeader.setTopic(topic);
 
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_TOPIC_CONSUME_BY_WHO, requestHeader);
 
         RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
                 GroupList groupList = GroupList.decode(response.getBody(), GroupList.class);
@@ -1466,10 +1504,9 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-
     public List<QueueTimeSpan> queryConsumeTimeSpan(final String addr, final String topic, final String group, final long timeoutMillis)
-            throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
-            MQBrokerException {
+        throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
+        MQBrokerException {
         QueryConsumeTimeSpanRequestHeader requestHeader = new QueryConsumeTimeSpanRequestHeader();
         requestHeader.setTopic(topic);
         requestHeader.setGroup(group);
@@ -1477,7 +1514,7 @@ public class MQClientAPIImpl {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUME_TIME_SPAN, requestHeader);
 
         RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
                 QueryConsumeTimeSpanBody consumeTimeSpanBody = GroupList.decode(response.getBody(), QueryConsumeTimeSpanBody.class);
@@ -1490,9 +1527,8 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-
     public TopicList getTopicsByCluster(final String cluster, final long timeoutMillis)
-            throws RemotingException, MQClientException, InterruptedException {
+        throws RemotingException, MQClientException, InterruptedException {
         GetTopicsByClusterRequestHeader requestHeader = new GetTopicsByClusterRequestHeader();
         requestHeader.setCluster(cluster);
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TOPICS_BY_CLUSTER, requestHeader);
@@ -1514,15 +1550,14 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-
     public void registerMessageFilterClass(final String addr, //
-                                           final String consumerGroup, //
-                                           final String topic, //
-                                           final String className, //
-                                           final int classCRC, //
-                                           final byte[] classBody, //
-                                           final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
-            InterruptedException, MQBrokerException {
+        final String consumerGroup, //
+        final String topic, //
+        final String className, //
+        final int classCRC, //
+        final byte[] classBody, //
+        final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
+        InterruptedException, MQBrokerException {
         RegisterMessageFilterClassRequestHeader requestHeader = new RegisterMessageFilterClassRequestHeader();
         requestHeader.setConsumerGroup(consumerGroup);
         requestHeader.setClassName(className);
@@ -1543,7 +1578,6 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-
     public TopicList getSystemTopicList(final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS, null);
 
@@ -1555,7 +1589,7 @@ public class MQClientAPIImpl {
                 if (body != null) {
                     TopicList topicList = TopicList.decode(response.getBody(), TopicList.class);
                     if (topicList.getTopicList() != null && !topicList.getTopicList().isEmpty()
-                            && !UtilAll.isBlank(topicList.getBrokerAddr())) {
+                        && !UtilAll.isBlank(topicList.getBrokerAddr())) {
                         TopicList tmp = getSystemTopicListFromBroker(topicList.getBrokerAddr(), timeoutMillis);
                         if (tmp.getTopicList() != null && !tmp.getTopicList().isEmpty()) {
                             topicList.getTopicList().addAll(tmp.getTopicList());
@@ -1571,13 +1605,12 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-
     public TopicList getSystemTopicListFromBroker(final String addr, final long timeoutMillis)
-            throws RemotingException, MQClientException, InterruptedException {
+        throws RemotingException, MQClientException, InterruptedException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_BROKER, null);
 
         RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         assert response != null;
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
@@ -1594,12 +1627,11 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-
     public boolean cleanExpiredConsumeQueue(final String addr, long timeoutMillis) throws MQClientException, RemotingConnectException,
-            RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
+        RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CLEAN_EXPIRED_CONSUMEQUEUE, null);
         RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
                 return true;
@@ -1611,12 +1643,11 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-
     public boolean cleanUnusedTopicByAddr(final String addr, long timeoutMillis) throws MQClientException, RemotingConnectException,
-            RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
+        RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CLEAN_UNUSED_TOPIC, null);
         RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
                 return true;
@@ -1629,7 +1660,7 @@ public class MQClientAPIImpl {
     }
 
     public ConsumerRunningInfo getConsumerRunningInfo(final String addr, String consumerGroup, String clientId, boolean jstack,
-                                                      final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
+        final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
         GetConsumerRunningInfoRequestHeader requestHeader = new GetConsumerRunningInfoRequestHeader();
         requestHeader.setConsumerGroup(consumerGroup);
         requestHeader.setClientId(clientId);
@@ -1638,7 +1669,7 @@ public class MQClientAPIImpl {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_RUNNING_INFO, requestHeader);
 
         RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         assert response != null;
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
@@ -1656,10 +1687,10 @@ public class MQClientAPIImpl {
     }
 
     public ConsumeMessageDirectlyResult consumeMessageDirectly(final String addr, //
-                                                               String consumerGroup, //
-                                                               String clientId, //
-                                                               String msgId, //
-                                                               final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
+        String consumerGroup, //
+        String clientId, //
+        String msgId, //
+        final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
         ConsumeMessageDirectlyResultRequestHeader requestHeader = new ConsumeMessageDirectlyResultRequestHeader();
         requestHeader.setConsumerGroup(consumerGroup);
         requestHeader.setClientId(clientId);
@@ -1668,7 +1699,7 @@ public class MQClientAPIImpl {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUME_MESSAGE_DIRECTLY, requestHeader);
 
         RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         assert response != null;
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
@@ -1686,8 +1717,8 @@ public class MQClientAPIImpl {
     }
 
     public Map<Integer, Long> queryCorrectionOffset(final String addr, final String topic, final String group, Set<String> filterGroup,
-                                                    long timeoutMillis) throws MQClientException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
-            InterruptedException {
+        long timeoutMillis) throws MQClientException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
+        InterruptedException {
         QueryCorrectionOffsetHeader requestHeader = new QueryCorrectionOffsetHeader();
         requestHeader.setCompareGroup(group);
         requestHeader.setTopic(topic);
@@ -1703,7 +1734,7 @@ public class MQClientAPIImpl {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CORRECTION_OFFSET, requestHeader);
 
         RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         assert response != null;
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
@@ -1720,7 +1751,7 @@ public class MQClientAPIImpl {
     }
 
     public TopicList getUnitTopicList(final boolean containRetry, final long timeoutMillis)
-            throws RemotingException, MQClientException, InterruptedException {
+        throws RemotingException, MQClientException, InterruptedException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_UNIT_TOPIC_LIST, null);
 
         RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
@@ -1749,9 +1780,8 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-
     public TopicList getHasUnitSubTopicList(final boolean containRetry, final long timeoutMillis)
-            throws RemotingException, MQClientException, InterruptedException {
+        throws RemotingException, MQClientException, InterruptedException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST, null);
 
         RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
@@ -1779,9 +1809,8 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-
     public TopicList getHasUnitSubUnUnitTopicList(final boolean containRetry, final long timeoutMillis)
-            throws RemotingException, MQClientException, InterruptedException {
+        throws RemotingException, MQClientException, InterruptedException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST, null);
 
         RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
@@ -1809,9 +1838,8 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-
     public void cloneGroupOffset(final String addr, final String srcGroup, final String destGroup, final String topic,
-                                 final boolean isOffline, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
+        final boolean isOffline, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
         CloneGroupOffsetRequestHeader requestHeader = new CloneGroupOffsetRequestHeader();
         requestHeader.setSrcGroup(srcGroup);
         requestHeader.setDestGroup(destGroup);
@@ -1820,7 +1848,7 @@ public class MQClientAPIImpl {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CLONE_GROUP_OFFSET, requestHeader);
 
         RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         assert response != null;
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
@@ -1833,10 +1861,9 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-
     public BrokerStatsData viewBrokerStatsData(String brokerAddr, String statsName, String statsKey, long timeoutMillis)
-            throws MQClientException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
-            InterruptedException {
+        throws MQClientException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
+        InterruptedException {
         ViewBrokerStatsDataRequestHeader requestHeader = new ViewBrokerStatsDataRequestHeader();
         requestHeader.setStatsName(statsName);
         requestHeader.setStatsKey(statsKey);
@@ -1844,7 +1871,7 @@ public class MQClientAPIImpl {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.VIEW_BROKER_STATS_DATA, requestHeader);
 
         RemotingCommand response = this.remotingClient
-                .invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), brokerAddr), request, timeoutMillis);
+            .invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), brokerAddr), request, timeoutMillis);
         assert response != null;
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
@@ -1860,23 +1887,21 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-
     public Set<String> getClusterList(String topic, long timeoutMillis) throws MQClientException, RemotingConnectException,
-            RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
+        RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
         // todo:jodie
         return Collections.EMPTY_SET;
     }
 
-
     public ConsumeStatsList fetchConsumeStatsInBroker(String brokerAddr, boolean isOrder, long timeoutMillis) throws MQClientException,
-            RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
+        RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
         GetConsumeStatsInBrokerHeader requestHeader = new GetConsumeStatsInBrokerHeader();
         requestHeader.setIsOrder(isOrder);
 
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CONSUME_STATS, requestHeader);
 
         RemotingCommand response = this.remotingClient
-                .invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), brokerAddr), request, timeoutMillis);
+            .invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), brokerAddr), request, timeoutMillis);
         assert response != null;
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
@@ -1892,12 +1917,11 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-
     public SubscriptionGroupWrapper getAllSubscriptionGroup(final String brokerAddr, long timeoutMillis) throws InterruptedException,
-            RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+        RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null);
         RemotingCommand response = this.remotingClient
-                .invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), brokerAddr), request, timeoutMillis);
+            .invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), brokerAddr), request, timeoutMillis);
         assert response != null;
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
@@ -1909,13 +1933,12 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-
   

<TRUNCATED>