You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/19 09:40:44 UTC
[27/43] incubator-rocketmq git commit: Finish code dump. Reviewed by:
@yukon @vongosling @stevenschew @vintagewang @lollipop @zander
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
new file mode 100644
index 0000000..b82cde9
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -0,0 +1,1080 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.client.impl.producer;
+
+import com.alibaba.rocketmq.client.QueryResult;
+import com.alibaba.rocketmq.client.Validators;
+import com.alibaba.rocketmq.client.common.ClientErrorCode;
+import com.alibaba.rocketmq.client.exception.MQBrokerException;
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.client.hook.CheckForbiddenContext;
+import com.alibaba.rocketmq.client.hook.CheckForbiddenHook;
+import com.alibaba.rocketmq.client.hook.SendMessageContext;
+import com.alibaba.rocketmq.client.hook.SendMessageHook;
+import com.alibaba.rocketmq.client.impl.CommunicationMode;
+import com.alibaba.rocketmq.client.impl.MQClientManager;
+import com.alibaba.rocketmq.client.impl.factory.MQClientInstance;
+import com.alibaba.rocketmq.client.latency.MQFaultStrategy;
+import com.alibaba.rocketmq.client.log.ClientLogger;
+import com.alibaba.rocketmq.client.producer.*;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.ServiceState;
+import com.alibaba.rocketmq.common.UtilAll;
+import com.alibaba.rocketmq.common.help.FAQUrl;
+import com.alibaba.rocketmq.common.message.*;
+import com.alibaba.rocketmq.common.protocol.ResponseCode;
+import com.alibaba.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
+import com.alibaba.rocketmq.common.protocol.header.EndTransactionRequestHeader;
+import com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeader;
+import com.alibaba.rocketmq.common.sysflag.MessageSysFlag;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.remoting.common.RemotingHelper;
+import com.alibaba.rocketmq.remoting.exception.RemotingConnectException;
+import com.alibaba.rocketmq.remoting.exception.RemotingException;
+import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.*;
+import java.util.concurrent.*;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class DefaultMQProducerImpl implements MQProducerInner {
+ private final Logger log = ClientLogger.getLog();
+ private final Random random = new Random();
+ private final DefaultMQProducer defaultMQProducer;
+ private final ConcurrentHashMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =
+ new ConcurrentHashMap<String, TopicPublishInfo>();
+ private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
+ private final RPCHook rpcHook;
+ protected BlockingQueue<Runnable> checkRequestQueue;
+ protected ExecutorService checkExecutor;
+ private ServiceState serviceState = ServiceState.CREATE_JUST;
+ private MQClientInstance mQClientFactory;
+ private ArrayList<CheckForbiddenHook> checkForbiddenHookList = new ArrayList<CheckForbiddenHook>();
+ private int zipCompressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5"));
+
+ private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();
+
+
+ public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer) {
+ this(defaultMQProducer, null);
+ }
+
+
+ public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
+ this.defaultMQProducer = defaultMQProducer;
+ this.rpcHook = rpcHook;
+ }
+
+ public void registerCheckForbiddenHook(CheckForbiddenHook checkForbiddenHook) {
+ this.checkForbiddenHookList.add(checkForbiddenHook);
+ log.info("register a new checkForbiddenHook. hookName={}, allHookSize={}", checkForbiddenHook.hookName(),
+ checkForbiddenHookList.size());
+ }
+
+ public void initTransactionEnv() {
+ TransactionMQProducer producer = (TransactionMQProducer) this.defaultMQProducer;
+ this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(producer.getCheckRequestHoldMax());
+ this.checkExecutor = new ThreadPoolExecutor(//
+ producer.getCheckThreadPoolMinSize(), //
+ producer.getCheckThreadPoolMaxSize(), //
+ 1000 * 60, //
+ TimeUnit.MILLISECONDS, //
+ this.checkRequestQueue);
+ }
+
+ public void destroyTransactionEnv() {
+ this.checkExecutor.shutdown();
+ this.checkRequestQueue.clear();
+ }
+
+ public void registerSendMessageHook(final SendMessageHook hook) {
+ this.sendMessageHookList.add(hook);
+ log.info("register sendMessage Hook, {}", hook.hookName());
+ }
+
+ public void start() throws MQClientException {
+ this.start(true);
+ }
+
+ public void start(final boolean startFactory) throws MQClientException {
+ switch (this.serviceState) {
+ case CREATE_JUST:
+ this.serviceState = ServiceState.START_FAILED;
+
+ this.checkConfig();
+
+ if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
+ this.defaultMQProducer.changeInstanceNameToPID();
+ }
+
+ this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
+
+ boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
+ if (!registerOK) {
+ this.serviceState = ServiceState.CREATE_JUST;
+ throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
+ null);
+ }
+
+ this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
+
+ if (startFactory) {
+ mQClientFactory.start();
+ }
+
+ log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
+ this.defaultMQProducer.isSendMessageWithVIPChannel());
+ this.serviceState = ServiceState.RUNNING;
+ break;
+ case RUNNING:
+ case START_FAILED:
+ case SHUTDOWN_ALREADY:
+ throw new MQClientException("The producer service state not OK, maybe started once, "//
+ + this.serviceState//
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
+ null);
+ default:
+ break;
+ }
+
+ this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
+ }
+
+ private void checkConfig() throws MQClientException {
+ Validators.checkGroup(this.defaultMQProducer.getProducerGroup());
+
+ if (null == this.defaultMQProducer.getProducerGroup()) {
+ throw new MQClientException("producerGroup is null", null);
+ }
+
+ if (this.defaultMQProducer.getProducerGroup().equals(MixAll.DEFAULT_PRODUCER_GROUP)) {
+ throw new MQClientException("producerGroup can not equal " + MixAll.DEFAULT_PRODUCER_GROUP + ", please specify another one.",
+ null);
+ }
+ }
+
+ public void shutdown() {
+ this.shutdown(true);
+ }
+
+ public void shutdown(final boolean shutdownFactory) {
+ switch (this.serviceState) {
+ case CREATE_JUST:
+ break;
+ case RUNNING:
+ this.mQClientFactory.unregisterProducer(this.defaultMQProducer.getProducerGroup());
+ if (shutdownFactory) {
+ this.mQClientFactory.shutdown();
+ }
+
+ log.info("the producer [{}] shutdown OK", this.defaultMQProducer.getProducerGroup());
+ this.serviceState = ServiceState.SHUTDOWN_ALREADY;
+ break;
+ case SHUTDOWN_ALREADY:
+ break;
+ default:
+ break;
+ }
+ }
+
+ @Override
+ public Set<String> getPublishTopicList() {
+ Set<String> topicList = new HashSet<String>();
+ for (String key : this.topicPublishInfoTable.keySet()) {
+ topicList.add(key);
+ }
+
+ return topicList;
+ }
+
+ @Override
+ public boolean isPublishTopicNeedUpdate(String topic) {
+ TopicPublishInfo prev = this.topicPublishInfoTable.get(topic);
+
+ return null == prev || !prev.ok();
+ }
+
+ @Override
+ public TransactionCheckListener checkListener() {
+ if (this.defaultMQProducer instanceof TransactionMQProducer) {
+ TransactionMQProducer producer = (TransactionMQProducer) defaultMQProducer;
+ return producer.getTransactionCheckListener();
+ }
+
+ return null;
+ }
+
+ @Override
+ public void checkTransactionState(final String addr, final MessageExt msg, final CheckTransactionStateRequestHeader header) {
+ Runnable request = new Runnable() {
+ private final String brokerAddr = addr;
+ private final MessageExt message = msg;
+ private final CheckTransactionStateRequestHeader checkRequestHeader = header;
+ private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup();
+
+
+ @Override
+ public void run() {
+ TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
+ if (transactionCheckListener != null) {
+ LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
+ Throwable exception = null;
+ try {
+ localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
+ } catch (Throwable e) {
+ log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);
+ exception = e;
+ }
+
+ this.processTransactionState(//
+ localTransactionState, //
+ group, //
+ exception);
+ } else {
+ log.warn("checkTransactionState, pick transactionCheckListener by group[{}] failed", group);
+ }
+ }
+
+
+ private void processTransactionState(//
+ final LocalTransactionState localTransactionState, //
+ final String producerGroup, //
+ final Throwable exception) {
+ final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();
+ thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());
+ thisHeader.setProducerGroup(producerGroup);
+ thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());
+ thisHeader.setFromTransactionCheck(true);
+
+ String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
+ if (uniqueKey == null) {
+ uniqueKey = message.getMsgId();
+ }
+ thisHeader.setMsgId(uniqueKey);
+ thisHeader.setTransactionId(checkRequestHeader.getTransactionId());
+ switch (localTransactionState) {
+ case COMMIT_MESSAGE:
+ thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
+ break;
+ case ROLLBACK_MESSAGE:
+ thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
+ log.warn("when broker check, client rollback this transaction, {}", thisHeader);
+ break;
+ case UNKNOW:
+ thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
+ log.warn("when broker check, client does not know this transaction state, {}", thisHeader);
+ break;
+ default:
+ break;
+ }
+
+ String remark = null;
+ if (exception != null) {
+ remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception);
+ }
+
+ try {
+ DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,
+ 3000);
+ } catch (Exception e) {
+ log.error("endTransactionOneway exception", e);
+ }
+ }
+ };
+
+ this.checkExecutor.submit(request);
+ }
+
+ @Override
+ public void updateTopicPublishInfo(final String topic, final TopicPublishInfo info) {
+ if (info != null && topic != null) {
+ TopicPublishInfo prev = this.topicPublishInfoTable.put(topic, info);
+ if (prev != null) {
+ log.info("updateTopicPublishInfo prev is not null, " + prev.toString());
+ }
+ }
+ }
+
+ @Override
+ public boolean isUnitMode() {
+ return this.defaultMQProducer.isUnitMode();
+ }
+
+ public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
+ createTopic(key, newTopic, queueNum, 0);
+ }
+
+ public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
+ this.makeSureStateOK();
+ Validators.checkTopic(newTopic);
+
+ this.mQClientFactory.getMQAdminImpl().createTopic(key, newTopic, queueNum, topicSysFlag);
+ }
+
+ private void makeSureStateOK() throws MQClientException {
+ if (this.serviceState != ServiceState.RUNNING) {
+ throw new MQClientException("The producer service state not OK, "//
+ + this.serviceState//
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
+ null);
+ }
+ }
+
+ public List<MessageQueue> fetchPublishMessageQueues(String topic) throws MQClientException {
+ this.makeSureStateOK();
+ return this.mQClientFactory.getMQAdminImpl().fetchPublishMessageQueues(topic);
+ }
+
+ public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
+ this.makeSureStateOK();
+ return this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
+ }
+
+ public long maxOffset(MessageQueue mq) throws MQClientException {
+ this.makeSureStateOK();
+ return this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
+ }
+
+ public long minOffset(MessageQueue mq) throws MQClientException {
+ this.makeSureStateOK();
+ return this.mQClientFactory.getMQAdminImpl().minOffset(mq);
+ }
+
+ public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
+ this.makeSureStateOK();
+ return this.mQClientFactory.getMQAdminImpl().earliestMsgStoreTime(mq);
+ }
+
+ public MessageExt viewMessage(String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ this.makeSureStateOK();
+
+ return this.mQClientFactory.getMQAdminImpl().viewMessage(msgId);
+ }
+
+ public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
+ throws MQClientException, InterruptedException {
+ this.makeSureStateOK();
+ return this.mQClientFactory.getMQAdminImpl().queryMessage(topic, key, maxNum, begin, end);
+ }
+
+ public MessageExt queryMessageByUniqKey(String topic, String uniqKey)
+ throws MQClientException, InterruptedException {
+ this.makeSureStateOK();
+ return this.mQClientFactory.getMQAdminImpl().queryMessageByUniqKey(topic, uniqKey);
+ }
+
+ /**
+ * DEFAULT ASYNC -------------------------------------------------------
+ */
+ public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
+ send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
+ }
+
+ public void send(Message msg, SendCallback sendCallback, long timeout)
+ throws MQClientException, RemotingException, InterruptedException {
+ try {
+ this.sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout);
+ } catch (MQBrokerException e) {
+ throw new MQClientException("unknownn exception", e);
+ }
+ }
+
+ public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
+ return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
+ }
+
+ public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
+ this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation);
+ }
+
+ private SendResult sendDefaultImpl(//
+ Message msg, //
+ final CommunicationMode communicationMode, //
+ final SendCallback sendCallback, //
+ final long timeout//
+ ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ this.makeSureStateOK();
+ Validators.checkMessage(msg, this.defaultMQProducer);
+
+ final long invokeID = random.nextLong();
+ long beginTimestampFirst = System.currentTimeMillis();
+ long beginTimestampPrev = beginTimestampFirst;
+ long endTimestamp = beginTimestampFirst;
+ TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
+ if (topicPublishInfo != null && topicPublishInfo.ok()) {
+ MessageQueue mq = null;
+ Exception exception = null;
+ SendResult sendResult = null;
+ int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
+ int times = 0;
+ String[] brokersSent = new String[timesTotal];
+ for (; times < timesTotal; times++) {
+ String lastBrokerName = null == mq ? null : mq.getBrokerName();
+ MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
+ if (tmpmq != null) {
+ mq = tmpmq;
+ brokersSent[times] = mq.getBrokerName();
+ try {
+ beginTimestampPrev = System.currentTimeMillis();
+ sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
+ endTimestamp = System.currentTimeMillis();
+ this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
+ switch (communicationMode) {
+ case ASYNC:
+ return null;
+ case ONEWAY:
+ return null;
+ case SYNC:
+ if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
+ if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
+ continue;
+ }
+ }
+
+ return sendResult;
+ default:
+ break;
+ }
+ } catch (RemotingException e) {
+ endTimestamp = System.currentTimeMillis();
+ this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
+ log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
+ log.warn(msg.toString());
+ exception = e;
+ continue;
+ } catch (MQClientException e) {
+ endTimestamp = System.currentTimeMillis();
+ this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
+ log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
+ log.warn(msg.toString());
+ exception = e;
+ continue;
+ } catch (MQBrokerException e) {
+ endTimestamp = System.currentTimeMillis();
+ this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
+ log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
+ log.warn(msg.toString());
+ exception = e;
+ switch (e.getResponseCode()) {
+ case ResponseCode.TOPIC_NOT_EXIST:
+ case ResponseCode.SERVICE_NOT_AVAILABLE:
+ case ResponseCode.SYSTEM_ERROR:
+ case ResponseCode.NO_PERMISSION:
+ case ResponseCode.NO_BUYER_ID:
+ case ResponseCode.NOT_IN_CURRENT_UNIT:
+ continue;
+ default:
+ if (sendResult != null) {
+ return sendResult;
+ }
+
+ throw e;
+ }
+ } catch (InterruptedException e) {
+ endTimestamp = System.currentTimeMillis();
+ this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
+ log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
+ log.warn(msg.toString());
+
+ log.warn("sendKernelImpl exception", e);
+ log.warn(msg.toString());
+ throw e;
+ }
+ } else {
+ break;
+ }
+ }
+
+ if (sendResult != null) {
+ return sendResult;
+ }
+
+ String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
+ times,
+ System.currentTimeMillis() - beginTimestampFirst,
+ msg.getTopic(),
+ Arrays.toString(brokersSent));
+
+ info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
+
+ MQClientException mqClientException = new MQClientException(info, exception);
+ if (exception instanceof MQBrokerException) {
+ mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
+ } else if (exception instanceof RemotingConnectException) {
+ mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
+ } else if (exception instanceof RemotingTimeoutException) {
+ mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
+ } else if (exception instanceof MQClientException) {
+ mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
+ }
+
+ throw mqClientException;
+ }
+
+ List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
+ if (null == nsList || nsList.isEmpty()) {
+ throw new MQClientException(
+ "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
+ }
+
+ throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
+ null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
+ }
+
+ private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
+ TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
+ if (null == topicPublishInfo || !topicPublishInfo.ok()) {
+ this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
+ this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
+ topicPublishInfo = this.topicPublishInfoTable.get(topic);
+ }
+
+ if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
+ return topicPublishInfo;
+ } else {
+ this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
+ topicPublishInfo = this.topicPublishInfoTable.get(topic);
+ return topicPublishInfo;
+ }
+ }
+
+ private SendResult sendKernelImpl(final Message msg, //
+ final MessageQueue mq, //
+ final CommunicationMode communicationMode, //
+ final SendCallback sendCallback, //
+ final TopicPublishInfo topicPublishInfo, //
+ final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
+ if (null == brokerAddr) {
+ tryToFindTopicPublishInfo(mq.getTopic());
+ brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
+ }
+
+ SendMessageContext context = null;
+ if (brokerAddr != null) {
+ brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
+
+ byte[] prevBody = msg.getBody();
+ try {
+
+ MessageClientIDSetter.setUniqID(msg);
+
+ int sysFlag = 0;
+ if (this.tryToCompressMessage(msg)) {
+ sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
+ }
+
+ final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
+ if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
+ sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
+ }
+
+ if (hasCheckForbiddenHook()) {
+ CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
+ checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
+ checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
+ checkForbiddenContext.setCommunicationMode(communicationMode);
+ checkForbiddenContext.setBrokerAddr(brokerAddr);
+ checkForbiddenContext.setMessage(msg);
+ checkForbiddenContext.setMq(mq);
+ checkForbiddenContext.setUnitMode(this.isUnitMode());
+ this.executeCheckForbiddenHook(checkForbiddenContext);
+ }
+
+ if (this.hasSendMessageHook()) {
+ context = new SendMessageContext();
+ context.setProducer(this);
+ context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
+ context.setCommunicationMode(communicationMode);
+ context.setBornHost(this.defaultMQProducer.getClientIP());
+ context.setBrokerAddr(brokerAddr);
+ context.setMessage(msg);
+ context.setMq(mq);
+ String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
+ if (isTrans != null && isTrans.equals("true")) {
+ context.setMsgType(MessageType.Trans_Msg_Half);
+ }
+
+ if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
+ context.setMsgType(MessageType.Delay_Msg);
+ }
+ this.executeSendMessageHookBefore(context);
+ }
+
+ SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
+ requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
+ requestHeader.setTopic(msg.getTopic());
+ requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
+ requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
+ requestHeader.setQueueId(mq.getQueueId());
+ requestHeader.setSysFlag(sysFlag);
+ requestHeader.setBornTimestamp(System.currentTimeMillis());
+ requestHeader.setFlag(msg.getFlag());
+ requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
+ requestHeader.setReconsumeTimes(0);
+ requestHeader.setUnitMode(this.isUnitMode());
+ if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+ String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
+ if (reconsumeTimes != null) {
+ requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
+ MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
+ }
+
+ String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
+ if (maxReconsumeTimes != null) {
+ requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
+ MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
+ }
+ }
+
+ SendResult sendResult = null;
+ switch (communicationMode) {
+ case ASYNC:
+ sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(//
+ brokerAddr, // 1
+ mq.getBrokerName(), // 2
+ msg, // 3
+ requestHeader, // 4
+ timeout, // 5
+ communicationMode, // 6
+ sendCallback, // 7
+ topicPublishInfo, // 8
+ this.mQClientFactory, // 9
+ this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), // 10
+ context, //
+ this);
+ break;
+ case ONEWAY:
+ case SYNC:
+ sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
+ brokerAddr,
+ mq.getBrokerName(),
+ msg,
+ requestHeader,
+ timeout,
+ communicationMode,
+ context,
+ this);
+ break;
+ default:
+ assert false;
+ break;
+ }
+
+ if (this.hasSendMessageHook()) {
+ context.setSendResult(sendResult);
+ this.executeSendMessageHookAfter(context);
+ }
+
+ return sendResult;
+ } catch (RemotingException e) {
+ if (this.hasSendMessageHook()) {
+ context.setException(e);
+ this.executeSendMessageHookAfter(context);
+ }
+ throw e;
+ } catch (MQBrokerException e) {
+ if (this.hasSendMessageHook()) {
+ context.setException(e);
+ this.executeSendMessageHookAfter(context);
+ }
+ throw e;
+ } catch (InterruptedException e) {
+ if (this.hasSendMessageHook()) {
+ context.setException(e);
+ this.executeSendMessageHookAfter(context);
+ }
+ throw e;
+ } finally {
+ msg.setBody(prevBody);
+ }
+ }
+
+ throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
+ }
+
+ public MQClientInstance getmQClientFactory() {
+ return mQClientFactory;
+ }
+
+ private boolean tryToCompressMessage(final Message msg) {
+ byte[] body = msg.getBody();
+ if (body != null) {
+ if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {
+ try {
+ byte[] data = UtilAll.compress(body, zipCompressLevel);
+ if (data != null) {
+ msg.setBody(data);
+ return true;
+ }
+ } catch (IOException e) {
+ log.error("tryToCompressMessage exception", e);
+ log.warn(msg.toString());
+ }
+ }
+ }
+
+ return false;
+ }
+
+ public boolean hasCheckForbiddenHook() {
+ return !checkForbiddenHookList.isEmpty();
+ }
+
+ public void executeCheckForbiddenHook(final CheckForbiddenContext context) throws MQClientException {
+ if (hasCheckForbiddenHook()) {
+ for (CheckForbiddenHook hook : checkForbiddenHookList) {
+ hook.checkForbidden(context);
+ }
+ }
+ }
+
+ public boolean hasSendMessageHook() {
+ return !this.sendMessageHookList.isEmpty();
+ }
+
+ public void executeSendMessageHookBefore(final SendMessageContext context) {
+ if (!this.sendMessageHookList.isEmpty()) {
+ for (SendMessageHook hook : this.sendMessageHookList) {
+ try {
+ hook.sendMessageBefore(context);
+ } catch (Throwable e) {
+ log.warn("failed to executeSendMessageHookBefore", e);
+ }
+ }
+ }
+ }
+
+ public void executeSendMessageHookAfter(final SendMessageContext context) {
+ if (!this.sendMessageHookList.isEmpty()) {
+ for (SendMessageHook hook : this.sendMessageHookList) {
+ try {
+ hook.sendMessageAfter(context);
+ } catch (Throwable e) {
+ log.warn("failed to executeSendMessageHookAfter", e);
+ }
+ }
+ }
+ }
+
+ /**
+ * DEFAULT ONEWAY -------------------------------------------------------
+ */
+ public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException {
+ try {
+ this.sendDefaultImpl(msg, CommunicationMode.ONEWAY, null, this.defaultMQProducer.getSendMsgTimeout());
+ } catch (MQBrokerException e) {
+ throw new MQClientException("unknown exception", e);
+ }
+ }
+
+ /**
+ * KERNEL SYNC -------------------------------------------------------
+ */
+ public SendResult send(Message msg, MessageQueue mq)
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ return send(msg, mq, this.defaultMQProducer.getSendMsgTimeout());
+ }
+
+ public SendResult send(Message msg, MessageQueue mq, long timeout)
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ this.makeSureStateOK();
+ Validators.checkMessage(msg, this.defaultMQProducer);
+
+ if (!msg.getTopic().equals(mq.getTopic())) {
+ throw new MQClientException("message's topic not equal mq's topic", null);
+ }
+
+ return this.sendKernelImpl(msg, mq, CommunicationMode.SYNC, null, null, timeout);
+ }
+
+ /**
+ * KERNEL ASYNC -------------------------------------------------------
+ */
+ public void send(Message msg, MessageQueue mq, SendCallback sendCallback)
+ throws MQClientException, RemotingException, InterruptedException {
+ send(msg, mq, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
+ }
+
+ public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout)
+ throws MQClientException, RemotingException, InterruptedException {
+ this.makeSureStateOK();
+ Validators.checkMessage(msg, this.defaultMQProducer);
+
+ if (!msg.getTopic().equals(mq.getTopic())) {
+ throw new MQClientException("message's topic not equal mq's topic", null);
+ }
+
+ try {
+ this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, sendCallback, null, timeout);
+ } catch (MQBrokerException e) {
+ throw new MQClientException("unknown exception", e);
+ }
+ }
+
+ /**
+ * KERNEL ONEWAY -------------------------------------------------------
+ */
+ public void sendOneway(Message msg, MessageQueue mq) throws MQClientException, RemotingException, InterruptedException {
+ this.makeSureStateOK();
+ Validators.checkMessage(msg, this.defaultMQProducer);
+
+ try {
+ this.sendKernelImpl(msg, mq, CommunicationMode.ONEWAY, null, null, this.defaultMQProducer.getSendMsgTimeout());
+ } catch (MQBrokerException e) {
+ throw new MQClientException("unknown exception", e);
+ }
+ }
+
+ /**
+ * SELECT SYNC -------------------------------------------------------
+ */
+ public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ return send(msg, selector, arg, this.defaultMQProducer.getSendMsgTimeout());
+ }
+
+ public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ return this.sendSelectImpl(msg, selector, arg, CommunicationMode.SYNC, null, timeout);
+ }
+
+ private SendResult sendSelectImpl(//
+ Message msg, //
+ MessageQueueSelector selector, //
+ Object arg, //
+ final CommunicationMode communicationMode, //
+ final SendCallback sendCallback, final long timeout//
+ ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ this.makeSureStateOK();
+ Validators.checkMessage(msg, this.defaultMQProducer);
+
+ TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
+ if (topicPublishInfo != null && topicPublishInfo.ok()) {
+ MessageQueue mq = null;
+ try {
+ mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
+ } catch (Throwable e) {
+ throw new MQClientException("select message queue throwed exception.", e);
+ }
+
+ if (mq != null) {
+ return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout);
+ } else {
+ throw new MQClientException("select message queue return null.", null);
+ }
+ }
+
+ throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
+ }
+
+ /**
+ * SELECT ASYNC -------------------------------------------------------
+ */
+ public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback)
+ throws MQClientException, RemotingException, InterruptedException {
+ send(msg, selector, arg, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
+ }
+
+ public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout)
+ throws MQClientException, RemotingException, InterruptedException {
+ try {
+ this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, sendCallback, timeout);
+ } catch (MQBrokerException e) {
+ throw new MQClientException("unknownn exception", e);
+ }
+ }
+
+ /**
+ * SELECT ONEWAY -------------------------------------------------------
+ */
+ public void sendOneway(Message msg, MessageQueueSelector selector, Object arg)
+ throws MQClientException, RemotingException, InterruptedException {
+ try {
+ this.sendSelectImpl(msg, selector, arg, CommunicationMode.ONEWAY, null, this.defaultMQProducer.getSendMsgTimeout());
+ } catch (MQBrokerException e) {
+ throw new MQClientException("unknown exception", e);
+ }
+ }
+
+ public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter tranExecuter, final Object arg)
+ throws MQClientException {
+ if (null == tranExecuter) {
+ throw new MQClientException("tranExecutor is null", null);
+ }
+ Validators.checkMessage(msg, this.defaultMQProducer);
+
+ SendResult sendResult = null;
+ MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
+ MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
+ try {
+ sendResult = this.send(msg);
+ } catch (Exception e) {
+ throw new MQClientException("send message Exception", e);
+ }
+
+ LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
+ Throwable localException = null;
+ switch (sendResult.getSendStatus()) {
+ case SEND_OK: {
+ try {
+ if (sendResult.getTransactionId() != null) {
+ msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
+ }
+ localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
+ if (null == localTransactionState) {
+ localTransactionState = LocalTransactionState.UNKNOW;
+ }
+
+ if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
+ log.info("executeLocalTransactionBranch return {}", localTransactionState);
+ log.info(msg.toString());
+ }
+ } catch (Throwable e) {
+ log.info("executeLocalTransactionBranch exception", e);
+ log.info(msg.toString());
+ localException = e;
+ }
+ }
+ break;
+ case FLUSH_DISK_TIMEOUT:
+ case FLUSH_SLAVE_TIMEOUT:
+ case SLAVE_NOT_AVAILABLE:
+ localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
+ break;
+ default:
+ break;
+ }
+
+ try {
+ this.endTransaction(sendResult, localTransactionState, localException);
+ } catch (Exception e) {
+ log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
+ }
+
+ TransactionSendResult transactionSendResult = new TransactionSendResult();
+ transactionSendResult.setSendStatus(sendResult.getSendStatus());
+ transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
+ transactionSendResult.setMsgId(sendResult.getMsgId());
+ transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
+ transactionSendResult.setTransactionId(sendResult.getTransactionId());
+ transactionSendResult.setLocalTransactionState(localTransactionState);
+ return transactionSendResult;
+ }
+
+ /**
+ * DEFAULT SYNC -------------------------------------------------------
+ */
+ public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ return send(msg, this.defaultMQProducer.getSendMsgTimeout());
+ }
+
+ public void endTransaction(//
+ final SendResult sendResult, //
+ final LocalTransactionState localTransactionState, //
+ final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
+ final MessageId id;
+ if (sendResult.getOffsetMsgId() != null) {
+ id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
+ } else {
+ id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
+ }
+ String transactionId = sendResult.getTransactionId();
+ final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
+ EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
+ requestHeader.setTransactionId(transactionId);
+ requestHeader.setCommitLogOffset(id.getOffset());
+ switch (localTransactionState) {
+ case COMMIT_MESSAGE:
+ requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
+ break;
+ case ROLLBACK_MESSAGE:
+ requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
+ break;
+ case UNKNOW:
+ requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
+ break;
+ default:
+ break;
+ }
+
+ requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
+ requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
+ requestHeader.setMsgId(sendResult.getMsgId());
+ String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
+ this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
+ this.defaultMQProducer.getSendMsgTimeout());
+ }
+
+ public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
+ }
+
+ public ConcurrentHashMap<String, TopicPublishInfo> getTopicPublishInfoTable() {
+ return topicPublishInfoTable;
+ }
+
+ public int getZipCompressLevel() {
+ return zipCompressLevel;
+ }
+
+
+ public void setZipCompressLevel(int zipCompressLevel) {
+ this.zipCompressLevel = zipCompressLevel;
+ }
+
+
+ public ServiceState getServiceState() {
+ return serviceState;
+ }
+
+
+ public void setServiceState(ServiceState serviceState) {
+ this.serviceState = serviceState;
+ }
+
+ public long[] getNotAvailableDuration() {
+ return this.mqFaultStrategy.getNotAvailableDuration();
+ }
+
+ public void setNotAvailableDuration(final long[] notAvailableDuration) {
+ this.mqFaultStrategy.setNotAvailableDuration(notAvailableDuration);
+ }
+
+ public long[] getLatencyMax() {
+ return this.mqFaultStrategy.getLatencyMax();
+ }
+
+ public void setLatencyMax(final long[] latencyMax) {
+ this.mqFaultStrategy.setLatencyMax(latencyMax);
+ }
+
+ public boolean isSendLatencyFaultEnable() {
+ return this.mqFaultStrategy.isSendLatencyFaultEnable();
+ }
+
+ public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) {
+ this.mqFaultStrategy.setSendLatencyFaultEnable(sendLatencyFaultEnable);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/producer/MQProducerInner.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/producer/MQProducerInner.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/producer/MQProducerInner.java
new file mode 100644
index 0000000..e2837e2
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/producer/MQProducerInner.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.client.impl.producer;
+
+import com.alibaba.rocketmq.client.producer.TransactionCheckListener;
+import com.alibaba.rocketmq.common.message.MessageExt;
+import com.alibaba.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
+
+import java.util.Set;
+
+
+/**
+ * @author shijia.wxr
+ */
+public interface MQProducerInner {
+ Set<String> getPublishTopicList();
+
+
+ boolean isPublishTopicNeedUpdate(final String topic);
+
+
+ TransactionCheckListener checkListener();
+
+
+ void checkTransactionState(//
+ final String addr, //
+ final MessageExt msg, //
+ final CheckTransactionStateRequestHeader checkRequestHeader);
+
+
+ void updateTopicPublishInfo(final String topic, final TopicPublishInfo info);
+
+
+ boolean isUnitMode();
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/producer/TopicPublishInfo.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/producer/TopicPublishInfo.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/producer/TopicPublishInfo.java
new file mode 100644
index 0000000..2f7de22
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/producer/TopicPublishInfo.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.client.impl.producer;
+
+import com.alibaba.rocketmq.client.common.ThreadLocalIndex;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.common.protocol.route.QueueData;
+import com.alibaba.rocketmq.common.protocol.route.TopicRouteData;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class TopicPublishInfo {
+ private boolean orderTopic = false;
+ private boolean haveTopicRouterInfo = false;
+ private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
+ private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(0);
+ private TopicRouteData topicRouteData;
+
+
+ public boolean isOrderTopic() {
+ return orderTopic;
+ }
+
+ public void setOrderTopic(boolean orderTopic) {
+ this.orderTopic = orderTopic;
+ }
+
+ public boolean ok() {
+ return null != this.messageQueueList && !this.messageQueueList.isEmpty();
+ }
+
+ public List<MessageQueue> getMessageQueueList() {
+ return messageQueueList;
+ }
+
+
+ public void setMessageQueueList(List<MessageQueue> messageQueueList) {
+ this.messageQueueList = messageQueueList;
+ }
+
+
+ public ThreadLocalIndex getSendWhichQueue() {
+ return sendWhichQueue;
+ }
+
+
+ public void setSendWhichQueue(ThreadLocalIndex sendWhichQueue) {
+ this.sendWhichQueue = sendWhichQueue;
+ }
+
+
+ public boolean isHaveTopicRouterInfo() {
+ return haveTopicRouterInfo;
+ }
+
+
+ public void setHaveTopicRouterInfo(boolean haveTopicRouterInfo) {
+ this.haveTopicRouterInfo = haveTopicRouterInfo;
+ }
+
+
+ public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
+ if (lastBrokerName == null) {
+ return selectOneMessageQueue();
+ } else {
+ int index = this.sendWhichQueue.getAndIncrement();
+ for (int i = 0; i < this.messageQueueList.size(); i++) {
+ int pos = Math.abs(index++) % this.messageQueueList.size();
+ if (pos < 0)
+ pos = 0;
+ MessageQueue mq = this.messageQueueList.get(pos);
+ if (!mq.getBrokerName().equals(lastBrokerName)) {
+ return mq;
+ }
+ }
+ return selectOneMessageQueue();
+ }
+ }
+
+
+ public MessageQueue selectOneMessageQueue() {
+ int index = this.sendWhichQueue.getAndIncrement();
+ int pos = Math.abs(index) % this.messageQueueList.size();
+ if (pos < 0)
+ pos = 0;
+ return this.messageQueueList.get(pos);
+ }
+
+ public int getQueueIdByBroker(final String brokerName) {
+ for (int i = 0; i < topicRouteData.getQueueDatas().size(); i++) {
+ final QueueData queueData = this.topicRouteData.getQueueDatas().get(i);
+ if (queueData.getBrokerName().equals(brokerName)) {
+ return queueData.getWriteQueueNums();
+ }
+ }
+
+ return -1;
+ }
+
+
+ @Override
+ public String toString() {
+ return "TopicPublishInfo [orderTopic=" + orderTopic + ", messageQueueList=" + messageQueueList
+ + ", sendWhichQueue=" + sendWhichQueue + ", haveTopicRouterInfo=" + haveTopicRouterInfo + "]";
+ }
+
+ public TopicRouteData getTopicRouteData() {
+ return topicRouteData;
+ }
+
+ public void setTopicRouteData(final TopicRouteData topicRouteData) {
+ this.topicRouteData = topicRouteData;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/latency/LatencyFaultTolerance.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/latency/LatencyFaultTolerance.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/latency/LatencyFaultTolerance.java
new file mode 100644
index 0000000..e6152e4
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/latency/LatencyFaultTolerance.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.client.latency;
+
+/**
+ * @author shijia.wxr
+ */
+public interface LatencyFaultTolerance<T> {
+ void updateFaultItem(final T name, final long currentLatency, final long notAvailableDuration);
+
+ boolean isAvailable(final T name);
+
+ void remove(final T name);
+
+ T pickOneAtLeast();
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/latency/LatencyFaultToleranceImpl.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/latency/LatencyFaultToleranceImpl.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/latency/LatencyFaultToleranceImpl.java
new file mode 100644
index 0000000..8a86449
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/latency/LatencyFaultToleranceImpl.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.client.latency;
+
+import com.alibaba.rocketmq.client.common.ThreadLocalIndex;
+
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @author shijia.wxr
+ */
+public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> {
+ private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16);
+
+ private final ThreadLocalIndex whichItemWorst = new ThreadLocalIndex(0);
+
+ @Override
+ public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
+ FaultItem old = this.faultItemTable.get(name);
+ if (null == old) {
+ final FaultItem faultItem = new FaultItem(name);
+ faultItem.setCurrentLatency(currentLatency);
+ faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
+
+ old = this.faultItemTable.putIfAbsent(name, faultItem);
+ if (old != null) {
+ old.setCurrentLatency(currentLatency);
+ old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
+ }
+ } else {
+ old.setCurrentLatency(currentLatency);
+ old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
+ }
+ }
+
+ @Override
+ public boolean isAvailable(final String name) {
+ final FaultItem faultItem = this.faultItemTable.get(name);
+ if (faultItem != null) {
+ return faultItem.isAvailable();
+ }
+ return true;
+ }
+
+ @Override
+ public void remove(final String name) {
+ this.faultItemTable.remove(name);
+ }
+
+ @Override
+ public String pickOneAtLeast() {
+ final Enumeration<FaultItem> elements = this.faultItemTable.elements();
+ List<FaultItem> tmpList = new LinkedList<FaultItem>();
+ while (elements.hasMoreElements()) {
+ final FaultItem faultItem = elements.nextElement();
+ tmpList.add(faultItem);
+ }
+
+ if (!tmpList.isEmpty()) {
+ Collections.shuffle(tmpList);
+
+ Collections.sort(tmpList);
+
+ final int half = tmpList.size() / 2;
+ if (half <= 0) {
+ return tmpList.get(0).getName();
+ } else {
+ final int i = this.whichItemWorst.getAndIncrement() % half;
+ return tmpList.get(i).getName();
+ }
+ }
+
+ return null;
+ }
+
+ class FaultItem implements Comparable<FaultItem> {
+ private final String name;
+ private volatile long currentLatency;
+ private volatile long startTimestamp;
+
+ public FaultItem(final String name) {
+ this.name = name;
+ }
+
+ @Override
+ public int compareTo(final FaultItem other) {
+ if (this.isAvailable() != other.isAvailable()) {
+ if (this.isAvailable()) return -1;
+
+ if (other.isAvailable()) return 1;
+ }
+
+ if (this.currentLatency < other.currentLatency)
+ return -1;
+ else if (this.currentLatency > other.currentLatency) {
+ return 1;
+ }
+
+ if (this.startTimestamp < other.startTimestamp)
+ return -1;
+ else if (this.startTimestamp > other.startTimestamp) {
+ return 1;
+ }
+
+ return 0;
+ }
+
+ public boolean isAvailable() {
+ return (System.currentTimeMillis() - startTimestamp) >= 0;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = getName() != null ? getName().hashCode() : 0;
+ result = 31 * result + (int) (getCurrentLatency() ^ (getCurrentLatency() >>> 32));
+ result = 31 * result + (int) (getStartTimestamp() ^ (getStartTimestamp() >>> 32));
+ return result;
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) return true;
+ if (!(o instanceof FaultItem)) return false;
+
+ final FaultItem faultItem = (FaultItem) o;
+
+ if (getCurrentLatency() != faultItem.getCurrentLatency()) return false;
+ if (getStartTimestamp() != faultItem.getStartTimestamp()) return false;
+ return getName() != null ? getName().equals(faultItem.getName()) : faultItem.getName() == null;
+
+ }
+
+ @Override
+ public String toString() {
+ return "FaultItem{" +
+ "name='" + name + '\'' +
+ ", currentLatency=" + currentLatency +
+ ", startTimestamp=" + startTimestamp +
+ '}';
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public long getCurrentLatency() {
+ return currentLatency;
+ }
+
+ public void setCurrentLatency(final long currentLatency) {
+ this.currentLatency = currentLatency;
+ }
+
+ public long getStartTimestamp() {
+ return startTimestamp;
+ }
+
+ public void setStartTimestamp(final long startTimestamp) {
+ this.startTimestamp = startTimestamp;
+ }
+
+
+ }
+
+ @Override
+ public String toString() {
+ return "LatencyFaultToleranceImpl{" +
+ "faultItemTable=" + faultItemTable +
+ ", whichItemWorst=" + whichItemWorst +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/latency/MQFaultStrategy.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/latency/MQFaultStrategy.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/latency/MQFaultStrategy.java
new file mode 100644
index 0000000..b323f04
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/latency/MQFaultStrategy.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.client.latency;
+
+import com.alibaba.rocketmq.client.impl.producer.TopicPublishInfo;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+
+/**
+ * @author shijia.wxr
+ */
+public class MQFaultStrategy {
+ private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
+
+ private boolean sendLatencyFaultEnable = false;
+
+ private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
+ private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
+
+ public long[] getNotAvailableDuration() {
+ return notAvailableDuration;
+ }
+
+ public void setNotAvailableDuration(final long[] notAvailableDuration) {
+ this.notAvailableDuration = notAvailableDuration;
+ }
+
+ public long[] getLatencyMax() {
+ return latencyMax;
+ }
+
+ public void setLatencyMax(final long[] latencyMax) {
+ this.latencyMax = latencyMax;
+ }
+
+ public boolean isSendLatencyFaultEnable() {
+ return sendLatencyFaultEnable;
+ }
+
+ public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) {
+ this.sendLatencyFaultEnable = sendLatencyFaultEnable;
+ }
+
+ public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
+ if (this.sendLatencyFaultEnable) {
+ try {
+ int index = tpInfo.getSendWhichQueue().getAndIncrement();
+ for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
+ int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
+ if (pos < 0)
+ pos = 0;
+ MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
+ if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
+ if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
+ return mq;
+ }
+ }
+
+ final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
+ int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
+ if (writeQueueNums > 0) {
+ final MessageQueue mq = tpInfo.selectOneMessageQueue();
+ if (notBestBroker != null) {
+ mq.setBrokerName(notBestBroker);
+ mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
+ }
+ return mq;
+ } else {
+ latencyFaultTolerance.remove(notBestBroker);
+ }
+ } catch (Exception e) {
+ }
+
+ return tpInfo.selectOneMessageQueue();
+ }
+
+ return tpInfo.selectOneMessageQueue(lastBrokerName);
+ }
+
+ public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
+ if (this.sendLatencyFaultEnable) {
+ long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
+ this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
+ }
+ }
+
+ private long computeNotAvailableDuration(final long currentLatency) {
+ for (int i = latencyMax.length - 1; i >= 0; i--) {
+ if (currentLatency >= latencyMax[i]) return this.notAvailableDuration[i];
+ }
+
+ return 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/log/ClientLogger.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/log/ClientLogger.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/log/ClientLogger.java
new file mode 100644
index 0000000..02af207
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/log/ClientLogger.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.client.log;
+
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import org.slf4j.ILoggerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Method;
+import java.net.URL;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ClientLogger {
+ private static Logger log;
+ public static final String CLIENT_LOG_ROOT = "rocketmq.client.logRoot";
+ public static final String CLIENT_LOG_MAXINDEX = "rocketmq.client.logFileMaxIndex";
+ public static final String CLIENT_LOG_LEVEL = "rocketmq.client.logLevel";
+
+ static {
+ log = createLogger(LoggerName.CLIENT_LOGGER_NAME);
+ }
+
+
+ private static Logger createLogger(final String loggerName) {
+ String logConfigFilePath =
+ System.getProperty("rocketmq.client.log.configFile",
+ System.getenv("ROCKETMQ_CLIENT_LOG_CONFIGFILE"));
+ Boolean isloadconfig =
+ Boolean.parseBoolean(System.getProperty("rocketmq.client.log.loadconfig", "true"));
+
+ final String log4JResourceFile =
+ System.getProperty("rocketmq.client.log4j.resource.fileName", "log4j_rocketmq_client.xml");
+
+ final String logbackResourceFile =
+ System.getProperty("rocketmq.client.logback.resource.fileName", "logback_rocketmq_client.xml");
+
+ String clientLogRoot = System.getProperty(CLIENT_LOG_ROOT, "${user.home}/logs/rocketmqlogs");
+ System.setProperty("client.logRoot", clientLogRoot);
+ String clientLogLevel = System.getProperty(CLIENT_LOG_LEVEL, "INFO");
+ System.setProperty("client.logLevel", clientLogLevel);
+ String clientLogMaxIndex = System.getProperty(CLIENT_LOG_MAXINDEX, "10");
+ System.setProperty("client.logFileMaxIndex", clientLogMaxIndex);
+
+ if (isloadconfig) {
+ try {
+ ILoggerFactory iLoggerFactory = LoggerFactory.getILoggerFactory();
+ Class classType = iLoggerFactory.getClass();
+ if (classType.getName().equals("org.slf4j.impl.Log4jLoggerFactory")) {
+ Class<?> domconfigurator;
+ Object domconfiguratorobj;
+ domconfigurator = Class.forName("org.apache.log4j.xml.DOMConfigurator");
+ domconfiguratorobj = domconfigurator.newInstance();
+ if (null == logConfigFilePath) {
+ Method configure = domconfiguratorobj.getClass().getMethod("configure", URL.class);
+ URL url = ClientLogger.class.getClassLoader().getResource(log4JResourceFile);
+ configure.invoke(domconfiguratorobj, url);
+ } else {
+ Method configure = domconfiguratorobj.getClass().getMethod("configure", String.class);
+ configure.invoke(domconfiguratorobj, logConfigFilePath);
+ }
+
+ } else if (classType.getName().equals("ch.qos.logback.classic.LoggerContext")) {
+ Class<?> joranConfigurator;
+ Class<?> context = Class.forName("ch.qos.logback.core.Context");
+ Object joranConfiguratoroObj;
+ joranConfigurator = Class.forName("ch.qos.logback.classic.joran.JoranConfigurator");
+ joranConfiguratoroObj = joranConfigurator.newInstance();
+ Method setContext = joranConfiguratoroObj.getClass().getMethod("setContext", context);
+ setContext.invoke(joranConfiguratoroObj, iLoggerFactory);
+ if (null == logConfigFilePath) {
+ URL url = ClientLogger.class.getClassLoader().getResource(logbackResourceFile);
+ Method doConfigure =
+ joranConfiguratoroObj.getClass().getMethod("doConfigure", URL.class);
+ doConfigure.invoke(joranConfiguratoroObj, url);
+ } else {
+ Method doConfigure =
+ joranConfiguratoroObj.getClass().getMethod("doConfigure", String.class);
+ doConfigure.invoke(joranConfiguratoroObj, logConfigFilePath);
+ }
+
+ }
+ } catch (Exception e) {
+ System.err.println(e);
+ }
+ }
+ return LoggerFactory.getLogger(LoggerName.CLIENT_LOGGER_NAME);
+ }
+
+
+ public static Logger getLog() {
+ return log;
+ }
+
+
+ public static void setLog(Logger log) {
+ ClientLogger.log = log;
+ }
+}