You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 08:13:45 UTC

[20/34] incubator-rocketmq git commit: ROCKETMQ-18 Reformat all codes.

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 6e0e379..42b7a82 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -16,11 +16,40 @@
  */
 package org.apache.rocketmq.client.impl.factory;
 
+import java.io.UnsupportedEncodingException;
+import java.net.DatagramSocket;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.admin.MQAdminExtInner;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.impl.*;
+import org.apache.rocketmq.client.impl.ClientRemotingProcessor;
+import org.apache.rocketmq.client.impl.FindBrokerResult;
+import org.apache.rocketmq.client.impl.MQAdminImpl;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl;
+import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
+import org.apache.rocketmq.client.impl.consumer.MQConsumerInner;
+import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
+import org.apache.rocketmq.client.impl.consumer.PullMessageService;
+import org.apache.rocketmq.client.impl.consumer.RebalanceService;
 import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
 import org.apache.rocketmq.client.impl.producer.MQProducerInner;
 import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
@@ -36,7 +65,11 @@ import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
 import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
-import org.apache.rocketmq.common.protocol.heartbeat.*;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
+import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
+import org.apache.rocketmq.common.protocol.heartbeat.ProducerData;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
@@ -45,19 +78,8 @@ import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import org.apache.rocketmq.client.impl.consumer.*;
 import org.slf4j.Logger;
 
-import java.io.UnsupportedEncodingException;
-import java.net.DatagramSocket;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-
 public class MQClientInstance {
     private final static long LOCK_TIMEOUT_MILLIS = 3000;
     private final Logger log = ClientLogger.getLog();
@@ -75,7 +97,7 @@ public class MQClientInstance {
     private final Lock lockNamesrv = new ReentrantLock();
     private final Lock lockHeartbeat = new ReentrantLock();
     private final ConcurrentHashMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
-            new ConcurrentHashMap<String, HashMap<Long, String>>();
+        new ConcurrentHashMap<String, HashMap<Long, String>>();
     private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
         @Override
         public Thread newThread(Runnable r) {
@@ -92,12 +114,10 @@ public class MQClientInstance {
     private DatagramSocket datagramSocket;
     private Random random = new Random();
 
-
     public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId) {
         this(clientConfig, instanceIndex, clientId, null);
     }
 
-
     public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
         this.clientConfig = clientConfig;
         this.instanceIndex = instanceIndex;
@@ -125,10 +145,74 @@ public class MQClientInstance {
         this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);
 
         log.info("created a new client Instance, FactoryIndex: {} ClinetID: {} {} {}, serializeType={}", //
-                this.instanceIndex, //
-                this.clientId, //
-                this.clientConfig, //
-                MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer());
+            this.instanceIndex, //
+            this.clientId, //
+            this.clientConfig, //
+            MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer());
+    }
+
+    public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
+        TopicPublishInfo info = new TopicPublishInfo();
+        info.setTopicRouteData(route);
+        if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {
+            String[] brokers = route.getOrderTopicConf().split(";");
+            for (String broker : brokers) {
+                String[] item = broker.split(":");
+                int nums = Integer.parseInt(item[1]);
+                for (int i = 0; i < nums; i++) {
+                    MessageQueue mq = new MessageQueue(topic, item[0], i);
+                    info.getMessageQueueList().add(mq);
+                }
+            }
+
+            info.setOrderTopic(true);
+        } else {
+            List<QueueData> qds = route.getQueueDatas();
+            Collections.sort(qds);
+            for (QueueData qd : qds) {
+                if (PermName.isWriteable(qd.getPerm())) {
+                    BrokerData brokerData = null;
+                    for (BrokerData bd : route.getBrokerDatas()) {
+                        if (bd.getBrokerName().equals(qd.getBrokerName())) {
+                            brokerData = bd;
+                            break;
+                        }
+                    }
+
+                    if (null == brokerData) {
+                        continue;
+                    }
+
+                    if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
+                        continue;
+                    }
+
+                    for (int i = 0; i < qd.getWriteQueueNums(); i++) {
+                        MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
+                        info.getMessageQueueList().add(mq);
+                    }
+                }
+            }
+
+            info.setOrderTopic(false);
+        }
+
+        return info;
+    }
+
+    public static Set<MessageQueue> topicRouteData2TopicSubscribeInfo(final String topic, final TopicRouteData route) {
+        Set<MessageQueue> mqList = new HashSet<MessageQueue>();
+        List<QueueData> qds = route.getQueueDatas();
+        for (QueueData qd : qds) {
+            if (PermName.isReadable(qd.getPerm())) {
+                for (int i = 0; i < qd.getReadQueueNums(); i++) {
+                    MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
+                    mqList.add(mq);
+                }
+            }
+        }
+
+        return mqList;
     }
 
     public void start() throws MQClientException {
@@ -166,7 +250,6 @@ public class MQClientInstance {
         }
     }
 
