You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by xi...@apache.org on 2017/04/24 04:05:24 UTC

[3/5] storm git commit: STORM-2349: sharing a single producer/consumer instance across threads

STORM-2349: sharing a single producer/consumer instance across threads


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9bacd977
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9bacd977
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9bacd977

Branch: refs/heads/master
Commit: 9bacd977f27aa3f9358c7c9adaf7b04191a15cb8
Parents: 4a4a8d2
Author: vesense <be...@163.com>
Authored: Thu Apr 13 21:39:44 2017 +0800
Committer: vesense <be...@163.com>
Committed: Thu Apr 13 21:39:44 2017 +0800

----------------------------------------------------------------------
 .../apache/storm/rocketmq/RocketMQConfig.java   | 30 +++-----
 .../storm/rocketmq/bolt/RocketMQBolt.java       | 30 +++++---
 .../storm/rocketmq/spout/RocketMQSpout.java     | 80 +++++++++++---------
 .../rocketmq/trident/state/RocketMQState.java   |  2 +-
 4 files changed, 78 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/9bacd977/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQConfig.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQConfig.java
index 082822c..fcf6ff4 100644
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQConfig.java
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQConfig.java
@@ -24,7 +24,6 @@ import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 import org.apache.rocketmq.remoting.common.RemotingUtil;
-import org.apache.storm.task.TopologyContext;
 
 import java.util.Properties;
 import java.util.UUID;
