You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/19 09:40:44 UTC

[27/43] incubator-rocketmq git commit: Finish code dump. Reviewed by: @yukon @vongosling @stevenschew @vintagewang @lollipop @zander

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
new file mode 100644
index 0000000..b82cde9
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -0,0 +1,1080 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.client.impl.producer;
+
+import com.alibaba.rocketmq.client.QueryResult;
+import com.alibaba.rocketmq.client.Validators;
+import com.alibaba.rocketmq.client.common.ClientErrorCode;
+import com.alibaba.rocketmq.client.exception.MQBrokerException;
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.client.hook.CheckForbiddenContext;
+import com.alibaba.rocketmq.client.hook.CheckForbiddenHook;
+import com.alibaba.rocketmq.client.hook.SendMessageContext;
+import com.alibaba.rocketmq.client.hook.SendMessageHook;
+import com.alibaba.rocketmq.client.impl.CommunicationMode;
+import com.alibaba.rocketmq.client.impl.MQClientManager;
+import com.alibaba.rocketmq.client.impl.factory.MQClientInstance;
+import com.alibaba.rocketmq.client.latency.MQFaultStrategy;
+import com.alibaba.rocketmq.client.log.ClientLogger;
+import com.alibaba.rocketmq.client.producer.*;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.ServiceState;
+import com.alibaba.rocketmq.common.UtilAll;
+import com.alibaba.rocketmq.common.help.FAQUrl;
+import com.alibaba.rocketmq.common.message.*;
+import com.alibaba.rocketmq.common.protocol.ResponseCode;
+import com.alibaba.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
+import com.alibaba.rocketmq.common.protocol.header.EndTransactionRequestHeader;
+import com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeader;
+import com.alibaba.rocketmq.common.sysflag.MessageSysFlag;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.remoting.common.RemotingHelper;
+import com.alibaba.rocketmq.remoting.exception.RemotingConnectException;
+import com.alibaba.rocketmq.remoting.exception.RemotingException;
+import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.*;
+import java.util.concurrent.*;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class DefaultMQProducerImpl implements MQProducerInner {
+    private final Logger log = ClientLogger.getLog();
+    private final Random random = new Random();
+    private final DefaultMQProducer defaultMQProducer;
+    private final ConcurrentHashMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =
+            new ConcurrentHashMap<String, TopicPublishInfo>();
+    private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
+    private final RPCHook rpcHook;
+    protected BlockingQueue<Runnable> checkRequestQueue;
+    protected ExecutorService checkExecutor;
+    private ServiceState serviceState = ServiceState.CREATE_JUST;
+    private MQClientInstance mQClientFactory;
+    private ArrayList<CheckForbiddenHook> checkForbiddenHookList = new ArrayList<CheckForbiddenHook>();
+    private int zipCompressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5"));
+
+    private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();
+
+
+    public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer) {
+        this(defaultMQProducer, null);
+    }
+
+
+    public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
+        this.defaultMQProducer = defaultMQProducer;
+        this.rpcHook = rpcHook;
+    }
+
+    public void registerCheckForbiddenHook(CheckForbiddenHook checkForbiddenHook) {
+        this.checkForbiddenHookList.add(checkForbiddenHook);
+        log.info("register a new checkForbiddenHook. hookName={}, allHookSize={}", checkForbiddenHook.hookName(),
+                checkForbiddenHookList.size());
+    }
+
+    public void initTransactionEnv() {
+        TransactionMQProducer producer = (TransactionMQProducer) this.defaultMQProducer;
+        this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(producer.getCheckRequestHoldMax());
+        this.checkExecutor = new ThreadPoolExecutor(//
+                producer.getCheckThreadPoolMinSize(), //
+                producer.getCheckThreadPoolMaxSize(), //
+                1000 * 60, //
+                TimeUnit.MILLISECONDS, //
+                this.checkRequestQueue);
+    }
+
+    public void destroyTransactionEnv() {
+        this.checkExecutor.shutdown();
+        this.checkRequestQueue.clear();
+    }
+
+    public void registerSendMessageHook(final SendMessageHook hook) {
+        this.sendMessageHookList.add(hook);
+        log.info("register sendMessage Hook, {}", hook.hookName());
+    }
+
+    public void start() throws MQClientException {
+        this.start(true);
+    }
+
+    public void start(final boolean startFactory) throws MQClientException {
+        switch (this.serviceState) {
+            case CREATE_JUST:
+                this.serviceState = ServiceState.START_FAILED;
+
+                this.checkConfig();
+
+                if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
+                    this.defaultMQProducer.changeInstanceNameToPID();
+                }
+
+                this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
+
+                boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
+                if (!registerOK) {
+                    this.serviceState = ServiceState.CREATE_JUST;
+                    throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+                            + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
+                            null);
+                }
+
+                this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
+
+                if (startFactory) {
+                    mQClientFactory.start();
+                }
+
+                log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
+                        this.defaultMQProducer.isSendMessageWithVIPChannel());
+                this.serviceState = ServiceState.RUNNING;
+                break;
+            case RUNNING:
+            case START_FAILED:
+            case SHUTDOWN_ALREADY:
+                throw new MQClientException("The producer service state not OK, maybe started once, "//
+                        + this.serviceState//
+                        + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
+                        null);
+            default:
+                break;
+        }
+
+        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
+    }
+
+    private void checkConfig() throws MQClientException {
+        Validators.checkGroup(this.defaultMQProducer.getProducerGroup());
+
+        if (null == this.defaultMQProducer.getProducerGroup()) {
+            throw new MQClientException("producerGroup is null", null);
+        }
+
+        if (this.defaultMQProducer.getProducerGroup().equals(MixAll.DEFAULT_PRODUCER_GROUP)) {
+            throw new MQClientException("producerGroup can not equal " + MixAll.DEFAULT_PRODUCER_GROUP + ", please specify another one.",
+                    null);
+        }
+    }
+
+    public void shutdown() {
+        this.shutdown(true);
+    }
+
+    public void shutdown(final boolean shutdownFactory) {
+        switch (this.serviceState) {
+            case CREATE_JUST:
+                break;
+            case RUNNING:
+                this.mQClientFactory.unregisterProducer(this.defaultMQProducer.getProducerGroup());
+                if (shutdownFactory) {
+                    this.mQClientFactory.shutdown();
+                }
+
+                log.info("the producer [{}] shutdown OK", this.defaultMQProducer.getProducerGroup());
+                this.serviceState = ServiceState.SHUTDOWN_ALREADY;
+                break;
+            case SHUTDOWN_ALREADY:
+                break;
+            default:
+                break;
+        }
+    }
+
+    @Override
+    public Set<String> getPublishTopicList() {
+        Set<String> topicList = new HashSet<String>();
+        for (String key : this.topicPublishInfoTable.keySet()) {
+            topicList.add(key);
+        }
+
+        return topicList;
+    }
+
+    @Override
+    public boolean isPublishTopicNeedUpdate(String topic) {
+        TopicPublishInfo prev = this.topicPublishInfoTable.get(topic);
+
+        return null == prev || !prev.ok();
+    }
+
+    @Override
+    public TransactionCheckListener checkListener() {
+        if (this.defaultMQProducer instanceof TransactionMQProducer) {
+            TransactionMQProducer producer = (TransactionMQProducer) defaultMQProducer;
+            return producer.getTransactionCheckListener();
+        }
+
+        return null;
+    }
+
+    @Override
+    public void checkTransactionState(final String addr, final MessageExt msg, final CheckTransactionStateRequestHeader header) {
+        Runnable request = new Runnable() {
+            private final String brokerAddr = addr;
+            private final MessageExt message = msg;
+            private final CheckTransactionStateRequestHeader checkRequestHeader = header;
+            private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup();
+
+
+            @Override
+            public void run() {
+                TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
+                if (transactionCheckListener != null) {
+                    LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
+                    Throwable exception = null;
+                    try {
+                        localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
+                    } catch (Throwable e) {
+                        log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);
+                        exception = e;
+                    }
+
+                    this.processTransactionState(//
+                            localTransactionState, //
+                            group, //
+                            exception);
+                } else {
+                    log.warn("checkTransactionState, pick transactionCheckListener by group[{}] failed", group);
+                }
+            }
+
+
+            private void processTransactionState(//
+                                                 final LocalTransactionState localTransactionState, //
+                                                 final String producerGroup, //
+                                                 final Throwable exception) {
+                final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();
+                thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());
+                thisHeader.setProducerGroup(producerGroup);
+                thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());
+                thisHeader.setFromTransactionCheck(true);
+
+                String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
+                if (uniqueKey == null) {
+                    uniqueKey = message.getMsgId();
+                }
+                thisHeader.setMsgId(uniqueKey);
+                thisHeader.setTransactionId(checkRequestHeader.getTransactionId());
+                switch (localTransactionState) {
+                    case COMMIT_MESSAGE:
+                        thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
+                        break;
+                    case ROLLBACK_MESSAGE:
+                        thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
+                        log.warn("when broker check, client rollback this transaction, {}", thisHeader);
+                        break;
+                    case UNKNOW:
+                        thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
+                        log.warn("when broker check, client does not know this transaction state, {}", thisHeader);
+                        break;
+                    default:
+                        break;
+                }
+
+                String remark = null;
+                if (exception != null) {
+                    remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception);
+                }
+
+                try {
+                    DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,
+                            3000);
+                } catch (Exception e) {
+                    log.error("endTransactionOneway exception", e);
+                }
+            }
+        };
+
+        this.checkExecutor.submit(request);
+    }
+
+    @Override
+    public void updateTopicPublishInfo(final String topic, final TopicPublishInfo info) {
+        if (info != null && topic != null) {
+            TopicPublishInfo prev = this.topicPublishInfoTable.put(topic, info);
+            if (prev != null) {
+                log.info("updateTopicPublishInfo prev is not null, " + prev.toString());
+            }
+        }
+    }
+
+    @Override
+    public boolean isUnitMode() {
+        return this.defaultMQProducer.isUnitMode();
+    }
+
+    public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
+        createTopic(key, newTopic, queueNum, 0);
+    }
+
+    public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
+        this.makeSureStateOK();
+        Validators.checkTopic(newTopic);
+
+        this.mQClientFactory.getMQAdminImpl().createTopic(key, newTopic, queueNum, topicSysFlag);
+    }
+
+    private void makeSureStateOK() throws MQClientException {
+        if (this.serviceState != ServiceState.RUNNING) {
+            throw new MQClientException("The producer service state not OK, "//
+                    + this.serviceState//
+                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
+                    null);
+        }
+    }
+
+    public List<MessageQueue> fetchPublishMessageQueues(String topic) throws MQClientException {
+        this.makeSureStateOK();
+        return this.mQClientFactory.getMQAdminImpl().fetchPublishMessageQueues(topic);
+    }
+
+    public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
+        this.makeSureStateOK();
+        return this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
+    }
+
+    public long maxOffset(MessageQueue mq) throws MQClientException {
+        this.makeSureStateOK();
+        return this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
+    }
+
+    public long minOffset(MessageQueue mq) throws MQClientException {
+        this.makeSureStateOK();
+        return this.mQClientFactory.getMQAdminImpl().minOffset(mq);
+    }
+
+    public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
+        this.makeSureStateOK();
+        return this.mQClientFactory.getMQAdminImpl().earliestMsgStoreTime(mq);
+    }
+
+    public MessageExt viewMessage(String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        this.makeSureStateOK();
+
+        return this.mQClientFactory.getMQAdminImpl().viewMessage(msgId);
+    }
+
+    public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
+            throws MQClientException, InterruptedException {
+        this.makeSureStateOK();
+        return this.mQClientFactory.getMQAdminImpl().queryMessage(topic, key, maxNum, begin, end);
+    }
+
+    public MessageExt queryMessageByUniqKey(String topic, String uniqKey)
+            throws MQClientException, InterruptedException {
+        this.makeSureStateOK();
+        return this.mQClientFactory.getMQAdminImpl().queryMessageByUniqKey(topic, uniqKey);
+    }
+
+    /**
+     * DEFAULT ASYNC -------------------------------------------------------
+     */
+    public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
+        send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
+    }
+
+    public void send(Message msg, SendCallback sendCallback, long timeout)
+            throws MQClientException, RemotingException, InterruptedException {
+        try {
+            this.sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout);
+        } catch (MQBrokerException e) {
+            throw new MQClientException("unknownn exception", e);
+        }
+    }
+
+    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
+        return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
+    }
+
+    public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
+        this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation);
+    }
+
+    private SendResult sendDefaultImpl(//
+                                       Message msg, //
+                                       final CommunicationMode communicationMode, //
+                                       final SendCallback sendCallback, //
+                                       final long timeout//
+    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        this.makeSureStateOK();
+        Validators.checkMessage(msg, this.defaultMQProducer);
+
+        final long invokeID = random.nextLong();
+        long beginTimestampFirst = System.currentTimeMillis();
+        long beginTimestampPrev = beginTimestampFirst;
+        long endTimestamp = beginTimestampFirst;
+        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
+        if (topicPublishInfo != null && topicPublishInfo.ok()) {
+            MessageQueue mq = null;
+            Exception exception = null;
+            SendResult sendResult = null;
+            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
+            int times = 0;
+            String[] brokersSent = new String[timesTotal];
+            for (; times < timesTotal; times++) {
+                String lastBrokerName = null == mq ? null : mq.getBrokerName();
+                MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
+                if (tmpmq != null) {
+                    mq = tmpmq;
+                    brokersSent[times] = mq.getBrokerName();
+                    try {
+                        beginTimestampPrev = System.currentTimeMillis();
+                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
+                        endTimestamp = System.currentTimeMillis();
+                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
+                        switch (communicationMode) {
+                            case ASYNC:
+                                return null;
+                            case ONEWAY:
+                                return null;
+                            case SYNC:
+                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
+                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
+                                        continue;
+                                    }
+                                }
+
+                                return sendResult;
+                            default:
+                                break;
+                        }
+                    } catch (RemotingException e) {
+                        endTimestamp = System.currentTimeMillis();
+                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
+                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
+                        log.warn(msg.toString());
+                        exception = e;
+                        continue;
+                    } catch (MQClientException e) {
+                        endTimestamp = System.currentTimeMillis();
+                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
+                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
+                        log.warn(msg.toString());
+                        exception = e;
+                        continue;
+                    } catch (MQBrokerException e) {
+                        endTimestamp = System.currentTimeMillis();
+                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
+                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
+                        log.warn(msg.toString());
+                        exception = e;
+                        switch (e.getResponseCode()) {
+                            case ResponseCode.TOPIC_NOT_EXIST:
+                            case ResponseCode.SERVICE_NOT_AVAILABLE:
+                            case ResponseCode.SYSTEM_ERROR:
+                            case ResponseCode.NO_PERMISSION:
+                            case ResponseCode.NO_BUYER_ID:
+                            case ResponseCode.NOT_IN_CURRENT_UNIT:
+                                continue;
+                            default:
+                                if (sendResult != null) {
+                                    return sendResult;
+                                }
+
+                                throw e;
+                        }
+                    } catch (InterruptedException e) {
+                        endTimestamp = System.currentTimeMillis();
+                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
+                        log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
+                        log.warn(msg.toString());
+
+                        log.warn("sendKernelImpl exception", e);
+                        log.warn(msg.toString());
+                        throw e;
+                    }
+                } else {
+                    break;
+                }
+            }
+
+            if (sendResult != null) {
+                return sendResult;
+            }
+
+            String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
+                    times,
+                    System.currentTimeMillis() - beginTimestampFirst,
+                    msg.getTopic(),
+                    Arrays.toString(brokersSent));
+
+            info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
+
+            MQClientException mqClientException = new MQClientException(info, exception);
+            if (exception instanceof MQBrokerException) {
+                mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
+            } else if (exception instanceof RemotingConnectException) {
+                mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
+            } else if (exception instanceof RemotingTimeoutException) {
+                mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
+            } else if (exception instanceof MQClientException) {
+                mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
+            }
+
+            throw mqClientException;
+        }
+
+        List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
+        if (null == nsList || nsList.isEmpty()) {
+            throw new MQClientException(
+                    "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
+        }
+
+        throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
+                null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
+    }
+
+    private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
+        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
+        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
+            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
+            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
+            topicPublishInfo = this.topicPublishInfoTable.get(topic);
+        }
+
+        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
+            return topicPublishInfo;
+        } else {
+            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
+            topicPublishInfo = this.topicPublishInfoTable.get(topic);
+            return topicPublishInfo;
+        }
+    }
+
+    private SendResult sendKernelImpl(final Message msg, //
+                                      final MessageQueue mq, //
+                                      final CommunicationMode communicationMode, //
+                                      final SendCallback sendCallback, //
+                                      final TopicPublishInfo topicPublishInfo, //
+                                      final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
+        if (null == brokerAddr) {
+            tryToFindTopicPublishInfo(mq.getTopic());
+            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
+        }
+
+        SendMessageContext context = null;
+        if (brokerAddr != null) {
+            brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
+
+            byte[] prevBody = msg.getBody();
+            try {
+
+                MessageClientIDSetter.setUniqID(msg);
+
+                int sysFlag = 0;
+                if (this.tryToCompressMessage(msg)) {
+                    sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
+                }
+
+                final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
+                if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
+                    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
+                }
+
+                if (hasCheckForbiddenHook()) {
+                    CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
+                    checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
+                    checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
+                    checkForbiddenContext.setCommunicationMode(communicationMode);
+                    checkForbiddenContext.setBrokerAddr(brokerAddr);
+                    checkForbiddenContext.setMessage(msg);
+                    checkForbiddenContext.setMq(mq);
+                    checkForbiddenContext.setUnitMode(this.isUnitMode());
+                    this.executeCheckForbiddenHook(checkForbiddenContext);
+                }
+
+                if (this.hasSendMessageHook()) {
+                    context = new SendMessageContext();
+                    context.setProducer(this);
+                    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
+                    context.setCommunicationMode(communicationMode);
+                    context.setBornHost(this.defaultMQProducer.getClientIP());
+                    context.setBrokerAddr(brokerAddr);
+                    context.setMessage(msg);
+                    context.setMq(mq);
+                    String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
+                    if (isTrans != null && isTrans.equals("true")) {
+                        context.setMsgType(MessageType.Trans_Msg_Half);
+                    }
+
+                    if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
+                        context.setMsgType(MessageType.Delay_Msg);
+                    }
+                    this.executeSendMessageHookBefore(context);
+                }
+
+                SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
+                requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
+                requestHeader.setTopic(msg.getTopic());
+                requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
+                requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
+                requestHeader.setQueueId(mq.getQueueId());
+                requestHeader.setSysFlag(sysFlag);
+                requestHeader.setBornTimestamp(System.currentTimeMillis());
+                requestHeader.setFlag(msg.getFlag());
+                requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
+                requestHeader.setReconsumeTimes(0);
+                requestHeader.setUnitMode(this.isUnitMode());
+                if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+                    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
+                    if (reconsumeTimes != null) {
+                        requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
+                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
+                    }
+
+                    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
+                    if (maxReconsumeTimes != null) {
+                        requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
+                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
+                    }
+                }
+
+                SendResult sendResult = null;
+                switch (communicationMode) {
+                    case ASYNC:
+                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(//
+                                brokerAddr, // 1
+                                mq.getBrokerName(), // 2
+                                msg, // 3
+                                requestHeader, // 4
+                                timeout, // 5
+                                communicationMode, // 6
+                                sendCallback, // 7
+                                topicPublishInfo, // 8
+                                this.mQClientFactory, // 9
+                                this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), // 10
+                                context, //
+                                this);
+                        break;
+                    case ONEWAY:
+                    case SYNC:
+                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
+                                brokerAddr,
+                                mq.getBrokerName(),
+                                msg,
+                                requestHeader,
+                                timeout,
+                                communicationMode,
+                                context,
+                                this);
+                        break;
+                    default:
+                        assert false;
+                        break;
+                }
+
+                if (this.hasSendMessageHook()) {
+                    context.setSendResult(sendResult);
+                    this.executeSendMessageHookAfter(context);
+                }
+
+                return sendResult;
+            } catch (RemotingException e) {
+                if (this.hasSendMessageHook()) {
+                    context.setException(e);
+                    this.executeSendMessageHookAfter(context);
+                }
+                throw e;
+            } catch (MQBrokerException e) {
+                if (this.hasSendMessageHook()) {
+                    context.setException(e);
+                    this.executeSendMessageHookAfter(context);
+                }
+                throw e;
+            } catch (InterruptedException e) {
+                if (this.hasSendMessageHook()) {
+                    context.setException(e);
+                    this.executeSendMessageHookAfter(context);
+                }
+                throw e;
+            } finally {
+                msg.setBody(prevBody);
+            }
+        }
+
+        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
+    }
+
+    public MQClientInstance getmQClientFactory() {
+        return mQClientFactory;
+    }
+
+    private boolean tryToCompressMessage(final Message msg) {
+        byte[] body = msg.getBody();
+        if (body != null) {
+            if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {
+                try {
+                    byte[] data = UtilAll.compress(body, zipCompressLevel);
+                    if (data != null) {
+                        msg.setBody(data);
+                        return true;
+                    }
+                } catch (IOException e) {
+                    log.error("tryToCompressMessage exception", e);
+                    log.warn(msg.toString());
+                }
+            }
+        }
+
+        return false;
+    }
+
+    public boolean hasCheckForbiddenHook() {
+        return !checkForbiddenHookList.isEmpty();
+    }
+
+    public void executeCheckForbiddenHook(final CheckForbiddenContext context) throws MQClientException {
+        if (hasCheckForbiddenHook()) {
+            for (CheckForbiddenHook hook : checkForbiddenHookList) {
+                hook.checkForbidden(context);
+            }
+        }
+    }
+
+    public boolean hasSendMessageHook() {
+        return !this.sendMessageHookList.isEmpty();
+    }
+
+    public void executeSendMessageHookBefore(final SendMessageContext context) {
+        if (!this.sendMessageHookList.isEmpty()) {
+            for (SendMessageHook hook : this.sendMessageHookList) {
+                try {
+                    hook.sendMessageBefore(context);
+                } catch (Throwable e) {
+                    log.warn("failed to executeSendMessageHookBefore", e);
+                }
+            }
+        }
+    }
+
+    public void executeSendMessageHookAfter(final SendMessageContext context) {
+        if (!this.sendMessageHookList.isEmpty()) {
+            for (SendMessageHook hook : this.sendMessageHookList) {
+                try {
+                    hook.sendMessageAfter(context);
+                } catch (Throwable e) {
+                    log.warn("failed to executeSendMessageHookAfter", e);
+                }
+            }
+        }
+    }
+
+    /**
+     * DEFAULT ONEWAY -------------------------------------------------------
+     */
+    public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException {
+        try {
+            this.sendDefaultImpl(msg, CommunicationMode.ONEWAY, null, this.defaultMQProducer.getSendMsgTimeout());
+        } catch (MQBrokerException e) {
+            throw new MQClientException("unknown exception", e);
+        }
+    }
+
+    /**
+     * KERNEL SYNC -------------------------------------------------------
+     */
+    public SendResult send(Message msg, MessageQueue mq)
+            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        return send(msg, mq, this.defaultMQProducer.getSendMsgTimeout());
+    }
+
+    public SendResult send(Message msg, MessageQueue mq, long timeout)
+            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        this.makeSureStateOK();
+        Validators.checkMessage(msg, this.defaultMQProducer);
+
+        if (!msg.getTopic().equals(mq.getTopic())) {
+            throw new MQClientException("message's topic not equal mq's topic", null);
+        }
+
+        return this.sendKernelImpl(msg, mq, CommunicationMode.SYNC, null, null, timeout);
+    }
+
+    /**
+     * KERNEL ASYNC -------------------------------------------------------
+     */
+    public void send(Message msg, MessageQueue mq, SendCallback sendCallback)
+            throws MQClientException, RemotingException, InterruptedException {
+        send(msg, mq, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
+    }
+
+    public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout)
+            throws MQClientException, RemotingException, InterruptedException {
+        this.makeSureStateOK();
+        Validators.checkMessage(msg, this.defaultMQProducer);
+
+        if (!msg.getTopic().equals(mq.getTopic())) {
+            throw new MQClientException("message's topic not equal mq's topic", null);
+        }
+
+        try {
+            this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, sendCallback, null, timeout);
+        } catch (MQBrokerException e) {
+            throw new MQClientException("unknown exception", e);
+        }
+    }
+
+    /**
+     * KERNEL ONEWAY -------------------------------------------------------
+     */
+    public void sendOneway(Message msg, MessageQueue mq) throws MQClientException, RemotingException, InterruptedException {
+        this.makeSureStateOK();
+        Validators.checkMessage(msg, this.defaultMQProducer);
+
+        try {
+            this.sendKernelImpl(msg, mq, CommunicationMode.ONEWAY, null, null, this.defaultMQProducer.getSendMsgTimeout());
+        } catch (MQBrokerException e) {
+            throw new MQClientException("unknown exception", e);
+        }
+    }
+
+    /**
+     * SELECT SYNC -------------------------------------------------------
+     */
+    public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
+            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        return send(msg, selector, arg, this.defaultMQProducer.getSendMsgTimeout());
+    }
+
+    public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)
+            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        return this.sendSelectImpl(msg, selector, arg, CommunicationMode.SYNC, null, timeout);
+    }
+
+    private SendResult sendSelectImpl(//
+                                      Message msg, //
+                                      MessageQueueSelector selector, //
+                                      Object arg, //
+                                      final CommunicationMode communicationMode, //
+                                      final SendCallback sendCallback, final long timeout//
+    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        this.makeSureStateOK();
+        Validators.checkMessage(msg, this.defaultMQProducer);
+
+        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
+        if (topicPublishInfo != null && topicPublishInfo.ok()) {
+            MessageQueue mq = null;
+            try {
+                mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
+            } catch (Throwable e) {
+                throw new MQClientException("select message queue throwed exception.", e);
+            }
+
+            if (mq != null) {
+                return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout);
+            } else {
+                throw new MQClientException("select message queue return null.", null);
+            }
+        }
+
+        throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
+    }
+
+    /**
+     * SELECT ASYNC -------------------------------------------------------
+     */
+    public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback)
+            throws MQClientException, RemotingException, InterruptedException {
+        send(msg, selector, arg, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
+    }
+
+    public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout)
+            throws MQClientException, RemotingException, InterruptedException {
+        try {
+            this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, sendCallback, timeout);
+        } catch (MQBrokerException e) {
+            throw new MQClientException("unknownn exception", e);
+        }
+    }
+
+    /**
+     * SELECT ONEWAY -------------------------------------------------------
+     */
+    public void sendOneway(Message msg, MessageQueueSelector selector, Object arg)
+            throws MQClientException, RemotingException, InterruptedException {
+        try {
+            this.sendSelectImpl(msg, selector, arg, CommunicationMode.ONEWAY, null, this.defaultMQProducer.getSendMsgTimeout());
+        } catch (MQBrokerException e) {
+            throw new MQClientException("unknown exception", e);
+        }
+    }
+
+    public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter tranExecuter, final Object arg)
+            throws MQClientException {
+        if (null == tranExecuter) {
+            throw new MQClientException("tranExecutor is null", null);
+        }
+        Validators.checkMessage(msg, this.defaultMQProducer);
+
+        SendResult sendResult = null;
+        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
+        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
+        try {
+            sendResult = this.send(msg);
+        } catch (Exception e) {
+            throw new MQClientException("send message Exception", e);
+        }
+
+        LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
+        Throwable localException = null;
+        switch (sendResult.getSendStatus()) {
+            case SEND_OK: {
+                try {
+                    if (sendResult.getTransactionId() != null) {
+                        msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
+                    }
+                    localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
+                    if (null == localTransactionState) {
+                        localTransactionState = LocalTransactionState.UNKNOW;
+                    }
+
+                    if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
+                        log.info("executeLocalTransactionBranch return {}", localTransactionState);
+                        log.info(msg.toString());
+                    }
+                } catch (Throwable e) {
+                    log.info("executeLocalTransactionBranch exception", e);
+                    log.info(msg.toString());
+                    localException = e;
+                }
+            }
+            break;
+            case FLUSH_DISK_TIMEOUT:
+            case FLUSH_SLAVE_TIMEOUT:
+            case SLAVE_NOT_AVAILABLE:
+                localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
+                break;
+            default:
+                break;
+        }
+
+        try {
+            this.endTransaction(sendResult, localTransactionState, localException);
+        } catch (Exception e) {
+            log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
+        }
+
+        TransactionSendResult transactionSendResult = new TransactionSendResult();
+        transactionSendResult.setSendStatus(sendResult.getSendStatus());
+        transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
+        transactionSendResult.setMsgId(sendResult.getMsgId());
+        transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
+        transactionSendResult.setTransactionId(sendResult.getTransactionId());
+        transactionSendResult.setLocalTransactionState(localTransactionState);
+        return transactionSendResult;
+    }
+
+    /**
+     * DEFAULT SYNC -------------------------------------------------------
+     */
+    public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        return send(msg, this.defaultMQProducer.getSendMsgTimeout());
+    }
+
+    public void endTransaction(//
+                               final SendResult sendResult, //
+                               final LocalTransactionState localTransactionState, //
+                               final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
+        final MessageId id;
+        if (sendResult.getOffsetMsgId() != null) {
+            id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
+        } else {
+            id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
+        }
+        String transactionId = sendResult.getTransactionId();
+        final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
+        EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
+        requestHeader.setTransactionId(transactionId);
+        requestHeader.setCommitLogOffset(id.getOffset());
+        switch (localTransactionState) {
+            case COMMIT_MESSAGE:
+                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
+                break;
+            case ROLLBACK_MESSAGE:
+                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
+                break;
+            case UNKNOW:
+                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
+                break;
+            default:
+                break;
+        }
+
+        requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
+        requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
+        requestHeader.setMsgId(sendResult.getMsgId());
+        String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
+        this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
+                this.defaultMQProducer.getSendMsgTimeout());
+    }
+
+    public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
+    }
+
+    public ConcurrentHashMap<String, TopicPublishInfo> getTopicPublishInfoTable() {
+        return topicPublishInfoTable;
+    }
+
+    public int getZipCompressLevel() {
+        return zipCompressLevel;
+    }
+
+
+    public void setZipCompressLevel(int zipCompressLevel) {
+        this.zipCompressLevel = zipCompressLevel;
+    }
+
+
+    public ServiceState getServiceState() {
+        return serviceState;
+    }
+
+
+    public void setServiceState(ServiceState serviceState) {
+        this.serviceState = serviceState;
+    }
+
+    public long[] getNotAvailableDuration() {
+        return this.mqFaultStrategy.getNotAvailableDuration();
+    }
+
+    public void setNotAvailableDuration(final long[] notAvailableDuration) {
+        this.mqFaultStrategy.setNotAvailableDuration(notAvailableDuration);
+    }
+
+    public long[] getLatencyMax() {
+        return this.mqFaultStrategy.getLatencyMax();
+    }
+
+    public void setLatencyMax(final long[] latencyMax) {
+        this.mqFaultStrategy.setLatencyMax(latencyMax);
+    }
+
+    public boolean isSendLatencyFaultEnable() {
+        return this.mqFaultStrategy.isSendLatencyFaultEnable();
+    }
+
+    public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) {
+        this.mqFaultStrategy.setSendLatencyFaultEnable(sendLatencyFaultEnable);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/producer/MQProducerInner.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/producer/MQProducerInner.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/producer/MQProducerInner.java
new file mode 100644
index 0000000..e2837e2
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/producer/MQProducerInner.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.client.impl.producer;
+
+import com.alibaba.rocketmq.client.producer.TransactionCheckListener;
+import com.alibaba.rocketmq.common.message.MessageExt;
+import com.alibaba.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
+
+import java.util.Set;
+
+
+/**
+ * @author shijia.wxr
+ */
+public interface MQProducerInner {
+    Set<String> getPublishTopicList();
+
+
+    boolean isPublishTopicNeedUpdate(final String topic);
+
+
+    TransactionCheckListener checkListener();
+
+
+    void checkTransactionState(//
+                               final String addr, //
+                               final MessageExt msg, //
+                               final CheckTransactionStateRequestHeader checkRequestHeader);
+
+
+    void updateTopicPublishInfo(final String topic, final TopicPublishInfo info);
+
+
+    boolean isUnitMode();
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/producer/TopicPublishInfo.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/producer/TopicPublishInfo.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/producer/TopicPublishInfo.java
new file mode 100644
index 0000000..2f7de22
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/producer/TopicPublishInfo.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.client.impl.producer;
+
+import com.alibaba.rocketmq.client.common.ThreadLocalIndex;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.common.protocol.route.QueueData;
+import com.alibaba.rocketmq.common.protocol.route.TopicRouteData;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class TopicPublishInfo {
+    private boolean orderTopic = false;
+    private boolean haveTopicRouterInfo = false;
+    private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
+    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(0);
+    private TopicRouteData topicRouteData;
+
+
+    public boolean isOrderTopic() {
+        return orderTopic;
+    }
+
+    public void setOrderTopic(boolean orderTopic) {
+        this.orderTopic = orderTopic;
+    }
+
+    public boolean ok() {
+        return null != this.messageQueueList && !this.messageQueueList.isEmpty();
+    }
+
+    public List<MessageQueue> getMessageQueueList() {
+        return messageQueueList;
+    }
+
+
+    public void setMessageQueueList(List<MessageQueue> messageQueueList) {
+        this.messageQueueList = messageQueueList;
+    }
+
+
+    public ThreadLocalIndex getSendWhichQueue() {
+        return sendWhichQueue;
+    }
+
+
+    public void setSendWhichQueue(ThreadLocalIndex sendWhichQueue) {
+        this.sendWhichQueue = sendWhichQueue;
+    }
+
+
+    public boolean isHaveTopicRouterInfo() {
+        return haveTopicRouterInfo;
+    }
+
+
+    public void setHaveTopicRouterInfo(boolean haveTopicRouterInfo) {
+        this.haveTopicRouterInfo = haveTopicRouterInfo;
+    }
+
+
+    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
+        if (lastBrokerName == null) {
+            return selectOneMessageQueue();
+        } else {
+            int index = this.sendWhichQueue.getAndIncrement();
+            for (int i = 0; i < this.messageQueueList.size(); i++) {
+                int pos = Math.abs(index++) % this.messageQueueList.size();
+                if (pos < 0)
+                    pos = 0;
+                MessageQueue mq = this.messageQueueList.get(pos);
+                if (!mq.getBrokerName().equals(lastBrokerName)) {
+                    return mq;
+                }
+            }
+            return selectOneMessageQueue();
+        }
+    }
+
+
+    public MessageQueue selectOneMessageQueue() {
+        int index = this.sendWhichQueue.getAndIncrement();
+        int pos = Math.abs(index) % this.messageQueueList.size();
+        if (pos < 0)
+            pos = 0;
+        return this.messageQueueList.get(pos);
+    }
+
+    public int getQueueIdByBroker(final String brokerName) {
+        for (int i = 0; i < topicRouteData.getQueueDatas().size(); i++) {
+            final QueueData queueData = this.topicRouteData.getQueueDatas().get(i);
+            if (queueData.getBrokerName().equals(brokerName)) {
+                return queueData.getWriteQueueNums();
+            }
+        }
+
+        return -1;
+    }
+
+
+    @Override
+    public String toString() {
+        return "TopicPublishInfo [orderTopic=" + orderTopic + ", messageQueueList=" + messageQueueList
+                + ", sendWhichQueue=" + sendWhichQueue + ", haveTopicRouterInfo=" + haveTopicRouterInfo + "]";
+    }
+
+    public TopicRouteData getTopicRouteData() {
+        return topicRouteData;
+    }
+
+    public void setTopicRouteData(final TopicRouteData topicRouteData) {
+        this.topicRouteData = topicRouteData;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/latency/LatencyFaultTolerance.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/latency/LatencyFaultTolerance.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/latency/LatencyFaultTolerance.java
new file mode 100644
index 0000000..e6152e4
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/latency/LatencyFaultTolerance.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.client.latency;
+
+/**
+ * @author shijia.wxr
+ */
+public interface LatencyFaultTolerance<T> {
+    void updateFaultItem(final T name, final long currentLatency, final long notAvailableDuration);
+
+    boolean isAvailable(final T name);
+
+    void remove(final T name);
+
+    T pickOneAtLeast();
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/latency/LatencyFaultToleranceImpl.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/latency/LatencyFaultToleranceImpl.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/latency/LatencyFaultToleranceImpl.java
new file mode 100644
index 0000000..8a86449
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/latency/LatencyFaultToleranceImpl.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.client.latency;
+
+import com.alibaba.rocketmq.client.common.ThreadLocalIndex;
+
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @author shijia.wxr
+ */
+public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> {
+    private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16);
+
+    private final ThreadLocalIndex whichItemWorst = new ThreadLocalIndex(0);
+
+    @Override
+    public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
+        FaultItem old = this.faultItemTable.get(name);
+        if (null == old) {
+            final FaultItem faultItem = new FaultItem(name);
+            faultItem.setCurrentLatency(currentLatency);
+            faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
+
+            old = this.faultItemTable.putIfAbsent(name, faultItem);
+            if (old != null) {
+                old.setCurrentLatency(currentLatency);
+                old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
+            }
+        } else {
+            old.setCurrentLatency(currentLatency);
+            old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
+        }
+    }
+
+    @Override
+    public boolean isAvailable(final String name) {
+        final FaultItem faultItem = this.faultItemTable.get(name);
+        if (faultItem != null) {
+            return faultItem.isAvailable();
+        }
+        return true;
+    }
+
+    @Override
+    public void remove(final String name) {
+        this.faultItemTable.remove(name);
+    }
+
+    @Override
+    public String pickOneAtLeast() {
+        final Enumeration<FaultItem> elements = this.faultItemTable.elements();
+        List<FaultItem> tmpList = new LinkedList<FaultItem>();
+        while (elements.hasMoreElements()) {
+            final FaultItem faultItem = elements.nextElement();
+            tmpList.add(faultItem);
+        }
+
+        if (!tmpList.isEmpty()) {
+            Collections.shuffle(tmpList);
+
+            Collections.sort(tmpList);
+
+            final int half = tmpList.size() / 2;
+            if (half <= 0) {
+                return tmpList.get(0).getName();
+            } else {
+                final int i = this.whichItemWorst.getAndIncrement() % half;
+                return tmpList.get(i).getName();
+            }
+        }
+
+        return null;
+    }
+
+    class FaultItem implements Comparable<FaultItem> {
+        private final String name;
+        private volatile long currentLatency;
+        private volatile long startTimestamp;
+
+        public FaultItem(final String name) {
+            this.name = name;
+        }
+
+        @Override
+        public int compareTo(final FaultItem other) {
+            if (this.isAvailable() != other.isAvailable()) {
+                if (this.isAvailable()) return -1;
+
+                if (other.isAvailable()) return 1;
+            }
+
+            if (this.currentLatency < other.currentLatency)
+                return -1;
+            else if (this.currentLatency > other.currentLatency) {
+                return 1;
+            }
+
+            if (this.startTimestamp < other.startTimestamp)
+                return -1;
+            else if (this.startTimestamp > other.startTimestamp) {
+                return 1;
+            }
+
+            return 0;
+        }
+
+        public boolean isAvailable() {
+            return (System.currentTimeMillis() - startTimestamp) >= 0;
+        }
+
+        @Override
+        public int hashCode() {
+            int result = getName() != null ? getName().hashCode() : 0;
+            result = 31 * result + (int) (getCurrentLatency() ^ (getCurrentLatency() >>> 32));
+            result = 31 * result + (int) (getStartTimestamp() ^ (getStartTimestamp() >>> 32));
+            return result;
+        }
+
+        @Override
+        public boolean equals(final Object o) {
+            if (this == o) return true;
+            if (!(o instanceof FaultItem)) return false;
+
+            final FaultItem faultItem = (FaultItem) o;
+
+            if (getCurrentLatency() != faultItem.getCurrentLatency()) return false;
+            if (getStartTimestamp() != faultItem.getStartTimestamp()) return false;
+            return getName() != null ? getName().equals(faultItem.getName()) : faultItem.getName() == null;
+
+        }
+
+        @Override
+        public String toString() {
+            return "FaultItem{" +
+                    "name='" + name + '\'' +
+                    ", currentLatency=" + currentLatency +
+                    ", startTimestamp=" + startTimestamp +
+                    '}';
+        }
+
+        public String getName() {
+            return name;
+        }
+
+        public long getCurrentLatency() {
+            return currentLatency;
+        }
+
+        public void setCurrentLatency(final long currentLatency) {
+            this.currentLatency = currentLatency;
+        }
+
+        public long getStartTimestamp() {
+            return startTimestamp;
+        }
+
+        public void setStartTimestamp(final long startTimestamp) {
+            this.startTimestamp = startTimestamp;
+        }
+
+
+    }
+
+    @Override
+    public String toString() {
+        return "LatencyFaultToleranceImpl{" +
+                "faultItemTable=" + faultItemTable +
+                ", whichItemWorst=" + whichItemWorst +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/latency/MQFaultStrategy.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/latency/MQFaultStrategy.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/latency/MQFaultStrategy.java
new file mode 100644
index 0000000..b323f04
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/latency/MQFaultStrategy.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.client.latency;
+
+import com.alibaba.rocketmq.client.impl.producer.TopicPublishInfo;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+
+/**
+ * @author shijia.wxr
+ */
+public class MQFaultStrategy {
+    private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
+
+    private boolean sendLatencyFaultEnable = false;
+
+    private long[] latencyMax =             {50L,   100L,   550L,       1000L,  2000L,      3000L,      15000L};
+    private long[] notAvailableDuration =   {0L,    0L,     30000L,     60000L, 120000L,    180000L,    600000L};
+
+    public long[] getNotAvailableDuration() {
+        return notAvailableDuration;
+    }
+
+    public void setNotAvailableDuration(final long[] notAvailableDuration) {
+        this.notAvailableDuration = notAvailableDuration;
+    }
+
+    public long[] getLatencyMax() {
+        return latencyMax;
+    }
+
+    public void setLatencyMax(final long[] latencyMax) {
+        this.latencyMax = latencyMax;
+    }
+
+    public boolean isSendLatencyFaultEnable() {
+        return sendLatencyFaultEnable;
+    }
+
+    public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) {
+        this.sendLatencyFaultEnable = sendLatencyFaultEnable;
+    }
+
+    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
+        if (this.sendLatencyFaultEnable) {
+            try {
+                int index = tpInfo.getSendWhichQueue().getAndIncrement();
+                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
+                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
+                    if (pos < 0)
+                        pos = 0;
+                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
+                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
+                        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
+                            return mq;
+                    }
+                }
+
+                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
+                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
+                if (writeQueueNums > 0) {
+                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
+                    if (notBestBroker != null) {
+                        mq.setBrokerName(notBestBroker);
+                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
+                    }
+                    return mq;
+                } else {
+                    latencyFaultTolerance.remove(notBestBroker);
+                }
+            } catch (Exception e) {
+            }
+
+            return tpInfo.selectOneMessageQueue();
+        }
+
+        return tpInfo.selectOneMessageQueue(lastBrokerName);
+    }
+
+    public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
+        if (this.sendLatencyFaultEnable) {
+            long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
+            this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
+        }
+    }
+
+    private long computeNotAvailableDuration(final long currentLatency) {
+        for (int i = latencyMax.length - 1; i >= 0; i--) {
+            if (currentLatency >= latencyMax[i]) return this.notAvailableDuration[i];
+        }
+
+        return 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/log/ClientLogger.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/log/ClientLogger.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/log/ClientLogger.java
new file mode 100644
index 0000000..02af207
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/log/ClientLogger.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.client.log;
+
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import org.slf4j.ILoggerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Method;
+import java.net.URL;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ClientLogger {
+    private static Logger log;
+    public static final String CLIENT_LOG_ROOT = "rocketmq.client.logRoot";
+    public static final String CLIENT_LOG_MAXINDEX = "rocketmq.client.logFileMaxIndex";
+    public static final String CLIENT_LOG_LEVEL = "rocketmq.client.logLevel";
+
+    static {
+        log = createLogger(LoggerName.CLIENT_LOGGER_NAME);
+    }
+
+
+    private static Logger createLogger(final String loggerName) {
+        String logConfigFilePath =
+                System.getProperty("rocketmq.client.log.configFile",
+                        System.getenv("ROCKETMQ_CLIENT_LOG_CONFIGFILE"));
+        Boolean isloadconfig =
+                Boolean.parseBoolean(System.getProperty("rocketmq.client.log.loadconfig", "true"));
+
+        final String log4JResourceFile =
+                System.getProperty("rocketmq.client.log4j.resource.fileName", "log4j_rocketmq_client.xml");
+
+        final String logbackResourceFile =
+                System.getProperty("rocketmq.client.logback.resource.fileName", "logback_rocketmq_client.xml");
+
+        String clientLogRoot = System.getProperty(CLIENT_LOG_ROOT, "${user.home}/logs/rocketmqlogs");
+        System.setProperty("client.logRoot", clientLogRoot);
+        String clientLogLevel = System.getProperty(CLIENT_LOG_LEVEL, "INFO");
+        System.setProperty("client.logLevel", clientLogLevel);
+        String clientLogMaxIndex = System.getProperty(CLIENT_LOG_MAXINDEX, "10");
+        System.setProperty("client.logFileMaxIndex", clientLogMaxIndex);
+
+        if (isloadconfig) {
+            try {
+                ILoggerFactory iLoggerFactory = LoggerFactory.getILoggerFactory();
+                Class classType = iLoggerFactory.getClass();
+                if (classType.getName().equals("org.slf4j.impl.Log4jLoggerFactory")) {
+                    Class<?> domconfigurator;
+                    Object domconfiguratorobj;
+                    domconfigurator = Class.forName("org.apache.log4j.xml.DOMConfigurator");
+                    domconfiguratorobj = domconfigurator.newInstance();
+                    if (null == logConfigFilePath) {
+                        Method configure = domconfiguratorobj.getClass().getMethod("configure", URL.class);
+                        URL url = ClientLogger.class.getClassLoader().getResource(log4JResourceFile);
+                        configure.invoke(domconfiguratorobj, url);
+                    } else {
+                        Method configure = domconfiguratorobj.getClass().getMethod("configure", String.class);
+                        configure.invoke(domconfiguratorobj, logConfigFilePath);
+                    }
+
+                } else if (classType.getName().equals("ch.qos.logback.classic.LoggerContext")) {
+                    Class<?> joranConfigurator;
+                    Class<?> context = Class.forName("ch.qos.logback.core.Context");
+                    Object joranConfiguratoroObj;
+                    joranConfigurator = Class.forName("ch.qos.logback.classic.joran.JoranConfigurator");
+                    joranConfiguratoroObj = joranConfigurator.newInstance();
+                    Method setContext = joranConfiguratoroObj.getClass().getMethod("setContext", context);
+                    setContext.invoke(joranConfiguratoroObj, iLoggerFactory);
+                    if (null == logConfigFilePath) {
+                        URL url = ClientLogger.class.getClassLoader().getResource(logbackResourceFile);
+                        Method doConfigure =
+                                joranConfiguratoroObj.getClass().getMethod("doConfigure", URL.class);
+                        doConfigure.invoke(joranConfiguratoroObj, url);
+                    } else {
+                        Method doConfigure =
+                                joranConfiguratoroObj.getClass().getMethod("doConfigure", String.class);
+                        doConfigure.invoke(joranConfiguratoroObj, logConfigFilePath);
+                    }
+
+                }
+            } catch (Exception e) {
+                System.err.println(e);
+            }
+        }
+        return LoggerFactory.getLogger(LoggerName.CLIENT_LOGGER_NAME);
+    }
+
+
+    public static Logger getLog() {
+        return log;
+    }
+
+
+    public static void setLog(Logger log) {
+        ClientLogger.log = log;
+    }
+}