-
     private void startScheduledTask() {
         if (null == this.clientConfig.getNamesrvAddr()) {
             this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@@ -353,7 +436,7 @@ public class MQClientInstance {
             if (impl != null) {
                 try {
                     if (impl instanceof DefaultMQPushConsumerImpl) {
-                        DefaultMQPushConsumerImpl dmq = (DefaultMQPushConsumerImpl) impl;
+                        DefaultMQPushConsumerImpl dmq = (DefaultMQPushConsumerImpl)impl;
                         dmq.adjustThreadPool();
                     }
                 } catch (Exception e) {
@@ -420,7 +503,7 @@ public class MQClientInstance {
                                 log.error("send heart beat to broker exception", e);
                             } else {
                                 log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
-                                        id, addr);
+                                    id, addr);
                             }
                         }
                     }
@@ -460,7 +543,7 @@ public class MQClientInstance {
                     TopicRouteData topicRouteData;
                     if (isDefault && defaultMQProducer != null) {
                         topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
-                                1000 * 3);
+                            1000 * 3);
                         if (topicRouteData != null) {
                             for (QueueData data : topicRouteData.getQueueDatas()) {
                                 int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
@@ -559,7 +642,6 @@ public class MQClientInstance {
             }
         }
 
-
         // Producer
         for (Map.Entry<String/* group */, MQProducerInner> entry : this.producerTable.entrySet()) {
             MQProducerInner impl = entry.getValue();
@@ -590,7 +672,7 @@ public class MQClientInstance {
     }
 
     private void uploadFilterClassToAllFilterServer(final String consumerGroup, final String fullClassName, final String topic,
-                                                    final String filterClassSource) throws UnsupportedEncodingException {
+        final String filterClassSource) throws UnsupportedEncodingException {
         byte[] classBody = null;
         int classCRC = 0;
         try {
@@ -598,13 +680,13 @@ public class MQClientInstance {
             classCRC = UtilAll.crc32(classBody);
         } catch (Exception e1) {
             log.warn("uploadFilterClassToAllFilterServer Exception, ClassName: {} {}", //
-                    fullClassName, //
-                    RemotingHelper.exceptionSimpleDesc(e1));
+                fullClassName, //
+                RemotingHelper.exceptionSimpleDesc(e1));
         }
 
         TopicRouteData topicRouteData = this.topicRouteTable.get(topic);
         if (topicRouteData != null //
-                && topicRouteData.getFilterServerTable() != null && !topicRouteData.getFilterServerTable().isEmpty()) {
+            && topicRouteData.getFilterServerTable() != null && !topicRouteData.getFilterServerTable().isEmpty()) {
             Iterator<Entry<String, List<String>>> it = topicRouteData.getFilterServerTable().entrySet().iterator();
             while (it.hasNext()) {
                 Entry<String, List<String>> next = it.next();
@@ -612,10 +694,10 @@ public class MQClientInstance {
                 for (final String fsAddr : value) {
                     try {
                         this.mQClientAPIImpl.registerMessageFilterClass(fsAddr, consumerGroup, topic, fullClassName, classCRC, classBody,
-                                5000);
+                            5000);
 
                         log.info("register message class filter to {} OK, ConsumerGroup: {} Topic: {} ClassName: {}", fsAddr, consumerGroup,
-                                topic, fullClassName);
+                            topic, fullClassName);
 
                     } catch (Exception e) {
                         log.error("uploadFilterClassToAllFilterServer Exception", e);
@@ -624,7 +706,7 @@ public class MQClientInstance {
             }
         } else {
             log.warn("register message class filter failed, because no filter server, ConsumerGroup: {} Topic: {} ClassName: {}",
-                    consumerGroup, topic, fullClassName);
+                consumerGroup, topic, fullClassName);
         }
     }
 
@@ -668,70 +750,6 @@ public class MQClientInstance {
         return result;
     }
 
-    public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
-        TopicPublishInfo info = new TopicPublishInfo();
-        info.setTopicRouteData(route);
-        if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {
-            String[] brokers = route.getOrderTopicConf().split(";");
-            for (String broker : brokers) {
-                String[] item = broker.split(":");
-                int nums = Integer.parseInt(item[1]);
-                for (int i = 0; i < nums; i++) {
-                    MessageQueue mq = new MessageQueue(topic, item[0], i);
-                    info.getMessageQueueList().add(mq);
-                }
-            }
-
-            info.setOrderTopic(true);
-        } else {
-            List<QueueData> qds = route.getQueueDatas();
-            Collections.sort(qds);
-            for (QueueData qd : qds) {
-                if (PermName.isWriteable(qd.getPerm())) {
-                    BrokerData brokerData = null;
-                    for (BrokerData bd : route.getBrokerDatas()) {
-                        if (bd.getBrokerName().equals(qd.getBrokerName())) {
-                            brokerData = bd;
-                            break;
-                        }
-                    }
-
-                    if (null == brokerData) {
-                        continue;
-                    }
-
-                    if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
-                        continue;
-                    }
-
-                    for (int i = 0; i < qd.getWriteQueueNums(); i++) {
-                        MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
-                        info.getMessageQueueList().add(mq);
-                    }
-                }
-            }
-
-            info.setOrderTopic(false);
-        }
-
-        return info;
-    }
-
-    public static Set<MessageQueue> topicRouteData2TopicSubscribeInfo(final String topic, final TopicRouteData route) {
-        Set<MessageQueue> mqList = new HashSet<MessageQueue>();
-        List<QueueData> qds = route.getQueueDatas();
-        for (QueueData qd : qds) {
-            if (PermName.isReadable(qd.getPerm())) {
-                for (int i = 0; i < qd.getReadQueueNums(); i++) {
-                    MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
-                    mqList.add(mq);
-                }
-            }
-        }
-
-        return mqList;
-    }
-
     public void shutdown() {
         // Consumer
         if (!this.consumerTable.isEmpty())
@@ -824,7 +842,7 @@ public class MQClientInstance {
                         try {
                             this.mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup, consumerGroup, 3000);
                             log.info("unregister client[Producer: {} Consumer: {}] from broker[{} {} {}] success", producerGroup,
-                                    consumerGroup, brokerName, entry1.getKey(), addr);
+                                consumerGroup, brokerName, entry1.getKey(), addr);
                         } catch (RemotingException e) {
                             log.error("unregister client exception from broker: " + addr, e);
                         } catch (MQBrokerException e) {
@@ -942,9 +960,9 @@ public class MQClientInstance {
     }
 
     public FindBrokerResult findBrokerAddressInSubscribe(//
-                                                         final String brokerName, //
-                                                         final long brokerId, //
-                                                         final boolean onlyThisBroker//
+        final String brokerName, //
+        final long brokerId, //
+        final boolean onlyThisBroker//
     ) {
         String brokerAddr = null;
         boolean slave = false;
@@ -1008,7 +1026,7 @@ public class MQClientInstance {
         try {
             MQConsumerInner impl = this.consumerTable.get(group);
             if (impl != null && impl instanceof DefaultMQPushConsumerImpl) {
-                consumer = (DefaultMQPushConsumerImpl) impl;
+                consumer = (DefaultMQPushConsumerImpl)impl;
             } else {
                 log.info("[reset-offset] consumer dose not exist. group={}", group);
                 return;
@@ -1053,10 +1071,10 @@ public class MQClientInstance {
     public Map<MessageQueue, Long> getConsumerStatus(String topic, String group) {
         MQConsumerInner impl = this.consumerTable.get(group);
         if (impl != null && impl instanceof DefaultMQPushConsumerImpl) {
-            DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl) impl;
+            DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl)impl;
             return consumer.getOffsetStore().cloneOffsetTable(topic);
         } else if (impl != null && impl instanceof DefaultMQPullConsumerImpl) {
-            DefaultMQPullConsumerImpl consumer = (DefaultMQPullConsumerImpl) impl;
+            DefaultMQPullConsumerImpl consumer = (DefaultMQPullConsumerImpl)impl;
             return consumer.getOffsetStore().cloneOffsetTable(topic);
         } else {
             return Collections.EMPTY_MAP;
@@ -1096,11 +1114,11 @@ public class MQClientInstance {
     }
 
     public ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, //
-                                                               final String consumerGroup, //
-                                                               final String brokerName) {
+        final String consumerGroup, //
+        final String brokerName) {
         MQConsumerInner mqConsumerInner = this.consumerTable.get(consumerGroup);
         if (null != mqConsumerInner) {
-            DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl) mqConsumerInner;
+            DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl)mqConsumerInner;
 
             ConsumeMessageDirectlyResult result = consumer.getConsumeMessageService().consumeMessageDirectly(msg, brokerName);
             return result;
@@ -1109,7 +1127,6 @@ public class MQClientInstance {
         return null;
     }
 
-
     public ConsumerRunningInfo consumerRunningInfo(final String consumerGroup) {
         MQConsumerInner mqConsumerInner = this.consumerTable.get(consumerGroup);
 
@@ -1128,12 +1145,11 @@ public class MQClientInstance {
         consumerRunningInfo.getProperties().put(ConsumerRunningInfo.PROP_NAMESERVER_ADDR, nsAddr);
         consumerRunningInfo.getProperties().put(ConsumerRunningInfo.PROP_CONSUME_TYPE, mqConsumerInner.consumeType().name());
         consumerRunningInfo.getProperties().put(ConsumerRunningInfo.PROP_CLIENT_VERSION,
-                MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION));
+            MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION));
 
         return consumerRunningInfo;
     }
 
-
     public ConsumerStatsManager getConsumerStatsManager() {
         return consumerStatsManager;
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index b53fa19..42bf360 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -16,6 +16,20 @@
  */
 package org.apache.rocketmq.client.impl.producer;
 
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.client.QueryResult;
 import org.apache.rocketmq.client.Validators;
 import org.apache.rocketmq.client.common.ClientErrorCode;
@@ -30,11 +44,29 @@ import org.apache.rocketmq.client.impl.MQClientManager;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
 import org.apache.rocketmq.client.latency.MQFaultStrategy;
 import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
+import org.apache.rocketmq.client.producer.LocalTransactionState;
+import org.apache.rocketmq.client.producer.MessageQueueSelector;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.client.producer.TransactionCheckListener;
+import org.apache.rocketmq.client.producer.TransactionMQProducer;
+import org.apache.rocketmq.client.producer.TransactionSendResult;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.ServiceState;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.help.FAQUrl;
-import org.apache.rocketmq.common.message.*;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageClientIDSetter;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageId;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.message.MessageType;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
 import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
@@ -45,21 +77,14 @@ import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingConnectException;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
-import org.apache.rocketmq.client.producer.*;
 import org.slf4j.Logger;
 
-import java.io.IOException;
-import java.net.UnknownHostException;
-import java.util.*;
-import java.util.concurrent.*;
-
-
 public class DefaultMQProducerImpl implements MQProducerInner {
     private final Logger log = ClientLogger.getLog();
     private final Random random = new Random();
     private final DefaultMQProducer defaultMQProducer;
     private final ConcurrentHashMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =
-            new ConcurrentHashMap<String, TopicPublishInfo>();
+        new ConcurrentHashMap<String, TopicPublishInfo>();
     private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
     private final RPCHook rpcHook;
     protected BlockingQueue<Runnable> checkRequestQueue;
@@ -71,12 +96,10 @@ public class DefaultMQProducerImpl implements MQProducerInner {
 
     private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();
 
-
     public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer) {
         this(defaultMQProducer, null);
     }
 
-
     public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
         this.defaultMQProducer = defaultMQProducer;
         this.rpcHook = rpcHook;
@@ -85,18 +108,18 @@ public class DefaultMQProducerImpl implements MQProducerInner {
     public void registerCheckForbiddenHook(CheckForbiddenHook checkForbiddenHook) {
         this.checkForbiddenHookList.add(checkForbiddenHook);
         log.info("register a new checkForbiddenHook. hookName={}, allHookSize={}", checkForbiddenHook.hookName(),
-                checkForbiddenHookList.size());
+            checkForbiddenHookList.size());
     }
 
     public void initTransactionEnv() {
-        TransactionMQProducer producer = (TransactionMQProducer) this.defaultMQProducer;
+        TransactionMQProducer producer = (TransactionMQProducer)this.defaultMQProducer;
         this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(producer.getCheckRequestHoldMax());
         this.checkExecutor = new ThreadPoolExecutor(//
-                producer.getCheckThreadPoolMinSize(), //
-                producer.getCheckThreadPoolMaxSize(), //
-                1000 * 60, //
-                TimeUnit.MILLISECONDS, //
-                this.checkRequestQueue);
+            producer.getCheckThreadPoolMinSize(), //
+            producer.getCheckThreadPoolMaxSize(), //
+            1000 * 60, //
+            TimeUnit.MILLISECONDS, //
+            this.checkRequestQueue);
     }
 
     public void destroyTransactionEnv() {
@@ -130,8 +153,8 @@ public class DefaultMQProducerImpl implements MQProducerInner {
                 if (!registerOK) {
                     this.serviceState = ServiceState.CREATE_JUST;
                     throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
-                            + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
-                            null);
+                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
+                        null);
                 }
 
                 this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
@@ -141,16 +164,16 @@ public class DefaultMQProducerImpl implements MQProducerInner {
                 }
 
                 log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
-                        this.defaultMQProducer.isSendMessageWithVIPChannel());
+                    this.defaultMQProducer.isSendMessageWithVIPChannel());
                 this.serviceState = ServiceState.RUNNING;
                 break;
             case RUNNING:
             case START_FAILED:
             case SHUTDOWN_ALREADY:
                 throw new MQClientException("The producer service state not OK, maybe started once, "//
-                        + this.serviceState//
-                        + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
-                        null);
+                    + this.serviceState//
+                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
+                    null);
             default:
                 break;
         }
@@ -167,7 +190,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
 
         if (this.defaultMQProducer.getProducerGroup().equals(MixAll.DEFAULT_PRODUCER_GROUP)) {
             throw new MQClientException("producerGroup can not equal " + MixAll.DEFAULT_PRODUCER_GROUP + ", please specify another one.",
-                    null);
+                null);
         }
     }
 