@@ -88,17 +87,12 @@ public class RocketMQConfig {
     public static final int DEFAULT_CONSUMER_MAX_THREADS = 64;
 
 
-    public static void buildProducerConfigs(Properties props, DefaultMQProducer producer, TopologyContext context) {
-        buildCommonConfigs(props, producer, context);
+    public static void buildProducerConfigs(Properties props, DefaultMQProducer producer) {
+        buildCommonConfigs(props, producer);
 
         // According to the RocketMQ official docs, "only one instance is allowed per producer group"
-        // So, we use taskID/UUID as the producer group by default
-        String defaultGroup;
-        if (context != null) {
-            defaultGroup = String.valueOf(context.getThisTaskId());
-        } else {
-            defaultGroup = UUID.randomUUID().toString();
-        }
+        // So, we use UUID as the producer group by default, to allow many producer instances for one topic
+        String defaultGroup = UUID.randomUUID().toString();
         producer.setProducerGroup(props.getProperty(PRODUCER_GROUP, defaultGroup));
 
         producer.setRetryTimesWhenSendFailed(getInteger(props,
@@ -109,8 +103,8 @@ public class RocketMQConfig {
                 PRODUCER_TIMEOUT, DEFAULT_PRODUCER_TIMEOUT));
     }
 
-    public static void buildConsumerConfigs(Properties props, DefaultMQPushConsumer consumer, TopologyContext context) {
-        buildCommonConfigs(props, consumer, context);
+    public static void buildConsumerConfigs(Properties props, DefaultMQPushConsumer consumer) {
+        buildCommonConfigs(props, consumer);
 
         String group = props.getProperty(CONSUMER_GROUP);
         Validate.notEmpty(group);
@@ -147,19 +141,15 @@ public class RocketMQConfig {
         }
     }
 
-    public static void buildCommonConfigs(Properties props, ClientConfig client, TopologyContext context) {
+    public static void buildCommonConfigs(Properties props, ClientConfig client) {
         String namesvr = props.getProperty(NAME_SERVER_ADDR);
         Validate.notEmpty(namesvr);
         client.setNamesrvAddr(namesvr);
 
         client.setClientIP(props.getProperty(CLIENT_IP, DEFAULT_CLIENT_IP));
-        // use taskID/UUID for client name by default
-        String defaultClientName;
-        if (context != null) {
-            defaultClientName = String.valueOf(context.getThisTaskId());
-        } else {
-            defaultClientName = UUID.randomUUID().toString();
-        }
+        // According to the RocketMQ official docs, "only one instance is allowed per machine"
+        // So, we use UUID as the client name by default, to allow RocketMQ spout/bolt instances in one machine.
+        String defaultClientName = UUID.randomUUID().toString();
         client.setInstanceName(props.getProperty(CLIENT_NAME, defaultClientName));
 
         client.setClientCallbackExecutorThreads(getInteger(props,

http://git-wip-us.apache.org/repos/asf/storm/blob/9bacd977/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/bolt/RocketMQBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/bolt/RocketMQBolt.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/bolt/RocketMQBolt.java
index d55babe..3f04d07 100644
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/bolt/RocketMQBolt.java
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/bolt/RocketMQBolt.java
@@ -41,7 +41,7 @@ import java.util.Properties;
 public class RocketMQBolt implements IRichBolt {
     private static final Logger LOG = LoggerFactory.getLogger(RocketMQBolt.class);
 
-    private MQProducer producer;
+    private static MQProducer producer;
     private OutputCollector collector;
     private boolean async = true;
     private TopicSelector selector;
@@ -52,14 +52,21 @@ public class RocketMQBolt implements IRichBolt {
     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
         Validate.notEmpty(properties, "Producer properties can not be empty");
 
-        producer = new DefaultMQProducer();
-        RocketMQConfig.buildProducerConfigs(properties, (DefaultMQProducer)producer, context);
-
-        try {
-            producer.start();
-        } catch (MQClientException e) {
-            throw new RuntimeException(e);
+        // Since RocketMQ Producer is thread-safe, RocketMQBolt uses a single
+        // producer instance across threads to improve the performance.
+        synchronized (RocketMQBolt.class) {
+            if (producer == null) {
+                producer = new DefaultMQProducer();
+                RocketMQConfig.buildProducerConfigs(properties, (DefaultMQProducer)producer);
+
+                try {
+                    producer.start();
+                } catch (MQClientException e) {
+                    throw new RuntimeException(e);
+                }
+            }
         }
+
         this.collector = collector;
 
         Validate.notNull(selector, "TopicSelector can not be null");
@@ -143,6 +150,11 @@ public class RocketMQBolt implements IRichBolt {
 
     @Override
     public void cleanup() {
-        producer.shutdown();
+        synchronized (RocketMQBolt.class) {
+            if (producer != null) {
+                producer.shutdown();
+                producer = null;
+            }
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/9bacd977/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMQSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMQSpout.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMQSpout.java
index c8a0802..74c8264 100644
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMQSpout.java
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMQSpout.java
@@ -40,6 +40,7 @@ import org.apache.storm.topology.IRichSpout;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.ObjectReader;
 
 import java.util.List;
 import java.util.Map;
@@ -58,7 +59,7 @@ import static org.apache.storm.rocketmq.RocketMQUtils.getInteger;
 public class RocketMQSpout implements IRichSpout {
     // TODO add metrics
 
-    private MQPushConsumer consumer;
+    private static MQPushConsumer consumer;
     private SpoutOutputCollector collector;
     private BlockingQueue<MessageSet> queue;
 
@@ -74,42 +75,48 @@ public class RocketMQSpout implements IRichSpout {
         Validate.notEmpty(properties, "Consumer properties can not be empty");
         boolean ordered = getBoolean(properties, RocketMQConfig.CONSUMER_MESSAGES_ORDERLY, false);
 
-        int queueSize = getInteger(properties, SpoutConfig.QUEUE_SIZE, Integer.parseInt(conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING).toString()));
+        int queueSize = getInteger(properties, SpoutConfig.QUEUE_SIZE, ObjectReader.getInt(conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING)));
         queue = new LinkedBlockingQueue<>(queueSize);
 
-        consumer = new DefaultMQPushConsumer();
-        RocketMQConfig.buildConsumerConfigs(properties, (DefaultMQPushConsumer)consumer, context);
-
-        if (ordered) {
-            consumer.registerMessageListener(new MessageListenerOrderly() {
-                @Override
-                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
-                                                           ConsumeOrderlyContext context) {
-                    if (process(msgs)) {
-                        return ConsumeOrderlyStatus.SUCCESS;
-                    } else {
-                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
-                    }
+        // Since RocketMQ Consumer is thread-safe, RocketMQSpout uses a single
+        // consumer instance across threads to improve the performance.
+        synchronized (RocketMQSpout.class) {
+            if (consumer == null) {
+                consumer = new DefaultMQPushConsumer();
+                RocketMQConfig.buildConsumerConfigs(properties, (DefaultMQPushConsumer)consumer);
+
+                if (ordered) {
+                    consumer.registerMessageListener(new MessageListenerOrderly() {
+                        @Override
+                        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
+                                                                   ConsumeOrderlyContext context) {
+                            if (process(msgs)) {
+                                return ConsumeOrderlyStatus.SUCCESS;
+                            } else {
+                                return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
+                            }
+                        }
+                    });
+                } else {
+                    consumer.registerMessageListener(new MessageListenerConcurrently() {
+                        @Override
+                        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
+                                                                        ConsumeConcurrentlyContext context) {
+                            if (process(msgs)) {
+                                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+                            } else {
+                                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
+                            }
+                        }
+                    });
                 }
-            });
-        } else {
-            consumer.registerMessageListener(new MessageListenerConcurrently() {
-                @Override
-                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
-                                                                ConsumeConcurrentlyContext context) {
-                    if (process(msgs)) {
-                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-                    } else {
-                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
-                    }
-                }
-            });
-        }
 
-        try {
-            consumer.start();
-        } catch (MQClientException e) {
-            throw new RuntimeException(e);
+                try {
+                    consumer.start();
+                } catch (MQClientException e) {
+                    throw new RuntimeException(e);
+                }
+            }
         }
 
         int maxRetry = getInteger(properties, SpoutConfig.MESSAGES_MAX_RETRY, SpoutConfig.DEFAULT_MESSAGES_MAX_RETRY);
@@ -162,7 +169,12 @@ public class RocketMQSpout implements IRichSpout {
 
     @Override
     public void close() {
-        consumer.shutdown();
+        synchronized (RocketMQSpout.class) {
+            if (consumer != null) {
+                consumer.shutdown();
+                consumer = null;
+            }
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/9bacd977/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQState.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQState.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQState.java
index 7e5c078..b6413ad 100644
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQState.java
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQState.java
@@ -73,7 +73,7 @@ public class RocketMQState implements State {
         Validate.notEmpty(options.properties, "Producer properties can not be empty");
 
         producer = new DefaultMQProducer();
-        RocketMQConfig.buildProducerConfigs(options.properties, (DefaultMQProducer)producer, null);
+        RocketMQConfig.buildProducerConfigs(options.properties, (DefaultMQProducer)producer);
 
         try {
             producer.start();