You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 09:15:15 UTC
[84/99] [abbrv] incubator-rocketmq git commit: ROCKETMQ-18 Reformat
all codes.
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 6e0e379..42b7a82 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -16,11 +16,40 @@
*/
package org.apache.rocketmq.client.impl.factory;
+import java.io.UnsupportedEncodingException;
+import java.net.DatagramSocket;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.admin.MQAdminExtInner;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.impl.*;
+import org.apache.rocketmq.client.impl.ClientRemotingProcessor;
+import org.apache.rocketmq.client.impl.FindBrokerResult;
+import org.apache.rocketmq.client.impl.MQAdminImpl;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl;
+import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
+import org.apache.rocketmq.client.impl.consumer.MQConsumerInner;
+import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
+import org.apache.rocketmq.client.impl.consumer.PullMessageService;
+import org.apache.rocketmq.client.impl.consumer.RebalanceService;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.impl.producer.MQProducerInner;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
@@ -36,7 +65,11 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
-import org.apache.rocketmq.common.protocol.heartbeat.*;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
+import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
+import org.apache.rocketmq.common.protocol.heartbeat.ProducerData;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
@@ -45,19 +78,8 @@ import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import org.apache.rocketmq.client.impl.consumer.*;
import org.slf4j.Logger;
-import java.io.UnsupportedEncodingException;
-import java.net.DatagramSocket;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-
public class MQClientInstance {
private final static long LOCK_TIMEOUT_MILLIS = 3000;
private final Logger log = ClientLogger.getLog();
@@ -75,7 +97,7 @@ public class MQClientInstance {
private final Lock lockNamesrv = new ReentrantLock();
private final Lock lockHeartbeat = new ReentrantLock();
private final ConcurrentHashMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
- new ConcurrentHashMap<String, HashMap<Long, String>>();
+ new ConcurrentHashMap<String, HashMap<Long, String>>();
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
@@ -92,12 +114,10 @@ public class MQClientInstance {
private DatagramSocket datagramSocket;
private Random random = new Random();
-
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId) {
this(clientConfig, instanceIndex, clientId, null);
}
-
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
this.clientConfig = clientConfig;
this.instanceIndex = instanceIndex;
@@ -125,10 +145,74 @@ public class MQClientInstance {
this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);
log.info("created a new client Instance, FactoryIndex: {} ClinetID: {} {} {}, serializeType={}", //
- this.instanceIndex, //
- this.clientId, //
- this.clientConfig, //
- MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer());
+ this.instanceIndex, //
+ this.clientId, //
+ this.clientConfig, //
+ MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer());
+ }
+
+ public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
+ TopicPublishInfo info = new TopicPublishInfo();
+ info.setTopicRouteData(route);
+ if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {
+ String[] brokers = route.getOrderTopicConf().split(";");
+ for (String broker : brokers) {
+ String[] item = broker.split(":");
+ int nums = Integer.parseInt(item[1]);
+ for (int i = 0; i < nums; i++) {
+ MessageQueue mq = new MessageQueue(topic, item[0], i);
+ info.getMessageQueueList().add(mq);
+ }
+ }
+
+ info.setOrderTopic(true);
+ } else {
+ List<QueueData> qds = route.getQueueDatas();
+ Collections.sort(qds);
+ for (QueueData qd : qds) {
+ if (PermName.isWriteable(qd.getPerm())) {
+ BrokerData brokerData = null;
+ for (BrokerData bd : route.getBrokerDatas()) {
+ if (bd.getBrokerName().equals(qd.getBrokerName())) {
+ brokerData = bd;
+ break;
+ }
+ }
+
+ if (null == brokerData) {
+ continue;
+ }
+
+ if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
+ continue;
+ }
+
+ for (int i = 0; i < qd.getWriteQueueNums(); i++) {
+ MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
+ info.getMessageQueueList().add(mq);
+ }
+ }
+ }
+
+ info.setOrderTopic(false);
+ }
+
+ return info;
+ }
+
+ public static Set<MessageQueue> topicRouteData2TopicSubscribeInfo(final String topic, final TopicRouteData route) {
+ Set<MessageQueue> mqList = new HashSet<MessageQueue>();
+ List<QueueData> qds = route.getQueueDatas();
+ for (QueueData qd : qds) {
+ if (PermName.isReadable(qd.getPerm())) {
+ for (int i = 0; i < qd.getReadQueueNums(); i++) {
+ MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
+ mqList.add(mq);
+ }
+ }
+ }
+
+ return mqList;
}
public void start() throws MQClientException {
@@ -166,7 +250,6 @@ public class MQClientInstance {
}
}
-
private void startScheduledTask() {
if (null == this.clientConfig.getNamesrvAddr()) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@@ -353,7 +436,7 @@ public class MQClientInstance {
if (impl != null) {
try {
if (impl instanceof DefaultMQPushConsumerImpl) {
- DefaultMQPushConsumerImpl dmq = (DefaultMQPushConsumerImpl) impl;
+ DefaultMQPushConsumerImpl dmq = (DefaultMQPushConsumerImpl)impl;
dmq.adjustThreadPool();
}
} catch (Exception e) {
@@ -420,7 +503,7 @@ public class MQClientInstance {
log.error("send heart beat to broker exception", e);
} else {
log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
- id, addr);
+ id, addr);
}
}
}
@@ -460,7 +543,7 @@ public class MQClientInstance {
TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) {
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
- 1000 * 3);
+ 1000 * 3);
if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) {
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
@@ -559,7 +642,6 @@ public class MQClientInstance {
}
}
-
// Producer
for (Map.Entry<String/* group */, MQProducerInner> entry : this.producerTable.entrySet()) {
MQProducerInner impl = entry.getValue();
@@ -590,7 +672,7 @@ public class MQClientInstance {
}
private void uploadFilterClassToAllFilterServer(final String consumerGroup, final String fullClassName, final String topic,
- final String filterClassSource) throws UnsupportedEncodingException {
+ final String filterClassSource) throws UnsupportedEncodingException {
byte[] classBody = null;
int classCRC = 0;
try {
@@ -598,13 +680,13 @@ public class MQClientInstance {
classCRC = UtilAll.crc32(classBody);
} catch (Exception e1) {
log.warn("uploadFilterClassToAllFilterServer Exception, ClassName: {} {}", //
- fullClassName, //
- RemotingHelper.exceptionSimpleDesc(e1));
+ fullClassName, //
+ RemotingHelper.exceptionSimpleDesc(e1));
}
TopicRouteData topicRouteData = this.topicRouteTable.get(topic);
if (topicRouteData != null //
- && topicRouteData.getFilterServerTable() != null && !topicRouteData.getFilterServerTable().isEmpty()) {
+ && topicRouteData.getFilterServerTable() != null && !topicRouteData.getFilterServerTable().isEmpty()) {
Iterator<Entry<String, List<String>>> it = topicRouteData.getFilterServerTable().entrySet().iterator();
while (it.hasNext()) {
Entry<String, List<String>> next = it.next();
@@ -612,10 +694,10 @@ public class MQClientInstance {
for (final String fsAddr : value) {
try {
this.mQClientAPIImpl.registerMessageFilterClass(fsAddr, consumerGroup, topic, fullClassName, classCRC, classBody,
- 5000);
+ 5000);
log.info("register message class filter to {} OK, ConsumerGroup: {} Topic: {} ClassName: {}", fsAddr, consumerGroup,
- topic, fullClassName);
+ topic, fullClassName);
} catch (Exception e) {
log.error("uploadFilterClassToAllFilterServer Exception", e);
@@ -624,7 +706,7 @@ public class MQClientInstance {
}
} else {
log.warn("register message class filter failed, because no filter server, ConsumerGroup: {} Topic: {} ClassName: {}",
- consumerGroup, topic, fullClassName);
+ consumerGroup, topic, fullClassName);
}
}
@@ -668,70 +750,6 @@ public class MQClientInstance {
return result;
}
- public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
- TopicPublishInfo info = new TopicPublishInfo();
- info.setTopicRouteData(route);
- if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {
- String[] brokers = route.getOrderTopicConf().split(";");
- for (String broker : brokers) {
- String[] item = broker.split(":");
- int nums = Integer.parseInt(item[1]);
- for (int i = 0; i < nums; i++) {
- MessageQueue mq = new MessageQueue(topic, item[0], i);
- info.getMessageQueueList().add(mq);
- }
- }
-
- info.setOrderTopic(true);
- } else {
- List<QueueData> qds = route.getQueueDatas();
- Collections.sort(qds);
- for (QueueData qd : qds) {
- if (PermName.isWriteable(qd.getPerm())) {
- BrokerData brokerData = null;
- for (BrokerData bd : route.getBrokerDatas()) {
- if (bd.getBrokerName().equals(qd.getBrokerName())) {
- brokerData = bd;
- break;
- }
- }
-
- if (null == brokerData) {
- continue;
- }
-
- if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
- continue;
- }
-
- for (int i = 0; i < qd.getWriteQueueNums(); i++) {
- MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
- info.getMessageQueueList().add(mq);
- }
- }
- }
-
- info.setOrderTopic(false);
- }
-
- return info;
- }
-
- public static Set<MessageQueue> topicRouteData2TopicSubscribeInfo(final String topic, final TopicRouteData route) {
- Set<MessageQueue> mqList = new HashSet<MessageQueue>();
- List<QueueData> qds = route.getQueueDatas();
- for (QueueData qd : qds) {
- if (PermName.isReadable(qd.getPerm())) {
- for (int i = 0; i < qd.getReadQueueNums(); i++) {
- MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
- mqList.add(mq);
- }
- }
- }
-
- return mqList;
- }
-
public void shutdown() {
// Consumer
if (!this.consumerTable.isEmpty())
@@ -824,7 +842,7 @@ public class MQClientInstance {
try {
this.mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup, consumerGroup, 3000);
log.info("unregister client[Producer: {} Consumer: {}] from broker[{} {} {}] success", producerGroup,
- consumerGroup, brokerName, entry1.getKey(), addr);
+ consumerGroup, brokerName, entry1.getKey(), addr);
} catch (RemotingException e) {
log.error("unregister client exception from broker: " + addr, e);
} catch (MQBrokerException e) {
@@ -942,9 +960,9 @@ public class MQClientInstance {
}
public FindBrokerResult findBrokerAddressInSubscribe(//
- final String brokerName, //
- final long brokerId, //
- final boolean onlyThisBroker//
+ final String brokerName, //
+ final long brokerId, //
+ final boolean onlyThisBroker//
) {
String brokerAddr = null;
boolean slave = false;
@@ -1008,7 +1026,7 @@ public class MQClientInstance {
try {
MQConsumerInner impl = this.consumerTable.get(group);
if (impl != null && impl instanceof DefaultMQPushConsumerImpl) {
- consumer = (DefaultMQPushConsumerImpl) impl;
+ consumer = (DefaultMQPushConsumerImpl)impl;
} else {
log.info("[reset-offset] consumer dose not exist. group={}", group);
return;
@@ -1053,10 +1071,10 @@ public class MQClientInstance {
public Map<MessageQueue, Long> getConsumerStatus(String topic, String group) {
MQConsumerInner impl = this.consumerTable.get(group);
if (impl != null && impl instanceof DefaultMQPushConsumerImpl) {
- DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl) impl;
+ DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl)impl;
return consumer.getOffsetStore().cloneOffsetTable(topic);
} else if (impl != null && impl instanceof DefaultMQPullConsumerImpl) {
- DefaultMQPullConsumerImpl consumer = (DefaultMQPullConsumerImpl) impl;
+ DefaultMQPullConsumerImpl consumer = (DefaultMQPullConsumerImpl)impl;
return consumer.getOffsetStore().cloneOffsetTable(topic);
} else {
return Collections.EMPTY_MAP;
@@ -1096,11 +1114,11 @@ public class MQClientInstance {
}
public ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, //
- final String consumerGroup, //
- final String brokerName) {
+ final String consumerGroup, //
+ final String brokerName) {
MQConsumerInner mqConsumerInner = this.consumerTable.get(consumerGroup);
if (null != mqConsumerInner) {
- DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl) mqConsumerInner;
+ DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl)mqConsumerInner;
ConsumeMessageDirectlyResult result = consumer.getConsumeMessageService().consumeMessageDirectly(msg, brokerName);
return result;
@@ -1109,7 +1127,6 @@ public class MQClientInstance {
return null;
}
-
public ConsumerRunningInfo consumerRunningInfo(final String consumerGroup) {
MQConsumerInner mqConsumerInner = this.consumerTable.get(consumerGroup);
@@ -1128,12 +1145,11 @@ public class MQClientInstance {
consumerRunningInfo.getProperties().put(ConsumerRunningInfo.PROP_NAMESERVER_ADDR, nsAddr);
consumerRunningInfo.getProperties().put(ConsumerRunningInfo.PROP_CONSUME_TYPE, mqConsumerInner.consumeType().name());
consumerRunningInfo.getProperties().put(ConsumerRunningInfo.PROP_CLIENT_VERSION,
- MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION));
+ MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION));
return consumerRunningInfo;
}
-
public ConsumerStatsManager getConsumerStatsManager() {
return consumerStatsManager;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index b53fa19..42bf360 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -16,6 +16,20 @@
*/
package org.apache.rocketmq.client.impl.producer;
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.common.ClientErrorCode;
@@ -30,11 +44,29 @@ import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.latency.MQFaultStrategy;
import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
+import org.apache.rocketmq.client.producer.LocalTransactionState;
+import org.apache.rocketmq.client.producer.MessageQueueSelector;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.client.producer.TransactionCheckListener;
+import org.apache.rocketmq.client.producer.TransactionMQProducer;
+import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.help.FAQUrl;
-import org.apache.rocketmq.common.message.*;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageClientIDSetter;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageId;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.message.MessageType;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
@@ -45,21 +77,14 @@ import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
-import org.apache.rocketmq.client.producer.*;
import org.slf4j.Logger;
-import java.io.IOException;
-import java.net.UnknownHostException;
-import java.util.*;
-import java.util.concurrent.*;
-
-
public class DefaultMQProducerImpl implements MQProducerInner {
private final Logger log = ClientLogger.getLog();
private final Random random = new Random();
private final DefaultMQProducer defaultMQProducer;
private final ConcurrentHashMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =
- new ConcurrentHashMap<String, TopicPublishInfo>();
+ new ConcurrentHashMap<String, TopicPublishInfo>();
private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
private final RPCHook rpcHook;
protected BlockingQueue<Runnable> checkRequestQueue;
@@ -71,12 +96,10 @@ public class DefaultMQProducerImpl implements MQProducerInner {
private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();
-
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer) {
this(defaultMQProducer, null);
}
-
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
this.defaultMQProducer = defaultMQProducer;
this.rpcHook = rpcHook;
@@ -85,18 +108,18 @@ public class DefaultMQProducerImpl implements MQProducerInner {
public void registerCheckForbiddenHook(CheckForbiddenHook checkForbiddenHook) {
this.checkForbiddenHookList.add(checkForbiddenHook);
log.info("register a new checkForbiddenHook. hookName={}, allHookSize={}", checkForbiddenHook.hookName(),
- checkForbiddenHookList.size());
+ checkForbiddenHookList.size());
}
public void initTransactionEnv() {
- TransactionMQProducer producer = (TransactionMQProducer) this.defaultMQProducer;
+ TransactionMQProducer producer = (TransactionMQProducer)this.defaultMQProducer;
this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(producer.getCheckRequestHoldMax());
this.checkExecutor = new ThreadPoolExecutor(//
- producer.getCheckThreadPoolMinSize(), //
- producer.getCheckThreadPoolMaxSize(), //
- 1000 * 60, //
- TimeUnit.MILLISECONDS, //
- this.checkRequestQueue);
+ producer.getCheckThreadPoolMinSize(), //
+ producer.getCheckThreadPoolMaxSize(), //
+ 1000 * 60, //
+ TimeUnit.MILLISECONDS, //
+ this.checkRequestQueue);
}
public void destroyTransactionEnv() {
@@ -130,8 +153,8 @@ public class DefaultMQProducerImpl implements MQProducerInner {
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
- + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
- null);
+ + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
+ null);
}
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
@@ -141,16 +164,16 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
- this.defaultMQProducer.isSendMessageWithVIPChannel());
+ this.defaultMQProducer.isSendMessageWithVIPChannel());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "//
- + this.serviceState//
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
- null);
+ + this.serviceState//
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
+ null);
default:
break;
}
@@ -167,7 +190,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
if (this.defaultMQProducer.getProducerGroup().equals(MixAll.DEFAULT_PRODUCER_GROUP)) {
throw new MQClientException("producerGroup can not equal " + MixAll.DEFAULT_PRODUCER_GROUP + ", please specify another one.",
- null);
+ null);
}
}
@@ -215,7 +238,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
@Override
public TransactionCheckListener checkListener() {
if (this.defaultMQProducer instanceof TransactionMQProducer) {
- TransactionMQProducer producer = (TransactionMQProducer) defaultMQProducer;
+ TransactionMQProducer producer = (TransactionMQProducer)defaultMQProducer;
return producer.getTransactionCheckListener();
}
@@ -230,7 +253,6 @@ public class DefaultMQProducerImpl implements MQProducerInner {
private final CheckTransactionStateRequestHeader checkRequestHeader = header;
private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup();
-
@Override
public void run() {
TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
@@ -245,19 +267,18 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
this.processTransactionState(//
- localTransactionState, //
- group, //
- exception);
+ localTransactionState, //
+ group, //
+ exception);
} else {
log.warn("checkTransactionState, pick transactionCheckListener by group[{}] failed", group);
}
}
-
private void processTransactionState(//
- final LocalTransactionState localTransactionState, //
- final String producerGroup, //
- final Throwable exception) {
+ final LocalTransactionState localTransactionState, //
+ final String producerGroup, //
+ final Throwable exception) {
final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();
thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());
thisHeader.setProducerGroup(producerGroup);
@@ -293,7 +314,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
try {
DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,
- 3000);
+ 3000);
} catch (Exception e) {
log.error("endTransactionOneway exception", e);
}
@@ -332,9 +353,9 @@ public class DefaultMQProducerImpl implements MQProducerInner {
private void makeSureStateOK() throws MQClientException {
if (this.serviceState != ServiceState.RUNNING) {
throw new MQClientException("The producer service state not OK, "//
- + this.serviceState//
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
- null);
+ + this.serviceState//
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
+ null);
}
}
@@ -370,13 +391,13 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
- throws MQClientException, InterruptedException {
+ throws MQClientException, InterruptedException {
this.makeSureStateOK();
return this.mQClientFactory.getMQAdminImpl().queryMessage(topic, key, maxNum, begin, end);
}
public MessageExt queryMessageByUniqKey(String topic, String uniqKey)
- throws MQClientException, InterruptedException {
+ throws MQClientException, InterruptedException {
this.makeSureStateOK();
return this.mQClientFactory.getMQAdminImpl().queryMessageByUniqKey(topic, uniqKey);
}
@@ -389,7 +410,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
public void send(Message msg, SendCallback sendCallback, long timeout)
- throws MQClientException, RemotingException, InterruptedException {
+ throws MQClientException, RemotingException, InterruptedException {
try {
this.sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout);
} catch (MQBrokerException e) {
@@ -406,10 +427,10 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
private SendResult sendDefaultImpl(//
- Message msg, //
- final CommunicationMode communicationMode, //
- final SendCallback sendCallback, //
- final long timeout//
+ Message msg, //
+ final CommunicationMode communicationMode, //
+ final SendCallback sendCallback, //
+ final long timeout//
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
@@ -508,16 +529,16 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
- times,
- System.currentTimeMillis() - beginTimestampFirst,
- msg.getTopic(),
- Arrays.toString(brokersSent));
+ times,
+ System.currentTimeMillis() - beginTimestampFirst,
+ msg.getTopic(),
+ Arrays.toString(brokersSent));
info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
MQClientException mqClientException = new MQClientException(info, exception);
if (exception instanceof MQBrokerException) {
- mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
+ mqClientException.setResponseCode(((MQBrokerException)exception).getResponseCode());
} else if (exception instanceof RemotingConnectException) {
mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
} else if (exception instanceof RemotingTimeoutException) {
@@ -532,11 +553,11 @@ public class DefaultMQProducerImpl implements MQProducerInner {
List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
if (null == nsList || nsList.isEmpty()) {
throw new MQClientException(
- "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
+ "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
}
throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
- null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
+ null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
@@ -557,11 +578,11 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
private SendResult sendKernelImpl(final Message msg, //
- final MessageQueue mq, //
- final CommunicationMode communicationMode, //
- final SendCallback sendCallback, //
- final TopicPublishInfo topicPublishInfo, //
- final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ final MessageQueue mq, //
+ final CommunicationMode communicationMode, //
+ final SendCallback sendCallback, //
+ final TopicPublishInfo topicPublishInfo, //
+ final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic());
@@ -649,30 +670,30 @@ public class DefaultMQProducerImpl implements MQProducerInner {
switch (communicationMode) {
case ASYNC:
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(//
- brokerAddr, // 1
- mq.getBrokerName(), // 2
- msg, // 3
- requestHeader, // 4
- timeout, // 5
- communicationMode, // 6
- sendCallback, // 7
- topicPublishInfo, // 8
- this.mQClientFactory, // 9
- this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), // 10
- context, //
- this);
+ brokerAddr, // 1
+ mq.getBrokerName(), // 2
+ msg, // 3
+ requestHeader, // 4
+ timeout, // 5
+ communicationMode, // 6
+ sendCallback, // 7
+ topicPublishInfo, // 8
+ this.mQClientFactory, // 9
+ this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), // 10
+ context, //
+ this);
break;
case ONEWAY:
case SYNC:
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
- brokerAddr,
- mq.getBrokerName(),
- msg,
- requestHeader,
- timeout,
- communicationMode,
- context,
- this);
+ brokerAddr,
+ mq.getBrokerName(),
+ msg,
+ requestHeader,
+ timeout,
+ communicationMode,
+ context,
+ this);
break;
default:
assert false;
@@ -790,12 +811,12 @@ public class DefaultMQProducerImpl implements MQProducerInner {
* KERNEL SYNC -------------------------------------------------------
*/
public SendResult send(Message msg, MessageQueue mq)
- throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return send(msg, mq, this.defaultMQProducer.getSendMsgTimeout());
}
public SendResult send(Message msg, MessageQueue mq, long timeout)
- throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
@@ -810,12 +831,12 @@ public class DefaultMQProducerImpl implements MQProducerInner {
* KERNEL ASYNC -------------------------------------------------------
*/
public void send(Message msg, MessageQueue mq, SendCallback sendCallback)
- throws MQClientException, RemotingException, InterruptedException {
+ throws MQClientException, RemotingException, InterruptedException {
send(msg, mq, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
}
public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout)
- throws MQClientException, RemotingException, InterruptedException {
+ throws MQClientException, RemotingException, InterruptedException {
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
@@ -848,21 +869,21 @@ public class DefaultMQProducerImpl implements MQProducerInner {
* SELECT SYNC -------------------------------------------------------
*/
public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
- throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return send(msg, selector, arg, this.defaultMQProducer.getSendMsgTimeout());
}
public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)
- throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.sendSelectImpl(msg, selector, arg, CommunicationMode.SYNC, null, timeout);
}
private SendResult sendSelectImpl(//
- Message msg, //
- MessageQueueSelector selector, //
- Object arg, //
- final CommunicationMode communicationMode, //
- final SendCallback sendCallback, final long timeout//
+ Message msg, //
+ MessageQueueSelector selector, //
+ Object arg, //
+ final CommunicationMode communicationMode, //
+ final SendCallback sendCallback, final long timeout//
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
@@ -890,12 +911,12 @@ public class DefaultMQProducerImpl implements MQProducerInner {
* SELECT ASYNC -------------------------------------------------------
*/
public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback)
- throws MQClientException, RemotingException, InterruptedException {
+ throws MQClientException, RemotingException, InterruptedException {
send(msg, selector, arg, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
}
public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout)
- throws MQClientException, RemotingException, InterruptedException {
+ throws MQClientException, RemotingException, InterruptedException {
try {
this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, sendCallback, timeout);
} catch (MQBrokerException e) {
@@ -907,7 +928,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
* SELECT ONEWAY -------------------------------------------------------
*/
public void sendOneway(Message msg, MessageQueueSelector selector, Object arg)
- throws MQClientException, RemotingException, InterruptedException {
+ throws MQClientException, RemotingException, InterruptedException {
try {
this.sendSelectImpl(msg, selector, arg, CommunicationMode.ONEWAY, null, this.defaultMQProducer.getSendMsgTimeout());
} catch (MQBrokerException e) {
@@ -916,7 +937,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter tranExecuter, final Object arg)
- throws MQClientException {
+ throws MQClientException {
if (null == tranExecuter) {
throw new MQClientException("tranExecutor is null", null);
}
@@ -988,9 +1009,9 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
public void endTransaction(//
- final SendResult sendResult, //
- final LocalTransactionState localTransactionState, //
- final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
+ final SendResult sendResult, //
+ final LocalTransactionState localTransactionState, //
+ final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
final MessageId id;
if (sendResult.getOffsetMsgId() != null) {
id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
@@ -1021,7 +1042,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
requestHeader.setMsgId(sendResult.getMsgId());
String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
- this.defaultMQProducer.getSendMsgTimeout());
+ this.defaultMQProducer.getSendMsgTimeout());
}
public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
@@ -1036,17 +1057,14 @@ public class DefaultMQProducerImpl implements MQProducerInner {
return zipCompressLevel;
}
-
public void setZipCompressLevel(int zipCompressLevel) {
this.zipCompressLevel = zipCompressLevel;
}
-
public ServiceState getServiceState() {
return serviceState;
}
-
public void setServiceState(ServiceState serviceState) {
this.serviceState = serviceState;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/producer/MQProducerInner.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/MQProducerInner.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/MQProducerInner.java
index cac77ae..cf61326 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/MQProducerInner.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/MQProducerInner.java
@@ -6,41 +6,34 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.impl.producer;
+import java.util.Set;
import org.apache.rocketmq.client.producer.TransactionCheckListener;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
-import java.util.Set;
-
-
public interface MQProducerInner {
Set<String> getPublishTopicList();
-
boolean isPublishTopicNeedUpdate(final String topic);
-
TransactionCheckListener checkListener();
-
void checkTransactionState(//
- final String addr, //
- final MessageExt msg, //
- final CheckTransactionStateRequestHeader checkRequestHeader);
-
+ final String addr, //
+ final MessageExt msg, //
+ final CheckTransactionStateRequestHeader checkRequestHeader);
void updateTopicPublishInfo(final String topic, final TopicPublishInfo info);
-
boolean isUnitMode();
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
index dca20cb..c6f9d45 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
@@ -16,15 +16,13 @@
*/
package org.apache.rocketmq.client.impl.producer;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.rocketmq.client.common.ThreadLocalIndex;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
-import java.util.ArrayList;
-import java.util.List;
-
-
public class TopicPublishInfo {
private boolean orderTopic = false;
private boolean haveTopicRouterInfo = false;
@@ -32,7 +30,6 @@ public class TopicPublishInfo {
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(0);
private TopicRouteData topicRouteData;
-
public boolean isOrderTopic() {
return orderTopic;
}
@@ -49,32 +46,26 @@ public class TopicPublishInfo {
return messageQueueList;
}
-
public void setMessageQueueList(List<MessageQueue> messageQueueList) {
this.messageQueueList = messageQueueList;
}
-
public ThreadLocalIndex getSendWhichQueue() {
return sendWhichQueue;
}
-
public void setSendWhichQueue(ThreadLocalIndex sendWhichQueue) {
this.sendWhichQueue = sendWhichQueue;
}
-
public boolean isHaveTopicRouterInfo() {
return haveTopicRouterInfo;
}
-
public void setHaveTopicRouterInfo(boolean haveTopicRouterInfo) {
this.haveTopicRouterInfo = haveTopicRouterInfo;
}
-
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName == null) {
return selectOneMessageQueue();
@@ -93,7 +84,6 @@ public class TopicPublishInfo {
}
}
-
public MessageQueue selectOneMessageQueue() {
int index = this.sendWhichQueue.getAndIncrement();
int pos = Math.abs(index) % this.messageQueueList.size();
@@ -113,11 +103,10 @@ public class TopicPublishInfo {
return -1;
}
-
@Override
public String toString() {
return "TopicPublishInfo [orderTopic=" + orderTopic + ", messageQueueList=" + messageQueueList
- + ", sendWhichQueue=" + sendWhichQueue + ", haveTopicRouterInfo=" + haveTopicRouterInfo + "]";
+ + ", sendWhichQueue=" + sendWhichQueue + ", haveTopicRouterInfo=" + haveTopicRouterInfo + "]";
}
public TopicRouteData getTopicRouteData() {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java
index 12dac4b..b61d855 100644
--- a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java
@@ -17,13 +17,12 @@
package org.apache.rocketmq.client.latency;
-import org.apache.rocketmq.client.common.ThreadLocalIndex;
-
import java.util.Collections;
import java.util.Enumeration;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.rocketmq.client.common.ThreadLocalIndex;
public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> {
private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16);
@@ -89,6 +88,14 @@ public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String>
return null;
}
+ @Override
+ public String toString() {
+ return "LatencyFaultToleranceImpl{" +
+ "faultItemTable=" + faultItemTable +
+ ", whichItemWorst=" + whichItemWorst +
+ '}';
+ }
+
class FaultItem implements Comparable<FaultItem> {
private final String name;
private volatile long currentLatency;
@@ -101,9 +108,11 @@ public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String>
@Override
public int compareTo(final FaultItem other) {
if (this.isAvailable() != other.isAvailable()) {
- if (this.isAvailable()) return -1;
+ if (this.isAvailable())
+ return -1;
- if (other.isAvailable()) return 1;
+ if (other.isAvailable())
+ return 1;
}
if (this.currentLatency < other.currentLatency)
@@ -128,20 +137,24 @@ public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String>
@Override
public int hashCode() {
int result = getName() != null ? getName().hashCode() : 0;
- result = 31 * result + (int) (getCurrentLatency() ^ (getCurrentLatency() >>> 32));
- result = 31 * result + (int) (getStartTimestamp() ^ (getStartTimestamp() >>> 32));
+ result = 31 * result + (int)(getCurrentLatency() ^ (getCurrentLatency() >>> 32));
+ result = 31 * result + (int)(getStartTimestamp() ^ (getStartTimestamp() >>> 32));
return result;
}
@Override
public boolean equals(final Object o) {
- if (this == o) return true;
- if (!(o instanceof FaultItem)) return false;
+ if (this == o)
+ return true;
+ if (!(o instanceof FaultItem))
+ return false;
- final FaultItem faultItem = (FaultItem) o;
+ final FaultItem faultItem = (FaultItem)o;
- if (getCurrentLatency() != faultItem.getCurrentLatency()) return false;
- if (getStartTimestamp() != faultItem.getStartTimestamp()) return false;
+ if (getCurrentLatency() != faultItem.getCurrentLatency())
+ return false;
+ if (getStartTimestamp() != faultItem.getStartTimestamp())
+ return false;
return getName() != null ? getName().equals(faultItem.getName()) : faultItem.getName() == null;
}
@@ -149,10 +162,10 @@ public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String>
@Override
public String toString() {
return "FaultItem{" +
- "name='" + name + '\'' +
- ", currentLatency=" + currentLatency +
- ", startTimestamp=" + startTimestamp +
- '}';
+ "name='" + name + '\'' +
+ ", currentLatency=" + currentLatency +
+ ", startTimestamp=" + startTimestamp +
+ '}';
}
public String getName() {
@@ -175,14 +188,5 @@ public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String>
this.startTimestamp = startTimestamp;
}
-
- }
-
- @Override
- public String toString() {
- return "LatencyFaultToleranceImpl{" +
- "faultItemTable=" + faultItemTable +
- ", whichItemWorst=" + whichItemWorst +
- '}';
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
index cdfd5d1..70758dc 100644
--- a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
+++ b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
@@ -25,8 +25,8 @@ public class MQFaultStrategy {
private boolean sendLatencyFaultEnable = false;
- private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
- private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
+ private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
+ private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
public long[] getNotAvailableDuration() {
return notAvailableDuration;
@@ -97,7 +97,8 @@ public class MQFaultStrategy {
private long computeNotAvailableDuration(final long currentLatency) {
for (int i = latencyMax.length - 1; i >= 0; i--) {
- if (currentLatency >= latencyMax[i]) return this.notAvailableDuration[i];
+ if (currentLatency >= latencyMax[i])
+ return this.notAvailableDuration[i];
}
return 0;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/log/ClientLogger.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/log/ClientLogger.java b/client/src/main/java/org/apache/rocketmq/client/log/ClientLogger.java
index 3055119..7a05e76 100644
--- a/client/src/main/java/org/apache/rocketmq/client/log/ClientLogger.java
+++ b/client/src/main/java/org/apache/rocketmq/client/log/ClientLogger.java
@@ -16,38 +16,35 @@
*/
package org.apache.rocketmq.client.log;
+import java.lang.reflect.Method;
+import java.net.URL;
import org.apache.rocketmq.common.constant.LoggerName;
import org.slf4j.ILoggerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.lang.reflect.Method;
-import java.net.URL;
-
-
public class ClientLogger {
- private static Logger log;
public static final String CLIENT_LOG_ROOT = "rocketmq.client.logRoot";
public static final String CLIENT_LOG_MAXINDEX = "rocketmq.client.logFileMaxIndex";
public static final String CLIENT_LOG_LEVEL = "rocketmq.client.logLevel";
+ private static Logger log;
static {
log = createLogger(LoggerName.CLIENT_LOGGER_NAME);
}
-
private static Logger createLogger(final String loggerName) {
String logConfigFilePath =
- System.getProperty("rocketmq.client.log.configFile",
- System.getenv("ROCKETMQ_CLIENT_LOG_CONFIGFILE"));
+ System.getProperty("rocketmq.client.log.configFile",
+ System.getenv("ROCKETMQ_CLIENT_LOG_CONFIGFILE"));
Boolean isloadconfig =
- Boolean.parseBoolean(System.getProperty("rocketmq.client.log.loadconfig", "true"));
+ Boolean.parseBoolean(System.getProperty("rocketmq.client.log.loadconfig", "true"));
final String log4JResourceFile =
- System.getProperty("rocketmq.client.log4j.resource.fileName", "log4j_rocketmq_client.xml");
+ System.getProperty("rocketmq.client.log4j.resource.fileName", "log4j_rocketmq_client.xml");
final String logbackResourceFile =
- System.getProperty("rocketmq.client.logback.resource.fileName", "logback_rocketmq_client.xml");
+ System.getProperty("rocketmq.client.logback.resource.fileName", "logback_rocketmq_client.xml");
String clientLogRoot = System.getProperty(CLIENT_LOG_ROOT, "${user.home}/logs/rocketmqlogs");
System.setProperty("client.logRoot", clientLogRoot);
@@ -85,11 +82,11 @@ public class ClientLogger {
if (null == logConfigFilePath) {
URL url = ClientLogger.class.getClassLoader().getResource(logbackResourceFile);
Method doConfigure =
- joranConfiguratoroObj.getClass().getMethod("doConfigure", URL.class);
+ joranConfiguratoroObj.getClass().getMethod("doConfigure", URL.class);
doConfigure.invoke(joranConfiguratoroObj, url);
} else {
Method doConfigure =
- joranConfiguratoroObj.getClass().getMethod("doConfigure", String.class);
+ joranConfiguratoroObj.getClass().getMethod("doConfigure", String.class);
doConfigure.invoke(joranConfiguratoroObj, logConfigFilePath);
}
@@ -101,12 +98,10 @@ public class ClientLogger {
return LoggerFactory.getLogger(LoggerName.CLIENT_LOGGER_NAME);
}
-
public static Logger getLog() {
return log;
}
-
public static void setLog(Logger log) {
ClientLogger.log = log;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index 340b1ff..736aa15 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -6,28 +6,30 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.producer;
+import java.util.List;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageId;
+import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
-import org.apache.rocketmq.common.message.*;
-
-import java.util.List;
-
public class DefaultMQProducer extends ClientConfig implements MQProducer {
protected final transient DefaultMQProducerImpl defaultMQProducerImpl;
@@ -44,27 +46,24 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
private boolean retryAnotherBrokerWhenNotStoreOK = false;
private int maxMessageSize = 1024 * 1024 * 4; // 4M
+
public DefaultMQProducer() {
this(MixAll.DEFAULT_PRODUCER_GROUP, null);
}
-
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}
-
public DefaultMQProducer(final String producerGroup) {
this(producerGroup, null);
}
-
public DefaultMQProducer(RPCHook rpcHook) {
this(MixAll.DEFAULT_PRODUCER_GROUP, rpcHook);
}
-
@Override
public void start() throws MQClientException {
this.defaultMQProducerImpl.start();
@@ -75,169 +74,143 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
this.defaultMQProducerImpl.shutdown();
}
-
@Override
public List<MessageQueue> fetchPublishMessageQueues(String topic) throws MQClientException {
return this.defaultMQProducerImpl.fetchPublishMessageQueues(topic);
}
-
@Override
public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg);
}
-
@Override
public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg, timeout);
}
-
@Override
public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
this.defaultMQProducerImpl.send(msg, sendCallback);
}
-
@Override
public void send(Message msg, SendCallback sendCallback, long timeout)
- throws MQClientException, RemotingException, InterruptedException {
+ throws MQClientException, RemotingException, InterruptedException {
this.defaultMQProducerImpl.send(msg, sendCallback, timeout);
}
-
@Override
public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException {
this.defaultMQProducerImpl.sendOneway(msg);
}
-
@Override
public SendResult send(Message msg, MessageQueue mq)
- throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg, mq);
}
-
@Override
public SendResult send(Message msg, MessageQueue mq, long timeout)
- throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg, mq, timeout);
}
-
@Override
public void send(Message msg, MessageQueue mq, SendCallback sendCallback)
- throws MQClientException, RemotingException, InterruptedException {
+ throws MQClientException, RemotingException, InterruptedException {
this.defaultMQProducerImpl.send(msg, mq, sendCallback);
}
-
@Override
public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout)
- throws MQClientException, RemotingException, InterruptedException {
+ throws MQClientException, RemotingException, InterruptedException {
this.defaultMQProducerImpl.send(msg, mq, sendCallback, timeout);
}
-
@Override
public void sendOneway(Message msg, MessageQueue mq) throws MQClientException, RemotingException, InterruptedException {
this.defaultMQProducerImpl.sendOneway(msg, mq);
}
-
@Override
public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
- throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg, selector, arg);
}
-
@Override
public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)
- throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg, selector, arg, timeout);
}
-
@Override
public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback)
- throws MQClientException, RemotingException, InterruptedException {
+ throws MQClientException, RemotingException, InterruptedException {
this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback);
}
-
@Override
public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout)
- throws MQClientException, RemotingException, InterruptedException {
+ throws MQClientException, RemotingException, InterruptedException {
this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback, timeout);
}
-
@Override
public void sendOneway(Message msg, MessageQueueSelector selector, Object arg)
- throws MQClientException, RemotingException, InterruptedException {
+ throws MQClientException, RemotingException, InterruptedException {
this.defaultMQProducerImpl.sendOneway(msg, selector, arg);
}
-
@Override
public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter tranExecuter, final Object arg)
- throws MQClientException {
+ throws MQClientException {
throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class");
}
-
@Override
public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
createTopic(key, newTopic, queueNum, 0);
}
-
@Override
public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
this.defaultMQProducerImpl.createTopic(key, newTopic, queueNum, topicSysFlag);
}
-
@Override
public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
return this.defaultMQProducerImpl.searchOffset(mq, timestamp);
}
-
@Override
public long maxOffset(MessageQueue mq) throws MQClientException {
return this.defaultMQProducerImpl.maxOffset(mq);
}
-
@Override
public long minOffset(MessageQueue mq) throws MQClientException {
return this.defaultMQProducerImpl.minOffset(mq);
}
-
@Override
public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
return this.defaultMQProducerImpl.earliestMsgStoreTime(mq);
}
-
@Override
public MessageExt viewMessage(String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
return this.defaultMQProducerImpl.viewMessage(offsetMsgId);
}
-
@Override
public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
- throws MQClientException, InterruptedException {
+ throws MQClientException, InterruptedException {
return this.defaultMQProducerImpl.queryMessage(topic, key, maxNum, begin, end);
}
-
@Override
public MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
try {
@@ -252,97 +225,78 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
return producerGroup;
}
-
public void setProducerGroup(String producerGroup) {
this.producerGroup = producerGroup;
}
-
public String getCreateTopicKey() {
return createTopicKey;
}
-
public void setCreateTopicKey(String createTopicKey) {
this.createTopicKey = createTopicKey;
}
-
public int getSendMsgTimeout() {
return sendMsgTimeout;
}
-
public void setSendMsgTimeout(int sendMsgTimeout) {
this.sendMsgTimeout = sendMsgTimeout;
}
-
public int getCompressMsgBodyOverHowmuch() {
return compressMsgBodyOverHowmuch;
}
-
public void setCompressMsgBodyOverHowmuch(int compressMsgBodyOverHowmuch) {
this.compressMsgBodyOverHowmuch = compressMsgBodyOverHowmuch;
}
-
public DefaultMQProducerImpl getDefaultMQProducerImpl() {
return defaultMQProducerImpl;
}
-
public boolean isRetryAnotherBrokerWhenNotStoreOK() {
return retryAnotherBrokerWhenNotStoreOK;
}
-
public void setRetryAnotherBrokerWhenNotStoreOK(boolean retryAnotherBrokerWhenNotStoreOK) {
this.retryAnotherBrokerWhenNotStoreOK = retryAnotherBrokerWhenNotStoreOK;
}
-
public int getMaxMessageSize() {
return maxMessageSize;
}
-
public void setMaxMessageSize(int maxMessageSize) {
this.maxMessageSize = maxMessageSize;
}
-
public int getDefaultTopicQueueNums() {
return defaultTopicQueueNums;
}
-
public void setDefaultTopicQueueNums(int defaultTopicQueueNums) {
this.defaultTopicQueueNums = defaultTopicQueueNums;
}
-
public int getRetryTimesWhenSendFailed() {
return retryTimesWhenSendFailed;
}
-
public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) {
this.retryTimesWhenSendFailed = retryTimesWhenSendFailed;
}
-
public boolean isSendMessageWithVIPChannel() {
return isVipChannelEnabled();
}
-
public void setSendMessageWithVIPChannel(final boolean sendMessageWithVIPChannel) {
this.setVipChannelEnabled(sendMessageWithVIPChannel);
}
-
public long[] getNotAvailableDuration() {
return this.defaultMQProducerImpl.getNotAvailableDuration();
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java b/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java
index a7246e0..1083f9b 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java
@@ -6,19 +6,18 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.producer;
import org.apache.rocketmq.common.message.Message;
-
public interface LocalTransactionExecuter {
public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionState.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionState.java b/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionState.java
index b907f81..209619a 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionState.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionState.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.producer;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
index 492604e..b53652a 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
@@ -6,16 +6,17 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.producer;
+import java.util.List;
import org.apache.rocketmq.client.MQAdmin;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
@@ -23,81 +24,61 @@ import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
-import java.util.List;
-
-
public interface MQProducer extends MQAdmin {
void start() throws MQClientException;
void shutdown();
-
List<MessageQueue> fetchPublishMessageQueues(final String topic) throws MQClientException;
-
SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException,
- InterruptedException;
-
+ InterruptedException;
SendResult send(final Message msg, final long timeout) throws MQClientException,
- RemotingException, MQBrokerException, InterruptedException;
-
+ RemotingException, MQBrokerException, InterruptedException;
void send(final Message msg, final SendCallback sendCallback) throws MQClientException,
- RemotingException, InterruptedException;
-
+ RemotingException, InterruptedException;
void send(final Message msg, final SendCallback sendCallback, final long timeout)
- throws MQClientException, RemotingException, InterruptedException;
-
+ throws MQClientException, RemotingException, InterruptedException;
void sendOneway(final Message msg) throws MQClientException, RemotingException,
- InterruptedException;
-
+ InterruptedException;
SendResult send(final Message msg, final MessageQueue mq) throws MQClientException,
- RemotingException, MQBrokerException, InterruptedException;
-
+ RemotingException, MQBrokerException, InterruptedException;
SendResult send(final Message msg, final MessageQueue mq, final long timeout)
- throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
-
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback)
- throws MQClientException, RemotingException, InterruptedException;
-
+ throws MQClientException, RemotingException, InterruptedException;
void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, long timeout)
- throws MQClientException, RemotingException, InterruptedException;
-
+ throws MQClientException, RemotingException, InterruptedException;
void sendOneway(final Message msg, final MessageQueue mq) throws MQClientException,
- RemotingException, InterruptedException;
-
+ RemotingException, InterruptedException;
SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg)
- throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
-
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg,
- final long timeout) throws MQClientException, RemotingException, MQBrokerException,
- InterruptedException;
-
+ final long timeout) throws MQClientException, RemotingException, MQBrokerException,
+ InterruptedException;
void send(final Message msg, final MessageQueueSelector selector, final Object arg,
- final SendCallback sendCallback) throws MQClientException, RemotingException,
- InterruptedException;
-
+ final SendCallback sendCallback) throws MQClientException, RemotingException,
+ InterruptedException;
void send(final Message msg, final MessageQueueSelector selector, final Object arg,
- final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException,
- InterruptedException;
-
+ final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException,
+ InterruptedException;
void sendOneway(final Message msg, final MessageQueueSelector selector, final Object arg)
- throws MQClientException, RemotingException, InterruptedException;
-
+ throws MQClientException, RemotingException, InterruptedException;
TransactionSendResult sendMessageInTransaction(final Message msg,
- final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException;
+ final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/producer/MessageQueueSelector.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MessageQueueSelector.java b/client/src/main/java/org/apache/rocketmq/client/producer/MessageQueueSelector.java
index 47956bb..761f45e 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/MessageQueueSelector.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/MessageQueueSelector.java
@@ -6,22 +6,20 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.producer;
+import java.util.List;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
-import java.util.List;
-
-
public interface MessageQueueSelector {
MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/producer/SendCallback.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/SendCallback.java b/client/src/main/java/org/apache/rocketmq/client/producer/SendCallback.java
index f599d83..178e79a 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/SendCallback.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/SendCallback.java
@@ -6,19 +6,18 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.producer;
public interface SendCallback {
public void onSuccess(final SendResult sendResult);
-
public void onException(final Throwable e);
}