You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 02:44:27 UTC
[31/58] [abbrv] [partial] incubator-rocketmq git commit: ROCKETMQ-18
Rename package name from com.alibaba to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
deleted file mode 100644
index b82cde9..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ /dev/null
@@ -1,1080 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.impl.producer;
-
-import com.alibaba.rocketmq.client.QueryResult;
-import com.alibaba.rocketmq.client.Validators;
-import com.alibaba.rocketmq.client.common.ClientErrorCode;
-import com.alibaba.rocketmq.client.exception.MQBrokerException;
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.client.hook.CheckForbiddenContext;
-import com.alibaba.rocketmq.client.hook.CheckForbiddenHook;
-import com.alibaba.rocketmq.client.hook.SendMessageContext;
-import com.alibaba.rocketmq.client.hook.SendMessageHook;
-import com.alibaba.rocketmq.client.impl.CommunicationMode;
-import com.alibaba.rocketmq.client.impl.MQClientManager;
-import com.alibaba.rocketmq.client.impl.factory.MQClientInstance;
-import com.alibaba.rocketmq.client.latency.MQFaultStrategy;
-import com.alibaba.rocketmq.client.log.ClientLogger;
-import com.alibaba.rocketmq.client.producer.*;
-import com.alibaba.rocketmq.common.MixAll;
-import com.alibaba.rocketmq.common.ServiceState;
-import com.alibaba.rocketmq.common.UtilAll;
-import com.alibaba.rocketmq.common.help.FAQUrl;
-import com.alibaba.rocketmq.common.message.*;
-import com.alibaba.rocketmq.common.protocol.ResponseCode;
-import com.alibaba.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
-import com.alibaba.rocketmq.common.protocol.header.EndTransactionRequestHeader;
-import com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeader;
-import com.alibaba.rocketmq.common.sysflag.MessageSysFlag;
-import com.alibaba.rocketmq.remoting.RPCHook;
-import com.alibaba.rocketmq.remoting.common.RemotingHelper;
-import com.alibaba.rocketmq.remoting.exception.RemotingConnectException;
-import com.alibaba.rocketmq.remoting.exception.RemotingException;
-import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException;
-import org.slf4j.Logger;
-
-import java.io.IOException;
-import java.net.UnknownHostException;
-import java.util.*;
-import java.util.concurrent.*;
-
-
-/**
- * @author shijia.wxr
- */
-public class DefaultMQProducerImpl implements MQProducerInner {
- private final Logger log = ClientLogger.getLog();
- private final Random random = new Random();
- private final DefaultMQProducer defaultMQProducer;
- private final ConcurrentHashMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =
- new ConcurrentHashMap<String, TopicPublishInfo>();
- private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
- private final RPCHook rpcHook;
- protected BlockingQueue<Runnable> checkRequestQueue;
- protected ExecutorService checkExecutor;
- private ServiceState serviceState = ServiceState.CREATE_JUST;
- private MQClientInstance mQClientFactory;
- private ArrayList<CheckForbiddenHook> checkForbiddenHookList = new ArrayList<CheckForbiddenHook>();
- private int zipCompressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5"));
-
- private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();
-
-
- public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer) {
- this(defaultMQProducer, null);
- }
-
-
- public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
- this.defaultMQProducer = defaultMQProducer;
- this.rpcHook = rpcHook;
- }
-
- public void registerCheckForbiddenHook(CheckForbiddenHook checkForbiddenHook) {
- this.checkForbiddenHookList.add(checkForbiddenHook);
- log.info("register a new checkForbiddenHook. hookName={}, allHookSize={}", checkForbiddenHook.hookName(),
- checkForbiddenHookList.size());
- }
-
- public void initTransactionEnv() {
- TransactionMQProducer producer = (TransactionMQProducer) this.defaultMQProducer;
- this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(producer.getCheckRequestHoldMax());
- this.checkExecutor = new ThreadPoolExecutor(//
- producer.getCheckThreadPoolMinSize(), //
- producer.getCheckThreadPoolMaxSize(), //
- 1000 * 60, //
- TimeUnit.MILLISECONDS, //
- this.checkRequestQueue);
- }
-
- public void destroyTransactionEnv() {
- this.checkExecutor.shutdown();
- this.checkRequestQueue.clear();
- }
-
- public void registerSendMessageHook(final SendMessageHook hook) {
- this.sendMessageHookList.add(hook);
- log.info("register sendMessage Hook, {}", hook.hookName());
- }
-
- public void start() throws MQClientException {
- this.start(true);
- }
-
- public void start(final boolean startFactory) throws MQClientException {
- switch (this.serviceState) {
- case CREATE_JUST:
- this.serviceState = ServiceState.START_FAILED;
-
- this.checkConfig();
-
- if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
- this.defaultMQProducer.changeInstanceNameToPID();
- }
-
- this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
-
- boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
- if (!registerOK) {
- this.serviceState = ServiceState.CREATE_JUST;
- throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
- + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
- null);
- }
-
- this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
-
- if (startFactory) {
- mQClientFactory.start();
- }
-
- log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
- this.defaultMQProducer.isSendMessageWithVIPChannel());
- this.serviceState = ServiceState.RUNNING;
- break;
- case RUNNING:
- case START_FAILED:
- case SHUTDOWN_ALREADY:
- throw new MQClientException("The producer service state not OK, maybe started once, "//
- + this.serviceState//
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
- null);
- default:
- break;
- }
-
- this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
- }
-
- private void checkConfig() throws MQClientException {
- Validators.checkGroup(this.defaultMQProducer.getProducerGroup());
-
- if (null == this.defaultMQProducer.getProducerGroup()) {
- throw new MQClientException("producerGroup is null", null);
- }
-
- if (this.defaultMQProducer.getProducerGroup().equals(MixAll.DEFAULT_PRODUCER_GROUP)) {
- throw new MQClientException("producerGroup can not equal " + MixAll.DEFAULT_PRODUCER_GROUP + ", please specify another one.",
- null);
- }
- }
-
- public void shutdown() {
- this.shutdown(true);
- }
-
- public void shutdown(final boolean shutdownFactory) {
- switch (this.serviceState) {
- case CREATE_JUST:
- break;
- case RUNNING:
- this.mQClientFactory.unregisterProducer(this.defaultMQProducer.getProducerGroup());
- if (shutdownFactory) {
- this.mQClientFactory.shutdown();
- }
-
- log.info("the producer [{}] shutdown OK", this.defaultMQProducer.getProducerGroup());
- this.serviceState = ServiceState.SHUTDOWN_ALREADY;
- break;
- case SHUTDOWN_ALREADY:
- break;
- default:
- break;
- }
- }
-
- @Override
- public Set<String> getPublishTopicList() {
- Set<String> topicList = new HashSet<String>();
- for (String key : this.topicPublishInfoTable.keySet()) {
- topicList.add(key);
- }
-
- return topicList;
- }
-
- @Override
- public boolean isPublishTopicNeedUpdate(String topic) {
- TopicPublishInfo prev = this.topicPublishInfoTable.get(topic);
-
- return null == prev || !prev.ok();
- }
-
- @Override
- public TransactionCheckListener checkListener() {
- if (this.defaultMQProducer instanceof TransactionMQProducer) {
- TransactionMQProducer producer = (TransactionMQProducer) defaultMQProducer;
- return producer.getTransactionCheckListener();
- }
-
- return null;
- }
-
- @Override
- public void checkTransactionState(final String addr, final MessageExt msg, final CheckTransactionStateRequestHeader header) {
- Runnable request = new Runnable() {
- private final String brokerAddr = addr;
- private final MessageExt message = msg;
- private final CheckTransactionStateRequestHeader checkRequestHeader = header;
- private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup();
-
-
- @Override
- public void run() {
- TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
- if (transactionCheckListener != null) {
- LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
- Throwable exception = null;
- try {
- localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
- } catch (Throwable e) {
- log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);
- exception = e;
- }
-
- this.processTransactionState(//
- localTransactionState, //
- group, //
- exception);
- } else {
- log.warn("checkTransactionState, pick transactionCheckListener by group[{}] failed", group);
- }
- }
-
-
- private void processTransactionState(//
- final LocalTransactionState localTransactionState, //
- final String producerGroup, //
- final Throwable exception) {
- final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();
- thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());
- thisHeader.setProducerGroup(producerGroup);
- thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());
- thisHeader.setFromTransactionCheck(true);
-
- String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
- if (uniqueKey == null) {
- uniqueKey = message.getMsgId();
- }
- thisHeader.setMsgId(uniqueKey);
- thisHeader.setTransactionId(checkRequestHeader.getTransactionId());
- switch (localTransactionState) {
- case COMMIT_MESSAGE:
- thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
- break;
- case ROLLBACK_MESSAGE:
- thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
- log.warn("when broker check, client rollback this transaction, {}", thisHeader);
- break;
- case UNKNOW:
- thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
- log.warn("when broker check, client does not know this transaction state, {}", thisHeader);
- break;
- default:
- break;
- }
-
- String remark = null;
- if (exception != null) {
- remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception);
- }
-
- try {
- DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,
- 3000);
- } catch (Exception e) {
- log.error("endTransactionOneway exception", e);
- }
- }
- };
-
- this.checkExecutor.submit(request);
- }
-
- @Override
- public void updateTopicPublishInfo(final String topic, final TopicPublishInfo info) {
- if (info != null && topic != null) {
- TopicPublishInfo prev = this.topicPublishInfoTable.put(topic, info);
- if (prev != null) {
- log.info("updateTopicPublishInfo prev is not null, " + prev.toString());
- }
- }
- }
-
- @Override
- public boolean isUnitMode() {
- return this.defaultMQProducer.isUnitMode();
- }
-
- public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
- createTopic(key, newTopic, queueNum, 0);
- }
-
- public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
- this.makeSureStateOK();
- Validators.checkTopic(newTopic);
-
- this.mQClientFactory.getMQAdminImpl().createTopic(key, newTopic, queueNum, topicSysFlag);
- }
-
- private void makeSureStateOK() throws MQClientException {
- if (this.serviceState != ServiceState.RUNNING) {
- throw new MQClientException("The producer service state not OK, "//
- + this.serviceState//
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
- null);
- }
- }
-
- public List<MessageQueue> fetchPublishMessageQueues(String topic) throws MQClientException {
- this.makeSureStateOK();
- return this.mQClientFactory.getMQAdminImpl().fetchPublishMessageQueues(topic);
- }
-
- public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
- this.makeSureStateOK();
- return this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
- }
-
- public long maxOffset(MessageQueue mq) throws MQClientException {
- this.makeSureStateOK();
- return this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
- }
-
- public long minOffset(MessageQueue mq) throws MQClientException {
- this.makeSureStateOK();
- return this.mQClientFactory.getMQAdminImpl().minOffset(mq);
- }
-
- public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
- this.makeSureStateOK();
- return this.mQClientFactory.getMQAdminImpl().earliestMsgStoreTime(mq);
- }
-
- public MessageExt viewMessage(String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
- this.makeSureStateOK();
-
- return this.mQClientFactory.getMQAdminImpl().viewMessage(msgId);
- }
-
- public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
- throws MQClientException, InterruptedException {
- this.makeSureStateOK();
- return this.mQClientFactory.getMQAdminImpl().queryMessage(topic, key, maxNum, begin, end);
- }
-
- public MessageExt queryMessageByUniqKey(String topic, String uniqKey)
- throws MQClientException, InterruptedException {
- this.makeSureStateOK();
- return this.mQClientFactory.getMQAdminImpl().queryMessageByUniqKey(topic, uniqKey);
- }
-
- /**
- * DEFAULT ASYNC -------------------------------------------------------
- */
- public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
- send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
- }
-
- public void send(Message msg, SendCallback sendCallback, long timeout)
- throws MQClientException, RemotingException, InterruptedException {
- try {
- this.sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout);
- } catch (MQBrokerException e) {
- throw new MQClientException("unknownn exception", e);
- }
- }
-
- public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
- return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
- }
-
- public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
- this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation);
- }
-
- private SendResult sendDefaultImpl(//
- Message msg, //
- final CommunicationMode communicationMode, //
- final SendCallback sendCallback, //
- final long timeout//
- ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
- this.makeSureStateOK();
- Validators.checkMessage(msg, this.defaultMQProducer);
-
- final long invokeID = random.nextLong();
- long beginTimestampFirst = System.currentTimeMillis();
- long beginTimestampPrev = beginTimestampFirst;
- long endTimestamp = beginTimestampFirst;
- TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
- if (topicPublishInfo != null && topicPublishInfo.ok()) {
- MessageQueue mq = null;
- Exception exception = null;
- SendResult sendResult = null;
- int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
- int times = 0;
- String[] brokersSent = new String[timesTotal];
- for (; times < timesTotal; times++) {
- String lastBrokerName = null == mq ? null : mq.getBrokerName();
- MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
- if (tmpmq != null) {
- mq = tmpmq;
- brokersSent[times] = mq.getBrokerName();
- try {
- beginTimestampPrev = System.currentTimeMillis();
- sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
- endTimestamp = System.currentTimeMillis();
- this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
- switch (communicationMode) {
- case ASYNC:
- return null;
- case ONEWAY:
- return null;
- case SYNC:
- if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
- if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
- continue;
- }
- }
-
- return sendResult;
- default:
- break;
- }
- } catch (RemotingException e) {
- endTimestamp = System.currentTimeMillis();
- this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
- log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
- log.warn(msg.toString());
- exception = e;
- continue;
- } catch (MQClientException e) {
- endTimestamp = System.currentTimeMillis();
- this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
- log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
- log.warn(msg.toString());
- exception = e;
- continue;
- } catch (MQBrokerException e) {
- endTimestamp = System.currentTimeMillis();
- this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
- log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
- log.warn(msg.toString());
- exception = e;
- switch (e.getResponseCode()) {
- case ResponseCode.TOPIC_NOT_EXIST:
- case ResponseCode.SERVICE_NOT_AVAILABLE:
- case ResponseCode.SYSTEM_ERROR:
- case ResponseCode.NO_PERMISSION:
- case ResponseCode.NO_BUYER_ID:
- case ResponseCode.NOT_IN_CURRENT_UNIT:
- continue;
- default:
- if (sendResult != null) {
- return sendResult;
- }
-
- throw e;
- }
- } catch (InterruptedException e) {
- endTimestamp = System.currentTimeMillis();
- this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
- log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
- log.warn(msg.toString());
-
- log.warn("sendKernelImpl exception", e);
- log.warn(msg.toString());
- throw e;
- }
- } else {
- break;
- }
- }
-
- if (sendResult != null) {
- return sendResult;
- }
-
- String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
- times,
- System.currentTimeMillis() - beginTimestampFirst,
- msg.getTopic(),
- Arrays.toString(brokersSent));
-
- info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
-
- MQClientException mqClientException = new MQClientException(info, exception);
- if (exception instanceof MQBrokerException) {
- mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
- } else if (exception instanceof RemotingConnectException) {
- mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
- } else if (exception instanceof RemotingTimeoutException) {
- mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
- } else if (exception instanceof MQClientException) {
- mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
- }
-
- throw mqClientException;
- }
-
- List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
- if (null == nsList || nsList.isEmpty()) {
- throw new MQClientException(
- "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
- }
-
- throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
- null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
- }
-
- private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
- TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
- if (null == topicPublishInfo || !topicPublishInfo.ok()) {
- this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
- this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
- topicPublishInfo = this.topicPublishInfoTable.get(topic);
- }
-
- if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
- return topicPublishInfo;
- } else {
- this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
- topicPublishInfo = this.topicPublishInfoTable.get(topic);
- return topicPublishInfo;
- }
- }
-
- private SendResult sendKernelImpl(final Message msg, //
- final MessageQueue mq, //
- final CommunicationMode communicationMode, //
- final SendCallback sendCallback, //
- final TopicPublishInfo topicPublishInfo, //
- final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
- String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
- if (null == brokerAddr) {
- tryToFindTopicPublishInfo(mq.getTopic());
- brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
- }
-
- SendMessageContext context = null;
- if (brokerAddr != null) {
- brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
-
- byte[] prevBody = msg.getBody();
- try {
-
- MessageClientIDSetter.setUniqID(msg);
-
- int sysFlag = 0;
- if (this.tryToCompressMessage(msg)) {
- sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
- }
-
- final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
- if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
- sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
- }
-
- if (hasCheckForbiddenHook()) {
- CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
- checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
- checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
- checkForbiddenContext.setCommunicationMode(communicationMode);
- checkForbiddenContext.setBrokerAddr(brokerAddr);
- checkForbiddenContext.setMessage(msg);
- checkForbiddenContext.setMq(mq);
- checkForbiddenContext.setUnitMode(this.isUnitMode());
- this.executeCheckForbiddenHook(checkForbiddenContext);
- }
-
- if (this.hasSendMessageHook()) {
- context = new SendMessageContext();
- context.setProducer(this);
- context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
- context.setCommunicationMode(communicationMode);
- context.setBornHost(this.defaultMQProducer.getClientIP());
- context.setBrokerAddr(brokerAddr);
- context.setMessage(msg);
- context.setMq(mq);
- String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
- if (isTrans != null && isTrans.equals("true")) {
- context.setMsgType(MessageType.Trans_Msg_Half);
- }
-
- if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
- context.setMsgType(MessageType.Delay_Msg);
- }
- this.executeSendMessageHookBefore(context);
- }
-
- SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
- requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
- requestHeader.setTopic(msg.getTopic());
- requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
- requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
- requestHeader.setQueueId(mq.getQueueId());
- requestHeader.setSysFlag(sysFlag);
- requestHeader.setBornTimestamp(System.currentTimeMillis());
- requestHeader.setFlag(msg.getFlag());
- requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
- requestHeader.setReconsumeTimes(0);
- requestHeader.setUnitMode(this.isUnitMode());
- if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
- String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
- if (reconsumeTimes != null) {
- requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
- MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
- }
-
- String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
- if (maxReconsumeTimes != null) {
- requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
- MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
- }
- }
-
- SendResult sendResult = null;
- switch (communicationMode) {
- case ASYNC:
- sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(//
- brokerAddr, // 1
- mq.getBrokerName(), // 2
- msg, // 3
- requestHeader, // 4
- timeout, // 5
- communicationMode, // 6
- sendCallback, // 7
- topicPublishInfo, // 8
- this.mQClientFactory, // 9
- this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), // 10
- context, //
- this);
- break;
- case ONEWAY:
- case SYNC:
- sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
- brokerAddr,
- mq.getBrokerName(),
- msg,
- requestHeader,
- timeout,
- communicationMode,
- context,
- this);
- break;
- default:
- assert false;
- break;
- }
-
- if (this.hasSendMessageHook()) {
- context.setSendResult(sendResult);
- this.executeSendMessageHookAfter(context);
- }
-
- return sendResult;
- } catch (RemotingException e) {
- if (this.hasSendMessageHook()) {
- context.setException(e);
- this.executeSendMessageHookAfter(context);
- }
- throw e;
- } catch (MQBrokerException e) {
- if (this.hasSendMessageHook()) {
- context.setException(e);
- this.executeSendMessageHookAfter(context);
- }
- throw e;
- } catch (InterruptedException e) {
- if (this.hasSendMessageHook()) {
- context.setException(e);
- this.executeSendMessageHookAfter(context);
- }
- throw e;
- } finally {
- msg.setBody(prevBody);
- }
- }
-
- throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
- }
-
- public MQClientInstance getmQClientFactory() {
- return mQClientFactory;
- }
-
- private boolean tryToCompressMessage(final Message msg) {
- byte[] body = msg.getBody();
- if (body != null) {
- if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {
- try {
- byte[] data = UtilAll.compress(body, zipCompressLevel);
- if (data != null) {
- msg.setBody(data);
- return true;
- }
- } catch (IOException e) {
- log.error("tryToCompressMessage exception", e);
- log.warn(msg.toString());
- }
- }
- }
-
- return false;
- }
-
- public boolean hasCheckForbiddenHook() {
- return !checkForbiddenHookList.isEmpty();
- }
-
- public void executeCheckForbiddenHook(final CheckForbiddenContext context) throws MQClientException {
- if (hasCheckForbiddenHook()) {
- for (CheckForbiddenHook hook : checkForbiddenHookList) {
- hook.checkForbidden(context);
- }
- }
- }
-
- public boolean hasSendMessageHook() {
- return !this.sendMessageHookList.isEmpty();
- }
-
- public void executeSendMessageHookBefore(final SendMessageContext context) {
- if (!this.sendMessageHookList.isEmpty()) {
- for (SendMessageHook hook : this.sendMessageHookList) {
- try {
- hook.sendMessageBefore(context);
- } catch (Throwable e) {
- log.warn("failed to executeSendMessageHookBefore", e);
- }
- }
- }
- }
-
- public void executeSendMessageHookAfter(final SendMessageContext context) {
- if (!this.sendMessageHookList.isEmpty()) {
- for (SendMessageHook hook : this.sendMessageHookList) {
- try {
- hook.sendMessageAfter(context);
- } catch (Throwable e) {
- log.warn("failed to executeSendMessageHookAfter", e);
- }
- }
- }
- }
-
- /**
- * DEFAULT ONEWAY -------------------------------------------------------
- */
- public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException {
- try {
- this.sendDefaultImpl(msg, CommunicationMode.ONEWAY, null, this.defaultMQProducer.getSendMsgTimeout());
- } catch (MQBrokerException e) {
- throw new MQClientException("unknown exception", e);
- }
- }
-
- /**
- * KERNEL SYNC -------------------------------------------------------
- */
- public SendResult send(Message msg, MessageQueue mq)
- throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
- return send(msg, mq, this.defaultMQProducer.getSendMsgTimeout());
- }
-
- public SendResult send(Message msg, MessageQueue mq, long timeout)
- throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
- this.makeSureStateOK();
- Validators.checkMessage(msg, this.defaultMQProducer);
-
- if (!msg.getTopic().equals(mq.getTopic())) {
- throw new MQClientException("message's topic not equal mq's topic", null);
- }
-
- return this.sendKernelImpl(msg, mq, CommunicationMode.SYNC, null, null, timeout);
- }
-
- /**
- * KERNEL ASYNC -------------------------------------------------------
- */
- public void send(Message msg, MessageQueue mq, SendCallback sendCallback)
- throws MQClientException, RemotingException, InterruptedException {
- send(msg, mq, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
- }
-
- public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout)
- throws MQClientException, RemotingException, InterruptedException {
- this.makeSureStateOK();
- Validators.checkMessage(msg, this.defaultMQProducer);
-
- if (!msg.getTopic().equals(mq.getTopic())) {
- throw new MQClientException("message's topic not equal mq's topic", null);
- }
-
- try {
- this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, sendCallback, null, timeout);
- } catch (MQBrokerException e) {
- throw new MQClientException("unknown exception", e);
- }
- }
-
- /**
- * KERNEL ONEWAY -------------------------------------------------------
- */
- public void sendOneway(Message msg, MessageQueue mq) throws MQClientException, RemotingException, InterruptedException {
- this.makeSureStateOK();
- Validators.checkMessage(msg, this.defaultMQProducer);
-
- try {
- this.sendKernelImpl(msg, mq, CommunicationMode.ONEWAY, null, null, this.defaultMQProducer.getSendMsgTimeout());
- } catch (MQBrokerException e) {
- throw new MQClientException("unknown exception", e);
- }
- }
-
- /**
- * SELECT SYNC -------------------------------------------------------
- */
- public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
- throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
- return send(msg, selector, arg, this.defaultMQProducer.getSendMsgTimeout());
- }
-
- public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)
- throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
- return this.sendSelectImpl(msg, selector, arg, CommunicationMode.SYNC, null, timeout);
- }
-
- private SendResult sendSelectImpl(//
- Message msg, //
- MessageQueueSelector selector, //
- Object arg, //
- final CommunicationMode communicationMode, //
- final SendCallback sendCallback, final long timeout//
- ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
- this.makeSureStateOK();
- Validators.checkMessage(msg, this.defaultMQProducer);
-
- TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
- if (topicPublishInfo != null && topicPublishInfo.ok()) {
- MessageQueue mq = null;
- try {
- mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
- } catch (Throwable e) {
- throw new MQClientException("select message queue throwed exception.", e);
- }
-
- if (mq != null) {
- return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout);
- } else {
- throw new MQClientException("select message queue return null.", null);
- }
- }
-
- throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
- }
-
- /**
- * SELECT ASYNC -------------------------------------------------------
- */
- public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback)
- throws MQClientException, RemotingException, InterruptedException {
- send(msg, selector, arg, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
- }
-
- public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout)
- throws MQClientException, RemotingException, InterruptedException {
- try {
- this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, sendCallback, timeout);
- } catch (MQBrokerException e) {
- throw new MQClientException("unknownn exception", e);
- }
- }
-
- /**
- * SELECT ONEWAY -------------------------------------------------------
- */
- public void sendOneway(Message msg, MessageQueueSelector selector, Object arg)
- throws MQClientException, RemotingException, InterruptedException {
- try {
- this.sendSelectImpl(msg, selector, arg, CommunicationMode.ONEWAY, null, this.defaultMQProducer.getSendMsgTimeout());
- } catch (MQBrokerException e) {
- throw new MQClientException("unknown exception", e);
- }
- }
-
- public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter tranExecuter, final Object arg)
- throws MQClientException {
- if (null == tranExecuter) {
- throw new MQClientException("tranExecutor is null", null);
- }
- Validators.checkMessage(msg, this.defaultMQProducer);
-
- SendResult sendResult = null;
- MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
- MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
- try {
- sendResult = this.send(msg);
- } catch (Exception e) {
- throw new MQClientException("send message Exception", e);
- }
-
- LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
- Throwable localException = null;
- switch (sendResult.getSendStatus()) {
- case SEND_OK: {
- try {
- if (sendResult.getTransactionId() != null) {
- msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
- }
- localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
- if (null == localTransactionState) {
- localTransactionState = LocalTransactionState.UNKNOW;
- }
-
- if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
- log.info("executeLocalTransactionBranch return {}", localTransactionState);
- log.info(msg.toString());
- }
- } catch (Throwable e) {
- log.info("executeLocalTransactionBranch exception", e);
- log.info(msg.toString());
- localException = e;
- }
- }
- break;
- case FLUSH_DISK_TIMEOUT:
- case FLUSH_SLAVE_TIMEOUT:
- case SLAVE_NOT_AVAILABLE:
- localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
- break;
- default:
- break;
- }
-
- try {
- this.endTransaction(sendResult, localTransactionState, localException);
- } catch (Exception e) {
- log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
- }
-
- TransactionSendResult transactionSendResult = new TransactionSendResult();
- transactionSendResult.setSendStatus(sendResult.getSendStatus());
- transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
- transactionSendResult.setMsgId(sendResult.getMsgId());
- transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
- transactionSendResult.setTransactionId(sendResult.getTransactionId());
- transactionSendResult.setLocalTransactionState(localTransactionState);
- return transactionSendResult;
- }
-
- /**
- * DEFAULT SYNC -------------------------------------------------------
- */
- public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
- return send(msg, this.defaultMQProducer.getSendMsgTimeout());
- }
-
- public void endTransaction(//
- final SendResult sendResult, //
- final LocalTransactionState localTransactionState, //
- final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
- final MessageId id;
- if (sendResult.getOffsetMsgId() != null) {
- id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
- } else {
- id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
- }
- String transactionId = sendResult.getTransactionId();
- final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
- EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
- requestHeader.setTransactionId(transactionId);
- requestHeader.setCommitLogOffset(id.getOffset());
- switch (localTransactionState) {
- case COMMIT_MESSAGE:
- requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
- break;
- case ROLLBACK_MESSAGE:
- requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
- break;
- case UNKNOW:
- requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
- break;
- default:
- break;
- }
-
- requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
- requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
- requestHeader.setMsgId(sendResult.getMsgId());
- String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
- this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
- this.defaultMQProducer.getSendMsgTimeout());
- }
-
- public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
- return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
- }
-
- public ConcurrentHashMap<String, TopicPublishInfo> getTopicPublishInfoTable() {
- return topicPublishInfoTable;
- }
-
- public int getZipCompressLevel() {
- return zipCompressLevel;
- }
-
-
- public void setZipCompressLevel(int zipCompressLevel) {
- this.zipCompressLevel = zipCompressLevel;
- }
-
-
- public ServiceState getServiceState() {
- return serviceState;
- }
-
-
- public void setServiceState(ServiceState serviceState) {
- this.serviceState = serviceState;
- }
-
- public long[] getNotAvailableDuration() {
- return this.mqFaultStrategy.getNotAvailableDuration();
- }
-
- public void setNotAvailableDuration(final long[] notAvailableDuration) {
- this.mqFaultStrategy.setNotAvailableDuration(notAvailableDuration);
- }
-
- public long[] getLatencyMax() {
- return this.mqFaultStrategy.getLatencyMax();
- }
-
- public void setLatencyMax(final long[] latencyMax) {
- this.mqFaultStrategy.setLatencyMax(latencyMax);
- }
-
- public boolean isSendLatencyFaultEnable() {
- return this.mqFaultStrategy.isSendLatencyFaultEnable();
- }
-
- public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) {
- this.mqFaultStrategy.setSendLatencyFaultEnable(sendLatencyFaultEnable);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/producer/MQProducerInner.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/producer/MQProducerInner.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/producer/MQProducerInner.java
deleted file mode 100644
index e2837e2..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/impl/producer/MQProducerInner.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.impl.producer;
-
-import com.alibaba.rocketmq.client.producer.TransactionCheckListener;
-import com.alibaba.rocketmq.common.message.MessageExt;
-import com.alibaba.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
-
-import java.util.Set;
-
-
-/**
- * @author shijia.wxr
- */
-public interface MQProducerInner {
- Set<String> getPublishTopicList();
-
-
- boolean isPublishTopicNeedUpdate(final String topic);
-
-
- TransactionCheckListener checkListener();
-
-
- void checkTransactionState(//
- final String addr, //
- final MessageExt msg, //
- final CheckTransactionStateRequestHeader checkRequestHeader);
-
-
- void updateTopicPublishInfo(final String topic, final TopicPublishInfo info);
-
-
- boolean isUnitMode();
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/producer/TopicPublishInfo.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/producer/TopicPublishInfo.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/producer/TopicPublishInfo.java
deleted file mode 100644
index 2f7de22..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/impl/producer/TopicPublishInfo.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.impl.producer;
-
-import com.alibaba.rocketmq.client.common.ThreadLocalIndex;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-import com.alibaba.rocketmq.common.protocol.route.QueueData;
-import com.alibaba.rocketmq.common.protocol.route.TopicRouteData;
-
-import java.util.ArrayList;
-import java.util.List;
-
-
-/**
- * @author shijia.wxr
- */
-public class TopicPublishInfo {
- private boolean orderTopic = false;
- private boolean haveTopicRouterInfo = false;
- private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
- private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(0);
- private TopicRouteData topicRouteData;
-
-
- public boolean isOrderTopic() {
- return orderTopic;
- }
-
- public void setOrderTopic(boolean orderTopic) {
- this.orderTopic = orderTopic;
- }
-
- public boolean ok() {
- return null != this.messageQueueList && !this.messageQueueList.isEmpty();
- }
-
- public List<MessageQueue> getMessageQueueList() {
- return messageQueueList;
- }
-
-
- public void setMessageQueueList(List<MessageQueue> messageQueueList) {
- this.messageQueueList = messageQueueList;
- }
-
-
- public ThreadLocalIndex getSendWhichQueue() {
- return sendWhichQueue;
- }
-
-
- public void setSendWhichQueue(ThreadLocalIndex sendWhichQueue) {
- this.sendWhichQueue = sendWhichQueue;
- }
-
-
- public boolean isHaveTopicRouterInfo() {
- return haveTopicRouterInfo;
- }
-
-
- public void setHaveTopicRouterInfo(boolean haveTopicRouterInfo) {
- this.haveTopicRouterInfo = haveTopicRouterInfo;
- }
-
-
- public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
- if (lastBrokerName == null) {
- return selectOneMessageQueue();
- } else {
- int index = this.sendWhichQueue.getAndIncrement();
- for (int i = 0; i < this.messageQueueList.size(); i++) {
- int pos = Math.abs(index++) % this.messageQueueList.size();
- if (pos < 0)
- pos = 0;
- MessageQueue mq = this.messageQueueList.get(pos);
- if (!mq.getBrokerName().equals(lastBrokerName)) {
- return mq;
- }
- }
- return selectOneMessageQueue();
- }
- }
-
-
- public MessageQueue selectOneMessageQueue() {
- int index = this.sendWhichQueue.getAndIncrement();
- int pos = Math.abs(index) % this.messageQueueList.size();
- if (pos < 0)
- pos = 0;
- return this.messageQueueList.get(pos);
- }
-
- public int getQueueIdByBroker(final String brokerName) {
- for (int i = 0; i < topicRouteData.getQueueDatas().size(); i++) {
- final QueueData queueData = this.topicRouteData.getQueueDatas().get(i);
- if (queueData.getBrokerName().equals(brokerName)) {
- return queueData.getWriteQueueNums();
- }
- }
-
- return -1;
- }
-
-
- @Override
- public String toString() {
- return "TopicPublishInfo [orderTopic=" + orderTopic + ", messageQueueList=" + messageQueueList
- + ", sendWhichQueue=" + sendWhichQueue + ", haveTopicRouterInfo=" + haveTopicRouterInfo + "]";
- }
-
- public TopicRouteData getTopicRouteData() {
- return topicRouteData;
- }
-
- public void setTopicRouteData(final TopicRouteData topicRouteData) {
- this.topicRouteData = topicRouteData;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/latency/LatencyFaultTolerance.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/latency/LatencyFaultTolerance.java b/client/src/main/java/com/alibaba/rocketmq/client/latency/LatencyFaultTolerance.java
deleted file mode 100644
index e6152e4..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/latency/LatencyFaultTolerance.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.client.latency;
-
-/**
- * @author shijia.wxr
- */
-public interface LatencyFaultTolerance<T> {
- void updateFaultItem(final T name, final long currentLatency, final long notAvailableDuration);
-
- boolean isAvailable(final T name);
-
- void remove(final T name);
-
- T pickOneAtLeast();
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/latency/LatencyFaultToleranceImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/latency/LatencyFaultToleranceImpl.java b/client/src/main/java/com/alibaba/rocketmq/client/latency/LatencyFaultToleranceImpl.java
deleted file mode 100644
index 8a86449..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/latency/LatencyFaultToleranceImpl.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.client.latency;
-
-import com.alibaba.rocketmq.client.common.ThreadLocalIndex;
-
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * @author shijia.wxr
- */
-public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> {
- private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16);
-
- private final ThreadLocalIndex whichItemWorst = new ThreadLocalIndex(0);
-
- @Override
- public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
- FaultItem old = this.faultItemTable.get(name);
- if (null == old) {
- final FaultItem faultItem = new FaultItem(name);
- faultItem.setCurrentLatency(currentLatency);
- faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
-
- old = this.faultItemTable.putIfAbsent(name, faultItem);
- if (old != null) {
- old.setCurrentLatency(currentLatency);
- old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
- }
- } else {
- old.setCurrentLatency(currentLatency);
- old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
- }
- }
-
- @Override
- public boolean isAvailable(final String name) {
- final FaultItem faultItem = this.faultItemTable.get(name);
- if (faultItem != null) {
- return faultItem.isAvailable();
- }
- return true;
- }
-
- @Override
- public void remove(final String name) {
- this.faultItemTable.remove(name);
- }
-
- @Override
- public String pickOneAtLeast() {
- final Enumeration<FaultItem> elements = this.faultItemTable.elements();
- List<FaultItem> tmpList = new LinkedList<FaultItem>();
- while (elements.hasMoreElements()) {
- final FaultItem faultItem = elements.nextElement();
- tmpList.add(faultItem);
- }
-
- if (!tmpList.isEmpty()) {
- Collections.shuffle(tmpList);
-
- Collections.sort(tmpList);
-
- final int half = tmpList.size() / 2;
- if (half <= 0) {
- return tmpList.get(0).getName();
- } else {
- final int i = this.whichItemWorst.getAndIncrement() % half;
- return tmpList.get(i).getName();
- }
- }
-
- return null;
- }
-
- class FaultItem implements Comparable<FaultItem> {
- private final String name;
- private volatile long currentLatency;
- private volatile long startTimestamp;
-
- public FaultItem(final String name) {
- this.name = name;
- }
-
- @Override
- public int compareTo(final FaultItem other) {
- if (this.isAvailable() != other.isAvailable()) {
- if (this.isAvailable()) return -1;
-
- if (other.isAvailable()) return 1;
- }
-
- if (this.currentLatency < other.currentLatency)
- return -1;
- else if (this.currentLatency > other.currentLatency) {
- return 1;
- }
-
- if (this.startTimestamp < other.startTimestamp)
- return -1;
- else if (this.startTimestamp > other.startTimestamp) {
- return 1;
- }
-
- return 0;
- }
-
- public boolean isAvailable() {
- return (System.currentTimeMillis() - startTimestamp) >= 0;
- }
-
- @Override
- public int hashCode() {
- int result = getName() != null ? getName().hashCode() : 0;
- result = 31 * result + (int) (getCurrentLatency() ^ (getCurrentLatency() >>> 32));
- result = 31 * result + (int) (getStartTimestamp() ^ (getStartTimestamp() >>> 32));
- return result;
- }
-
- @Override
- public boolean equals(final Object o) {
- if (this == o) return true;
- if (!(o instanceof FaultItem)) return false;
-
- final FaultItem faultItem = (FaultItem) o;
-
- if (getCurrentLatency() != faultItem.getCurrentLatency()) return false;
- if (getStartTimestamp() != faultItem.getStartTimestamp()) return false;
- return getName() != null ? getName().equals(faultItem.getName()) : faultItem.getName() == null;
-
- }
-
- @Override
- public String toString() {
- return "FaultItem{" +
- "name='" + name + '\'' +
- ", currentLatency=" + currentLatency +
- ", startTimestamp=" + startTimestamp +
- '}';
- }
-
- public String getName() {
- return name;
- }
-
- public long getCurrentLatency() {
- return currentLatency;
- }
-
- public void setCurrentLatency(final long currentLatency) {
- this.currentLatency = currentLatency;
- }
-
- public long getStartTimestamp() {
- return startTimestamp;
- }
-
- public void setStartTimestamp(final long startTimestamp) {
- this.startTimestamp = startTimestamp;
- }
-
-
- }
-
- @Override
- public String toString() {
- return "LatencyFaultToleranceImpl{" +
- "faultItemTable=" + faultItemTable +
- ", whichItemWorst=" + whichItemWorst +
- '}';
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/latency/MQFaultStrategy.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/latency/MQFaultStrategy.java b/client/src/main/java/com/alibaba/rocketmq/client/latency/MQFaultStrategy.java
deleted file mode 100644
index b323f04..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/latency/MQFaultStrategy.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.client.latency;
-
-import com.alibaba.rocketmq.client.impl.producer.TopicPublishInfo;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-
-/**
- * @author shijia.wxr
- */
-public class MQFaultStrategy {
- private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
-
- private boolean sendLatencyFaultEnable = false;
-
- private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
- private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
-
- public long[] getNotAvailableDuration() {
- return notAvailableDuration;
- }
-
- public void setNotAvailableDuration(final long[] notAvailableDuration) {
- this.notAvailableDuration = notAvailableDuration;
- }
-
- public long[] getLatencyMax() {
- return latencyMax;
- }
-
- public void setLatencyMax(final long[] latencyMax) {
- this.latencyMax = latencyMax;
- }
-
- public boolean isSendLatencyFaultEnable() {
- return sendLatencyFaultEnable;
- }
-
- public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) {
- this.sendLatencyFaultEnable = sendLatencyFaultEnable;
- }
-
- public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
- if (this.sendLatencyFaultEnable) {
- try {
- int index = tpInfo.getSendWhichQueue().getAndIncrement();
- for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
- int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
- if (pos < 0)
- pos = 0;
- MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
- if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
- if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
- return mq;
- }
- }
-
- final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
- int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
- if (writeQueueNums > 0) {
- final MessageQueue mq = tpInfo.selectOneMessageQueue();
- if (notBestBroker != null) {
- mq.setBrokerName(notBestBroker);
- mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
- }
- return mq;
- } else {
- latencyFaultTolerance.remove(notBestBroker);
- }
- } catch (Exception e) {
- }
-
- return tpInfo.selectOneMessageQueue();
- }
-
- return tpInfo.selectOneMessageQueue(lastBrokerName);
- }
-
- public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
- if (this.sendLatencyFaultEnable) {
- long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
- this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
- }
- }
-
- private long computeNotAvailableDuration(final long currentLatency) {
- for (int i = latencyMax.length - 1; i >= 0; i--) {
- if (currentLatency >= latencyMax[i]) return this.notAvailableDuration[i];
- }
-
- return 0;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/log/ClientLogger.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/log/ClientLogger.java b/client/src/main/java/com/alibaba/rocketmq/client/log/ClientLogger.java
deleted file mode 100644
index 02af207..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/log/ClientLogger.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.log;
-
-import com.alibaba.rocketmq.common.constant.LoggerName;
-import org.slf4j.ILoggerFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.reflect.Method;
-import java.net.URL;
-
-
-/**
- * @author shijia.wxr
- */
-public class ClientLogger {
- private static Logger log;
- public static final String CLIENT_LOG_ROOT = "rocketmq.client.logRoot";
- public static final String CLIENT_LOG_MAXINDEX = "rocketmq.client.logFileMaxIndex";
- public static final String CLIENT_LOG_LEVEL = "rocketmq.client.logLevel";
-
- static {
- log = createLogger(LoggerName.CLIENT_LOGGER_NAME);
- }
-
-
- private static Logger createLogger(final String loggerName) {
- String logConfigFilePath =
- System.getProperty("rocketmq.client.log.configFile",
- System.getenv("ROCKETMQ_CLIENT_LOG_CONFIGFILE"));
- Boolean isloadconfig =
- Boolean.parseBoolean(System.getProperty("rocketmq.client.log.loadconfig", "true"));
-
- final String log4JResourceFile =
- System.getProperty("rocketmq.client.log4j.resource.fileName", "log4j_rocketmq_client.xml");
-
- final String logbackResourceFile =
- System.getProperty("rocketmq.client.logback.resource.fileName", "logback_rocketmq_client.xml");
-
- String clientLogRoot = System.getProperty(CLIENT_LOG_ROOT, "${user.home}/logs/rocketmqlogs");
- System.setProperty("client.logRoot", clientLogRoot);
- String clientLogLevel = System.getProperty(CLIENT_LOG_LEVEL, "INFO");
- System.setProperty("client.logLevel", clientLogLevel);
- String clientLogMaxIndex = System.getProperty(CLIENT_LOG_MAXINDEX, "10");
- System.setProperty("client.logFileMaxIndex", clientLogMaxIndex);
-
- if (isloadconfig) {
- try {
- ILoggerFactory iLoggerFactory = LoggerFactory.getILoggerFactory();
- Class classType = iLoggerFactory.getClass();
- if (classType.getName().equals("org.slf4j.impl.Log4jLoggerFactory")) {
- Class<?> domconfigurator;
- Object domconfiguratorobj;
- domconfigurator = Class.forName("org.apache.log4j.xml.DOMConfigurator");
- domconfiguratorobj = domconfigurator.newInstance();
- if (null == logConfigFilePath) {
- Method configure = domconfiguratorobj.getClass().getMethod("configure", URL.class);
- URL url = ClientLogger.class.getClassLoader().getResource(log4JResourceFile);
- configure.invoke(domconfiguratorobj, url);
- } else {
- Method configure = domconfiguratorobj.getClass().getMethod("configure", String.class);
- configure.invoke(domconfiguratorobj, logConfigFilePath);
- }
-
- } else if (classType.getName().equals("ch.qos.logback.classic.LoggerContext")) {
- Class<?> joranConfigurator;
- Class<?> context = Class.forName("ch.qos.logback.core.Context");
- Object joranConfiguratoroObj;
- joranConfigurator = Class.forName("ch.qos.logback.classic.joran.JoranConfigurator");
- joranConfiguratoroObj = joranConfigurator.newInstance();
- Method setContext = joranConfiguratoroObj.getClass().getMethod("setContext", context);
- setContext.invoke(joranConfiguratoroObj, iLoggerFactory);
- if (null == logConfigFilePath) {
- URL url = ClientLogger.class.getClassLoader().getResource(logbackResourceFile);
- Method doConfigure =
- joranConfiguratoroObj.getClass().getMethod("doConfigure", URL.class);
- doConfigure.invoke(joranConfiguratoroObj, url);
- } else {
- Method doConfigure =
- joranConfiguratoroObj.getClass().getMethod("doConfigure", String.class);
- doConfigure.invoke(joranConfiguratoroObj, logConfigFilePath);
- }
-
- }
- } catch (Exception e) {
- System.err.println(e);
- }
- }
- return LoggerFactory.getLogger(LoggerName.CLIENT_LOGGER_NAME);
- }
-
-
- public static Logger getLog() {
- return log;
- }
-
-
- public static void setLog(Logger log) {
- ClientLogger.log = log;
- }
-}