You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by xi...@apache.org on 2017/04/24 04:05:24 UTC
[3/5] storm git commit: STORM-2349: sharing a single
producer/consumer instance across threads
STORM-2349: sharing a single producer/consumer instance across threads
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9bacd977
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9bacd977
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9bacd977
Branch: refs/heads/master
Commit: 9bacd977f27aa3f9358c7c9adaf7b04191a15cb8
Parents: 4a4a8d2
Author: vesense <be...@163.com>
Authored: Thu Apr 13 21:39:44 2017 +0800
Committer: vesense <be...@163.com>
Committed: Thu Apr 13 21:39:44 2017 +0800
----------------------------------------------------------------------
.../apache/storm/rocketmq/RocketMQConfig.java | 30 +++-----
.../storm/rocketmq/bolt/RocketMQBolt.java | 30 +++++---
.../storm/rocketmq/spout/RocketMQSpout.java | 80 +++++++++++---------
.../rocketmq/trident/state/RocketMQState.java | 2 +-
4 files changed, 78 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/9bacd977/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQConfig.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQConfig.java
index 082822c..fcf6ff4 100644
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQConfig.java
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQConfig.java
@@ -24,7 +24,6 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.remoting.common.RemotingUtil;
-import org.apache.storm.task.TopologyContext;
import java.util.Properties;
import java.util.UUID;
@@ -88,17 +87,12 @@ public class RocketMQConfig {
public static final int DEFAULT_CONSUMER_MAX_THREADS = 64;
- public static void buildProducerConfigs(Properties props, DefaultMQProducer producer, TopologyContext context) {
- buildCommonConfigs(props, producer, context);
+ public static void buildProducerConfigs(Properties props, DefaultMQProducer producer) {
+ buildCommonConfigs(props, producer);
// According to the RocketMQ official docs, "only one instance is allowed per producer group"
- // So, we use taskID/UUID as the producer group by default
- String defaultGroup;
- if (context != null) {
- defaultGroup = String.valueOf(context.getThisTaskId());
- } else {
- defaultGroup = UUID.randomUUID().toString();
- }
+ // So, we use UUID as the producer group by default, to allow many producer instances for one topic
+ String defaultGroup = UUID.randomUUID().toString();
producer.setProducerGroup(props.getProperty(PRODUCER_GROUP, defaultGroup));
producer.setRetryTimesWhenSendFailed(getInteger(props,
@@ -109,8 +103,8 @@ public class RocketMQConfig {
PRODUCER_TIMEOUT, DEFAULT_PRODUCER_TIMEOUT));
}
- public static void buildConsumerConfigs(Properties props, DefaultMQPushConsumer consumer, TopologyContext context) {
- buildCommonConfigs(props, consumer, context);
+ public static void buildConsumerConfigs(Properties props, DefaultMQPushConsumer consumer) {
+ buildCommonConfigs(props, consumer);
String group = props.getProperty(CONSUMER_GROUP);
Validate.notEmpty(group);
@@ -147,19 +141,15 @@ public class RocketMQConfig {
}
}
- public static void buildCommonConfigs(Properties props, ClientConfig client, TopologyContext context) {
+ public static void buildCommonConfigs(Properties props, ClientConfig client) {
String namesvr = props.getProperty(NAME_SERVER_ADDR);
Validate.notEmpty(namesvr);
client.setNamesrvAddr(namesvr);
client.setClientIP(props.getProperty(CLIENT_IP, DEFAULT_CLIENT_IP));
- // use taskID/UUID for client name by default
- String defaultClientName;
- if (context != null) {
- defaultClientName = String.valueOf(context.getThisTaskId());
- } else {
- defaultClientName = UUID.randomUUID().toString();
- }
+ // According to the RocketMQ official docs, "only one instance is allowed per machine"
+ // So, we use UUID as the client name by default, to allow RocketMQ spout/bolt instances in one machine.
+ String defaultClientName = UUID.randomUUID().toString();
client.setInstanceName(props.getProperty(CLIENT_NAME, defaultClientName));
client.setClientCallbackExecutorThreads(getInteger(props,
http://git-wip-us.apache.org/repos/asf/storm/blob/9bacd977/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/bolt/RocketMQBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/bolt/RocketMQBolt.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/bolt/RocketMQBolt.java
index d55babe..3f04d07 100644
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/bolt/RocketMQBolt.java
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/bolt/RocketMQBolt.java
@@ -41,7 +41,7 @@ import java.util.Properties;
public class RocketMQBolt implements IRichBolt {
private static final Logger LOG = LoggerFactory.getLogger(RocketMQBolt.class);
- private MQProducer producer;
+ private static MQProducer producer;
private OutputCollector collector;
private boolean async = true;
private TopicSelector selector;
@@ -52,14 +52,21 @@ public class RocketMQBolt implements IRichBolt {
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
Validate.notEmpty(properties, "Producer properties can not be empty");
- producer = new DefaultMQProducer();
- RocketMQConfig.buildProducerConfigs(properties, (DefaultMQProducer)producer, context);
-
- try {
- producer.start();
- } catch (MQClientException e) {
- throw new RuntimeException(e);
+ // Since RocketMQ Producer is thread-safe, RocketMQBolt uses a single
+ // producer instance across threads to improve the performance.
+ synchronized (RocketMQBolt.class) {
+ if (producer == null) {
+ producer = new DefaultMQProducer();
+ RocketMQConfig.buildProducerConfigs(properties, (DefaultMQProducer)producer);
+
+ try {
+ producer.start();
+ } catch (MQClientException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
+
this.collector = collector;
Validate.notNull(selector, "TopicSelector can not be null");
@@ -143,6 +150,11 @@ public class RocketMQBolt implements IRichBolt {
@Override
public void cleanup() {
- producer.shutdown();
+ synchronized (RocketMQBolt.class) {
+ if (producer != null) {
+ producer.shutdown();
+ producer = null;
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/9bacd977/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMQSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMQSpout.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMQSpout.java
index c8a0802..74c8264 100644
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMQSpout.java
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMQSpout.java
@@ -40,6 +40,7 @@ import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.ObjectReader;
import java.util.List;
import java.util.Map;
@@ -58,7 +59,7 @@ import static org.apache.storm.rocketmq.RocketMQUtils.getInteger;
public class RocketMQSpout implements IRichSpout {
// TODO add metrics
- private MQPushConsumer consumer;
+ private static MQPushConsumer consumer;
private SpoutOutputCollector collector;
private BlockingQueue<MessageSet> queue;
@@ -74,42 +75,48 @@ public class RocketMQSpout implements IRichSpout {
Validate.notEmpty(properties, "Consumer properties can not be empty");
boolean ordered = getBoolean(properties, RocketMQConfig.CONSUMER_MESSAGES_ORDERLY, false);
- int queueSize = getInteger(properties, SpoutConfig.QUEUE_SIZE, Integer.parseInt(conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING).toString()));
+ int queueSize = getInteger(properties, SpoutConfig.QUEUE_SIZE, ObjectReader.getInt(conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING)));
queue = new LinkedBlockingQueue<>(queueSize);
- consumer = new DefaultMQPushConsumer();
- RocketMQConfig.buildConsumerConfigs(properties, (DefaultMQPushConsumer)consumer, context);
-
- if (ordered) {
- consumer.registerMessageListener(new MessageListenerOrderly() {
- @Override
- public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
- ConsumeOrderlyContext context) {
- if (process(msgs)) {
- return ConsumeOrderlyStatus.SUCCESS;
- } else {
- return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
- }
+ // Since RocketMQ Consumer is thread-safe, RocketMQSpout uses a single
+ // consumer instance across threads to improve the performance.
+ synchronized (RocketMQSpout.class) {
+ if (consumer == null) {
+ consumer = new DefaultMQPushConsumer();
+ RocketMQConfig.buildConsumerConfigs(properties, (DefaultMQPushConsumer)consumer);
+
+ if (ordered) {
+ consumer.registerMessageListener(new MessageListenerOrderly() {
+ @Override
+ public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
+ ConsumeOrderlyContext context) {
+ if (process(msgs)) {
+ return ConsumeOrderlyStatus.SUCCESS;
+ } else {
+ return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
+ }
+ }
+ });
+ } else {
+ consumer.registerMessageListener(new MessageListenerConcurrently() {
+ @Override
+ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
+ ConsumeConcurrentlyContext context) {
+ if (process(msgs)) {
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ } else {
+ return ConsumeConcurrentlyStatus.RECONSUME_LATER;
+ }
+ }
+ });
}
- });
- } else {
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
- ConsumeConcurrentlyContext context) {
- if (process(msgs)) {
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- } else {
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;
- }
- }
- });
- }
- try {
- consumer.start();
- } catch (MQClientException e) {
- throw new RuntimeException(e);
+ try {
+ consumer.start();
+ } catch (MQClientException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
int maxRetry = getInteger(properties, SpoutConfig.MESSAGES_MAX_RETRY, SpoutConfig.DEFAULT_MESSAGES_MAX_RETRY);
@@ -162,7 +169,12 @@ public class RocketMQSpout implements IRichSpout {
@Override
public void close() {
- consumer.shutdown();
+ synchronized (RocketMQSpout.class) {
+ if (consumer != null) {
+ consumer.shutdown();
+ consumer = null;
+ }
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/9bacd977/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQState.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQState.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQState.java
index 7e5c078..b6413ad 100644
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQState.java
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQState.java
@@ -73,7 +73,7 @@ public class RocketMQState implements State {
Validate.notEmpty(options.properties, "Producer properties can not be empty");
producer = new DefaultMQProducer();
- RocketMQConfig.buildProducerConfigs(options.properties, (DefaultMQProducer)producer, null);
+ RocketMQConfig.buildProducerConfigs(options.properties, (DefaultMQProducer)producer);
try {
producer.start();