@@ -215,7 +238,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
     @Override
     public TransactionCheckListener checkListener() {
         if (this.defaultMQProducer instanceof TransactionMQProducer) {
-            TransactionMQProducer producer = (TransactionMQProducer) defaultMQProducer;
+            TransactionMQProducer producer = (TransactionMQProducer)defaultMQProducer;
             return producer.getTransactionCheckListener();
         }
 
@@ -230,7 +253,6 @@ public class DefaultMQProducerImpl implements MQProducerInner {
             private final CheckTransactionStateRequestHeader checkRequestHeader = header;
             private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup();
 
-
             @Override
             public void run() {
                 TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
@@ -245,19 +267,18 @@ public class DefaultMQProducerImpl implements MQProducerInner {
                     }
 
                     this.processTransactionState(//
-                            localTransactionState, //
-                            group, //
-                            exception);
+                        localTransactionState, //
+                        group, //
+                        exception);
                 } else {
                     log.warn("checkTransactionState, pick transactionCheckListener by group[{}] failed", group);
                 }
             }
 
-
             private void processTransactionState(//
-                                                 final LocalTransactionState localTransactionState, //
-                                                 final String producerGroup, //
-                                                 final Throwable exception) {
+                final LocalTransactionState localTransactionState, //
+                final String producerGroup, //
+                final Throwable exception) {
                 final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();
                 thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());
                 thisHeader.setProducerGroup(producerGroup);
