You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 02:44:27 UTC

[31/58] [abbrv] [partial] incubator-rocketmq git commit: ROCKETMQ-18 Rename package name from com.alibaba to org.apache

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
deleted file mode 100644
index b82cde9..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ /dev/null
@@ -1,1080 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.impl.producer;
-
-import com.alibaba.rocketmq.client.QueryResult;
-import com.alibaba.rocketmq.client.Validators;
-import com.alibaba.rocketmq.client.common.ClientErrorCode;
-import com.alibaba.rocketmq.client.exception.MQBrokerException;
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.client.hook.CheckForbiddenContext;
-import com.alibaba.rocketmq.client.hook.CheckForbiddenHook;
-import com.alibaba.rocketmq.client.hook.SendMessageContext;
-import com.alibaba.rocketmq.client.hook.SendMessageHook;
-import com.alibaba.rocketmq.client.impl.CommunicationMode;
-import com.alibaba.rocketmq.client.impl.MQClientManager;
-import com.alibaba.rocketmq.client.impl.factory.MQClientInstance;
-import com.alibaba.rocketmq.client.latency.MQFaultStrategy;
-import com.alibaba.rocketmq.client.log.ClientLogger;
-import com.alibaba.rocketmq.client.producer.*;
-import com.alibaba.rocketmq.common.MixAll;
-import com.alibaba.rocketmq.common.ServiceState;
-import com.alibaba.rocketmq.common.UtilAll;
-import com.alibaba.rocketmq.common.help.FAQUrl;
-import com.alibaba.rocketmq.common.message.*;
-import com.alibaba.rocketmq.common.protocol.ResponseCode;
-import com.alibaba.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
-import com.alibaba.rocketmq.common.protocol.header.EndTransactionRequestHeader;
-import com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeader;
-import com.alibaba.rocketmq.common.sysflag.MessageSysFlag;
-import com.alibaba.rocketmq.remoting.RPCHook;
-import com.alibaba.rocketmq.remoting.common.RemotingHelper;
-import com.alibaba.rocketmq.remoting.exception.RemotingConnectException;
-import com.alibaba.rocketmq.remoting.exception.RemotingException;
-import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException;
-import org.slf4j.Logger;
-
-import java.io.IOException;
-import java.net.UnknownHostException;
-import java.util.*;
-import java.util.concurrent.*;
-
-
-/**
- * @author shijia.wxr
- */
-public class DefaultMQProducerImpl implements MQProducerInner {
-    private final Logger log = ClientLogger.getLog();
-    private final Random random = new Random();
-    private final DefaultMQProducer defaultMQProducer;
-    private final ConcurrentHashMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =
-            new ConcurrentHashMap<String, TopicPublishInfo>();
-    private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
-    private final RPCHook rpcHook;
-    protected BlockingQueue<Runnable> checkRequestQueue;
-    protected ExecutorService checkExecutor;
-    private ServiceState serviceState = ServiceState.CREATE_JUST;
-    private MQClientInstance mQClientFactory;
-    private ArrayList<CheckForbiddenHook> checkForbiddenHookList = new ArrayList<CheckForbiddenHook>();
-    private int zipCompressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5"));
-
-    private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();
-
-
-    public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer) {
-        this(defaultMQProducer, null);
-    }
-
-
-    public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
-        this.defaultMQProducer = defaultMQProducer;
-        this.rpcHook = rpcHook;
-    }
-
-    public void registerCheckForbiddenHook(CheckForbiddenHook checkForbiddenHook) {
-        this.checkForbiddenHookList.add(checkForbiddenHook);
-        log.info("register a new checkForbiddenHook. hookName={}, allHookSize={}", checkForbiddenHook.hookName(),
-                checkForbiddenHookList.size());
-    }
-
-    public void initTransactionEnv() {
-        TransactionMQProducer producer = (TransactionMQProducer) this.defaultMQProducer;
-        this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(producer.getCheckRequestHoldMax());
-        this.checkExecutor = new ThreadPoolExecutor(//
-                producer.getCheckThreadPoolMinSize(), //
-                producer.getCheckThreadPoolMaxSize(), //
-                1000 * 60, //
-                TimeUnit.MILLISECONDS, //
-                this.checkRequestQueue);
-    }
-
-    public void destroyTransactionEnv() {
-        this.checkExecutor.shutdown();
-        this.checkRequestQueue.clear();
-    }
-
-    public void registerSendMessageHook(final SendMessageHook hook) {
-        this.sendMessageHookList.add(hook);
-        log.info("register sendMessage Hook, {}", hook.hookName());
-    }
-
-    public void start() throws MQClientException {
-        this.start(true);
-    }
-
-    public void start(final boolean startFactory) throws MQClientException {
-        switch (this.serviceState) {
-            case CREATE_JUST:
-                this.serviceState = ServiceState.START_FAILED;
-
-                this.checkConfig();
-
-                if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
-                    this.defaultMQProducer.changeInstanceNameToPID();
-                }
-
-                this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
-
-                boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
-                if (!registerOK) {
-                    this.serviceState = ServiceState.CREATE_JUST;
-                    throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
-                            + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
-                            null);
-                }
-
-                this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
-
-                if (startFactory) {
-                    mQClientFactory.start();
-                }
-
-                log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
-                        this.defaultMQProducer.isSendMessageWithVIPChannel());
-                this.serviceState = ServiceState.RUNNING;
-                break;
-            case RUNNING:
-            case START_FAILED:
-            case SHUTDOWN_ALREADY:
-                throw new MQClientException("The producer service state not OK, maybe started once, "//
-                        + this.serviceState//
-                        + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
-                        null);
-            default:
-                break;
-        }
-
-        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
-    }
-
-    private void checkConfig() throws MQClientException {
-        Validators.checkGroup(this.defaultMQProducer.getProducerGroup());
-
-        if (null == this.defaultMQProducer.getProducerGroup()) {
-            throw new MQClientException("producerGroup is null", null);
-        }
-
-        if (this.defaultMQProducer.getProducerGroup().equals(MixAll.DEFAULT_PRODUCER_GROUP)) {
-            throw new MQClientException("producerGroup can not equal " + MixAll.DEFAULT_PRODUCER_GROUP + ", please specify another one.",
-                    null);
-        }
-    }
-
-    public void shutdown() {
-        this.shutdown(true);
-    }
-
-    public void shutdown(final boolean shutdownFactory) {
-        switch (this.serviceState) {
-            case CREATE_JUST:
-                break;
-            case RUNNING:
-                this.mQClientFactory.unregisterProducer(this.defaultMQProducer.getProducerGroup());
-                if (shutdownFactory) {
-                    this.mQClientFactory.shutdown();
-                }
-
-                log.info("the producer [{}] shutdown OK", this.defaultMQProducer.getProducerGroup());
-                this.serviceState = ServiceState.SHUTDOWN_ALREADY;
-                break;
-            case SHUTDOWN_ALREADY:
-                break;
-            default:
-                break;
-        }
-    }
-
-    @Override
-    public Set<String> getPublishTopicList() {
-        Set<String> topicList = new HashSet<String>();
-        for (String key : this.topicPublishInfoTable.keySet()) {
-            topicList.add(key);
-        }
-
-        return topicList;
-    }
-
-    @Override
-    public boolean isPublishTopicNeedUpdate(String topic) {
-        TopicPublishInfo prev = this.topicPublishInfoTable.get(topic);
-
-        return null == prev || !prev.ok();
-    }
-
-    @Override
-    public TransactionCheckListener checkListener() {
-        if (this.defaultMQProducer instanceof TransactionMQProducer) {
-            TransactionMQProducer producer = (TransactionMQProducer) defaultMQProducer;
-            return producer.getTransactionCheckListener();
-        }
-
-        return null;
-    }
-
-    @Override
-    public void checkTransactionState(final String addr, final MessageExt msg, final CheckTransactionStateRequestHeader header) {
-        Runnable request = new Runnable() {
-            private final String brokerAddr = addr;
-            private final MessageExt message = msg;
-            private final CheckTransactionStateRequestHeader checkRequestHeader = header;
-            private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup();
-
-
-            @Override
-            public void run() {
-                TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
-                if (transactionCheckListener != null) {
-                    LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
-                    Throwable exception = null;
-                    try {
-                        localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
-                    } catch (Throwable e) {
-                        log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);
-                        exception = e;
-                    }
-
-                    this.processTransactionState(//
-                            localTransactionState, //
-                            group, //
-                            exception);
-                } else {
-                    log.warn("checkTransactionState, pick transactionCheckListener by group[{}] failed", group);
-                }
-            }
-
-
-            private void processTransactionState(//
-                                                 final LocalTransactionState localTransactionState, //
-                                                 final String producerGroup, //
-                                                 final Throwable exception) {
-                final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();
-                thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());
-                thisHeader.setProducerGroup(producerGroup);
-                thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());
-                thisHeader.setFromTransactionCheck(true);
-
-                String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
-                if (uniqueKey == null) {
-                    uniqueKey = message.getMsgId();
-                }
-                thisHeader.setMsgId(uniqueKey);
-                thisHeader.setTransactionId(checkRequestHeader.getTransactionId());
-                switch (localTransactionState) {
-                    case COMMIT_MESSAGE:
-                        thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
-                        break;
-                    case ROLLBACK_MESSAGE:
-                        thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
-                        log.warn("when broker check, client rollback this transaction, {}", thisHeader);
-                        break;
-                    case UNKNOW:
-                        thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
-                        log.warn("when broker check, client does not know this transaction state, {}", thisHeader);
-                        break;
-                    default:
-                        break;
-                }
-
-                String remark = null;
-                if (exception != null) {
-                    remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception);
-                }
-
-                try {
-                    DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,
-                            3000);
-                } catch (Exception e) {
-                    log.error("endTransactionOneway exception", e);
-                }
-            }
-        };
-
-        this.checkExecutor.submit(request);
-    }
-
-    @Override
-    public void updateTopicPublishInfo(final String topic, final TopicPublishInfo info) {
-        if (info != null && topic != null) {
-            TopicPublishInfo prev = this.topicPublishInfoTable.put(topic, info);
-            if (prev != null) {
-                log.info("updateTopicPublishInfo prev is not null, " + prev.toString());
-            }
-        }
-    }
-
-    @Override
-    public boolean isUnitMode() {
-        return this.defaultMQProducer.isUnitMode();
-    }
-
-    public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
-        createTopic(key, newTopic, queueNum, 0);
-    }
-
-    public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
-        this.makeSureStateOK();
-        Validators.checkTopic(newTopic);
-
-        this.mQClientFactory.getMQAdminImpl().createTopic(key, newTopic, queueNum, topicSysFlag);
-    }
-
-    private void makeSureStateOK() throws MQClientException {
-        if (this.serviceState != ServiceState.RUNNING) {
-            throw new MQClientException("The producer service state not OK, "//
-                    + this.serviceState//
-                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
-                    null);
-        }
-    }
-
-    public List<MessageQueue> fetchPublishMessageQueues(String topic) throws MQClientException {
-        this.makeSureStateOK();
-        return this.mQClientFactory.getMQAdminImpl().fetchPublishMessageQueues(topic);
-    }
-
-    public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
-        this.makeSureStateOK();
-        return this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
-    }
-
-    public long maxOffset(MessageQueue mq) throws MQClientException {
-        this.makeSureStateOK();
-        return this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
-    }
-
-    public long minOffset(MessageQueue mq) throws MQClientException {
-        this.makeSureStateOK();
-        return this.mQClientFactory.getMQAdminImpl().minOffset(mq);
-    }
-
-    public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
-        this.makeSureStateOK();
-        return this.mQClientFactory.getMQAdminImpl().earliestMsgStoreTime(mq);
-    }
-
-    public MessageExt viewMessage(String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
-        this.makeSureStateOK();
-
-        return this.mQClientFactory.getMQAdminImpl().viewMessage(msgId);
-    }
-
-    public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
-            throws MQClientException, InterruptedException {
-        this.makeSureStateOK();
-        return this.mQClientFactory.getMQAdminImpl().queryMessage(topic, key, maxNum, begin, end);
-    }
-
-    public MessageExt queryMessageByUniqKey(String topic, String uniqKey)
-            throws MQClientException, InterruptedException {
-        this.makeSureStateOK();
-        return this.mQClientFactory.getMQAdminImpl().queryMessageByUniqKey(topic, uniqKey);
-    }
-
-    /**
-     * DEFAULT ASYNC -------------------------------------------------------
-     */
-    public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
-        send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
-    }
-
-    public void send(Message msg, SendCallback sendCallback, long timeout)
-            throws MQClientException, RemotingException, InterruptedException {
-        try {
-            this.sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout);
-        } catch (MQBrokerException e) {
-            throw new MQClientException("unknownn exception", e);
-        }
-    }
-
-    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
-        return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
-    }
-
-    public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
-        this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation);
-    }
-
-    private SendResult sendDefaultImpl(//
-                                       Message msg, //
-                                       final CommunicationMode communicationMode, //
-                                       final SendCallback sendCallback, //
-                                       final long timeout//
-    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
-        this.makeSureStateOK();
-        Validators.checkMessage(msg, this.defaultMQProducer);
-
-        final long invokeID = random.nextLong();
-        long beginTimestampFirst = System.currentTimeMillis();
-        long beginTimestampPrev = beginTimestampFirst;
-        long endTimestamp = beginTimestampFirst;
-        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
-        if (topicPublishInfo != null && topicPublishInfo.ok()) {
-            MessageQueue mq = null;
-            Exception exception = null;
-            SendResult sendResult = null;
-            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
-            int times = 0;
-            String[] brokersSent = new String[timesTotal];
-            for (; times < timesTotal; times++) {
-                String lastBrokerName = null == mq ? null : mq.getBrokerName();
-                MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
-                if (tmpmq != null) {
-                    mq = tmpmq;
-                    brokersSent[times] = mq.getBrokerName();
-                    try {
-                        beginTimestampPrev = System.currentTimeMillis();
-                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
-                        endTimestamp = System.currentTimeMillis();
-                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
-                        switch (communicationMode) {
-                            case ASYNC:
-                                return null;
-                            case ONEWAY:
-                                return null;
-                            case SYNC:
-                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
-                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
-                                        continue;
-                                    }
-                                }
-
-                                return sendResult;
-                            default:
-                                break;
-                        }
-                    } catch (RemotingException e) {
-                        endTimestamp = System.currentTimeMillis();
-                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
-                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
-                        log.warn(msg.toString());
-                        exception = e;
-                        continue;
-                    } catch (MQClientException e) {
-                        endTimestamp = System.currentTimeMillis();
-                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
-                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
-                        log.warn(msg.toString());
-                        exception = e;
-                        continue;
-                    } catch (MQBrokerException e) {
-                        endTimestamp = System.currentTimeMillis();
-                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
-                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
-                        log.warn(msg.toString());
-                        exception = e;
-                        switch (e.getResponseCode()) {
-                            case ResponseCode.TOPIC_NOT_EXIST:
-                            case ResponseCode.SERVICE_NOT_AVAILABLE:
-                            case ResponseCode.SYSTEM_ERROR:
-                            case ResponseCode.NO_PERMISSION:
-                            case ResponseCode.NO_BUYER_ID:
-                            case ResponseCode.NOT_IN_CURRENT_UNIT:
-                                continue;
-                            default:
-                                if (sendResult != null) {
-                                    return sendResult;
-                                }
-
-                                throw e;
-                        }
-                    } catch (InterruptedException e) {
-                        endTimestamp = System.currentTimeMillis();
-                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
-                        log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
-                        log.warn(msg.toString());
-
-                        log.warn("sendKernelImpl exception", e);
-                        log.warn(msg.toString());
-                        throw e;
-                    }
-                } else {
-                    break;
-                }
-            }
-
-            if (sendResult != null) {
-                return sendResult;
-            }
-
-            String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
-                    times,
-                    System.currentTimeMillis() - beginTimestampFirst,
-                    msg.getTopic(),
-                    Arrays.toString(brokersSent));
-
-            info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
-
-            MQClientException mqClientException = new MQClientException(info, exception);
-            if (exception instanceof MQBrokerException) {
-                mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
-            } else if (exception instanceof RemotingConnectException) {
-                mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
-            } else if (exception instanceof RemotingTimeoutException) {
-                mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
-            } else if (exception instanceof MQClientException) {
-                mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
-            }
-
-            throw mqClientException;
-        }
-
-        List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
-        if (null == nsList || nsList.isEmpty()) {
-            throw new MQClientException(
-                    "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
-        }
-
-        throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
-                null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
-    }
-
-    private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
-        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
-        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
-            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
-            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
-            topicPublishInfo = this.topicPublishInfoTable.get(topic);
-        }
-
-        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
-            return topicPublishInfo;
-        } else {
-            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
-            topicPublishInfo = this.topicPublishInfoTable.get(topic);
-            return topicPublishInfo;
-        }
-    }
-
-    private SendResult sendKernelImpl(final Message msg, //
-                                      final MessageQueue mq, //
-                                      final CommunicationMode communicationMode, //
-                                      final SendCallback sendCallback, //
-                                      final TopicPublishInfo topicPublishInfo, //
-                                      final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
-        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
-        if (null == brokerAddr) {
-            tryToFindTopicPublishInfo(mq.getTopic());
-            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
-        }
-
-        SendMessageContext context = null;
-        if (brokerAddr != null) {
-            brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
-
-            byte[] prevBody = msg.getBody();
-            try {
-
-                MessageClientIDSetter.setUniqID(msg);
-
-                int sysFlag = 0;
-                if (this.tryToCompressMessage(msg)) {
-                    sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
-                }
-
-                final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
-                if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
-                    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
-                }
-
-                if (hasCheckForbiddenHook()) {
-                    CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
-                    checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
-                    checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
-                    checkForbiddenContext.setCommunicationMode(communicationMode);
-                    checkForbiddenContext.setBrokerAddr(brokerAddr);
-                    checkForbiddenContext.setMessage(msg);
-                    checkForbiddenContext.setMq(mq);
-                    checkForbiddenContext.setUnitMode(this.isUnitMode());
-                    this.executeCheckForbiddenHook(checkForbiddenContext);
-                }
-
-                if (this.hasSendMessageHook()) {
-                    context = new SendMessageContext();
-                    context.setProducer(this);
-                    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
-                    context.setCommunicationMode(communicationMode);
-                    context.setBornHost(this.defaultMQProducer.getClientIP());
-                    context.setBrokerAddr(brokerAddr);
-                    context.setMessage(msg);
-                    context.setMq(mq);
-                    String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
-                    if (isTrans != null && isTrans.equals("true")) {
-                        context.setMsgType(MessageType.Trans_Msg_Half);
-                    }
-
-                    if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
-                        context.setMsgType(MessageType.Delay_Msg);
-                    }
-                    this.executeSendMessageHookBefore(context);
-                }
-
-                SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
-                requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
-                requestHeader.setTopic(msg.getTopic());
-                requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
-                requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
-                requestHeader.setQueueId(mq.getQueueId());
-                requestHeader.setSysFlag(sysFlag);
-                requestHeader.setBornTimestamp(System.currentTimeMillis());
-                requestHeader.setFlag(msg.getFlag());
-                requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
-                requestHeader.setReconsumeTimes(0);
-                requestHeader.setUnitMode(this.isUnitMode());
-                if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
-                    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
-                    if (reconsumeTimes != null) {
-                        requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
-                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
-                    }
-
-                    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
-                    if (maxReconsumeTimes != null) {
-                        requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
-                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
-                    }
-                }
-
-                SendResult sendResult = null;
-                switch (communicationMode) {
-                    case ASYNC:
-                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(//
-                                brokerAddr, // 1
-                                mq.getBrokerName(), // 2
-                                msg, // 3
-                                requestHeader, // 4
-                                timeout, // 5
-                                communicationMode, // 6
-                                sendCallback, // 7
-                                topicPublishInfo, // 8
-                                this.mQClientFactory, // 9
-                                this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), // 10
-                                context, //
-                                this);
-                        break;
-                    case ONEWAY:
-                    case SYNC:
-                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
-                                brokerAddr,
-                                mq.getBrokerName(),
-                                msg,
-                                requestHeader,
-                                timeout,
-                                communicationMode,
-                                context,
-                                this);
-                        break;
-                    default:
-                        assert false;
-                        break;
-                }
-
-                if (this.hasSendMessageHook()) {
-                    context.setSendResult(sendResult);
-                    this.executeSendMessageHookAfter(context);
-                }
-
-                return sendResult;
-            } catch (RemotingException e) {
-                if (this.hasSendMessageHook()) {
-                    context.setException(e);
-                    this.executeSendMessageHookAfter(context);
-                }
-                throw e;
-            } catch (MQBrokerException e) {
-                if (this.hasSendMessageHook()) {
-                    context.setException(e);
-                    this.executeSendMessageHookAfter(context);
-                }
-                throw e;
-            } catch (InterruptedException e) {
-                if (this.hasSendMessageHook()) {
-                    context.setException(e);
-                    this.executeSendMessageHookAfter(context);
-                }
-                throw e;
-            } finally {
-                msg.setBody(prevBody);
-            }
-        }
-
-        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
-    }
-
-    public MQClientInstance getmQClientFactory() {
-        return mQClientFactory;
-    }
-
-    private boolean tryToCompressMessage(final Message msg) {
-        byte[] body = msg.getBody();
-        if (body != null) {
-            if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {
-                try {
-                    byte[] data = UtilAll.compress(body, zipCompressLevel);
-                    if (data != null) {
-                        msg.setBody(data);
-                        return true;
-                    }
-                } catch (IOException e) {
-                    log.error("tryToCompressMessage exception", e);
-                    log.warn(msg.toString());
-                }
-            }
-        }
-
-        return false;
-    }
-
-    public boolean hasCheckForbiddenHook() {
-        return !checkForbiddenHookList.isEmpty();
-    }
-
-    public void executeCheckForbiddenHook(final CheckForbiddenContext context) throws MQClientException {
-        if (hasCheckForbiddenHook()) {
-            for (CheckForbiddenHook hook : checkForbiddenHookList) {
-                hook.checkForbidden(context);
-            }
-        }
-    }
-
-    public boolean hasSendMessageHook() {
-        return !this.sendMessageHookList.isEmpty();
-    }
-
-    public void executeSendMessageHookBefore(final SendMessageContext context) {
-        if (!this.sendMessageHookList.isEmpty()) {
-            for (SendMessageHook hook : this.sendMessageHookList) {
-                try {
-                    hook.sendMessageBefore(context);
-                } catch (Throwable e) {
-                    log.warn("failed to executeSendMessageHookBefore", e);
-                }
-            }
-        }
-    }
-
-    public void executeSendMessageHookAfter(final SendMessageContext context) {
-        if (!this.sendMessageHookList.isEmpty()) {
-            for (SendMessageHook hook : this.sendMessageHookList) {
-                try {
-                    hook.sendMessageAfter(context);
-                } catch (Throwable e) {
-                    log.warn("failed to executeSendMessageHookAfter", e);
-                }
-            }
-        }
-    }
-
-    /**
-     * DEFAULT ONEWAY -------------------------------------------------------
-     */
-    public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException {
-        try {
-            this.sendDefaultImpl(msg, CommunicationMode.ONEWAY, null, this.defaultMQProducer.getSendMsgTimeout());
-        } catch (MQBrokerException e) {
-            throw new MQClientException("unknown exception", e);
-        }
-    }
-
-    /**
-     * KERNEL SYNC -------------------------------------------------------
-     */
-    public SendResult send(Message msg, MessageQueue mq)
-            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
-        return send(msg, mq, this.defaultMQProducer.getSendMsgTimeout());
-    }
-
-    public SendResult send(Message msg, MessageQueue mq, long timeout)
-            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
-        this.makeSureStateOK();
-        Validators.checkMessage(msg, this.defaultMQProducer);
-
-        if (!msg.getTopic().equals(mq.getTopic())) {
-            throw new MQClientException("message's topic not equal mq's topic", null);
-        }
-
-        return this.sendKernelImpl(msg, mq, CommunicationMode.SYNC, null, null, timeout);
-    }
-
-    /**
-     * KERNEL ASYNC -------------------------------------------------------
-     */
-    public void send(Message msg, MessageQueue mq, SendCallback sendCallback)
-            throws MQClientException, RemotingException, InterruptedException {
-        send(msg, mq, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
-    }
-
-    public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout)
-            throws MQClientException, RemotingException, InterruptedException {
-        this.makeSureStateOK();
-        Validators.checkMessage(msg, this.defaultMQProducer);
-
-        if (!msg.getTopic().equals(mq.getTopic())) {
-            throw new MQClientException("message's topic not equal mq's topic", null);
-        }
-
-        try {
-            this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, sendCallback, null, timeout);
-        } catch (MQBrokerException e) {
-            throw new MQClientException("unknown exception", e);
-        }
-    }
-
-    /**
-     * KERNEL ONEWAY -------------------------------------------------------
-     */
-    public void sendOneway(Message msg, MessageQueue mq) throws MQClientException, RemotingException, InterruptedException {
-        this.makeSureStateOK();
-        Validators.checkMessage(msg, this.defaultMQProducer);
-
-        try {
-            this.sendKernelImpl(msg, mq, CommunicationMode.ONEWAY, null, null, this.defaultMQProducer.getSendMsgTimeout());
-        } catch (MQBrokerException e) {
-            throw new MQClientException("unknown exception", e);
-        }
-    }
-
-    /**
-     * SELECT SYNC -------------------------------------------------------
-     */
-    public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
-            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
-        return send(msg, selector, arg, this.defaultMQProducer.getSendMsgTimeout());
-    }
-
-    public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)
-            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
-        return this.sendSelectImpl(msg, selector, arg, CommunicationMode.SYNC, null, timeout);
-    }
-
-    private SendResult sendSelectImpl(//
-                                      Message msg, //
-                                      MessageQueueSelector selector, //
-                                      Object arg, //
-                                      final CommunicationMode communicationMode, //
-                                      final SendCallback sendCallback, final long timeout//
-    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
-        this.makeSureStateOK();
-        Validators.checkMessage(msg, this.defaultMQProducer);
-
-        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
-        if (topicPublishInfo != null && topicPublishInfo.ok()) {
-            MessageQueue mq = null;
-            try {
-                mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
-            } catch (Throwable e) {
-                throw new MQClientException("select message queue throwed exception.", e);
-            }
-
-            if (mq != null) {
-                return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout);
-            } else {
-                throw new MQClientException("select message queue return null.", null);
-            }
-        }
-
-        throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
-    }
-
-    /**
-     * SELECT ASYNC -------------------------------------------------------
-     */
-    public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback)
-            throws MQClientException, RemotingException, InterruptedException {
-        send(msg, selector, arg, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
-    }
-
-    public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout)
-            throws MQClientException, RemotingException, InterruptedException {
-        try {
-            this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, sendCallback, timeout);
-        } catch (MQBrokerException e) {
-            throw new MQClientException("unknownn exception", e);
-        }
-    }
-
-    /**
-     * SELECT ONEWAY -------------------------------------------------------
-     */
-    public void sendOneway(Message msg, MessageQueueSelector selector, Object arg)
-            throws MQClientException, RemotingException, InterruptedException {
-        try {
-            this.sendSelectImpl(msg, selector, arg, CommunicationMode.ONEWAY, null, this.defaultMQProducer.getSendMsgTimeout());
-        } catch (MQBrokerException e) {
-            throw new MQClientException("unknown exception", e);
-        }
-    }
-
-    public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter tranExecuter, final Object arg)
-            throws MQClientException {
-        if (null == tranExecuter) {
-            throw new MQClientException("tranExecutor is null", null);
-        }
-        Validators.checkMessage(msg, this.defaultMQProducer);
-
-        SendResult sendResult = null;
-        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
-        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
-        try {
-            sendResult = this.send(msg);
-        } catch (Exception e) {
-            throw new MQClientException("send message Exception", e);
-        }
-
-        LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
-        Throwable localException = null;
-        switch (sendResult.getSendStatus()) {
-            case SEND_OK: {
-                try {
-                    if (sendResult.getTransactionId() != null) {
-                        msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
-                    }
-                    localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
-                    if (null == localTransactionState) {
-                        localTransactionState = LocalTransactionState.UNKNOW;
-                    }
-
-                    if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
-                        log.info("executeLocalTransactionBranch return {}", localTransactionState);
-                        log.info(msg.toString());
-                    }
-                } catch (Throwable e) {
-                    log.info("executeLocalTransactionBranch exception", e);
-                    log.info(msg.toString());
-                    localException = e;
-                }
-            }
-            break;
-            case FLUSH_DISK_TIMEOUT:
-            case FLUSH_SLAVE_TIMEOUT:
-            case SLAVE_NOT_AVAILABLE:
-                localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
-                break;
-            default:
-                break;
-        }
-
-        try {
-            this.endTransaction(sendResult, localTransactionState, localException);
-        } catch (Exception e) {
-            log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
-        }
-
-        TransactionSendResult transactionSendResult = new TransactionSendResult();
-        transactionSendResult.setSendStatus(sendResult.getSendStatus());
-        transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
-        transactionSendResult.setMsgId(sendResult.getMsgId());
-        transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
-        transactionSendResult.setTransactionId(sendResult.getTransactionId());
-        transactionSendResult.setLocalTransactionState(localTransactionState);
-        return transactionSendResult;
-    }
-
-    /**
-     * DEFAULT SYNC -------------------------------------------------------
-     */
-    public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
-        return send(msg, this.defaultMQProducer.getSendMsgTimeout());
-    }
-
-    public void endTransaction(//
-                               final SendResult sendResult, //
-                               final LocalTransactionState localTransactionState, //
-                               final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
-        final MessageId id;
-        if (sendResult.getOffsetMsgId() != null) {
-            id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
-        } else {
-            id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
-        }
-        String transactionId = sendResult.getTransactionId();
-        final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
-        EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
-        requestHeader.setTransactionId(transactionId);
-        requestHeader.setCommitLogOffset(id.getOffset());
-        switch (localTransactionState) {
-            case COMMIT_MESSAGE:
-                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
-                break;
-            case ROLLBACK_MESSAGE:
-                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
-                break;
-            case UNKNOW:
-                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
-                break;
-            default:
-                break;
-        }
-
-        requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
-        requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
-        requestHeader.setMsgId(sendResult.getMsgId());
-        String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
-        this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
-                this.defaultMQProducer.getSendMsgTimeout());
-    }
-
-    public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
-        return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
-    }
-
-    public ConcurrentHashMap<String, TopicPublishInfo> getTopicPublishInfoTable() {
-        return topicPublishInfoTable;
-    }
-
-    public int getZipCompressLevel() {
-        return zipCompressLevel;
-    }
-
-
-    public void setZipCompressLevel(int zipCompressLevel) {
-        this.zipCompressLevel = zipCompressLevel;
-    }
-
-
-    public ServiceState getServiceState() {
-        return serviceState;
-    }
-
-
-    public void setServiceState(ServiceState serviceState) {
-        this.serviceState = serviceState;
-    }
-
-    public long[] getNotAvailableDuration() {
-        return this.mqFaultStrategy.getNotAvailableDuration();
-    }
-
-    public void setNotAvailableDuration(final long[] notAvailableDuration) {
-        this.mqFaultStrategy.setNotAvailableDuration(notAvailableDuration);
-    }
-
-    public long[] getLatencyMax() {
-        return this.mqFaultStrategy.getLatencyMax();
-    }
-
-    public void setLatencyMax(final long[] latencyMax) {
-        this.mqFaultStrategy.setLatencyMax(latencyMax);
-    }
-
-    public boolean isSendLatencyFaultEnable() {
-        return this.mqFaultStrategy.isSendLatencyFaultEnable();
-    }
-
-    public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) {
-        this.mqFaultStrategy.setSendLatencyFaultEnable(sendLatencyFaultEnable);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/producer/MQProducerInner.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/producer/MQProducerInner.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/producer/MQProducerInner.java
deleted file mode 100644
index e2837e2..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/impl/producer/MQProducerInner.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.client.impl.producer;
-
-import com.alibaba.rocketmq.client.producer.TransactionCheckListener;
-import com.alibaba.rocketmq.common.message.MessageExt;
-import com.alibaba.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
-
-import java.util.Set;
-
-
-/**
- * @author shijia.wxr
- */
-public interface MQProducerInner {
-    Set<String> getPublishTopicList();
-
-
-    boolean isPublishTopicNeedUpdate(final String topic);
-
-
-    TransactionCheckListener checkListener();
-
-
-    void checkTransactionState(//
-                               final String addr, //
-                               final MessageExt msg, //
-                               final CheckTransactionStateRequestHeader checkRequestHeader);
-
-
-    void updateTopicPublishInfo(final String topic, final TopicPublishInfo info);
-
-
-    boolean isUnitMode();
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/producer/TopicPublishInfo.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/producer/TopicPublishInfo.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/producer/TopicPublishInfo.java
deleted file mode 100644
index 2f7de22..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/impl/producer/TopicPublishInfo.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.impl.producer;
-
-import com.alibaba.rocketmq.client.common.ThreadLocalIndex;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-import com.alibaba.rocketmq.common.protocol.route.QueueData;
-import com.alibaba.rocketmq.common.protocol.route.TopicRouteData;
-
-import java.util.ArrayList;
-import java.util.List;
-
-
-/**
- * @author shijia.wxr
- */
-public class TopicPublishInfo {
-    private boolean orderTopic = false;
-    private boolean haveTopicRouterInfo = false;
-    private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
-    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(0);
-    private TopicRouteData topicRouteData;
-
-
-    public boolean isOrderTopic() {
-        return orderTopic;
-    }
-
-    public void setOrderTopic(boolean orderTopic) {
-        this.orderTopic = orderTopic;
-    }
-
-    public boolean ok() {
-        return null != this.messageQueueList && !this.messageQueueList.isEmpty();
-    }
-
-    public List<MessageQueue> getMessageQueueList() {
-        return messageQueueList;
-    }
-
-
-    public void setMessageQueueList(List<MessageQueue> messageQueueList) {
-        this.messageQueueList = messageQueueList;
-    }
-
-
-    public ThreadLocalIndex getSendWhichQueue() {
-        return sendWhichQueue;
-    }
-
-
-    public void setSendWhichQueue(ThreadLocalIndex sendWhichQueue) {
-        this.sendWhichQueue = sendWhichQueue;
-    }
-
-
-    public boolean isHaveTopicRouterInfo() {
-        return haveTopicRouterInfo;
-    }
-
-
-    public void setHaveTopicRouterInfo(boolean haveTopicRouterInfo) {
-        this.haveTopicRouterInfo = haveTopicRouterInfo;
-    }
-
-
-    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
-        if (lastBrokerName == null) {
-            return selectOneMessageQueue();
-        } else {
-            int index = this.sendWhichQueue.getAndIncrement();
-            for (int i = 0; i < this.messageQueueList.size(); i++) {
-                int pos = Math.abs(index++) % this.messageQueueList.size();
-                if (pos < 0)
-                    pos = 0;
-                MessageQueue mq = this.messageQueueList.get(pos);
-                if (!mq.getBrokerName().equals(lastBrokerName)) {
-                    return mq;
-                }
-            }
-            return selectOneMessageQueue();
-        }
-    }
-
-
-    public MessageQueue selectOneMessageQueue() {
-        int index = this.sendWhichQueue.getAndIncrement();
-        int pos = Math.abs(index) % this.messageQueueList.size();
-        if (pos < 0)
-            pos = 0;
-        return this.messageQueueList.get(pos);
-    }
-
-    public int getQueueIdByBroker(final String brokerName) {
-        for (int i = 0; i < topicRouteData.getQueueDatas().size(); i++) {
-            final QueueData queueData = this.topicRouteData.getQueueDatas().get(i);
-            if (queueData.getBrokerName().equals(brokerName)) {
-                return queueData.getWriteQueueNums();
-            }
-        }
-
-        return -1;
-    }
-
-
-    @Override
-    public String toString() {
-        return "TopicPublishInfo [orderTopic=" + orderTopic + ", messageQueueList=" + messageQueueList
-                + ", sendWhichQueue=" + sendWhichQueue + ", haveTopicRouterInfo=" + haveTopicRouterInfo + "]";
-    }
-
-    public TopicRouteData getTopicRouteData() {
-        return topicRouteData;
-    }
-
-    public void setTopicRouteData(final TopicRouteData topicRouteData) {
-        this.topicRouteData = topicRouteData;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/latency/LatencyFaultTolerance.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/latency/LatencyFaultTolerance.java b/client/src/main/java/com/alibaba/rocketmq/client/latency/LatencyFaultTolerance.java
deleted file mode 100644
index e6152e4..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/latency/LatencyFaultTolerance.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.client.latency;
-
-/**
- * @author shijia.wxr
- */
-public interface LatencyFaultTolerance<T> {
-    void updateFaultItem(final T name, final long currentLatency, final long notAvailableDuration);
-
-    boolean isAvailable(final T name);
-
-    void remove(final T name);
-
-    T pickOneAtLeast();
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/latency/LatencyFaultToleranceImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/latency/LatencyFaultToleranceImpl.java b/client/src/main/java/com/alibaba/rocketmq/client/latency/LatencyFaultToleranceImpl.java
deleted file mode 100644
index 8a86449..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/latency/LatencyFaultToleranceImpl.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.client.latency;
-
-import com.alibaba.rocketmq.client.common.ThreadLocalIndex;
-
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * @author shijia.wxr
- */
-public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> {
-    private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16);
-
-    private final ThreadLocalIndex whichItemWorst = new ThreadLocalIndex(0);
-
-    @Override
-    public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
-        FaultItem old = this.faultItemTable.get(name);
-        if (null == old) {
-            final FaultItem faultItem = new FaultItem(name);
-            faultItem.setCurrentLatency(currentLatency);
-            faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
-
-            old = this.faultItemTable.putIfAbsent(name, faultItem);
-            if (old != null) {
-                old.setCurrentLatency(currentLatency);
-                old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
-            }
-        } else {
-            old.setCurrentLatency(currentLatency);
-            old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
-        }
-    }
-
-    @Override
-    public boolean isAvailable(final String name) {
-        final FaultItem faultItem = this.faultItemTable.get(name);
-        if (faultItem != null) {
-            return faultItem.isAvailable();
-        }
-        return true;
-    }
-
-    @Override
-    public void remove(final String name) {
-        this.faultItemTable.remove(name);
-    }
-
-    @Override
-    public String pickOneAtLeast() {
-        final Enumeration<FaultItem> elements = this.faultItemTable.elements();
-        List<FaultItem> tmpList = new LinkedList<FaultItem>();
-        while (elements.hasMoreElements()) {
-            final FaultItem faultItem = elements.nextElement();
-            tmpList.add(faultItem);
-        }
-
-        if (!tmpList.isEmpty()) {
-            Collections.shuffle(tmpList);
-
-            Collections.sort(tmpList);
-
-            final int half = tmpList.size() / 2;
-            if (half <= 0) {
-                return tmpList.get(0).getName();
-            } else {
-                final int i = this.whichItemWorst.getAndIncrement() % half;
-                return tmpList.get(i).getName();
-            }
-        }
-
-        return null;
-    }
-
-    class FaultItem implements Comparable<FaultItem> {
-        private final String name;
-        private volatile long currentLatency;
-        private volatile long startTimestamp;
-
-        public FaultItem(final String name) {
-            this.name = name;
-        }
-
-        @Override
-        public int compareTo(final FaultItem other) {
-            if (this.isAvailable() != other.isAvailable()) {
-                if (this.isAvailable()) return -1;
-
-                if (other.isAvailable()) return 1;
-            }
-
-            if (this.currentLatency < other.currentLatency)
-                return -1;
-            else if (this.currentLatency > other.currentLatency) {
-                return 1;
-            }
-
-            if (this.startTimestamp < other.startTimestamp)
-                return -1;
-            else if (this.startTimestamp > other.startTimestamp) {
-                return 1;
-            }
-
-            return 0;
-        }
-
-        public boolean isAvailable() {
-            return (System.currentTimeMillis() - startTimestamp) >= 0;
-        }
-
-        @Override
-        public int hashCode() {
-            int result = getName() != null ? getName().hashCode() : 0;
-            result = 31 * result + (int) (getCurrentLatency() ^ (getCurrentLatency() >>> 32));
-            result = 31 * result + (int) (getStartTimestamp() ^ (getStartTimestamp() >>> 32));
-            return result;
-        }
-
-        @Override
-        public boolean equals(final Object o) {
-            if (this == o) return true;
-            if (!(o instanceof FaultItem)) return false;
-
-            final FaultItem faultItem = (FaultItem) o;
-
-            if (getCurrentLatency() != faultItem.getCurrentLatency()) return false;
-            if (getStartTimestamp() != faultItem.getStartTimestamp()) return false;
-            return getName() != null ? getName().equals(faultItem.getName()) : faultItem.getName() == null;
-
-        }
-
-        @Override
-        public String toString() {
-            return "FaultItem{" +
-                    "name='" + name + '\'' +
-                    ", currentLatency=" + currentLatency +
-                    ", startTimestamp=" + startTimestamp +
-                    '}';
-        }
-
-        public String getName() {
-            return name;
-        }
-
-        public long getCurrentLatency() {
-            return currentLatency;
-        }
-
-        public void setCurrentLatency(final long currentLatency) {
-            this.currentLatency = currentLatency;
-        }
-
-        public long getStartTimestamp() {
-            return startTimestamp;
-        }
-
-        public void setStartTimestamp(final long startTimestamp) {
-            this.startTimestamp = startTimestamp;
-        }
-
-
-    }
-
-    @Override
-    public String toString() {
-        return "LatencyFaultToleranceImpl{" +
-                "faultItemTable=" + faultItemTable +
-                ", whichItemWorst=" + whichItemWorst +
-                '}';
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/latency/MQFaultStrategy.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/latency/MQFaultStrategy.java b/client/src/main/java/com/alibaba/rocketmq/client/latency/MQFaultStrategy.java
deleted file mode 100644
index b323f04..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/latency/MQFaultStrategy.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.client.latency;
-
-import com.alibaba.rocketmq.client.impl.producer.TopicPublishInfo;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-
-/**
- * @author shijia.wxr
- */
-public class MQFaultStrategy {
-    private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
-
-    private boolean sendLatencyFaultEnable = false;
-
-    private long[] latencyMax =             {50L,   100L,   550L,       1000L,  2000L,      3000L,      15000L};
-    private long[] notAvailableDuration =   {0L,    0L,     30000L,     60000L, 120000L,    180000L,    600000L};
-
-    public long[] getNotAvailableDuration() {
-        return notAvailableDuration;
-    }
-
-    public void setNotAvailableDuration(final long[] notAvailableDuration) {
-        this.notAvailableDuration = notAvailableDuration;
-    }
-
-    public long[] getLatencyMax() {
-        return latencyMax;
-    }
-
-    public void setLatencyMax(final long[] latencyMax) {
-        this.latencyMax = latencyMax;
-    }
-
-    public boolean isSendLatencyFaultEnable() {
-        return sendLatencyFaultEnable;
-    }
-
-    public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) {
-        this.sendLatencyFaultEnable = sendLatencyFaultEnable;
-    }
-
-    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
-        if (this.sendLatencyFaultEnable) {
-            try {
-                int index = tpInfo.getSendWhichQueue().getAndIncrement();
-                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
-                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
-                    if (pos < 0)
-                        pos = 0;
-                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
-                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
-                        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
-                            return mq;
-                    }
-                }
-
-                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
-                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
-                if (writeQueueNums > 0) {
-                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
-                    if (notBestBroker != null) {
-                        mq.setBrokerName(notBestBroker);
-                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
-                    }
-                    return mq;
-                } else {
-                    latencyFaultTolerance.remove(notBestBroker);
-                }
-            } catch (Exception e) {
-            }
-
-            return tpInfo.selectOneMessageQueue();
-        }
-
-        return tpInfo.selectOneMessageQueue(lastBrokerName);
-    }
-
-    public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
-        if (this.sendLatencyFaultEnable) {
-            long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
-            this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
-        }
-    }
-
-    private long computeNotAvailableDuration(final long currentLatency) {
-        for (int i = latencyMax.length - 1; i >= 0; i--) {
-            if (currentLatency >= latencyMax[i]) return this.notAvailableDuration[i];
-        }
-
-        return 0;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/log/ClientLogger.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/log/ClientLogger.java b/client/src/main/java/com/alibaba/rocketmq/client/log/ClientLogger.java
deleted file mode 100644
index 02af207..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/log/ClientLogger.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.log;
-
-import com.alibaba.rocketmq.common.constant.LoggerName;
-import org.slf4j.ILoggerFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.reflect.Method;
-import java.net.URL;
-
-
-/**
- * @author shijia.wxr
- */
-public class ClientLogger {
-    private static Logger log;
-    public static final String CLIENT_LOG_ROOT = "rocketmq.client.logRoot";
-    public static final String CLIENT_LOG_MAXINDEX = "rocketmq.client.logFileMaxIndex";
-    public static final String CLIENT_LOG_LEVEL = "rocketmq.client.logLevel";
-
-    static {
-        log = createLogger(LoggerName.CLIENT_LOGGER_NAME);
-    }
-
-
-    private static Logger createLogger(final String loggerName) {
-        String logConfigFilePath =
-                System.getProperty("rocketmq.client.log.configFile",
-                        System.getenv("ROCKETMQ_CLIENT_LOG_CONFIGFILE"));
-        Boolean isloadconfig =
-                Boolean.parseBoolean(System.getProperty("rocketmq.client.log.loadconfig", "true"));
-
-        final String log4JResourceFile =
-                System.getProperty("rocketmq.client.log4j.resource.fileName", "log4j_rocketmq_client.xml");
-
-        final String logbackResourceFile =
-                System.getProperty("rocketmq.client.logback.resource.fileName", "logback_rocketmq_client.xml");
-
-        String clientLogRoot = System.getProperty(CLIENT_LOG_ROOT, "${user.home}/logs/rocketmqlogs");
-        System.setProperty("client.logRoot", clientLogRoot);
-        String clientLogLevel = System.getProperty(CLIENT_LOG_LEVEL, "INFO");
-        System.setProperty("client.logLevel", clientLogLevel);
-        String clientLogMaxIndex = System.getProperty(CLIENT_LOG_MAXINDEX, "10");
-        System.setProperty("client.logFileMaxIndex", clientLogMaxIndex);
-
-        if (isloadconfig) {
-            try {
-                ILoggerFactory iLoggerFactory = LoggerFactory.getILoggerFactory();
-                Class classType = iLoggerFactory.getClass();
-                if (classType.getName().equals("org.slf4j.impl.Log4jLoggerFactory")) {
-                    Class<?> domconfigurator;
-                    Object domconfiguratorobj;
-                    domconfigurator = Class.forName("org.apache.log4j.xml.DOMConfigurator");
-                    domconfiguratorobj = domconfigurator.newInstance();
-                    if (null == logConfigFilePath) {
-                        Method configure = domconfiguratorobj.getClass().getMethod("configure", URL.class);
-                        URL url = ClientLogger.class.getClassLoader().getResource(log4JResourceFile);
-                        configure.invoke(domconfiguratorobj, url);
-                    } else {
-                        Method configure = domconfiguratorobj.getClass().getMethod("configure", String.class);
-                        configure.invoke(domconfiguratorobj, logConfigFilePath);
-                    }
-
-                } else if (classType.getName().equals("ch.qos.logback.classic.LoggerContext")) {
-                    Class<?> joranConfigurator;
-                    Class<?> context = Class.forName("ch.qos.logback.core.Context");
-                    Object joranConfiguratoroObj;
-                    joranConfigurator = Class.forName("ch.qos.logback.classic.joran.JoranConfigurator");
-                    joranConfiguratoroObj = joranConfigurator.newInstance();
-                    Method setContext = joranConfiguratoroObj.getClass().getMethod("setContext", context);
-                    setContext.invoke(joranConfiguratoroObj, iLoggerFactory);
-                    if (null == logConfigFilePath) {
-                        URL url = ClientLogger.class.getClassLoader().getResource(logbackResourceFile);
-                        Method doConfigure =
-                                joranConfiguratoroObj.getClass().getMethod("doConfigure", URL.class);
-                        doConfigure.invoke(joranConfiguratoroObj, url);
-                    } else {
-                        Method doConfigure =
-                                joranConfiguratoroObj.getClass().getMethod("doConfigure", String.class);
-                        doConfigure.invoke(joranConfiguratoroObj, logConfigFilePath);
-                    }
-
-                }
-            } catch (Exception e) {
-                System.err.println(e);
-            }
-        }
-        return LoggerFactory.getLogger(LoggerName.CLIENT_LOGGER_NAME);
-    }
-
-
-    public static Logger getLog() {
-        return log;
-    }
-
-
-    public static void setLog(Logger log) {
-        ClientLogger.log = log;
-    }
-}