You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by xi...@apache.org on 2017/04/24 04:05:22 UTC

[1/5] storm git commit: STORM-2349: Add one RocketMQ plugin for the Apache Storm

Repository: storm
Updated Branches:
  refs/heads/master bfd10066c -> 3411c45e9


http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/TestMessageRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/TestMessageRetryManager.java b/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/TestMessageRetryManager.java
new file mode 100644
index 0000000..ecb86d1
--- /dev/null
+++ b/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/TestMessageRetryManager.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq;
+
+import org.apache.storm.utils.Utils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestMessageRetryManager {
+    MessageRetryManager messageRetryManager;
+    Map<String,MessageSet> cache;
+    BlockingQueue<MessageSet> queue;
+
+    @Before
+    public void prepare() {
+        cache = new ConcurrentHashMap<>(10);
+        queue = new LinkedBlockingDeque<>(10);
+        int maxRetry = 3;
+        int ttl = 2000;
+        messageRetryManager = new DefaultMessageRetryManager(queue, maxRetry, ttl);
+        ((DefaultMessageRetryManager)messageRetryManager).setCache(cache);
+    }
+
+    @Test
+    public void testRetryLogics() {
+        //ack
+        MessageSet messageSet = new MessageSet(new ArrayList<>());
+        messageRetryManager.mark(messageSet);
+        assertEquals(1, cache.size());
+        assertTrue(cache.containsKey(messageSet.getId()));
+
+        messageRetryManager.ack(messageSet.getId());
+        assertEquals(0, cache.size());
+        assertFalse(cache.containsKey(messageSet.getId()));
+
+
+        //fail need retry: retries < maxRetry
+        messageSet = new MessageSet(new ArrayList<>());
+        messageRetryManager.mark(messageSet);
+        assertEquals(1, cache.size());
+        assertTrue(cache.containsKey(messageSet.getId()));
+
+        messageRetryManager.fail(messageSet.getId());
+        assertEquals(0, cache.size());
+        assertFalse(cache.containsKey(messageSet.getId()));
+        assertEquals(1, messageSet.getRetries());
+        assertEquals(1, queue.size());
+        assertEquals(messageSet, queue.poll());
+
+
+        //fail need not retry: retries >= maxRetry
+        messageSet = new MessageSet(new ArrayList<>());
+        messageRetryManager.mark(messageSet);
+        messageRetryManager.fail(messageSet.getId());
+        assertEquals(0, cache.size());
+        assertFalse(cache.containsKey(messageSet.getId()));
+
+        messageRetryManager.mark(messageSet);
+        messageRetryManager.fail(messageSet.getId());
+        assertEquals(2, messageSet.getRetries());
+        messageRetryManager.mark(messageSet);
+        messageRetryManager.fail(messageSet.getId());
+        assertEquals(3, messageSet.getRetries());
+
+        assertFalse(messageRetryManager.needRetry(messageSet));
+        messageRetryManager.mark(messageSet);
+        messageRetryManager.fail(messageSet.getId());
+        assertEquals(0, cache.size());
+        assertEquals(3, queue.size());
+        assertEquals(messageSet, queue.poll());
+
+
+        //fail: no ack/fail received in ttl
+        messageSet = new MessageSet(new ArrayList<>());
+        messageRetryManager.mark(messageSet);
+        Utils.sleep(10000);
+        assertEquals(0, cache.size());
+        assertFalse(cache.containsKey(messageSet.getId()));
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1281171..e0a1877 100644
--- a/pom.xml
+++ b/pom.xml
@@ -350,6 +350,7 @@
         <module>external/storm-druid</module>
         <module>external/storm-jms</module>
         <module>external/storm-pmml</module>
+        <module>external/storm-rocketmq</module>
 
         <!-- examples -->
         <module>examples/storm-starter</module>
@@ -367,6 +368,7 @@
         <module>examples/storm-mqtt-examples</module>
         <module>examples/storm-pmml-examples</module>
         <module>examples/storm-jms-examples</module>
+        <module>examples/storm-rocketmq-examples</module>
         <module>examples/storm-perf</module>
         <module>storm-client-misc</module>
     </modules>

http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/storm-dist/binary/final-package/src/main/assembly/binary.xml
----------------------------------------------------------------------
diff --git a/storm-dist/binary/final-package/src/main/assembly/binary.xml b/storm-dist/binary/final-package/src/main/assembly/binary.xml
index 209e795..46cf1b2 100644
--- a/storm-dist/binary/final-package/src/main/assembly/binary.xml
+++ b/storm-dist/binary/final-package/src/main/assembly/binary.xml
@@ -245,6 +245,13 @@
                 <include>README.md</include>
             </includes>
         </fileSet>
+        <fileSet>
+            <directory>${project.basedir}/../../../external/storm-rocketmq</directory>
+            <outputDirectory>external/storm-rocketmq</outputDirectory>
+            <includes>
+                <include>README.*</include>
+            </includes>
+        </fileSet>
 
         <!-- $STORM_HOME/extlib -->
         <fileSet>


[4/5] storm git commit: STORM-2349: Add Scheme for RocketMQSpout to deserialize data

Posted by xi...@apache.org.
STORM-2349: Add Scheme for RocketMQSpout to deserialize data


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ded7a1e4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ded7a1e4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ded7a1e4

Branch: refs/heads/master
Commit: ded7a1e4bda4cf8f16faaa507246636d49bcdc92
Parents: 9bacd97
Author: vesense <be...@163.com>
Authored: Tue Apr 18 12:01:43 2017 +0800
Committer: vesense <be...@163.com>
Committed: Tue Apr 18 12:01:43 2017 +0800

----------------------------------------------------------------------
 .../storm/rocketmq/topology/WordCounter.java    |  26 ++--
 .../apache/storm/rocketmq/ConsumerMessage.java  |  60 +++++++++
 .../rocketmq/DefaultMessageRetryManager.java    |  42 +++---
 .../storm/rocketmq/MessageRetryManager.java     |  12 +-
 .../org/apache/storm/rocketmq/MessageSet.java   |  66 ---------
 .../apache/storm/rocketmq/RocketMQUtils.java    |  31 ++++-
 .../org/apache/storm/rocketmq/SpoutConfig.java  |   8 +-
 .../storm/rocketmq/spout/RocketMQSpout.java     | 135 +++++++++++--------
 .../rocketmq/spout/scheme/KeyValueScheme.java   |  27 ++++
 .../spout/scheme/StringKeyValueScheme.java      |  38 ++++++
 .../rocketmq/spout/scheme/StringScheme.java     |  48 +++++++
 .../storm/rocketmq/TestMessageRetryManager.java |  68 +++++-----
 12 files changed, 361 insertions(+), 200 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/ded7a1e4/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/topology/WordCounter.java
----------------------------------------------------------------------
diff --git a/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/topology/WordCounter.java b/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/topology/WordCounter.java
index 8bc62b9..327c11a 100644
--- a/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/topology/WordCounter.java
+++ b/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/topology/WordCounter.java
@@ -17,8 +17,6 @@
  */
 package org.apache.storm.rocketmq.topology;
 
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.storm.rocketmq.RocketMQUtils;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.BasicOutputCollector;
 import org.apache.storm.topology.IBasicBolt;
@@ -28,7 +26,6 @@ import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
 
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 public class WordCounter implements IBasicBolt {
@@ -39,21 +36,18 @@ public class WordCounter implements IBasicBolt {
     }
 
     public void execute(Tuple input, BasicOutputCollector collector) {
-        List<MessageExt> list = (List<MessageExt>)input.getValueByField("msgs");
-        for (MessageExt messageExt : list) {
-            String word = RocketMQUtils.getUtf8StringBody(messageExt);
+        String word = input.getStringByField("str");
 
-            int count;
-            if (wordCounter.containsKey(word)) {
-                count = wordCounter.get(word) + 1;
-                wordCounter.put(word, wordCounter.get(word) + 1);
-            } else {
-                count = 1;
-            }
-
-            wordCounter.put(word, count);
-            collector.emit(new Values(word, count));
+        int count;
+        if (wordCounter.containsKey(word)) {
+            count = wordCounter.get(word) + 1;
+            wordCounter.put(word, wordCounter.get(word) + 1);
+        } else {
+            count = 1;
         }
+
+        wordCounter.put(word, count);
+        collector.emit(new Values(word, count));
     }
 
     public void cleanup() {

http://git-wip-us.apache.org/repos/asf/storm/blob/ded7a1e4/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/ConsumerMessage.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/ConsumerMessage.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/ConsumerMessage.java
new file mode 100644
index 0000000..97afae1
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/ConsumerMessage.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq;
+
+import org.apache.rocketmq.common.message.MessageExt;
+
+public class ConsumerMessage {
+    private String id;
+    private MessageExt data;
+    private long timestamp;
+    private int retries;
+
+    public ConsumerMessage(String id, MessageExt data) {
+        this.id = id;
+        this.data = data;
+    }
+
+    public ConsumerMessage(MessageExt data) {
+        this(data.getMsgId(), data);
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public MessageExt getData() {
+        return data;
+    }
+
+    public long getTimestamp() {
+        return timestamp;
+    }
+
+    public void setTimestamp(long timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    public int getRetries() {
+        return retries;
+    }
+
+    public void setRetries(int retries) {
+        this.retries = retries;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/ded7a1e4/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/DefaultMessageRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/DefaultMessageRetryManager.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/DefaultMessageRetryManager.java
index 9d540d9..bcc7e99 100644
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/DefaultMessageRetryManager.java
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/DefaultMessageRetryManager.java
@@ -27,12 +27,12 @@ import java.util.concurrent.ConcurrentHashMap;
  * An implementation of MessageRetryManager
  */
 public class DefaultMessageRetryManager implements MessageRetryManager{
-    private Map<String,MessageSet> cache = new ConcurrentHashMap<>(500);
-    private BlockingQueue<MessageSet> queue;
+    private Map<String,ConsumerMessage> cache = new ConcurrentHashMap<>(500);
+    private BlockingQueue<ConsumerMessage> queue;
     private int maxRetry;
     private int ttl;
 
-    public DefaultMessageRetryManager(BlockingQueue<MessageSet> queue, int maxRetry, int ttl) {
+    public DefaultMessageRetryManager(BlockingQueue<ConsumerMessage> queue, int maxRetry, int ttl) {
         this.queue = queue;
         this.maxRetry = maxRetry;
         this.ttl = ttl;
@@ -42,10 +42,10 @@ public class DefaultMessageRetryManager implements MessageRetryManager{
             @Override
             public void run() {
                 long now = System.currentTimeMillis();
-                for (Map.Entry<String, MessageSet> entry : cache.entrySet()) {
+                for (Map.Entry<String, ConsumerMessage> entry : cache.entrySet()) {
                     String id = entry.getKey();
-                    MessageSet messageSet = entry.getValue();
-                    if (now - messageSet.getTimestamp() >= ttl) { // no ack/fail received in ttl
+                    ConsumerMessage message = entry.getValue();
+                    if (now - message.getTimestamp() >= ttl) { // no ack/fail received in ttl
                         fail(id);
                     }
                 }
@@ -58,29 +58,35 @@ public class DefaultMessageRetryManager implements MessageRetryManager{
     }
 
     public void fail(String id) {
-        MessageSet messageSet = cache.remove(id);
-        if (messageSet == null) {
+        ConsumerMessage message = cache.remove(id);
+        if (message == null) {
             return;
         }
 
-        if (needRetry(messageSet)) {
-            messageSet.setRetries(messageSet.getRetries() + 1);
-            messageSet.setTimestamp(0);
-            queue.offer(messageSet);
+        if (needRetry(message)) {
+            message.setRetries(message.getRetries() + 1);
+            message.setTimestamp(0);
+            queue.offer(message);
         }
     }
 
-    public void mark(MessageSet messageSet) {
-        messageSet.setTimestamp(System.currentTimeMillis());
-        cache.put(messageSet.getId(), messageSet);
+    public void mark(ConsumerMessage message) {
+        message.setTimestamp(System.currentTimeMillis());
+        cache.put(message.getId(), message);
     }
 
-    public boolean needRetry(MessageSet messageSet) {
-        return messageSet.getRetries() < maxRetry;
+    /**
+     * Whether the message need retry.
+     * @param message
+     * @return
+     */
+    public boolean needRetry(ConsumerMessage message) {
+        return message.getRetries() < maxRetry;
     }
 
     // just for testing
-    public void setCache(Map<String,MessageSet> cache) {
+    public void setCache(Map<String,ConsumerMessage> cache) {
         this.cache = cache;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/ded7a1e4/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageRetryManager.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageRetryManager.java
index 18fd903..5e59d9c 100644
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageRetryManager.java
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageRetryManager.java
@@ -35,16 +35,16 @@ public interface MessageRetryManager {
     void fail(String id);
 
     /**
-     * Mark messageSet in the cache.
-     * @param messageSet
+     * Mark message in the cache.
+     * @param message
      */
-    void mark(MessageSet messageSet);
+    void mark(ConsumerMessage message);
 
     /**
-     * Whether the messageSet need retry.
-     * @param messageSet
+     * Whether the message need retry.
+     * @param message
      * @return
      */
-    boolean needRetry(MessageSet messageSet);
+    boolean needRetry(ConsumerMessage message);
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/ded7a1e4/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageSet.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageSet.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageSet.java
deleted file mode 100644
index 7307271..0000000
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageSet.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.rocketmq;
-
-import org.apache.rocketmq.common.message.MessageExt;
-
-import java.util.List;
-import java.util.UUID;
-
-/**
- * A message collection.
- */
-public class MessageSet {
-    private String id;
-    private List<MessageExt> data;
-    private long timestamp;
-    private int retries;
-
-    public MessageSet(String id, List<MessageExt> data) {
-        this.id = id;
-        this.data = data;
-    }
-
-    public MessageSet(List<MessageExt> data) {
-        this(UUID.randomUUID().toString(), data);
-    }
-
-    public String getId() {
-        return id;
-    }
-
-    public List<MessageExt> getData() {
-        return data;
-    }
-
-    public long getTimestamp() {
-        return timestamp;
-    }
-
-    public void setTimestamp(long timestamp) {
-        this.timestamp = timestamp;
-    }
-
-    public int getRetries() {
-        return retries;
-    }
-
-    public void setRetries(int retries) {
-        this.retries = retries;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/ded7a1e4/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQUtils.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQUtils.java
index dbe6b12..7cbf749 100644
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQUtils.java
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQUtils.java
@@ -18,8 +18,12 @@
 package org.apache.storm.rocketmq;
 
 import org.apache.rocketmq.common.message.Message;
+import org.apache.storm.rocketmq.spout.scheme.KeyValueScheme;
+import org.apache.storm.spout.Scheme;
 
+import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
+import java.util.List;
 import java.util.Properties;
 
 public final class RocketMQUtils {
@@ -32,10 +36,29 @@ public final class RocketMQUtils {
         return Boolean.parseBoolean(props.getProperty(key, String.valueOf(defaultValue)));
     }
 
-    public static String getUtf8StringBody(Message message) {
-        if (message == null) {
-            return null;
+    public static Scheme createScheme(Properties props) {
+        String schemeString = props.getProperty(SpoutConfig.SCHEME, SpoutConfig.DEFAULT_SCHEME);
+        Scheme scheme;
+        try {
+            Class clazz = Class.forName(schemeString);
+            scheme = (Scheme)clazz.newInstance();
+        } catch (Exception e) {
+            throw new IllegalArgumentException("Cannot create Scheme for " + schemeString
+                    + " due to " + e.getMessage());
         }
-        return new String(message.getBody(), StandardCharsets.UTF_8);
+        return scheme;
+    }
+
+    public static List<Object> generateTuples(Message msg, Scheme scheme) {
+        List<Object> tup;
+        String rawKey = msg.getKeys();
+        ByteBuffer body = ByteBuffer.wrap(msg.getBody());
+        if (rawKey != null && scheme instanceof KeyValueScheme) {
+            ByteBuffer key = ByteBuffer.wrap(rawKey.getBytes(StandardCharsets.UTF_8));
+            tup = ((KeyValueScheme)scheme).deserializeKeyAndValue(key, body);
+        } else {
+            tup = scheme.deserialize(body);
+        }
+        return tup;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/ded7a1e4/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/SpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/SpoutConfig.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/SpoutConfig.java
index 53ce152..05d7f76 100644
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/SpoutConfig.java
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/SpoutConfig.java
@@ -17,16 +17,18 @@
  */
 package org.apache.storm.rocketmq;
 
+import org.apache.storm.rocketmq.spout.scheme.StringScheme;
+
 public class SpoutConfig extends RocketMQConfig {
     public static final String QUEUE_SIZE = "spout.queue.size";
 
-    public static final String DECLARE_FIELDS = "spout.declare.fields";
-    public static final String DEFAULT_DECLARE_FIELDS = "msgs";
-
     public static final String MESSAGES_MAX_RETRY = "spout.messages.max.retry";
     public static final int DEFAULT_MESSAGES_MAX_RETRY = 3;
 
     public static final String MESSAGES_TTL = "spout.messages.ttl";
     public static final int DEFAULT_MESSAGES_TTL = 300000;  // 5min
 
+    public static final String SCHEME = "spout.scheme";
+    public static final String DEFAULT_SCHEME = StringScheme.class.getName();
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/ded7a1e4/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMQSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMQSpout.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMQSpout.java
index 74c8264..58dc2bf 100644
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMQSpout.java
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMQSpout.java
@@ -29,17 +29,17 @@ import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.storm.Config;
+import org.apache.storm.rocketmq.ConsumerMessage;
 import org.apache.storm.rocketmq.DefaultMessageRetryManager;
 import org.apache.storm.rocketmq.MessageRetryManager;
-import org.apache.storm.rocketmq.MessageSet;
 import org.apache.storm.rocketmq.RocketMQConfig;
+import org.apache.storm.rocketmq.RocketMQUtils;
 import org.apache.storm.rocketmq.SpoutConfig;
+import org.apache.storm.spout.Scheme;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.IRichSpout;
 import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
 import org.apache.storm.utils.ObjectReader;
 
 import java.util.List;
@@ -61,88 +61,117 @@ public class RocketMQSpout implements IRichSpout {
 
     private static MQPushConsumer consumer;
     private SpoutOutputCollector collector;
-    private BlockingQueue<MessageSet> queue;
+    private BlockingQueue<ConsumerMessage> queue;
+    private BlockingQueue<ConsumerMessage> pending;
 
     private Properties properties;
     private MessageRetryManager messageRetryManager;
+    private Scheme scheme;
 
     public RocketMQSpout(Properties properties) {
+        Validate.notEmpty(properties, "Consumer properties can not be empty");
         this.properties = properties;
+        scheme = RocketMQUtils.createScheme(properties);
     }
 
     @Override
     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
-        Validate.notEmpty(properties, "Consumer properties can not be empty");
-        boolean ordered = getBoolean(properties, RocketMQConfig.CONSUMER_MESSAGES_ORDERLY, false);
-
-        int queueSize = getInteger(properties, SpoutConfig.QUEUE_SIZE, ObjectReader.getInt(conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING)));
-        queue = new LinkedBlockingQueue<>(queueSize);
-
         // Since RocketMQ Consumer is thread-safe, RocketMQSpout uses a single
         // consumer instance across threads to improve the performance.
         synchronized (RocketMQSpout.class) {
             if (consumer == null) {
-                consumer = new DefaultMQPushConsumer();
-                RocketMQConfig.buildConsumerConfigs(properties, (DefaultMQPushConsumer)consumer);
-
-                if (ordered) {
-                    consumer.registerMessageListener(new MessageListenerOrderly() {
-                        @Override
-                        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
-                                                                   ConsumeOrderlyContext context) {
-                            if (process(msgs)) {
-                                return ConsumeOrderlyStatus.SUCCESS;
-                            } else {
-                                return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
-                            }
-                        }
-                    });
-                } else {
-                    consumer.registerMessageListener(new MessageListenerConcurrently() {
-                        @Override
-                        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
-                                                                        ConsumeConcurrentlyContext context) {
-                            if (process(msgs)) {
-                                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-                            } else {
-                                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
-                            }
-                        }
-                    });
-                }
-
-                try {
-                    consumer.start();
-                } catch (MQClientException e) {
-                    throw new RuntimeException(e);
-                }
+                buildAndStartConsumer();
             }
         }
 
+        int queueSize = getInteger(properties, SpoutConfig.QUEUE_SIZE, ObjectReader.getInt(conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING)));
+        queue = new LinkedBlockingQueue<>(queueSize);
+        pending = new LinkedBlockingQueue<>(queueSize);
         int maxRetry = getInteger(properties, SpoutConfig.MESSAGES_MAX_RETRY, SpoutConfig.DEFAULT_MESSAGES_MAX_RETRY);
         int ttl = getInteger(properties, SpoutConfig.MESSAGES_TTL, SpoutConfig.DEFAULT_MESSAGES_TTL);
+
         this.messageRetryManager = new DefaultMessageRetryManager(queue, maxRetry, ttl);
         this.collector = collector;
     }
 
-    public boolean process(List<MessageExt> msgs) {
+    protected void buildAndStartConsumer() {
+        consumer = new DefaultMQPushConsumer();
+        RocketMQConfig.buildConsumerConfigs(properties, (DefaultMQPushConsumer)consumer);
+
+        boolean ordered = getBoolean(properties, RocketMQConfig.CONSUMER_MESSAGES_ORDERLY, false);
+        if (ordered) {
+            consumer.registerMessageListener(new MessageListenerOrderly() {
+                @Override
+                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
+                                                           ConsumeOrderlyContext context) {
+                    if (process(msgs)) {
+                        return ConsumeOrderlyStatus.SUCCESS;
+                    } else {
+                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
+                    }
+                }
+            });
+        } else {
+            consumer.registerMessageListener(new MessageListenerConcurrently() {
+                @Override
+                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
+                                                                ConsumeConcurrentlyContext context) {
+                    if (process(msgs)) {
+                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+                    } else {
+                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
+                    }
+                }
+            });
+        }
+
+        try {
+            consumer.start();
+        } catch (MQClientException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * process pushed messages
+     * @param msgs
+     * @return
+     */
+    protected boolean process(List<MessageExt> msgs) {
         if (msgs.isEmpty()) {
             return true;
         }
-        MessageSet messageSet = new MessageSet(msgs);
-        // returning true upon success and false if this queue is full.
-        return queue.offer(messageSet);
+
+        boolean notFull = true;
+        for (MessageExt msg : msgs) {
+            ConsumerMessage message = new ConsumerMessage(msg);
+            // returning true upon success and false if this queue is full.
+            if(!queue.offer(message)){
+                notFull = false;
+                pending.offer(message);
+            }
+        }
+        return notFull;
     }
 
     @Override
     public void nextTuple() {
-        MessageSet messageSet = queue.poll();
-        if (messageSet == null) {
+        ConsumerMessage message;
+        if (!pending.isEmpty()) {
+            message = pending.poll();
+        } else {
+            message = queue.poll();
+        }
+
+        if (message == null) {
             return;
         }
 
-        messageRetryManager.mark(messageSet);
-        collector.emit(new Values(messageSet.getData()), messageSet.getId());
+        messageRetryManager.mark(message);
+        List<Object> tup = RocketMQUtils.generateTuples(message.getData(), scheme);
+        if (tup != null) {
+            collector.emit(tup, message.getId());
+        }
     }
 
     @Override
@@ -159,7 +188,7 @@ public class RocketMQSpout implements IRichSpout {
 
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields(properties.getProperty(SpoutConfig.DECLARE_FIELDS, SpoutConfig.DEFAULT_DECLARE_FIELDS)));
+        declarer.declare(scheme.getOutputFields());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/ded7a1e4/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/KeyValueScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/KeyValueScheme.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/KeyValueScheme.java
new file mode 100644
index 0000000..e066784
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/KeyValueScheme.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq.spout.scheme;
+
+import org.apache.storm.spout.Scheme;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public interface KeyValueScheme extends Scheme {
+    List<Object> deserializeKeyAndValue(ByteBuffer key, ByteBuffer value);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/ded7a1e4/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/StringKeyValueScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/StringKeyValueScheme.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/StringKeyValueScheme.java
new file mode 100644
index 0000000..47ef996
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/StringKeyValueScheme.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq.spout.scheme;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.storm.tuple.Values;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class StringKeyValueScheme extends StringScheme implements KeyValueScheme {
+
+    @Override
+    public List<Object> deserializeKeyAndValue(ByteBuffer key, ByteBuffer value) {
+        if ( key == null ) {
+            return deserialize(value);
+        }
+        String keyString = StringScheme.deserializeString(key);
+        String valueString = StringScheme.deserializeString(value);
+        return new Values(ImmutableMap.of(keyString, valueString));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/ded7a1e4/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/StringScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/StringScheme.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/StringScheme.java
new file mode 100644
index 0000000..64c8241
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/StringScheme.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq.spout.scheme;
+
+import org.apache.storm.spout.Scheme;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+public class StringScheme implements Scheme {
+    public static final String STRING_SCHEME_KEY = "str";
+
+    public List<Object> deserialize(ByteBuffer bytes) {
+        return new Values(deserializeString(bytes));
+    }
+
+    public static String deserializeString(ByteBuffer string) {
+        if (string.hasArray()) {
+            int base = string.arrayOffset();
+            return new String(string.array(), base + string.position(), string.remaining());
+        } else {
+            return new String(Utils.toByteArray(string), StandardCharsets.UTF_8);
+        }
+    }
+
+    public Fields getOutputFields() {
+        return new Fields(STRING_SCHEME_KEY);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/ded7a1e4/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/TestMessageRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/TestMessageRetryManager.java b/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/TestMessageRetryManager.java
index ecb86d1..47de20d 100644
--- a/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/TestMessageRetryManager.java
+++ b/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/TestMessageRetryManager.java
@@ -17,11 +17,11 @@
  */
 package org.apache.storm.rocketmq;
 
+import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.storm.utils.Utils;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.ArrayList;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
@@ -33,8 +33,8 @@ import static org.junit.Assert.assertTrue;
 
 public class TestMessageRetryManager {
     MessageRetryManager messageRetryManager;
-    Map<String,MessageSet> cache;
-    BlockingQueue<MessageSet> queue;
+    Map<String, ConsumerMessage> cache;
+    BlockingQueue<ConsumerMessage> queue;
 
     @Before
     public void prepare() {
@@ -49,58 +49,58 @@ public class TestMessageRetryManager {
     @Test
     public void testRetryLogics() {
         //ack
-        MessageSet messageSet = new MessageSet(new ArrayList<>());
-        messageRetryManager.mark(messageSet);
+        ConsumerMessage message = new ConsumerMessage("id", new MessageExt());
+        messageRetryManager.mark(message);
         assertEquals(1, cache.size());
-        assertTrue(cache.containsKey(messageSet.getId()));
+        assertTrue(cache.containsKey(message.getId()));
 
-        messageRetryManager.ack(messageSet.getId());
+        messageRetryManager.ack(message.getId());
         assertEquals(0, cache.size());
-        assertFalse(cache.containsKey(messageSet.getId()));
+        assertFalse(cache.containsKey(message.getId()));
 
 
         //fail need retry: retries < maxRetry
-        messageSet = new MessageSet(new ArrayList<>());
-        messageRetryManager.mark(messageSet);
+        message = new ConsumerMessage("id", new MessageExt());
+        messageRetryManager.mark(message);
         assertEquals(1, cache.size());
-        assertTrue(cache.containsKey(messageSet.getId()));
+        assertTrue(cache.containsKey(message.getId()));
 
-        messageRetryManager.fail(messageSet.getId());
+        messageRetryManager.fail(message.getId());
         assertEquals(0, cache.size());
-        assertFalse(cache.containsKey(messageSet.getId()));
-        assertEquals(1, messageSet.getRetries());
+        assertFalse(cache.containsKey(message.getId()));
+        assertEquals(1, message.getRetries());
         assertEquals(1, queue.size());
-        assertEquals(messageSet, queue.poll());
+        assertEquals(message, queue.poll());
 
 
         //fail need not retry: retries >= maxRetry
-        messageSet = new MessageSet(new ArrayList<>());
-        messageRetryManager.mark(messageSet);
-        messageRetryManager.fail(messageSet.getId());
+        message = new ConsumerMessage("id", new MessageExt());
+        messageRetryManager.mark(message);
+        messageRetryManager.fail(message.getId());
         assertEquals(0, cache.size());
-        assertFalse(cache.containsKey(messageSet.getId()));
-
-        messageRetryManager.mark(messageSet);
-        messageRetryManager.fail(messageSet.getId());
-        assertEquals(2, messageSet.getRetries());
-        messageRetryManager.mark(messageSet);
-        messageRetryManager.fail(messageSet.getId());
-        assertEquals(3, messageSet.getRetries());
-
-        assertFalse(messageRetryManager.needRetry(messageSet));
-        messageRetryManager.mark(messageSet);
-        messageRetryManager.fail(messageSet.getId());
+        assertFalse(cache.containsKey(message.getId()));
+
+        messageRetryManager.mark(message);
+        messageRetryManager.fail(message.getId());
+        assertEquals(2, message.getRetries());
+        messageRetryManager.mark(message);
+        messageRetryManager.fail(message.getId());
+        assertEquals(3, message.getRetries());
+
+        assertFalse(messageRetryManager.needRetry(message));
+        messageRetryManager.mark(message);
+        messageRetryManager.fail(message.getId());
         assertEquals(0, cache.size());
         assertEquals(3, queue.size());
-        assertEquals(messageSet, queue.poll());
+        assertEquals(message, queue.poll());
 
 
         //fail: no ack/fail received in ttl
-        messageSet = new MessageSet(new ArrayList<>());
-        messageRetryManager.mark(messageSet);
+        message = new ConsumerMessage("id", new MessageExt());
+        messageRetryManager.mark(message);
         Utils.sleep(10000);
         assertEquals(0, cache.size());
-        assertFalse(cache.containsKey(messageSet.getId()));
+        assertFalse(cache.containsKey(message.getId()));
 
     }
 }


[5/5] storm git commit: Merge branch 'STORM-2349' of https://github.com/vesense/storm

Posted by xi...@apache.org.
Merge branch 'STORM-2349' of https://github.com/vesense/storm


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3411c45e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3411c45e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3411c45e

Branch: refs/heads/master
Commit: 3411c45e9a6ef77f1e84051964ca60ac2961f951
Parents: bfd1006 ded7a1e
Author: vesense <be...@163.com>
Authored: Mon Apr 24 11:56:36 2017 +0800
Committer: vesense <be...@163.com>
Committed: Mon Apr 24 11:56:36 2017 +0800

----------------------------------------------------------------------
 examples/storm-rocketmq-examples/pom.xml        |  89 ++++++++
 .../rocketmq/topology/WordCountTopology.java    |  95 ++++++++
 .../storm/rocketmq/topology/WordCounter.java    |  66 ++++++
 .../rocketmq/trident/WordCountTrident.java      |  94 ++++++++
 external/storm-mongodb/pom.xml                  |   2 +-
 external/storm-rocketmq/README.md               | 118 ++++++++++
 external/storm-rocketmq/pom.xml                 |  71 ++++++
 .../apache/storm/rocketmq/ConsumerMessage.java  |  60 +++++
 .../rocketmq/DefaultMessageBodySerializer.java  |  37 ++++
 .../rocketmq/DefaultMessageRetryManager.java    |  92 ++++++++
 .../storm/rocketmq/MessageBodySerializer.java   |  27 +++
 .../storm/rocketmq/MessageRetryManager.java     |  50 +++++
 .../apache/storm/rocketmq/RocketMQConfig.java   | 162 ++++++++++++++
 .../apache/storm/rocketmq/RocketMQUtils.java    |  64 ++++++
 .../org/apache/storm/rocketmq/SpoutConfig.java  |  34 +++
 .../storm/rocketmq/bolt/RocketMQBolt.java       | 160 ++++++++++++++
 .../FieldNameBasedTupleToMessageMapper.java     |  66 ++++++
 .../common/mapper/TupleToMessageMapper.java     |  30 +++
 .../common/selector/DefaultTopicSelector.java   |  45 ++++
 .../selector/FieldNameBasedTopicSelector.java   |  63 ++++++
 .../rocketmq/common/selector/TopicSelector.java |  27 +++
 .../storm/rocketmq/spout/RocketMQSpout.java     | 218 +++++++++++++++++++
 .../rocketmq/spout/scheme/KeyValueScheme.java   |  27 +++
 .../spout/scheme/StringKeyValueScheme.java      |  38 ++++
 .../rocketmq/spout/scheme/StringScheme.java     |  48 ++++
 .../rocketmq/trident/state/RocketMQState.java   | 117 ++++++++++
 .../trident/state/RocketMQStateFactory.java     |  42 ++++
 .../trident/state/RocketMQStateUpdater.java     |  34 +++
 .../storm/rocketmq/TestMessageRetryManager.java | 106 +++++++++
 pom.xml                                         |   2 +
 .../final-package/src/main/assembly/binary.xml  |   7 +
 31 files changed, 2090 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/3411c45e/pom.xml
----------------------------------------------------------------------
diff --cc pom.xml
index ca5890c,e0a1877..ac6928b
--- a/pom.xml
+++ b/pom.xml
@@@ -369,7 -368,9 +370,8 @@@
          <module>examples/storm-mqtt-examples</module>
          <module>examples/storm-pmml-examples</module>
          <module>examples/storm-jms-examples</module>
+         <module>examples/storm-rocketmq-examples</module>
          <module>examples/storm-perf</module>
 -        <module>storm-client-misc</module>
      </modules>
  
      <dependencies>


[2/5] storm git commit: STORM-2349: Add one RocketMQ plugin for the Apache Storm

Posted by xi...@apache.org.
STORM-2349: Add one RocketMQ plugin for the Apache Storm


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4a4a8d2e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4a4a8d2e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4a4a8d2e

Branch: refs/heads/master
Commit: 4a4a8d2e1ec89d572116865e28df7241178bc624
Parents: 1850dd5
Author: vesense <be...@163.com>
Authored: Mon Mar 20 16:50:34 2017 +0800
Committer: vesense <be...@163.com>
Committed: Tue Apr 11 10:41:32 2017 +0800

----------------------------------------------------------------------
 examples/storm-rocketmq-examples/pom.xml        |  89 ++++++++++
 .../rocketmq/topology/WordCountTopology.java    |  95 ++++++++++
 .../storm/rocketmq/topology/WordCounter.java    |  72 ++++++++
 .../rocketmq/trident/WordCountTrident.java      |  94 ++++++++++
 external/storm-mongodb/pom.xml                  |   2 +-
 external/storm-rocketmq/README.md               | 118 +++++++++++++
 external/storm-rocketmq/pom.xml                 |  71 ++++++++
 .../rocketmq/DefaultMessageBodySerializer.java  |  37 ++++
 .../rocketmq/DefaultMessageRetryManager.java    |  86 +++++++++
 .../storm/rocketmq/MessageBodySerializer.java   |  27 +++
 .../storm/rocketmq/MessageRetryManager.java     |  50 ++++++
 .../org/apache/storm/rocketmq/MessageSet.java   |  66 +++++++
 .../apache/storm/rocketmq/RocketMQConfig.java   | 172 ++++++++++++++++++
 .../apache/storm/rocketmq/RocketMQUtils.java    |  41 +++++
 .../org/apache/storm/rocketmq/SpoutConfig.java  |  32 ++++
 .../storm/rocketmq/bolt/RocketMQBolt.java       | 148 ++++++++++++++++
 .../FieldNameBasedTupleToMessageMapper.java     |  66 +++++++
 .../common/mapper/TupleToMessageMapper.java     |  30 ++++
 .../common/selector/DefaultTopicSelector.java   |  45 +++++
 .../selector/FieldNameBasedTopicSelector.java   |  63 +++++++
 .../rocketmq/common/selector/TopicSelector.java |  27 +++
 .../storm/rocketmq/spout/RocketMQSpout.java     | 177 +++++++++++++++++++
 .../rocketmq/trident/state/RocketMQState.java   | 117 ++++++++++++
 .../trident/state/RocketMQStateFactory.java     |  42 +++++
 .../trident/state/RocketMQStateUpdater.java     |  34 ++++
 .../storm/rocketmq/TestMessageRetryManager.java | 106 +++++++++++
 pom.xml                                         |   2 +
 .../final-package/src/main/assembly/binary.xml  |   7 +
 28 files changed, 1915 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/examples/storm-rocketmq-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-rocketmq-examples/pom.xml b/examples/storm-rocketmq-examples/pom.xml
new file mode 100644
index 0000000..d66177e
--- /dev/null
+++ b/examples/storm-rocketmq-examples/pom.xml
@@ -0,0 +1,89 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>2.0.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>storm-rocketmq-examples</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-server</artifactId>
+            <version>${project.version}</version>
+            <scope>${provided.scope}</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-rocketmq</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <configuration>
+                    <createDependencyReducedPom>true</createDependencyReducedPom>
+                    <filters>
+                        <filter>
+                            <artifact>*:*</artifact>
+                            <excludes>
+                                <exclude>META-INF/*.SF</exclude>
+                                <exclude>META-INF/*.sf</exclude>
+                                <exclude>META-INF/*.DSA</exclude>
+                                <exclude>META-INF/*.dsa</exclude>
+                                <exclude>META-INF/*.RSA</exclude>
+                                <exclude>META-INF/*.rsa</exclude>
+                                <exclude>META-INF/*.EC</exclude>
+                                <exclude>META-INF/*.ec</exclude>
+                                <exclude>META-INF/MSFTSIG.SF</exclude>
+                                <exclude>META-INF/MSFTSIG.RSA</exclude>
+                            </excludes>
+                        </filter>
+                    </filters>
+                </configuration>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <transformers>
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                </transformer>
+                            </transformers>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/topology/WordCountTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/topology/WordCountTopology.java b/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/topology/WordCountTopology.java
new file mode 100644
index 0000000..0ce844e
--- /dev/null
+++ b/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/topology/WordCountTopology.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq.topology;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalCluster.LocalTopology;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.rocketmq.RocketMQConfig;
+import org.apache.storm.rocketmq.SpoutConfig;
+import org.apache.storm.rocketmq.bolt.RocketMQBolt;
+import org.apache.storm.rocketmq.common.mapper.FieldNameBasedTupleToMessageMapper;
+import org.apache.storm.rocketmq.common.mapper.TupleToMessageMapper;
+import org.apache.storm.rocketmq.common.selector.DefaultTopicSelector;
+import org.apache.storm.rocketmq.common.selector.TopicSelector;
+import org.apache.storm.rocketmq.spout.RocketMQSpout;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+
+import java.util.Properties;
+
+public class WordCountTopology {
+    private static final String WORD_SPOUT = "WORD_SPOUT";
+    private static final String COUNT_BOLT = "COUNT_BOLT";
+    private static final String INSERT_BOLT = "INSERT_BOLT";
+
+    private static final String CONSUMER_GROUP = "wordcount";
+    private static final String CONSUMER_TOPIC = "wordcountsource";
+
+    public static StormTopology buildTopology(String nameserverAddr, String topic){
+        Properties properties = new Properties();
+        properties.setProperty(SpoutConfig.NAME_SERVER_ADDR, nameserverAddr);
+        properties.setProperty(SpoutConfig.CONSUMER_GROUP, CONSUMER_GROUP);
+        properties.setProperty(SpoutConfig.CONSUMER_TOPIC, CONSUMER_TOPIC);
+
+        RocketMQSpout spout = new RocketMQSpout(properties);
+
+        WordCounter bolt = new WordCounter();
+
+        TupleToMessageMapper mapper = new FieldNameBasedTupleToMessageMapper("word", "count");
+        TopicSelector selector = new DefaultTopicSelector(topic);
+
+        properties = new Properties();
+        properties.setProperty(RocketMQConfig.NAME_SERVER_ADDR, nameserverAddr);
+
+        RocketMQBolt insertBolt = new RocketMQBolt()
+                .withMapper(mapper)
+                .withSelector(selector)
+                .withProperties(properties);
+
+        // wordSpout ==> countBolt ==> insertBolt
+        TopologyBuilder builder = new TopologyBuilder();
+
+        builder.setSpout(WORD_SPOUT, spout, 1);
+        builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT);
+        builder.setBolt(INSERT_BOLT, insertBolt, 1).fieldsGrouping(COUNT_BOLT, new Fields("word"));
+
+        return builder.createTopology();
+    }
+
+    public static void main(String[] args) throws Exception {
+        Config conf = new Config();
+        conf.setMaxSpoutPending(5);
+        if (args.length == 2) {
+            try (LocalCluster cluster = new LocalCluster();
+                 LocalTopology topo = cluster.submitTopology("wordCounter", conf, buildTopology(args[0], args[1]));) {
+                Thread.sleep(120 * 1000);
+            }
+            System.exit(0);
+        }
+        else if(args.length == 3) {
+            conf.setNumWorkers(3);
+            StormSubmitter.submitTopology(args[2], conf, buildTopology(args[0], args[1]));
+        } else{
+            System.out.println("Usage: WordCountTopology <nameserver addr> <topic> [topology name]");
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/topology/WordCounter.java
----------------------------------------------------------------------
diff --git a/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/topology/WordCounter.java b/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/topology/WordCounter.java
new file mode 100644
index 0000000..8bc62b9
--- /dev/null
+++ b/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/topology/WordCounter.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq.topology;
+
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.storm.rocketmq.RocketMQUtils;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.IBasicBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class WordCounter implements IBasicBolt {
+    private Map<String, Integer> wordCounter = new HashMap<>();
+
+    public void prepare(Map stormConf, TopologyContext context) {
+        
+    }
+
+    public void execute(Tuple input, BasicOutputCollector collector) {
+        List<MessageExt> list = (List<MessageExt>)input.getValueByField("msgs");
+        for (MessageExt messageExt : list) {
+            String word = RocketMQUtils.getUtf8StringBody(messageExt);
+
+            int count;
+            if (wordCounter.containsKey(word)) {
+                count = wordCounter.get(word) + 1;
+                wordCounter.put(word, wordCounter.get(word) + 1);
+            } else {
+                count = 1;
+            }
+
+            wordCounter.put(word, count);
+            collector.emit(new Values(word, count));
+        }
+    }
+
+    public void cleanup() {
+
+    }
+
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("word", "count"));
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/trident/WordCountTrident.java
----------------------------------------------------------------------
diff --git a/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/trident/WordCountTrident.java b/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/trident/WordCountTrident.java
new file mode 100644
index 0000000..1817d2e
--- /dev/null
+++ b/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/trident/WordCountTrident.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalCluster.LocalTopology;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.rocketmq.RocketMQConfig;
+import org.apache.storm.rocketmq.common.mapper.FieldNameBasedTupleToMessageMapper;
+import org.apache.storm.rocketmq.common.mapper.TupleToMessageMapper;
+import org.apache.storm.rocketmq.common.selector.DefaultTopicSelector;
+import org.apache.storm.rocketmq.common.selector.TopicSelector;
+import org.apache.storm.rocketmq.trident.state.RocketMQState;
+import org.apache.storm.rocketmq.trident.state.RocketMQStateFactory;
+import org.apache.storm.rocketmq.trident.state.RocketMQStateUpdater;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+import java.util.Properties;
+
+public class WordCountTrident {
+
+    public static StormTopology buildTopology(String nameserverAddr, String topic){
+        Fields fields = new Fields("word", "count");
+        FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
+                new Values("storm", 1),
+                new Values("trident", 1),
+                new Values("needs", 1),
+                new Values("javadoc", 1)
+        );
+        spout.setCycle(true);
+
+        TupleToMessageMapper mapper = new FieldNameBasedTupleToMessageMapper("word", "count");
+        TopicSelector selector = new DefaultTopicSelector(topic);
+
+        Properties properties = new Properties();
+        properties.setProperty(RocketMQConfig.NAME_SERVER_ADDR, nameserverAddr);
+
+        RocketMQState.Options options = new RocketMQState.Options()
+                .withMapper(mapper)
+                .withSelector(selector)
+                .withProperties(properties);
+
+        StateFactory factory = new RocketMQStateFactory(options);
+
+        TridentTopology topology = new TridentTopology();
+        Stream stream = topology.newStream("spout1", spout);
+
+        stream.partitionPersist(factory, fields,
+                new RocketMQStateUpdater(), new Fields());
+
+        return topology.build();
+    }
+
+    public static void main(String[] args) throws Exception {
+        Config conf = new Config();
+        conf.setMaxSpoutPending(5);
+        if (args.length == 2) {
+            try (LocalCluster cluster = new LocalCluster();
+                 LocalTopology topo = cluster.submitTopology("wordCounter", conf, buildTopology(args[0], args[1]));) {
+                Thread.sleep(60 * 1000);
+            }
+            System.exit(0);
+        }
+        else if(args.length == 3) {
+            conf.setNumWorkers(3);
+            StormSubmitter.submitTopology(args[2], conf, buildTopology(args[0], args[1]));
+        } else{
+            System.out.println("Usage: WordCountTrident <nameserver addr> <topic> [topology name]");
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/external/storm-mongodb/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/pom.xml b/external/storm-mongodb/pom.xml
index 6aaeab6..52d1cdd 100644
--- a/external/storm-mongodb/pom.xml
+++ b/external/storm-mongodb/pom.xml
@@ -44,7 +44,7 @@
             <groupId>org.apache.storm</groupId>
             <artifactId>storm-client</artifactId>
             <version>${project.version}</version>
-            <scope>provided</scope>
+            <scope>${provided.scope}</scope>
         </dependency>
         <dependency>
             <groupId>org.mongodb</groupId>

http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/external/storm-rocketmq/README.md
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/README.md b/external/storm-rocketmq/README.md
new file mode 100644
index 0000000..160118a
--- /dev/null
+++ b/external/storm-rocketmq/README.md
@@ -0,0 +1,118 @@
+#Storm RocketMQ
+
+Storm/Trident integration for [RocketMQ](https://rocketmq.incubator.apache.org/). This package includes the core spout, bolt and trident states that allows a storm topology to either write storm tuples into a topic or read from topics in a storm topology.
+
+
+## Read from Topic
+The spout included in this package for reading data from a topic.
+
+### RocketMQSpout
+To use the `RocketMQSpout`,  you construct an instance of it by specifying a Properties instance which including rocketmq configs.
+RocketMQSpout uses RocketMQ MQPushConsumer as the default implementation. PushConsumer is a high level consumer API, wrapping the pulling details. Looks like broker push messages to consumer.
+RocketMQSpout will retry 3(use `SpoutConfig.DEFAULT_MESSAGES_MAX_RETRY` to change the value) times when messages are failed.
+
+ ```java
+        Properties properties = new Properties();
+        properties.setProperty(SpoutConfig.NAME_SERVER_ADDR, nameserverAddr);
+        properties.setProperty(SpoutConfig.CONSUMER_GROUP, group);
+        properties.setProperty(SpoutConfig.CONSUMER_TOPIC, topic);
+
+        RocketMQSpout spout = new RocketMQSpout(properties);
+ ```
+
+
+## Write into Topic
+The bolt and trident state included in this package for write data into a topic.
+
+### TupleToMessageMapper
+The main API for mapping Storm tuple to a RocketMQ Message is the `org.apache.storm.rocketmq.common.mapper.TupleToMessageMapper` interface:
+
+```java
+public interface TupleToMessageMapper extends Serializable {
+    String getKeyFromTuple(ITuple tuple);
+    byte[] getValueFromTuple(ITuple tuple);
+}
+```
+
+### FieldNameBasedTupleToMessageMapper
+`storm-rocketmq` includes a general purpose `TupleToMessageMapper` implementation called `FieldNameBasedTupleToMessageMapper`.
+
+### TopicSelector
+The main API for selecting topic and tags is the `org.apache.storm.rocketmq.common.selector.TopicSelector` interface:
+
+```java
+public interface TopicSelector extends Serializable {
+    String getTopic(ITuple tuple);
+    String getTag(ITuple tuple);
+}
+```
+
+### DefaultTopicSelector/FieldNameBasedTopicSelector
+`storm-rocketmq` includes general purpose `TopicSelector` implementations called `DefaultTopicSelector` and `FieldNameBasedTopicSelector`.
+
+
+### RocketMQBolt
+To use the `RocketMQBolt`, you construct an instance of it by specifying TupleToMessageMapper, TopicSelector and Properties instances.
+RocketMQBolt send messages async by default. You can change this by invoking `withAsync(false)`.
+
+ ```java
+        TupleToMessageMapper mapper = new FieldNameBasedTupleToMessageMapper("word", "count");
+        TopicSelector selector = new DefaultTopicSelector(topic);
+
+        properties = new Properties();
+        properties.setProperty(RocketMQConfig.NAME_SERVER_ADDR, nameserverAddr);
+
+        RocketMQBolt insertBolt = new RocketMQBolt()
+                .withMapper(mapper)
+                .withSelector(selector)
+                .withProperties(properties);
+ ```
+
+### Trident State
+We support trident persistent state that can be used with trident topologies. To create a RocketMQ persistent trident state you need to initialize it with the TupleToMessageMapper, TopicSelector, Properties instances. See the example below:
+
+ ```java
+        TupleToMessageMapper mapper = new FieldNameBasedTupleToMessageMapper("word", "count");
+        TopicSelector selector = new DefaultTopicSelector(topic);
+
+        Properties properties = new Properties();
+        properties.setProperty(RocketMQConfig.NAME_SERVER_ADDR, nameserverAddr);
+
+        RocketMQState.Options options = new RocketMQState.Options()
+                .withMapper(mapper)
+                .withSelector(selector)
+                .withProperties(properties);
+
+        StateFactory factory = new RocketMQStateFactory(options);
+
+        TridentTopology topology = new TridentTopology();
+        Stream stream = topology.newStream("spout1", spout);
+
+        stream.partitionPersist(factory, fields,
+                new RocketMQStateUpdater(), new Fields());
+ ```
+
+
+## License
+
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+## Committer Sponsors
+
+ * Xin Wang ([xinwang@apache.org](mailto:xinwang@apache.org))
+

http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/external/storm-rocketmq/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/pom.xml b/external/storm-rocketmq/pom.xml
new file mode 100644
index 0000000..6068988
--- /dev/null
+++ b/external/storm-rocketmq/pom.xml
@@ -0,0 +1,71 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>2.0.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>storm-rocketmq</artifactId>
+    <name>storm-rocketmq</name>
+
+    <packaging>jar</packaging>
+
+    <developers>
+        <developer>
+            <id>vesense</id>
+            <name>Xin Wang</name>
+            <email>xinwang@apache.org</email>
+        </developer>
+    </developers>
+
+    <properties>
+        <rocketmq.version>4.0.0-incubating</rocketmq.version>
+    </properties>
+
+    <dependencies>
+        <!--parent module dependency-->
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-client</artifactId>
+            <version>${project.version}</version>
+            <scope>${provided.scope}</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-client</artifactId>
+            <version>${rocketmq.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-lang</groupId>
+            <artifactId>commons-lang</artifactId>
+        </dependency>
+        <!--test dependencies -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/DefaultMessageBodySerializer.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/DefaultMessageBodySerializer.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/DefaultMessageBodySerializer.java
new file mode 100644
index 0000000..5e7e314
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/DefaultMessageBodySerializer.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq;
+
+import java.nio.charset.StandardCharsets;
+
+public class DefaultMessageBodySerializer implements MessageBodySerializer {
+
+    /**
+     * Currently, we just convert string to bytes using UTF-8 charset.
+     * Note: in this way, object.toString() method is invoked.
+     * @param body RocketMQ Message body
+     * @return
+     */
+    @Override
+    public byte[] serialize(Object body) {
+        if (body == null) {
+            return null;
+        }
+        return body.toString().getBytes(StandardCharsets.UTF_8);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/DefaultMessageRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/DefaultMessageRetryManager.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/DefaultMessageRetryManager.java
new file mode 100644
index 0000000..9d540d9
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/DefaultMessageRetryManager.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq;
+
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * An implementation of MessageRetryManager
+ */
+public class DefaultMessageRetryManager implements MessageRetryManager{
+    private Map<String,MessageSet> cache = new ConcurrentHashMap<>(500);
+    private BlockingQueue<MessageSet> queue;
+    private int maxRetry;
+    private int ttl;
+
+    public DefaultMessageRetryManager(BlockingQueue<MessageSet> queue, int maxRetry, int ttl) {
+        this.queue = queue;
+        this.maxRetry = maxRetry;
+        this.ttl = ttl;
+
+        long period = 5000;
+        new Timer().scheduleAtFixedRate(new TimerTask() {
+            @Override
+            public void run() {
+                long now = System.currentTimeMillis();
+                for (Map.Entry<String, MessageSet> entry : cache.entrySet()) {
+                    String id = entry.getKey();
+                    MessageSet messageSet = entry.getValue();
+                    if (now - messageSet.getTimestamp() >= ttl) { // no ack/fail received in ttl
+                        fail(id);
+                    }
+                }
+            }
+        }, period, period);
+    }
+
+    public void ack(String id) {
+        cache.remove(id);
+    }
+
+    public void fail(String id) {
+        MessageSet messageSet = cache.remove(id);
+        if (messageSet == null) {
+            return;
+        }
+
+        if (needRetry(messageSet)) {
+            messageSet.setRetries(messageSet.getRetries() + 1);
+            messageSet.setTimestamp(0);
+            queue.offer(messageSet);
+        }
+    }
+
+    public void mark(MessageSet messageSet) {
+        messageSet.setTimestamp(System.currentTimeMillis());
+        cache.put(messageSet.getId(), messageSet);
+    }
+
+    public boolean needRetry(MessageSet messageSet) {
+        return messageSet.getRetries() < maxRetry;
+    }
+
+    // just for testing
+    public void setCache(Map<String,MessageSet> cache) {
+        this.cache = cache;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageBodySerializer.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageBodySerializer.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageBodySerializer.java
new file mode 100644
index 0000000..f86ea26
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageBodySerializer.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq;
+
+import java.io.Serializable;
+
+/**
+ * RocketMQ message body serializer
+ */
+public interface MessageBodySerializer extends Serializable{
+    byte[] serialize(Object body);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageRetryManager.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageRetryManager.java
new file mode 100644
index 0000000..18fd903
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageRetryManager.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq;
+
+/**
+ * Interface for messages retry manager
+ */
+public interface MessageRetryManager {
+    /**
+     * Remove from the cache. Message with the id is successful.
+     * @param id
+     */
+    void ack(String id);
+
+    /**
+     * Remove from the cache. Message with the id is failed.
+     * Invoke retry logics if necessary.
+     * @param id
+     */
+    void fail(String id);
+
+    /**
+     * Mark messageSet in the cache.
+     * @param messageSet
+     */
+    void mark(MessageSet messageSet);
+
+    /**
+     * Whether the messageSet need retry.
+     * @param messageSet
+     * @return
+     */
+    boolean needRetry(MessageSet messageSet);
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageSet.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageSet.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageSet.java
new file mode 100644
index 0000000..7307271
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageSet.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq;
+
+import org.apache.rocketmq.common.message.MessageExt;
+
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * A message collection.
+ */
+public class MessageSet {
+    private String id;
+    private List<MessageExt> data;
+    private long timestamp;
+    private int retries;
+
+    public MessageSet(String id, List<MessageExt> data) {
+        this.id = id;
+        this.data = data;
+    }
+
+    public MessageSet(List<MessageExt> data) {
+        this(UUID.randomUUID().toString(), data);
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public List<MessageExt> getData() {
+        return data;
+    }
+
+    public long getTimestamp() {
+        return timestamp;
+    }
+
+    public void setTimestamp(long timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    public int getRetries() {
+        return retries;
+    }
+
+    public void setRetries(int retries) {
+        this.retries = retries;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQConfig.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQConfig.java
new file mode 100644
index 0000000..082822c
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQConfig.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq;
+
+import org.apache.commons.lang.Validate;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.storm.task.TopologyContext;
+
+import java.util.Properties;
+import java.util.UUID;
+
+import static org.apache.storm.rocketmq.RocketMQUtils.getInteger;
+
+/**
+ * RocketMQConfig for Consumer/Producer
+ */
+public class RocketMQConfig {
+    // common
+    public static final String NAME_SERVER_ADDR = "nameserver.addr"; // Required
+
+    public static final String CLIENT_NAME = "client.name";
+
+    public static final String CLIENT_IP = "client.ip";
+    public static final String DEFAULT_CLIENT_IP = RemotingUtil.getLocalAddress();
+
+    public static final String CLIENT_CALLBACK_EXECUTOR_THREADS = "client.callback.executor.threads";
+    public static final int DEFAULT_CLIENT_CALLBACK_EXECUTOR_THREADS = Runtime.getRuntime().availableProcessors();;
+
+    public static final String NAME_SERVER_POLL_INTERVAL = "nameserver.poll.interval";
+    public static final int DEFAULT_NAME_SERVER_POLL_INTERVAL = 30000; // 30 seconds
+
+    public static final String BROKER_HEART_BEAT_INTERVAL = "brokerserver.heartbeat.interval";
+    public static final int DEFAULT_BROKER_HEART_BEAT_INTERVAL = 30000; // 30 seconds
+
+
+    // producer
+    public static final String PRODUCER_GROUP = "producer.group";
+
+    public static final String PRODUCER_RETRY_TIMES = "producer.retry.times";
+    public static final int DEFAULT_PRODUCER_RETRY_TIMES = 2;
+
+    public static final String PRODUCER_TIMEOUT = "producer.timeout";
+    public static final int DEFAULT_PRODUCER_TIMEOUT = 3000; // 3 seconds
+
+
+    // consumer
+    public static final String CONSUMER_GROUP = "consumer.group"; // Required
+
+    public static final String CONSUMER_TOPIC = "consumer.topic"; // Required
+
+    public static final String CONSUMER_TAG = "consumer.tag";
+    public static final String DEFAULT_TAG = "*";
+
+    public static final String CONSUMER_OFFSET_RESET_TO = "consumer.offset.reset.to";
+    public static final String CONSUMER_OFFSET_LATEST = "latest";
+    public static final String CONSUMER_OFFSET_EARLIEST = "earliest";
+    public static final String CONSUMER_OFFSET_TIMESTAMP = "timestamp";
+
+    public static final String CONSUMER_MESSAGES_ORDERLY = "consumer.messages.orderly";
+
+    public static final String CONSUMER_OFFSET_PERSIST_INTERVAL = "consumer.offset.persist.interval";
+    public static final int DEFAULT_CONSUMER_OFFSET_PERSIST_INTERVAL = 5000; // 5 seconds
+
+    public static final String CONSUMER_MIN_THREADS = "consumer.min.threads";
+    public static final int DEFAULT_CONSUMER_MIN_THREADS = 20;
+
+    public static final String CONSUMER_MAX_THREADS = "consumer.max.threads";
+    public static final int DEFAULT_CONSUMER_MAX_THREADS = 64;
+
+
+    public static void buildProducerConfigs(Properties props, DefaultMQProducer producer, TopologyContext context) {
+        buildCommonConfigs(props, producer, context);
+
+        // According to the RocketMQ official docs, "only one instance is allowed per producer group"
+        // So, we use taskID/UUID as the producer group by default
+        String defaultGroup;
+        if (context != null) {
+            defaultGroup = String.valueOf(context.getThisTaskId());
+        } else {
+            defaultGroup = UUID.randomUUID().toString();
+        }
+        producer.setProducerGroup(props.getProperty(PRODUCER_GROUP, defaultGroup));
+
+        producer.setRetryTimesWhenSendFailed(getInteger(props,
+                PRODUCER_RETRY_TIMES, DEFAULT_PRODUCER_RETRY_TIMES));
+        producer.setRetryTimesWhenSendAsyncFailed(getInteger(props,
+                PRODUCER_RETRY_TIMES, DEFAULT_PRODUCER_RETRY_TIMES));
+        producer.setSendMsgTimeout(getInteger(props,
+                PRODUCER_TIMEOUT, DEFAULT_PRODUCER_TIMEOUT));
+    }
+
+    public static void buildConsumerConfigs(Properties props, DefaultMQPushConsumer consumer, TopologyContext context) {
+        buildCommonConfigs(props, consumer, context);
+
+        String group = props.getProperty(CONSUMER_GROUP);
+        Validate.notEmpty(group);
+        consumer.setConsumerGroup(group);
+
+        consumer.setPersistConsumerOffsetInterval(getInteger(props,
+                CONSUMER_OFFSET_PERSIST_INTERVAL, DEFAULT_CONSUMER_OFFSET_PERSIST_INTERVAL));
+        consumer.setConsumeThreadMin(getInteger(props,
+                CONSUMER_MIN_THREADS, DEFAULT_CONSUMER_MIN_THREADS));
+        consumer.setConsumeThreadMax(getInteger(props,
+                CONSUMER_MAX_THREADS, DEFAULT_CONSUMER_MAX_THREADS));
+
+        String initOffset = props.getProperty(CONSUMER_OFFSET_RESET_TO, CONSUMER_OFFSET_LATEST);
+        switch (initOffset) {
+            case CONSUMER_OFFSET_EARLIEST:
+                consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+                break;
+            case CONSUMER_OFFSET_LATEST:
+                consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
+                break;
+            case CONSUMER_OFFSET_TIMESTAMP:
+                consumer.setConsumeTimestamp(initOffset);
+                break;
+            default:
+                consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
+        }
+
+        String topic = props.getProperty(CONSUMER_TOPIC);
+        Validate.notEmpty(topic);
+        try {
+            consumer.subscribe(topic, props.getProperty(CONSUMER_TAG, DEFAULT_TAG));
+        } catch (MQClientException e) {
+            throw new IllegalArgumentException(e);
+        }
+    }
+
+    public static void buildCommonConfigs(Properties props, ClientConfig client, TopologyContext context) {
+        String namesvr = props.getProperty(NAME_SERVER_ADDR);
+        Validate.notEmpty(namesvr);
+        client.setNamesrvAddr(namesvr);
+
+        client.setClientIP(props.getProperty(CLIENT_IP, DEFAULT_CLIENT_IP));
+        // use taskID/UUID for client name by default
+        String defaultClientName;
+        if (context != null) {
+            defaultClientName = String.valueOf(context.getThisTaskId());
+        } else {
+            defaultClientName = UUID.randomUUID().toString();
+        }
+        client.setInstanceName(props.getProperty(CLIENT_NAME, defaultClientName));
+
+        client.setClientCallbackExecutorThreads(getInteger(props,
+                CLIENT_CALLBACK_EXECUTOR_THREADS, DEFAULT_CLIENT_CALLBACK_EXECUTOR_THREADS));
+        client.setPollNameServerInteval(getInteger(props,
+                NAME_SERVER_POLL_INTERVAL, DEFAULT_NAME_SERVER_POLL_INTERVAL));
+        client.setHeartbeatBrokerInterval(getInteger(props,
+                BROKER_HEART_BEAT_INTERVAL, DEFAULT_BROKER_HEART_BEAT_INTERVAL));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQUtils.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQUtils.java
new file mode 100644
index 0000000..dbe6b12
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQUtils.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq;
+
+import org.apache.rocketmq.common.message.Message;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Properties;
+
+public final class RocketMQUtils {
+
+    public static int getInteger(Properties props, String key, int defaultValue) {
+        return Integer.parseInt(props.getProperty(key, String.valueOf(defaultValue)));
+    }
+
+    public static boolean getBoolean(Properties props, String key, boolean defaultValue) {
+        return Boolean.parseBoolean(props.getProperty(key, String.valueOf(defaultValue)));
+    }
+
+    public static String getUtf8StringBody(Message message) {
+        if (message == null) {
+            return null;
+        }
+        return new String(message.getBody(), StandardCharsets.UTF_8);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/SpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/SpoutConfig.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/SpoutConfig.java
new file mode 100644
index 0000000..53ce152
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/SpoutConfig.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq;
+
+public class SpoutConfig extends RocketMQConfig {
+    public static final String QUEUE_SIZE = "spout.queue.size";
+
+    public static final String DECLARE_FIELDS = "spout.declare.fields";
+    public static final String DEFAULT_DECLARE_FIELDS = "msgs";
+
+    public static final String MESSAGES_MAX_RETRY = "spout.messages.max.retry";
+    public static final int DEFAULT_MESSAGES_MAX_RETRY = 3;
+
+    public static final String MESSAGES_TTL = "spout.messages.ttl";
+    public static final int DEFAULT_MESSAGES_TTL = 300000;  // 5min
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/bolt/RocketMQBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/bolt/RocketMQBolt.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/bolt/RocketMQBolt.java
new file mode 100644
index 0000000..d55babe
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/bolt/RocketMQBolt.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq.bolt;
+
+import org.apache.commons.lang.Validate;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.MQProducer;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.storm.rocketmq.RocketMQConfig;
+import org.apache.storm.rocketmq.common.mapper.TupleToMessageMapper;
+import org.apache.storm.rocketmq.common.selector.TopicSelector;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Properties;
+
+public class RocketMQBolt implements IRichBolt {
+    private static final Logger LOG = LoggerFactory.getLogger(RocketMQBolt.class);
+
+    private MQProducer producer;
+    private OutputCollector collector;
+    private boolean async = true;
+    private TopicSelector selector;
+    private TupleToMessageMapper mapper;
+    private Properties properties;
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        Validate.notEmpty(properties, "Producer properties can not be empty");
+
+        producer = new DefaultMQProducer();
+        RocketMQConfig.buildProducerConfigs(properties, (DefaultMQProducer)producer, context);
+
+        try {
+            producer.start();
+        } catch (MQClientException e) {
+            throw new RuntimeException(e);
+        }
+        this.collector = collector;
+
+        Validate.notNull(selector, "TopicSelector can not be null");
+        Validate.notNull(mapper, "TupleToMessageMapper can not be null");
+    }
+
+    public RocketMQBolt withSelector(TopicSelector selector) {
+        this.selector = selector;
+        return this;
+    }
+
+    public RocketMQBolt withMapper(TupleToMessageMapper mapper) {
+        this.mapper = mapper;
+        return this;
+    }
+
+    public RocketMQBolt withAsync(boolean async) {
+        this.async = async;
+        return this;
+    }
+
+    public RocketMQBolt withProperties(Properties properties) {
+        this.properties = properties;
+        return this;
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        // Mapping: from storm tuple -> rocketmq Message
+        String topic = selector.getTopic(input);
+        String tag = selector.getTag(input);
+        String key = mapper.getKeyFromTuple(input);
+        byte[] value = mapper.getValueFromTuple(input);
+
+        if (topic == null) {
+            LOG.warn("skipping Message with Key = " + key + ", topic selector returned null.");
+            collector.ack(input);
+            return;
+        }
+
+        Message msg = new Message(topic,tag, key, value);
+
+        try {
+            if (async) {
+                // async sending
+                producer.send(msg, new SendCallback() {
+                    @Override
+                    public void onSuccess(SendResult sendResult) {
+                        collector.ack(input);
+                    }
+
+                    @Override
+                    public void onException(Throwable throwable) {
+                        if (throwable != null) {
+                            collector.reportError(throwable);
+                            collector.fail(input);
+                        }
+                    }
+                });
+            } else {
+                // sync sending, will return a SendResult
+                producer.send(msg);
+                collector.ack(input);
+            }
+        } catch (Exception e) {
+            collector.reportError(e);
+            collector.fail(input);
+        }
+
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return null;
+    }
+
+    @Override
+    public void cleanup() {
+        producer.shutdown();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/mapper/FieldNameBasedTupleToMessageMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/mapper/FieldNameBasedTupleToMessageMapper.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/mapper/FieldNameBasedTupleToMessageMapper.java
new file mode 100644
index 0000000..622cbdb
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/mapper/FieldNameBasedTupleToMessageMapper.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq.common.mapper;
+
+import org.apache.storm.rocketmq.DefaultMessageBodySerializer;
+import org.apache.storm.rocketmq.MessageBodySerializer;
+import org.apache.storm.tuple.ITuple;
+
+import java.nio.charset.StandardCharsets;
+
+public class FieldNameBasedTupleToMessageMapper implements TupleToMessageMapper {
+    public static final String BOLT_KEY = "key";
+    public static final String BOLT_MESSAGE = "message";
+    public String boltKeyField;
+    public String boltMessageField;
+    private MessageBodySerializer messageBodySerializer;
+
+    public FieldNameBasedTupleToMessageMapper() {
+        this(BOLT_KEY, BOLT_MESSAGE);
+    }
+
+    public FieldNameBasedTupleToMessageMapper(String boltKeyField, String boltMessageField) {
+        this.boltKeyField = boltKeyField;
+        this.boltMessageField = boltMessageField;
+        this.messageBodySerializer = new DefaultMessageBodySerializer();
+    }
+
+    @Override
+    public String getKeyFromTuple(ITuple tuple) {
+        return tuple.getStringByField(boltKeyField);
+    }
+
+    @Override
+    public byte[] getValueFromTuple(ITuple tuple) {
+        Object obj = tuple.getValueByField(boltMessageField);
+        if (obj == null) {
+            return null;
+        }
+        return messageBodySerializer.serialize(obj);
+    }
+
+    /**
+     * using this method can override the default  MessageBodySerializer
+     * @param serializer
+     * @return
+     */
+    public FieldNameBasedTupleToMessageMapper withMessageBodySerializer(MessageBodySerializer serializer) {
+        this.messageBodySerializer = serializer;
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/mapper/TupleToMessageMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/mapper/TupleToMessageMapper.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/mapper/TupleToMessageMapper.java
new file mode 100644
index 0000000..84ff4a2
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/mapper/TupleToMessageMapper.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq.common.mapper;
+
+import org.apache.storm.tuple.ITuple;
+
+import java.io.Serializable;
+
+/**
+ * Interface defining a mapping from storm tuple to rocketmq key and message.
+ */
+public interface TupleToMessageMapper extends Serializable {
+    String getKeyFromTuple(ITuple tuple);
+    byte[] getValueFromTuple(ITuple tuple);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/DefaultTopicSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/DefaultTopicSelector.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/DefaultTopicSelector.java
new file mode 100644
index 0000000..5332036
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/DefaultTopicSelector.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq.common.selector;
+
+import org.apache.storm.rocketmq.RocketMQConfig;
+import org.apache.storm.tuple.ITuple;
+
+public class DefaultTopicSelector implements TopicSelector {
+    private final String topicName;
+    private final String tagName;
+
+    public DefaultTopicSelector(final String topicName, final String tagName) {
+        this.topicName = topicName;
+        this.tagName = tagName;
+    }
+
+    public DefaultTopicSelector(final String topicName) {
+        this(topicName, RocketMQConfig.DEFAULT_TAG);
+    }
+
+    @Override
+    public String getTopic(ITuple tuple) {
+        return topicName;
+    }
+
+    @Override
+    public String getTag(ITuple tuple) {
+        return tagName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/FieldNameBasedTopicSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/FieldNameBasedTopicSelector.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/FieldNameBasedTopicSelector.java
new file mode 100644
index 0000000..60865c1
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/FieldNameBasedTopicSelector.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq.common.selector;
+
+import org.apache.storm.tuple.ITuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Uses field name to select topic and tag name from tuple.
+ */
+public class FieldNameBasedTopicSelector implements TopicSelector {
+    private static final Logger LOG = LoggerFactory.getLogger(FieldNameBasedTopicSelector.class);
+
+    private final String topicFieldName;
+    private final String defaultTopicName;
+
+    private final String tagFieldName;
+    private final String defaultTagName;
+
+
+    public FieldNameBasedTopicSelector(String topicFieldName, String defaultTopicName, String tagFieldName, String defaultTagName) {
+        this.topicFieldName = topicFieldName;
+        this.defaultTopicName = defaultTopicName;
+        this.tagFieldName = tagFieldName;
+        this.defaultTagName = defaultTagName;
+    }
+
+    @Override
+    public String getTopic(ITuple tuple) {
+        if (tuple.contains(topicFieldName)) {
+            return tuple.getStringByField(topicFieldName);
+        } else {
+            LOG.warn("Field {} Not Found. Returning default topic {}", topicFieldName, defaultTopicName);
+            return defaultTopicName;
+        }
+    }
+
+    @Override
+    public String getTag(ITuple tuple) {
+        if (tuple.contains(tagFieldName)) {
+            return tuple.getStringByField(tagFieldName);
+        } else {
+            LOG.warn("Field {} Not Found. Returning default tag {}", tagFieldName, defaultTagName);
+            return defaultTagName;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/TopicSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/TopicSelector.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/TopicSelector.java
new file mode 100644
index 0000000..f33d4a6
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/TopicSelector.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq.common.selector;
+
+import org.apache.storm.tuple.ITuple;
+
+import java.io.Serializable;
+
+public interface TopicSelector extends Serializable {
+    String getTopic(ITuple tuple);
+    String getTag(ITuple tuple);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMQSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMQSpout.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMQSpout.java
new file mode 100644
index 0000000..c8a0802
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMQSpout.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq.spout;
+
+import org.apache.commons.lang.Validate;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.MQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.storm.Config;
+import org.apache.storm.rocketmq.DefaultMessageRetryManager;
+import org.apache.storm.rocketmq.MessageRetryManager;
+import org.apache.storm.rocketmq.MessageSet;
+import org.apache.storm.rocketmq.RocketMQConfig;
+import org.apache.storm.rocketmq.SpoutConfig;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.apache.storm.rocketmq.RocketMQUtils.getBoolean;
+import static org.apache.storm.rocketmq.RocketMQUtils.getInteger;
+
+/**
+ * RocketMQSpout uses MQPushConsumer as the default implementation.
+ * PushConsumer is a high level consumer API, wrapping the pulling details
+ * Looks like broker push messages to consumer
+ */
+public class RocketMQSpout implements IRichSpout {
+    // TODO add metrics
+
+    private MQPushConsumer consumer;
+    private SpoutOutputCollector collector;
+    private BlockingQueue<MessageSet> queue;
+
+    private Properties properties;
+    private MessageRetryManager messageRetryManager;
+
+    public RocketMQSpout(Properties properties) {
+        this.properties = properties;
+    }
+
+    @Override
+    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+        Validate.notEmpty(properties, "Consumer properties can not be empty");
+        boolean ordered = getBoolean(properties, RocketMQConfig.CONSUMER_MESSAGES_ORDERLY, false);
+
+        int queueSize = getInteger(properties, SpoutConfig.QUEUE_SIZE, Integer.parseInt(conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING).toString()));
+        queue = new LinkedBlockingQueue<>(queueSize);
+
+        consumer = new DefaultMQPushConsumer();
+        RocketMQConfig.buildConsumerConfigs(properties, (DefaultMQPushConsumer)consumer, context);
+
+        if (ordered) {
+            consumer.registerMessageListener(new MessageListenerOrderly() {
+                @Override
+                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
+                                                           ConsumeOrderlyContext context) {
+                    if (process(msgs)) {
+                        return ConsumeOrderlyStatus.SUCCESS;
+                    } else {
+                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
+                    }
+                }
+            });
+        } else {
+            consumer.registerMessageListener(new MessageListenerConcurrently() {
+                @Override
+                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
+                                                                ConsumeConcurrentlyContext context) {
+                    if (process(msgs)) {
+                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+                    } else {
+                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
+                    }
+                }
+            });
+        }
+
+        try {
+            consumer.start();
+        } catch (MQClientException e) {
+            throw new RuntimeException(e);
+        }
+
+        int maxRetry = getInteger(properties, SpoutConfig.MESSAGES_MAX_RETRY, SpoutConfig.DEFAULT_MESSAGES_MAX_RETRY);
+        int ttl = getInteger(properties, SpoutConfig.MESSAGES_TTL, SpoutConfig.DEFAULT_MESSAGES_TTL);
+        this.messageRetryManager = new DefaultMessageRetryManager(queue, maxRetry, ttl);
+        this.collector = collector;
+    }
+
+    public boolean process(List<MessageExt> msgs) {
+        if (msgs.isEmpty()) {
+            return true;
+        }
+        MessageSet messageSet = new MessageSet(msgs);
+        // returning true upon success and false if this queue is full.
+        return queue.offer(messageSet);
+    }
+
+    @Override
+    public void nextTuple() {
+        MessageSet messageSet = queue.poll();
+        if (messageSet == null) {
+            return;
+        }
+
+        messageRetryManager.mark(messageSet);
+        collector.emit(new Values(messageSet.getData()), messageSet.getId());
+    }
+
+    @Override
+    public void ack(Object msgId) {
+        String id = msgId.toString();
+        messageRetryManager.ack(id);
+    }
+
+    @Override
+    public void fail(Object msgId) {
+        String id = msgId.toString();
+        messageRetryManager.fail(id);
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields(properties.getProperty(SpoutConfig.DECLARE_FIELDS, SpoutConfig.DEFAULT_DECLARE_FIELDS)));
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return null;
+    }
+
+    @Override
+    public void close() {
+        consumer.shutdown();
+    }
+
+    @Override
+    public void activate() {
+        consumer.resume();
+    }
+
+    @Override
+    public void deactivate() {
+        consumer.suspend();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQState.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQState.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQState.java
new file mode 100644
index 0000000..7e5c078
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQState.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq.trident.state;
+
+import org.apache.commons.lang.Validate;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.MQProducer;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.storm.rocketmq.RocketMQConfig;
+import org.apache.storm.rocketmq.common.mapper.TupleToMessageMapper;
+import org.apache.storm.rocketmq.common.selector.TopicSelector;
+import org.apache.storm.topology.FailedException;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+public class RocketMQState implements State {
+
+    private static final Logger LOG = LoggerFactory.getLogger(RocketMQState.class);
+
+    private Options options;
+    private MQProducer producer;
+
+    protected RocketMQState(Map map, Options options) {
+        this.options = options;
+    }
+
+    public static class Options implements Serializable {
+        private TopicSelector selector;
+        private TupleToMessageMapper mapper;
+        private Properties properties;
+
+        public Options withSelector(TopicSelector selector) {
+            this.selector = selector;
+            return this;
+        }
+
+        public Options withMapper(TupleToMessageMapper mapper) {
+            this.mapper = mapper;
+            return this;
+        }
+
+        public Options withProperties(Properties properties) {
+            this.properties = properties;
+            return this;
+        }
+    }
+
+    protected void prepare() {
+        Validate.notEmpty(options.properties, "Producer properties can not be empty");
+
+        producer = new DefaultMQProducer();
+        RocketMQConfig.buildProducerConfigs(options.properties, (DefaultMQProducer)producer, null);
+
+        try {
+            producer.start();
+        } catch (MQClientException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void beginCommit(Long txid) {
+        LOG.debug("beginCommit is noop.");
+    }
+
+    @Override
+    public void commit(Long txid) {
+        LOG.debug("commit is noop.");
+    }
+
+    public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
+        try {
+            for (TridentTuple tuple : tuples) {
+                String topic = options.selector.getTopic(tuple);
+                String tag = options.selector.getTag(tuple);
+                String key = options.mapper.getKeyFromTuple(tuple);
+                byte[] value = options.mapper.getValueFromTuple(tuple);
+
+                if (topic == null) {
+                    LOG.warn("skipping Message with Key = " + key + ", topic selector returned null.");
+                    continue;
+                }
+
+                Message msg = new Message(topic,tag, key, value);
+                this.producer.send(msg);
+            }
+        } catch (Exception e) {
+            LOG.warn("Batch write failed but some requests might have succeeded. Triggering replay.", e);
+            throw new FailedException(e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQStateFactory.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQStateFactory.java
new file mode 100644
index 0000000..82eb013
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQStateFactory.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq.trident.state;
+
+import org.apache.storm.task.IMetricsContext;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.state.StateFactory;
+
+import java.util.Map;
+
+public class RocketMQStateFactory implements StateFactory {
+
+    private RocketMQState.Options options;
+
+    public RocketMQStateFactory(RocketMQState.Options options) {
+        this.options = options;
+    }
+
+    @Override
+    public State makeState(Map conf, IMetricsContext metrics,
+            int partitionIndex, int numPartitions) {
+        RocketMQState state = new RocketMQState(conf, options);
+        state.prepare();
+        return state;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQStateUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQStateUpdater.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQStateUpdater.java
new file mode 100644
index 0000000..a548ce8
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQStateUpdater.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq.trident.state;
+
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.BaseStateUpdater;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.util.List;
+
+public class RocketMQStateUpdater extends BaseStateUpdater<RocketMQState>  {
+
+    @Override
+    public void updateState(RocketMQState state, List<TridentTuple> tuples,
+                            TridentCollector collector) {
+        state.updateState(tuples, collector);
+    }
+
+}


[3/5] storm git commit: STORM-2349: sharing a single producer/consumer instance across threads

Posted by xi...@apache.org.
STORM-2349: sharing a single producer/consumer instance across threads


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9bacd977
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9bacd977
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9bacd977

Branch: refs/heads/master
Commit: 9bacd977f27aa3f9358c7c9adaf7b04191a15cb8
Parents: 4a4a8d2
Author: vesense <be...@163.com>
Authored: Thu Apr 13 21:39:44 2017 +0800
Committer: vesense <be...@163.com>
Committed: Thu Apr 13 21:39:44 2017 +0800

----------------------------------------------------------------------
 .../apache/storm/rocketmq/RocketMQConfig.java   | 30 +++-----
 .../storm/rocketmq/bolt/RocketMQBolt.java       | 30 +++++---
 .../storm/rocketmq/spout/RocketMQSpout.java     | 80 +++++++++++---------
 .../rocketmq/trident/state/RocketMQState.java   |  2 +-
 4 files changed, 78 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/9bacd977/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQConfig.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQConfig.java
index 082822c..fcf6ff4 100644
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQConfig.java
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQConfig.java
@@ -24,7 +24,6 @@ import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 import org.apache.rocketmq.remoting.common.RemotingUtil;
-import org.apache.storm.task.TopologyContext;
 
 import java.util.Properties;
 import java.util.UUID;
@@ -88,17 +87,12 @@ public class RocketMQConfig {
     public static final int DEFAULT_CONSUMER_MAX_THREADS = 64;
 
 
-    public static void buildProducerConfigs(Properties props, DefaultMQProducer producer, TopologyContext context) {
-        buildCommonConfigs(props, producer, context);
+    public static void buildProducerConfigs(Properties props, DefaultMQProducer producer) {
+        buildCommonConfigs(props, producer);
 
         // According to the RocketMQ official docs, "only one instance is allowed per producer group"
-        // So, we use taskID/UUID as the producer group by default
-        String defaultGroup;
-        if (context != null) {
-            defaultGroup = String.valueOf(context.getThisTaskId());
-        } else {
-            defaultGroup = UUID.randomUUID().toString();
-        }
+        // So, we use UUID as the producer group by default, to allow many producer instances for one topic
+        String defaultGroup = UUID.randomUUID().toString();
         producer.setProducerGroup(props.getProperty(PRODUCER_GROUP, defaultGroup));
 
         producer.setRetryTimesWhenSendFailed(getInteger(props,
@@ -109,8 +103,8 @@ public class RocketMQConfig {
                 PRODUCER_TIMEOUT, DEFAULT_PRODUCER_TIMEOUT));
     }
 
-    public static void buildConsumerConfigs(Properties props, DefaultMQPushConsumer consumer, TopologyContext context) {
-        buildCommonConfigs(props, consumer, context);
+    public static void buildConsumerConfigs(Properties props, DefaultMQPushConsumer consumer) {
+        buildCommonConfigs(props, consumer);
 
         String group = props.getProperty(CONSUMER_GROUP);
         Validate.notEmpty(group);
@@ -147,19 +141,15 @@ public class RocketMQConfig {
         }
     }
 
-    public static void buildCommonConfigs(Properties props, ClientConfig client, TopologyContext context) {
+    public static void buildCommonConfigs(Properties props, ClientConfig client) {
         String namesvr = props.getProperty(NAME_SERVER_ADDR);
         Validate.notEmpty(namesvr);
         client.setNamesrvAddr(namesvr);
 
         client.setClientIP(props.getProperty(CLIENT_IP, DEFAULT_CLIENT_IP));
-        // use taskID/UUID for client name by default
-        String defaultClientName;
-        if (context != null) {
-            defaultClientName = String.valueOf(context.getThisTaskId());
-        } else {
-            defaultClientName = UUID.randomUUID().toString();
-        }
+        // According to the RocketMQ official docs, "only one instance is allowed per machine"
+        // So, we use UUID as the client name by default, to allow RocketMQ spout/bolt instances in one machine.
+        String defaultClientName = UUID.randomUUID().toString();
         client.setInstanceName(props.getProperty(CLIENT_NAME, defaultClientName));
 
         client.setClientCallbackExecutorThreads(getInteger(props,

http://git-wip-us.apache.org/repos/asf/storm/blob/9bacd977/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/bolt/RocketMQBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/bolt/RocketMQBolt.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/bolt/RocketMQBolt.java
index d55babe..3f04d07 100644
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/bolt/RocketMQBolt.java
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/bolt/RocketMQBolt.java
@@ -41,7 +41,7 @@ import java.util.Properties;
 public class RocketMQBolt implements IRichBolt {
     private static final Logger LOG = LoggerFactory.getLogger(RocketMQBolt.class);
 
-    private MQProducer producer;
+    private static MQProducer producer;
     private OutputCollector collector;
     private boolean async = true;
     private TopicSelector selector;
@@ -52,14 +52,21 @@ public class RocketMQBolt implements IRichBolt {
     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
         Validate.notEmpty(properties, "Producer properties can not be empty");
 
-        producer = new DefaultMQProducer();
-        RocketMQConfig.buildProducerConfigs(properties, (DefaultMQProducer)producer, context);
-
-        try {
-            producer.start();
-        } catch (MQClientException e) {
-            throw new RuntimeException(e);
+        // Since RocketMQ Producer is thread-safe, RocketMQBolt uses a single
+        // producer instance across threads to improve the performance.
+        synchronized (RocketMQBolt.class) {
+            if (producer == null) {
+                producer = new DefaultMQProducer();
+                RocketMQConfig.buildProducerConfigs(properties, (DefaultMQProducer)producer);
+
+                try {
+                    producer.start();
+                } catch (MQClientException e) {
+                    throw new RuntimeException(e);
+                }
+            }
         }
+
         this.collector = collector;
 
         Validate.notNull(selector, "TopicSelector can not be null");
@@ -143,6 +150,11 @@ public class RocketMQBolt implements IRichBolt {
 
     @Override
     public void cleanup() {
-        producer.shutdown();
+        synchronized (RocketMQBolt.class) {
+            if (producer != null) {
+                producer.shutdown();
+                producer = null;
+            }
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/9bacd977/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMQSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMQSpout.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMQSpout.java
index c8a0802..74c8264 100644
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMQSpout.java
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMQSpout.java
@@ -40,6 +40,7 @@ import org.apache.storm.topology.IRichSpout;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.ObjectReader;
 
 import java.util.List;
 import java.util.Map;
@@ -58,7 +59,7 @@ import static org.apache.storm.rocketmq.RocketMQUtils.getInteger;
 public class RocketMQSpout implements IRichSpout {
     // TODO add metrics
 
-    private MQPushConsumer consumer;
+    private static MQPushConsumer consumer;
     private SpoutOutputCollector collector;
     private BlockingQueue<MessageSet> queue;
 
@@ -74,42 +75,48 @@ public class RocketMQSpout implements IRichSpout {
         Validate.notEmpty(properties, "Consumer properties can not be empty");
         boolean ordered = getBoolean(properties, RocketMQConfig.CONSUMER_MESSAGES_ORDERLY, false);
 
-        int queueSize = getInteger(properties, SpoutConfig.QUEUE_SIZE, Integer.parseInt(conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING).toString()));
+        int queueSize = getInteger(properties, SpoutConfig.QUEUE_SIZE, ObjectReader.getInt(conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING)));
         queue = new LinkedBlockingQueue<>(queueSize);
 
-        consumer = new DefaultMQPushConsumer();
-        RocketMQConfig.buildConsumerConfigs(properties, (DefaultMQPushConsumer)consumer, context);
-
-        if (ordered) {
-            consumer.registerMessageListener(new MessageListenerOrderly() {
-                @Override
-                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
-                                                           ConsumeOrderlyContext context) {
-                    if (process(msgs)) {
-                        return ConsumeOrderlyStatus.SUCCESS;
-                    } else {
-                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
-                    }
+        // Since RocketMQ Consumer is thread-safe, RocketMQSpout uses a single
+        // consumer instance across threads to improve the performance.
+        synchronized (RocketMQSpout.class) {
+            if (consumer == null) {
+                consumer = new DefaultMQPushConsumer();
+                RocketMQConfig.buildConsumerConfigs(properties, (DefaultMQPushConsumer)consumer);
+
+                if (ordered) {
+                    consumer.registerMessageListener(new MessageListenerOrderly() {
+                        @Override
+                        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
+                                                                   ConsumeOrderlyContext context) {
+                            if (process(msgs)) {
+                                return ConsumeOrderlyStatus.SUCCESS;
+                            } else {
+                                return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
+                            }
+                        }
+                    });
+                } else {
+                    consumer.registerMessageListener(new MessageListenerConcurrently() {
+                        @Override
+                        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
+                                                                        ConsumeConcurrentlyContext context) {
+                            if (process(msgs)) {
+                                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+                            } else {
+                                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
+                            }
+                        }
+                    });
                 }
-            });
-        } else {
-            consumer.registerMessageListener(new MessageListenerConcurrently() {
-                @Override
-                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
-                                                                ConsumeConcurrentlyContext context) {
-                    if (process(msgs)) {
-                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-                    } else {
-                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
-                    }
-                }
-            });
-        }
 
-        try {
-            consumer.start();
-        } catch (MQClientException e) {
-            throw new RuntimeException(e);
+                try {
+                    consumer.start();
+                } catch (MQClientException e) {
+                    throw new RuntimeException(e);
+                }
+            }
         }
 
         int maxRetry = getInteger(properties, SpoutConfig.MESSAGES_MAX_RETRY, SpoutConfig.DEFAULT_MESSAGES_MAX_RETRY);
@@ -162,7 +169,12 @@ public class RocketMQSpout implements IRichSpout {
 
     @Override
     public void close() {
-        consumer.shutdown();
+        synchronized (RocketMQSpout.class) {
+            if (consumer != null) {
+                consumer.shutdown();
+                consumer = null;
+            }
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/9bacd977/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQState.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQState.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQState.java
index 7e5c078..b6413ad 100644
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQState.java
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQState.java
@@ -73,7 +73,7 @@ public class RocketMQState implements State {
         Validate.notEmpty(options.properties, "Producer properties can not be empty");
 
         producer = new DefaultMQProducer();
-        RocketMQConfig.buildProducerConfigs(options.properties, (DefaultMQProducer)producer, null);
+        RocketMQConfig.buildProducerConfigs(options.properties, (DefaultMQProducer)producer);
 
         try {
             producer.start();