@@ -293,7 +314,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
 
                 try {
                     DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,
-                            3000);
+                        3000);
                 } catch (Exception e) {
                     log.error("endTransactionOneway exception", e);
                 }
@@ -332,9 +353,9 @@ public class DefaultMQProducerImpl implements MQProducerInner {
     private void makeSureStateOK() throws MQClientException {
         if (this.serviceState != ServiceState.RUNNING) {
             throw new MQClientException("The producer service state not OK, "//
-                    + this.serviceState//
-                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
-                    null);
+                + this.serviceState//
+                + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
+                null);
         }
     }
 
@@ -370,13 +391,13 @@ public class DefaultMQProducerImpl implements MQProducerInner {
     }
 
     public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
-            throws MQClientException, InterruptedException {
+        throws MQClientException, InterruptedException {
         this.makeSureStateOK();
         return this.mQClientFactory.getMQAdminImpl().queryMessage(topic, key, maxNum, begin, end);
     }
 
     public MessageExt queryMessageByUniqKey(String topic, String uniqKey)
-            throws MQClientException, InterruptedException {
+        throws MQClientException, InterruptedException {
         this.makeSureStateOK();
         return this.mQClientFactory.getMQAdminImpl().queryMessageByUniqKey(topic, uniqKey);
     }
@@ -389,7 +410,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
     }
 
     public void send(Message msg, SendCallback sendCallback, long timeout)
-            throws MQClientException, RemotingException, InterruptedException {
+        throws MQClientException, RemotingException, InterruptedException {
         try {
             this.sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout);
         } catch (MQBrokerException e) {
@@ -406,10 +427,10 @@ public class DefaultMQProducerImpl implements MQProducerInner {
     }
 
     private SendResult sendDefaultImpl(//
-                                       Message msg, //
-                                       final CommunicationMode communicationMode, //
-                                       final SendCallback sendCallback, //
-                                       final long timeout//
+        Message msg, //
+        final CommunicationMode communicationMode, //
+        final SendCallback sendCallback, //
+        final long timeout//
     ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
         this.makeSureStateOK();
         Validators.checkMessage(msg, this.defaultMQProducer);
@@ -508,16 +529,16 @@ public class DefaultMQProducerImpl implements MQProducerInner {
             }
 
             String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
-                    times,
-                    System.currentTimeMillis() - beginTimestampFirst,
-                    msg.getTopic(),
-                    Arrays.toString(brokersSent));
+                times,
+                System.currentTimeMillis() - beginTimestampFirst,
+                msg.getTopic(),
+                Arrays.toString(brokersSent));
 
             info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
 
             MQClientException mqClientException = new MQClientException(info, exception);
             if (exception instanceof MQBrokerException) {
-                mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
+                mqClientException.setResponseCode(((MQBrokerException)exception).getResponseCode());
             } else if (exception instanceof RemotingConnectException) {
                 mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
             } else if (exception instanceof RemotingTimeoutException) {
@@ -532,11 +553,11 @@ public class DefaultMQProducerImpl implements MQProducerInner {
         List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
         if (null == nsList || nsList.isEmpty()) {
             throw new MQClientException(
-                    "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
+                "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
         }
 
         throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
-                null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
+            null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
     }
 
     private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
@@ -557,11 +578,11 @@ public class DefaultMQProducerImpl implements MQProducerInner {
     }
 
     private SendResult sendKernelImpl(final Message msg, //
-                                      final MessageQueue mq, //
-                                      final CommunicationMode communicationMode, //
-                                      final SendCallback sendCallback, //
-                                      final TopicPublishInfo topicPublishInfo, //
-                                      final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        final MessageQueue mq, //
+        final CommunicationMode communicationMode, //
+        final SendCallback sendCallback, //
+        final TopicPublishInfo topicPublishInfo, //
+        final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
         String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
         if (null == brokerAddr) {
             tryToFindTopicPublishInfo(mq.getTopic());
@@ -649,30 +670,30 @@ public class DefaultMQProducerImpl implements MQProducerInner {
                 switch (communicationMode) {
                     case ASYNC:
                         sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(//
-                                brokerAddr, // 1
-                                mq.getBrokerName(), // 2
-                                msg, // 3
-                                requestHeader, // 4
-                                timeout, // 5
-                                communicationMode, // 6
-                                sendCallback, // 7
-                                topicPublishInfo, // 8
-                                this.mQClientFactory, // 9
-                                this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), // 10
-                                context, //
-                                this);
+                            brokerAddr, // 1
+                            mq.getBrokerName(), // 2
+                            msg, // 3
+                            requestHeader, // 4
+                            timeout, // 5
+                            communicationMode, // 6
+                            sendCallback, // 7
+                            topicPublishInfo, // 8
+                            this.mQClientFactory, // 9
+                            this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), // 10
+                            context, //
+                            this);
                         break;
                     case ONEWAY:
                     case SYNC:
                         sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
-                                brokerAddr,
-                                mq.getBrokerName(),
-                                msg,
-                                requestHeader,
-                                timeout,
-                                communicationMode,
-                                context,
-                                this);
+                            brokerAddr,
+                            mq.getBrokerName(),
+                            msg,
+                            requestHeader,
+                            timeout,
+                            communicationMode,
+                            context,
+                            this);
                         break;
                     default:
                         assert false;
@@ -790,12 +811,12 @@ public class DefaultMQProducerImpl implements MQProducerInner {
      * KERNEL SYNC -------------------------------------------------------
      */
     public SendResult send(Message msg, MessageQueue mq)
-            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
         return send(msg, mq, this.defaultMQProducer.getSendMsgTimeout());
     }
 
     public SendResult send(Message msg, MessageQueue mq, long timeout)
-            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
         this.makeSureStateOK();
         Validators.checkMessage(msg, this.defaultMQProducer);
 
@@ -810,12 +831,12 @@ public class DefaultMQProducerImpl implements MQProducerInner {
      * KERNEL ASYNC -------------------------------------------------------
      */
     public void send(Message msg, MessageQueue mq, SendCallback sendCallback)
-            throws MQClientException, RemotingException, InterruptedException {
+        throws MQClientException, RemotingException, InterruptedException {
         send(msg, mq, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
     }
 
     public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout)
-            throws MQClientException, RemotingException, InterruptedException {
+        throws MQClientException, RemotingException, InterruptedException {
         this.makeSureStateOK();
         Validators.checkMessage(msg, this.defaultMQProducer);
 
@@ -848,21 +869,21 @@ public class DefaultMQProducerImpl implements MQProducerInner {
      * SELECT SYNC -------------------------------------------------------
      */
     public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
-            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
         return send(msg, selector, arg, this.defaultMQProducer.getSendMsgTimeout());
     }
 
     public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)
-            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
         return this.sendSelectImpl(msg, selector, arg, CommunicationMode.SYNC, null, timeout);
     }
 
     private SendResult sendSelectImpl(//
-                                      Message msg, //
-                                      MessageQueueSelector selector, //
-                                      Object arg, //
-                                      final CommunicationMode communicationMode, //
-                                      final SendCallback sendCallback, final long timeout//
+        Message msg, //
+        MessageQueueSelector selector, //
+        Object arg, //
+        final CommunicationMode communicationMode, //
+        final SendCallback sendCallback, final long timeout//
     ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
         this.makeSureStateOK();
         Validators.checkMessage(msg, this.defaultMQProducer);
@@ -890,12 +911,12 @@ public class DefaultMQProducerImpl implements MQProducerInner {
      * SELECT ASYNC -------------------------------------------------------
      */
     public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback)
-            throws MQClientException, RemotingException, InterruptedException {
+        throws MQClientException, RemotingException, InterruptedException {
         send(msg, selector, arg, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
     }
 
     public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout)
-            throws MQClientException, RemotingException, InterruptedException {
+        throws MQClientException, RemotingException, InterruptedException {
         try {
             this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, sendCallback, timeout);
         } catch (MQBrokerException e) {
@@ -907,7 +928,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
      * SELECT ONEWAY -------------------------------------------------------
      */
     public void sendOneway(Message msg, MessageQueueSelector selector, Object arg)
-            throws MQClientException, RemotingException, InterruptedException {
+        throws MQClientException, RemotingException, InterruptedException {
         try {
             this.sendSelectImpl(msg, selector, arg, CommunicationMode.ONEWAY, null, this.defaultMQProducer.getSendMsgTimeout());
         } catch (MQBrokerException e) {
@@ -916,7 +937,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
     }
 
     public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter tranExecuter, final Object arg)
-            throws MQClientException {
+        throws MQClientException {
         if (null == tranExecuter) {
             throw new MQClientException("tranExecutor is null", null);
         }
@@ -988,9 +1009,9 @@ public class DefaultMQProducerImpl implements MQProducerInner {
     }
 
     public void endTransaction(//
-                               final SendResult sendResult, //
-                               final LocalTransactionState localTransactionState, //
-                               final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
+        final SendResult sendResult, //
+        final LocalTransactionState localTransactionState, //
+        final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
         final MessageId id;
         if (sendResult.getOffsetMsgId() != null) {
             id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
@@ -1021,7 +1042,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
         requestHeader.setMsgId(sendResult.getMsgId());
         String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
         this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
-                this.defaultMQProducer.getSendMsgTimeout());
+            this.defaultMQProducer.getSendMsgTimeout());
     }
 
     public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
@@ -1036,17 +1057,14 @@ public class DefaultMQProducerImpl implements MQProducerInner {
         return zipCompressLevel;
     }
 
-
     public void setZipCompressLevel(int zipCompressLevel) {
         this.zipCompressLevel = zipCompressLevel;
     }
 
-
     public ServiceState getServiceState() {
         return serviceState;
     }
 
-
     public void setServiceState(ServiceState serviceState) {
         this.serviceState = serviceState;
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/producer/MQProducerInner.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/MQProducerInner.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/MQProducerInner.java
index cac77ae..cf61326 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/MQProducerInner.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/MQProducerInner.java
@@ -6,41 +6,34 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.client.impl.producer;
 
+import java.util.Set;
 import org.apache.rocketmq.client.producer.TransactionCheckListener;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
 
-import java.util.Set;
-
-
 public interface MQProducerInner {
     Set<String> getPublishTopicList();
 
-
     boolean isPublishTopicNeedUpdate(final String topic);
 
-
     TransactionCheckListener checkListener();
 
-
     void checkTransactionState(//
-                               final String addr, //
-                               final MessageExt msg, //
-                               final CheckTransactionStateRequestHeader checkRequestHeader);
-
+        final String addr, //
+        final MessageExt msg, //
+        final CheckTransactionStateRequestHeader checkRequestHeader);
 
     void updateTopicPublishInfo(final String topic, final TopicPublishInfo info);
 
-
     boolean isUnitMode();
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
index dca20cb..c6f9d45 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
@@ -16,15 +16,13 @@
  */
 package org.apache.rocketmq.client.impl.producer;
 
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.rocketmq.client.common.ThreadLocalIndex;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 
-import java.util.ArrayList;
-import java.util.List;
-
-
 public class TopicPublishInfo {
     private boolean orderTopic = false;
     private boolean haveTopicRouterInfo = false;
@@ -32,7 +30,6 @@ public class TopicPublishInfo {
     private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(0);
     private TopicRouteData topicRouteData;
 
-
     public boolean isOrderTopic() {
         return orderTopic;
     }
@@ -49,32 +46,26 @@ public class TopicPublishInfo {
         return messageQueueList;
     }
 
-
     public void setMessageQueueList(List<MessageQueue> messageQueueList) {
         this.messageQueueList = messageQueueList;
     }
 
-
     public ThreadLocalIndex getSendWhichQueue() {
         return sendWhichQueue;
     }
 
-
     public void setSendWhichQueue(ThreadLocalIndex sendWhichQueue) {
         this.sendWhichQueue = sendWhichQueue;
     }
 
-
     public boolean isHaveTopicRouterInfo() {
         return haveTopicRouterInfo;
     }
 
-
     public void setHaveTopicRouterInfo(boolean haveTopicRouterInfo) {
         this.haveTopicRouterInfo = haveTopicRouterInfo;
     }
 
-
     public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
         if (lastBrokerName == null) {
             return selectOneMessageQueue();
@@ -93,7 +84,6 @@ public class TopicPublishInfo {
         }
     }
 
-
     public MessageQueue selectOneMessageQueue() {
         int index = this.sendWhichQueue.getAndIncrement();
         int pos = Math.abs(index) % this.messageQueueList.size();
@@ -113,11 +103,10 @@ public class TopicPublishInfo {
         return -1;
     }
 
-
     @Override
     public String toString() {
         return "TopicPublishInfo [orderTopic=" + orderTopic + ", messageQueueList=" + messageQueueList
-                + ", sendWhichQueue=" + sendWhichQueue + ", haveTopicRouterInfo=" + haveTopicRouterInfo + "]";
+            + ", sendWhichQueue=" + sendWhichQueue + ", haveTopicRouterInfo=" + haveTopicRouterInfo + "]";
     }
 
     public TopicRouteData getTopicRouteData() {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java
index 12dac4b..b61d855 100644
--- a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java
@@ -17,13 +17,12 @@
 
 package org.apache.rocketmq.client.latency;
 
-import org.apache.rocketmq.client.common.ThreadLocalIndex;
-
 import java.util.Collections;
 import java.util.Enumeration;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
+import org.apache.rocketmq.client.common.ThreadLocalIndex;
 
 public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> {
     private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16);
@@ -89,6 +88,14 @@ public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String>
         return null;
     }
 
+    @Override
+    public String toString() {
+        return "LatencyFaultToleranceImpl{" +
+            "faultItemTable=" + faultItemTable +
+            ", whichItemWorst=" + whichItemWorst +
+            '}';
+    }
+
     class FaultItem implements Comparable<FaultItem> {
         private final String name;
         private volatile long currentLatency;
@@ -101,9 +108,11 @@ public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String>
         @Override
         public int compareTo(final FaultItem other) {
             if (this.isAvailable() != other.isAvailable()) {
-                if (this.isAvailable()) return -1;
+                if (this.isAvailable())
+                    return -1;
 
-                if (other.isAvailable()) return 1;
+                if (other.isAvailable())
+                    return 1;
             }
 
             if (this.currentLatency < other.currentLatency)
@@ -128,20 +137,24 @@ public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String>
         @Override
         public int hashCode() {
             int result = getName() != null ? getName().hashCode() : 0;
-            result = 31 * result + (int) (getCurrentLatency() ^ (getCurrentLatency() >>> 32));
-            result = 31 * result + (int) (getStartTimestamp() ^ (getStartTimestamp() >>> 32));
+            result = 31 * result + (int)(getCurrentLatency() ^ (getCurrentLatency() >>> 32));
+            result = 31 * result + (int)(getStartTimestamp() ^ (getStartTimestamp() >>> 32));
             return result;
         }
 
         @Override
         public boolean equals(final Object o) {
-            if (this == o) return true;
-            if (!(o instanceof FaultItem)) return false;
+            if (this == o)
+                return true;
+            if (!(o instanceof FaultItem))
+                return false;
 
-            final FaultItem faultItem = (FaultItem) o;
+            final FaultItem faultItem = (FaultItem)o;
 
-            if (getCurrentLatency() != faultItem.getCurrentLatency()) return false;
-            if (getStartTimestamp() != faultItem.getStartTimestamp()) return false;
+            if (getCurrentLatency() != faultItem.getCurrentLatency())
+                return false;
+            if (getStartTimestamp() != faultItem.getStartTimestamp())
+                return false;
             return getName() != null ? getName().equals(faultItem.getName()) : faultItem.getName() == null;
 
         }
@@ -149,10 +162,10 @@ public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String>
         @Override
         public String toString() {
             return "FaultItem{" +
-                    "name='" + name + '\'' +
-                    ", currentLatency=" + currentLatency +
-                    ", startTimestamp=" + startTimestamp +
-                    '}';
+                "name='" + name + '\'' +
+                ", currentLatency=" + currentLatency +
+                ", startTimestamp=" + startTimestamp +
+                '}';
         }
 
         public String getName() {
@@ -175,14 +188,5 @@ public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String>
             this.startTimestamp = startTimestamp;
         }
 
-
-    }
-
-    @Override
-    public String toString() {
-        return "LatencyFaultToleranceImpl{" +
-                "faultItemTable=" + faultItemTable +
-                ", whichItemWorst=" + whichItemWorst +
-                '}';
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
index cdfd5d1..70758dc 100644
--- a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
+++ b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
@@ -25,8 +25,8 @@ public class MQFaultStrategy {
 
     private boolean sendLatencyFaultEnable = false;
 
-    private long[] latencyMax =             {50L,   100L,   550L,       1000L,  2000L,      3000L,      15000L};
-    private long[] notAvailableDuration =   {0L,    0L,     30000L,     60000L, 120000L,    180000L,    600000L};
+    private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
+    private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
 
     public long[] getNotAvailableDuration() {
         return notAvailableDuration;
@@ -97,7 +97,8 @@ public class MQFaultStrategy {
 
     private long computeNotAvailableDuration(final long currentLatency) {
         for (int i = latencyMax.length - 1; i >= 0; i--) {
-            if (currentLatency >= latencyMax[i]) return this.notAvailableDuration[i];
+            if (currentLatency >= latencyMax[i])
+                return this.notAvailableDuration[i];
         }
 
         return 0;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/log/ClientLogger.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/log/ClientLogger.java b/client/src/main/java/org/apache/rocketmq/client/log/ClientLogger.java
index 3055119..7a05e76 100644
--- a/client/src/main/java/org/apache/rocketmq/client/log/ClientLogger.java
+++ b/client/src/main/java/org/apache/rocketmq/client/log/ClientLogger.java
@@ -16,38 +16,35 @@
  */
 package org.apache.rocketmq.client.log;
 
+import java.lang.reflect.Method;
+import java.net.URL;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.slf4j.ILoggerFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.lang.reflect.Method;
-import java.net.URL;
-
-
 public class ClientLogger {
-    private static Logger log;
     public static final String CLIENT_LOG_ROOT = "rocketmq.client.logRoot";
     public static final String CLIENT_LOG_MAXINDEX = "rocketmq.client.logFileMaxIndex";
     public static final String CLIENT_LOG_LEVEL = "rocketmq.client.logLevel";
+    private static Logger log;
 
     static {
         log = createLogger(LoggerName.CLIENT_LOGGER_NAME);
     }
 
-
     private static Logger createLogger(final String loggerName) {
         String logConfigFilePath =
-                System.getProperty("rocketmq.client.log.configFile",
-                        System.getenv("ROCKETMQ_CLIENT_LOG_CONFIGFILE"));
+            System.getProperty("rocketmq.client.log.configFile",
+                System.getenv("ROCKETMQ_CLIENT_LOG_CONFIGFILE"));
         Boolean isloadconfig =
-                Boolean.parseBoolean(System.getProperty("rocketmq.client.log.loadconfig", "true"));
+            Boolean.parseBoolean(System.getProperty("rocketmq.client.log.loadconfig", "true"));
 
         final String log4JResourceFile =
-                System.getProperty("rocketmq.client.log4j.resource.fileName", "log4j_rocketmq_client.xml");
+            System.getProperty("rocketmq.client.log4j.resource.fileName", "log4j_rocketmq_client.xml");
 
         final String logbackResourceFile =
-                System.getProperty("rocketmq.client.logback.resource.fileName", "logback_rocketmq_client.xml");
+            System.getProperty("rocketmq.client.logback.resource.fileName", "logback_rocketmq_client.xml");
 
         String clientLogRoot = System.getProperty(CLIENT_LOG_ROOT, "${user.home}/logs/rocketmqlogs");
         System.setProperty("client.logRoot", clientLogRoot);
@@ -85,11 +82,11 @@ public class ClientLogger {
                     if (null == logConfigFilePath) {
                         URL url = ClientLogger.class.getClassLoader().getResource(logbackResourceFile);
                         Method doConfigure =
-                                joranConfiguratoroObj.getClass().getMethod("doConfigure", URL.class);
+                            joranConfiguratoroObj.getClass().getMethod("doConfigure", URL.class);
                         doConfigure.invoke(joranConfiguratoroObj, url);
                     } else {
                         Method doConfigure =
-                                joranConfiguratoroObj.getClass().getMethod("doConfigure", String.class);
+                            joranConfiguratoroObj.getClass().getMethod("doConfigure", String.class);
                         doConfigure.invoke(joranConfiguratoroObj, logConfigFilePath);
                     }
 
@@ -101,12 +98,10 @@ public class ClientLogger {
         return LoggerFactory.getLogger(LoggerName.CLIENT_LOGGER_NAME);
     }
 
-
     public static Logger getLog() {
         return log;
     }
 
-
     public static void setLog(Logger log) {
         ClientLogger.log = log;
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index 340b1ff..736aa15 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -6,28 +6,30 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.client.producer;
 
+import java.util.List;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.QueryResult;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageId;
+import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.exception.RemotingException;
-import org.apache.rocketmq.common.message.*;
-
-import java.util.List;
-
 
 public class DefaultMQProducer extends ClientConfig implements MQProducer {
     protected final transient DefaultMQProducerImpl defaultMQProducerImpl;
@@ -44,27 +46,24 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
 
     private boolean retryAnotherBrokerWhenNotStoreOK = false;
     private int maxMessageSize = 1024 * 1024 * 4; // 4M
+
     public DefaultMQProducer() {
         this(MixAll.DEFAULT_PRODUCER_GROUP, null);
     }
 
-
     public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
         this.producerGroup = producerGroup;
         defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
     }
 
-
     public DefaultMQProducer(final String producerGroup) {
         this(producerGroup, null);
     }
 
-
     public DefaultMQProducer(RPCHook rpcHook) {
         this(MixAll.DEFAULT_PRODUCER_GROUP, rpcHook);
     }
 
-
     @Override
     public void start() throws MQClientException {
         this.defaultMQProducerImpl.start();
@@ -75,169 +74,143 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
         this.defaultMQProducerImpl.shutdown();
     }
 
-
     @Override
     public List<MessageQueue> fetchPublishMessageQueues(String topic) throws MQClientException {
         return this.defaultMQProducerImpl.fetchPublishMessageQueues(topic);
     }
 
-
     @Override
     public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
         return this.defaultMQProducerImpl.send(msg);
     }
 
-
     @Override
     public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
         return this.defaultMQProducerImpl.send(msg, timeout);
     }
 
-
     @Override
     public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
         this.defaultMQProducerImpl.send(msg, sendCallback);
     }
 
-
     @Override
     public void send(Message msg, SendCallback sendCallback, long timeout)
-            throws MQClientException, RemotingException, InterruptedException {
+        throws MQClientException, RemotingException, InterruptedException {
         this.defaultMQProducerImpl.send(msg, sendCallback, timeout);
     }
 
-
     @Override
     public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException {
         this.defaultMQProducerImpl.sendOneway(msg);
     }
 
-
     @Override
     public SendResult send(Message msg, MessageQueue mq)
-            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
         return this.defaultMQProducerImpl.send(msg, mq);
     }
 
-
     @Override
     public SendResult send(Message msg, MessageQueue mq, long timeout)
-            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
         return this.defaultMQProducerImpl.send(msg, mq, timeout);
     }
 
-
     @Override
     public void send(Message msg, MessageQueue mq, SendCallback sendCallback)
-            throws MQClientException, RemotingException, InterruptedException {
+        throws MQClientException, RemotingException, InterruptedException {
         this.defaultMQProducerImpl.send(msg, mq, sendCallback);
     }
 
-
     @Override
     public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout)
-            throws MQClientException, RemotingException, InterruptedException {
+        throws MQClientException, RemotingException, InterruptedException {
         this.defaultMQProducerImpl.send(msg, mq, sendCallback, timeout);
     }
 
-
     @Override
     public void sendOneway(Message msg, MessageQueue mq) throws MQClientException, RemotingException, InterruptedException {
         this.defaultMQProducerImpl.sendOneway(msg, mq);
     }
 
-
     @Override
     public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
-            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
         return this.defaultMQProducerImpl.send(msg, selector, arg);
     }
 
-
     @Override
     public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)
-            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
         return this.defaultMQProducerImpl.send(msg, selector, arg, timeout);
     }
 
-
     @Override
     public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback)
-            throws MQClientException, RemotingException, InterruptedException {
+        throws MQClientException, RemotingException, InterruptedException {
         this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback);
     }
 
-
     @Override
     public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout)
-            throws MQClientException, RemotingException, InterruptedException {
+        throws MQClientException, RemotingException, InterruptedException {
         this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback, timeout);
     }
 
-
     @Override
     public void sendOneway(Message msg, MessageQueueSelector selector, Object arg)
-            throws MQClientException, RemotingException, InterruptedException {
+        throws MQClientException, RemotingException, InterruptedException {
         this.defaultMQProducerImpl.sendOneway(msg, selector, arg);
     }
 
-
     @Override
     public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter tranExecuter, final Object arg)
-            throws MQClientException {
+        throws MQClientException {
         throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class");
     }
 
-
     @Override
     public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
         createTopic(key, newTopic, queueNum, 0);
     }
 
-
     @Override
     public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
         this.defaultMQProducerImpl.createTopic(key, newTopic, queueNum, topicSysFlag);
     }
 
-
     @Override
     public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
         return this.defaultMQProducerImpl.searchOffset(mq, timestamp);
     }
 
-
     @Override
     public long maxOffset(MessageQueue mq) throws MQClientException {
         return this.defaultMQProducerImpl.maxOffset(mq);
     }
 
-
     @Override
     public long minOffset(MessageQueue mq) throws MQClientException {
         return this.defaultMQProducerImpl.minOffset(mq);
     }
 
-
     @Override
     public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
         return this.defaultMQProducerImpl.earliestMsgStoreTime(mq);
     }
 
-
     @Override
     public MessageExt viewMessage(String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         return this.defaultMQProducerImpl.viewMessage(offsetMsgId);
     }
 
-
     @Override
     public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
-            throws MQClientException, InterruptedException {
+        throws MQClientException, InterruptedException {
         return this.defaultMQProducerImpl.queryMessage(topic, key, maxNum, begin, end);
     }
 
-
     @Override
     public MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         try {
@@ -252,97 +225,78 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
         return producerGroup;
     }
 
-
     public void setProducerGroup(String producerGroup) {
         this.producerGroup = producerGroup;
     }
 
-
     public String getCreateTopicKey() {
         return createTopicKey;
     }
 
-
     public void setCreateTopicKey(String createTopicKey) {
         this.createTopicKey = createTopicKey;
     }
 
-
     public int getSendMsgTimeout() {
         return sendMsgTimeout;
     }
 
-
     public void setSendMsgTimeout(int sendMsgTimeout) {
         this.sendMsgTimeout = sendMsgTimeout;
     }
 
-
     public int getCompressMsgBodyOverHowmuch() {
         return compressMsgBodyOverHowmuch;
     }
 
-
     public void setCompressMsgBodyOverHowmuch(int compressMsgBodyOverHowmuch) {
         this.compressMsgBodyOverHowmuch = compressMsgBodyOverHowmuch;
     }
 
-
     public DefaultMQProducerImpl getDefaultMQProducerImpl() {
         return defaultMQProducerImpl;
     }
 
-
     public boolean isRetryAnotherBrokerWhenNotStoreOK() {
         return retryAnotherBrokerWhenNotStoreOK;
     }
 
-
     public void setRetryAnotherBrokerWhenNotStoreOK(boolean retryAnotherBrokerWhenNotStoreOK) {
         this.retryAnotherBrokerWhenNotStoreOK = retryAnotherBrokerWhenNotStoreOK;
     }
 
-
     public int getMaxMessageSize() {
         return maxMessageSize;
     }
 
-
     public void setMaxMessageSize(int maxMessageSize) {
         this.maxMessageSize = maxMessageSize;
     }
 
-
     public int getDefaultTopicQueueNums() {
         return defaultTopicQueueNums;
     }
 
-
     public void setDefaultTopicQueueNums(int defaultTopicQueueNums) {
         this.defaultTopicQueueNums = defaultTopicQueueNums;
     }
 
-
     public int getRetryTimesWhenSendFailed() {
         return retryTimesWhenSendFailed;
     }
 
-
     public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) {
         this.retryTimesWhenSendFailed = retryTimesWhenSendFailed;
     }
 
-
     public boolean isSendMessageWithVIPChannel() {
         return isVipChannelEnabled();
     }
 
-
     public void setSendMessageWithVIPChannel(final boolean sendMessageWithVIPChannel) {
         this.setVipChannelEnabled(sendMessageWithVIPChannel);
     }
 
-
     public long[] getNotAvailableDuration() {
         return this.defaultMQProducerImpl.getNotAvailableDuration();
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java b/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java
index a7246e0..1083f9b 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java
@@ -6,19 +6,18 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.client.producer;
 
 import org.apache.rocketmq.common.message.Message;
 
-
 public interface LocalTransactionExecuter {
     public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg);
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionState.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionState.java b/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionState.java
index b907f81..209619a 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionState.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionState.java
@@ -6,13 +6,13 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.client.producer;
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
index 492604e..b53652a 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
@@ -6,16 +6,17 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.client.producer;
 
+import java.util.List;
 import org.apache.rocketmq.client.MQAdmin;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
@@ -23,81 +24,61 @@ import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 
-import java.util.List;
-
-
 public interface MQProducer extends MQAdmin {
     void start() throws MQClientException;
 
     void shutdown();
 
-
     List<MessageQueue> fetchPublishMessageQueues(final String topic) throws MQClientException;
 
-
     SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException,
-            InterruptedException;
-
+        InterruptedException;
 
     SendResult send(final Message msg, final long timeout) throws MQClientException,
-            RemotingException, MQBrokerException, InterruptedException;
-
+        RemotingException, MQBrokerException, InterruptedException;
 
     void send(final Message msg, final SendCallback sendCallback) throws MQClientException,
-            RemotingException, InterruptedException;
-
+        RemotingException, InterruptedException;
 
     void send(final Message msg, final SendCallback sendCallback, final long timeout)
-            throws MQClientException, RemotingException, InterruptedException;
-
+        throws MQClientException, RemotingException, InterruptedException;
 
     void sendOneway(final Message msg) throws MQClientException, RemotingException,
-            InterruptedException;
-
+        InterruptedException;
 
     SendResult send(final Message msg, final MessageQueue mq) throws MQClientException,
-            RemotingException, MQBrokerException, InterruptedException;
-
+        RemotingException, MQBrokerException, InterruptedException;
 
     SendResult send(final Message msg, final MessageQueue mq, final long timeout)
-            throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
-
+        throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
 
     void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback)
-            throws MQClientException, RemotingException, InterruptedException;
-
+        throws MQClientException, RemotingException, InterruptedException;
 
     void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, long timeout)
-            throws MQClientException, RemotingException, InterruptedException;
-
+        throws MQClientException, RemotingException, InterruptedException;
 
     void sendOneway(final Message msg, final MessageQueue mq) throws MQClientException,
-            RemotingException, InterruptedException;
-
+        RemotingException, InterruptedException;
 
     SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg)
-            throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
-
+        throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
 
     SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg,
-                    final long timeout) throws MQClientException, RemotingException, MQBrokerException,
-            InterruptedException;
-
+        final long timeout) throws MQClientException, RemotingException, MQBrokerException,
+        InterruptedException;
 
     void send(final Message msg, final MessageQueueSelector selector, final Object arg,
-              final SendCallback sendCallback) throws MQClientException, RemotingException,
-            InterruptedException;
-
+        final SendCallback sendCallback) throws MQClientException, RemotingException,
+        InterruptedException;
 
     void send(final Message msg, final MessageQueueSelector selector, final Object arg,
-              final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException,
-            InterruptedException;
-
+        final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException,
+        InterruptedException;
 
     void sendOneway(final Message msg, final MessageQueueSelector selector, final Object arg)
-            throws MQClientException, RemotingException, InterruptedException;
-
+        throws MQClientException, RemotingException, InterruptedException;
 
     TransactionSendResult sendMessageInTransaction(final Message msg,
-                                                   final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException;
+        final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/producer/MessageQueueSelector.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MessageQueueSelector.java b/client/src/main/java/org/apache/rocketmq/client/producer/MessageQueueSelector.java
index 47956bb..761f45e 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/MessageQueueSelector.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/MessageQueueSelector.java
@@ -6,22 +6,20 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.client.producer;
 
+import java.util.List;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageQueue;
 
-import java.util.List;
-
-
 public interface MessageQueueSelector {
     MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/producer/SendCallback.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/SendCallback.java b/client/src/main/java/org/apache/rocketmq/client/producer/SendCallback.java
index f599d83..178e79a 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/SendCallback.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/SendCallback.java
@@ -6,19 +6,18 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.client.producer;
 
 public interface SendCallback {
     public void onSuccess(final SendResult sendResult);
 
-
     public void onException(final Throwable e);
 }