You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by xi...@apache.org on 2017/04/24 04:05:25 UTC

[4/5] storm git commit: STORM-2349: Add Scheme for RocketMQSpout to deserialize data

STORM-2349: Add Scheme for RocketMQSpout to deserialize data


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ded7a1e4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ded7a1e4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ded7a1e4

Branch: refs/heads/master
Commit: ded7a1e4bda4cf8f16faaa507246636d49bcdc92
Parents: 9bacd97
Author: vesense <be...@163.com>
Authored: Tue Apr 18 12:01:43 2017 +0800
Committer: vesense <be...@163.com>
Committed: Tue Apr 18 12:01:43 2017 +0800

----------------------------------------------------------------------
 .../storm/rocketmq/topology/WordCounter.java    |  26 ++--
 .../apache/storm/rocketmq/ConsumerMessage.java  |  60 +++++++++
 .../rocketmq/DefaultMessageRetryManager.java    |  42 +++---
 .../storm/rocketmq/MessageRetryManager.java     |  12 +-
 .../org/apache/storm/rocketmq/MessageSet.java   |  66 ---------
 .../apache/storm/rocketmq/RocketMQUtils.java    |  31 ++++-
 .../org/apache/storm/rocketmq/SpoutConfig.java  |   8 +-
 .../storm/rocketmq/spout/RocketMQSpout.java     | 135 +++++++++++--------
 .../rocketmq/spout/scheme/KeyValueScheme.java   |  27 ++++
 .../spout/scheme/StringKeyValueScheme.java      |  38 ++++++
 .../rocketmq/spout/scheme/StringScheme.java     |  48 +++++++
 .../storm/rocketmq/TestMessageRetryManager.java |  68 +++++-----
 12 files changed, 361 insertions(+), 200 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/ded7a1e4/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/topology/WordCounter.java
----------------------------------------------------------------------
diff --git a/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/topology/WordCounter.java b/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/topology/WordCounter.java
index 8bc62b9..327c11a 100644
--- a/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/topology/WordCounter.java
+++ b/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/topology/WordCounter.java
@@ -17,8 +17,6 @@
  */
 package org.apache.storm.rocketmq.topology;
 
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.storm.rocketmq.RocketMQUtils;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.BasicOutputCollector;
 import org.apache.storm.topology.IBasicBolt;
@@ -28,7 +26,6 @@ import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
 
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 public class WordCounter implements IBasicBolt {
@@ -39,21 +36,18 @@ public class WordCounter implements IBasicBolt {
     }
 
     public void execute(Tuple input, BasicOutputCollector collector) {
-        List<MessageExt> list = (List<MessageExt>)input.getValueByField("msgs");
-        for (MessageExt messageExt : list) {
-            String word = RocketMQUtils.getUtf8StringBody(messageExt);
+        String word = input.getStringByField("str");
 
-            int count;
-            if (wordCounter.containsKey(word)) {
-                count = wordCounter.get(word) + 1;
-                wordCounter.put(word, wordCounter.get(word) + 1);
-            } else {
-                count = 1;
-            }
-
-            wordCounter.put(word, count);
-            collector.emit(new Values(word, count));
+        int count;
+        if (wordCounter.containsKey(word)) {
+            count = wordCounter.get(word) + 1;
+            wordCounter.put(word, wordCounter.get(word) + 1);
+        } else {
+            count = 1;
         }
+
+        wordCounter.put(word, count);
+        collector.emit(new Values(word, count));
     }
 
     public void cleanup() {

http://git-wip-us.apache.org/repos/asf/storm/blob/ded7a1e4/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/ConsumerMessage.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/ConsumerMessage.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/ConsumerMessage.java
new file mode 100644
index 0000000..97afae1
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/ConsumerMessage.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq;
+
+import org.apache.rocketmq.common.message.MessageExt;
+
+public class ConsumerMessage {
+    private String id;
+    private MessageExt data;
+    private long timestamp;
+    private int retries;
+
+    public ConsumerMessage(String id, MessageExt data) {
+        this.id = id;
+        this.data = data;
+    }
+
+    public ConsumerMessage(MessageExt data) {
+        this(data.getMsgId(), data);
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public MessageExt getData() {
+        return data;
+    }
+
+    public long getTimestamp() {
+        return timestamp;
+    }
+
+    public void setTimestamp(long timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    public int getRetries() {
+        return retries;
+    }
+
+    public void setRetries(int retries) {
+        this.retries = retries;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/ded7a1e4/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/DefaultMessageRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/DefaultMessageRetryManager.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/DefaultMessageRetryManager.java
index 9d540d9..bcc7e99 100644
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/DefaultMessageRetryManager.java
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/DefaultMessageRetryManager.java
@@ -27,12 +27,12 @@ import java.util.concurrent.ConcurrentHashMap;
  * An implementation of MessageRetryManager
  */
 public class DefaultMessageRetryManager implements MessageRetryManager{
-    private Map<String,MessageSet> cache = new ConcurrentHashMap<>(500);
-    private BlockingQueue<MessageSet> queue;
+    private Map<String,ConsumerMessage> cache = new ConcurrentHashMap<>(500);
+    private BlockingQueue<ConsumerMessage> queue;
     private int maxRetry;
     private int ttl;
 
-    public DefaultMessageRetryManager(BlockingQueue<MessageSet> queue, int maxRetry, int ttl) {
+    public DefaultMessageRetryManager(BlockingQueue<ConsumerMessage> queue, int maxRetry, int ttl) {
         this.queue = queue;
         this.maxRetry = maxRetry;
         this.ttl = ttl;
@@ -42,10 +42,10 @@ public class DefaultMessageRetryManager implements MessageRetryManager{
             @Override
             public void run() {
                 long now = System.currentTimeMillis();
-                for (Map.Entry<String, MessageSet> entry : cache.entrySet()) {
+                for (Map.Entry<String, ConsumerMessage> entry : cache.entrySet()) {
                     String id = entry.getKey();
-                    MessageSet messageSet = entry.getValue();
-                    if (now - messageSet.getTimestamp() >= ttl) { // no ack/fail received in ttl
+                    ConsumerMessage message = entry.getValue();
+                    if (now - message.getTimestamp() >= ttl) { // no ack/fail received in ttl
                         fail(id);
                     }
                 }
@@ -58,29 +58,35 @@ public class DefaultMessageRetryManager implements MessageRetryManager{
     }
 
     public void fail(String id) {
-        MessageSet messageSet = cache.remove(id);
-        if (messageSet == null) {
+        ConsumerMessage message = cache.remove(id);
+        if (message == null) {
             return;
         }
 
-        if (needRetry(messageSet)) {
-            messageSet.setRetries(messageSet.getRetries() + 1);
-            messageSet.setTimestamp(0);
-            queue.offer(messageSet);
+        if (needRetry(message)) {
+            message.setRetries(message.getRetries() + 1);
+            message.setTimestamp(0);
+            queue.offer(message);
         }
     }
 
-    public void mark(MessageSet messageSet) {
-        messageSet.setTimestamp(System.currentTimeMillis());
-        cache.put(messageSet.getId(), messageSet);
+    public void mark(ConsumerMessage message) {
+        message.setTimestamp(System.currentTimeMillis());
+        cache.put(message.getId(), message);
     }
 
-    public boolean needRetry(MessageSet messageSet) {
-        return messageSet.getRetries() < maxRetry;
+    /**
+     * Whether the message need retry.
+     * @param message
+     * @return
+     */
+    public boolean needRetry(ConsumerMessage message) {
+        return message.getRetries() < maxRetry;
     }
 
     // just for testing
-    public void setCache(Map<String,MessageSet> cache) {
+    public void setCache(Map<String,ConsumerMessage> cache) {
         this.cache = cache;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/ded7a1e4/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageRetryManager.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageRetryManager.java
index 18fd903..5e59d9c 100644
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageRetryManager.java
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageRetryManager.java
@@ -35,16 +35,16 @@ public interface MessageRetryManager {
     void fail(String id);
 
     /**
-     * Mark messageSet in the cache.
-     * @param messageSet
+     * Mark message in the cache.
+     * @param message
      */
-    void mark(MessageSet messageSet);
+    void mark(ConsumerMessage message);
 
     /**
-     * Whether the messageSet need retry.
-     * @param messageSet
+     * Whether the message need retry.
+     * @param message
      * @return
      */
-    boolean needRetry(MessageSet messageSet);
+    boolean needRetry(ConsumerMessage message);
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/ded7a1e4/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageSet.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageSet.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageSet.java
deleted file mode 100644
index 7307271..0000000
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageSet.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.rocketmq;
-
-import org.apache.rocketmq.common.message.MessageExt;
-
-import java.util.List;
-import java.util.UUID;
-
-/**
- * A message collection.
- */
-public class MessageSet {
-    private String id;
-    private List<MessageExt> data;
-    private long timestamp;
-    private int retries;
-
-    public MessageSet(String id, List<MessageExt> data) {
-        this.id = id;
-        this.data = data;
-    }
-
-    public MessageSet(List<MessageExt> data) {
-        this(UUID.randomUUID().toString(), data);
-    }
-
-    public String getId() {
-        return id;
-    }
-
-    public List<MessageExt> getData() {
-        return data;
-    }
-
-    public long getTimestamp() {
-        return timestamp;
-    }
-
-    public void setTimestamp(long timestamp) {
-        this.timestamp = timestamp;
-    }
-
-    public int getRetries() {
-        return retries;
-    }
-
-    public void setRetries(int retries) {
-        this.retries = retries;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/ded7a1e4/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQUtils.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQUtils.java
index dbe6b12..7cbf749 100644
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQUtils.java
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQUtils.java
@@ -18,8 +18,12 @@
 package org.apache.storm.rocketmq;
 
 import org.apache.rocketmq.common.message.Message;
+import org.apache.storm.rocketmq.spout.scheme.KeyValueScheme;
+import org.apache.storm.spout.Scheme;
 
+import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
+import java.util.List;
 import java.util.Properties;
 
 public final class RocketMQUtils {
@@ -32,10 +36,29 @@ public final class RocketMQUtils {
         return Boolean.parseBoolean(props.getProperty(key, String.valueOf(defaultValue)));
     }
 
-    public static String getUtf8StringBody(Message message) {
-        if (message == null) {
-            return null;
+    public static Scheme createScheme(Properties props) {
+        String schemeString = props.getProperty(SpoutConfig.SCHEME, SpoutConfig.DEFAULT_SCHEME);
+        Scheme scheme;
+        try {
+            Class clazz = Class.forName(schemeString);
+            scheme = (Scheme)clazz.newInstance();
+        } catch (Exception e) {
+            throw new IllegalArgumentException("Cannot create Scheme for " + schemeString
+                    + " due to " + e.getMessage());
         }
-        return new String(message.getBody(), StandardCharsets.UTF_8);
+        return scheme;
+    }
+
+    public static List<Object> generateTuples(Message msg, Scheme scheme) {
+        List<Object> tup;
+        String rawKey = msg.getKeys();
+        ByteBuffer body = ByteBuffer.wrap(msg.getBody());
+        if (rawKey != null && scheme instanceof KeyValueScheme) {
+            ByteBuffer key = ByteBuffer.wrap(rawKey.getBytes(StandardCharsets.UTF_8));
+            tup = ((KeyValueScheme)scheme).deserializeKeyAndValue(key, body);
+        } else {
+            tup = scheme.deserialize(body);
+        }
+        return tup;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/ded7a1e4/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/SpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/SpoutConfig.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/SpoutConfig.java
index 53ce152..05d7f76 100644
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/SpoutConfig.java
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/SpoutConfig.java
@@ -17,16 +17,18 @@
  */
 package org.apache.storm.rocketmq;
 
+import org.apache.storm.rocketmq.spout.scheme.StringScheme;
+
 public class SpoutConfig extends RocketMQConfig {
     public static final String QUEUE_SIZE = "spout.queue.size";
 
-    public static final String DECLARE_FIELDS = "spout.declare.fields";
-    public static final String DEFAULT_DECLARE_FIELDS = "msgs";
-
     public static final String MESSAGES_MAX_RETRY = "spout.messages.max.retry";
     public static final int DEFAULT_MESSAGES_MAX_RETRY = 3;
 
     public static final String MESSAGES_TTL = "spout.messages.ttl";
     public static final int DEFAULT_MESSAGES_TTL = 300000;  // 5min
 
+    public static final String SCHEME = "spout.scheme";
+    public static final String DEFAULT_SCHEME = StringScheme.class.getName();
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/ded7a1e4/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMQSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMQSpout.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMQSpout.java
index 74c8264..58dc2bf 100644
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMQSpout.java
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMQSpout.java
@@ -29,17 +29,17 @@ import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.storm.Config;
+import org.apache.storm.rocketmq.ConsumerMessage;
 import org.apache.storm.rocketmq.DefaultMessageRetryManager;
 import org.apache.storm.rocketmq.MessageRetryManager;
-import org.apache.storm.rocketmq.MessageSet;
 import org.apache.storm.rocketmq.RocketMQConfig;
+import org.apache.storm.rocketmq.RocketMQUtils;
 import org.apache.storm.rocketmq.SpoutConfig;
+import org.apache.storm.spout.Scheme;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.IRichSpout;
 import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
 import org.apache.storm.utils.ObjectReader;
 
 import java.util.List;
@@ -61,88 +61,117 @@ public class RocketMQSpout implements IRichSpout {
 
     private static MQPushConsumer consumer;
     private SpoutOutputCollector collector;
-    private BlockingQueue<MessageSet> queue;
+    private BlockingQueue<ConsumerMessage> queue;
+    private BlockingQueue<ConsumerMessage> pending;
 
     private Properties properties;
     private MessageRetryManager messageRetryManager;
+    private Scheme scheme;
 
     public RocketMQSpout(Properties properties) {
+        Validate.notEmpty(properties, "Consumer properties can not be empty");
         this.properties = properties;
+        scheme = RocketMQUtils.createScheme(properties);
     }
 
     @Override
     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
-        Validate.notEmpty(properties, "Consumer properties can not be empty");
-        boolean ordered = getBoolean(properties, RocketMQConfig.CONSUMER_MESSAGES_ORDERLY, false);
-
-        int queueSize = getInteger(properties, SpoutConfig.QUEUE_SIZE, ObjectReader.getInt(conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING)));
-        queue = new LinkedBlockingQueue<>(queueSize);
-
         // Since RocketMQ Consumer is thread-safe, RocketMQSpout uses a single
         // consumer instance across threads to improve the performance.
         synchronized (RocketMQSpout.class) {
             if (consumer == null) {
-                consumer = new DefaultMQPushConsumer();
-                RocketMQConfig.buildConsumerConfigs(properties, (DefaultMQPushConsumer)consumer);
-
-                if (ordered) {
-                    consumer.registerMessageListener(new MessageListenerOrderly() {
-                        @Override
-                        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
-                                                                   ConsumeOrderlyContext context) {
-                            if (process(msgs)) {
-                                return ConsumeOrderlyStatus.SUCCESS;
-                            } else {
-                                return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
-                            }
-                        }
-                    });
-                } else {
-                    consumer.registerMessageListener(new MessageListenerConcurrently() {
-                        @Override
-                        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
-                                                                        ConsumeConcurrentlyContext context) {
-                            if (process(msgs)) {
-                                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-                            } else {
-                                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
-                            }
-                        }
-                    });
-                }
-
-                try {
-                    consumer.start();
-                } catch (MQClientException e) {
-                    throw new RuntimeException(e);
-                }
+                buildAndStartConsumer();
             }
         }
 
+        int queueSize = getInteger(properties, SpoutConfig.QUEUE_SIZE, ObjectReader.getInt(conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING)));
+        queue = new LinkedBlockingQueue<>(queueSize);
+        pending = new LinkedBlockingQueue<>(queueSize);
         int maxRetry = getInteger(properties, SpoutConfig.MESSAGES_MAX_RETRY, SpoutConfig.DEFAULT_MESSAGES_MAX_RETRY);
         int ttl = getInteger(properties, SpoutConfig.MESSAGES_TTL, SpoutConfig.DEFAULT_MESSAGES_TTL);
+
         this.messageRetryManager = new DefaultMessageRetryManager(queue, maxRetry, ttl);
         this.collector = collector;
     }
 
-    public boolean process(List<MessageExt> msgs) {
+    protected void buildAndStartConsumer() {
+        consumer = new DefaultMQPushConsumer();
+        RocketMQConfig.buildConsumerConfigs(properties, (DefaultMQPushConsumer)consumer);
+
+        boolean ordered = getBoolean(properties, RocketMQConfig.CONSUMER_MESSAGES_ORDERLY, false);
+        if (ordered) {
+            consumer.registerMessageListener(new MessageListenerOrderly() {
+                @Override
+                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
+                                                           ConsumeOrderlyContext context) {
+                    if (process(msgs)) {
+                        return ConsumeOrderlyStatus.SUCCESS;
+                    } else {
+                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
+                    }
+                }
+            });
+        } else {
+            consumer.registerMessageListener(new MessageListenerConcurrently() {
+                @Override
+                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
+                                                                ConsumeConcurrentlyContext context) {
+                    if (process(msgs)) {
+                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+                    } else {
+                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
+                    }
+                }
+            });
+        }
+
+        try {
+            consumer.start();
+        } catch (MQClientException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * process pushed messages
+     * @param msgs
+     * @return
+     */
+    protected boolean process(List<MessageExt> msgs) {
         if (msgs.isEmpty()) {
             return true;
         }
-        MessageSet messageSet = new MessageSet(msgs);
-        // returning true upon success and false if this queue is full.
-        return queue.offer(messageSet);
+
+        boolean notFull = true;
+        for (MessageExt msg : msgs) {
+            ConsumerMessage message = new ConsumerMessage(msg);
+            // returning true upon success and false if this queue is full.
+            if(!queue.offer(message)){
+                notFull = false;
+                pending.offer(message);
+            }
+        }
+        return notFull;
     }
 
     @Override
     public void nextTuple() {
-        MessageSet messageSet = queue.poll();
-        if (messageSet == null) {
+        ConsumerMessage message;
+        if (!pending.isEmpty()) {
+            message = pending.poll();
+        } else {
+            message = queue.poll();
+        }
+
+        if (message == null) {
             return;
         }
 
-        messageRetryManager.mark(messageSet);
-        collector.emit(new Values(messageSet.getData()), messageSet.getId());
+        messageRetryManager.mark(message);
+        List<Object> tup = RocketMQUtils.generateTuples(message.getData(), scheme);
+        if (tup != null) {
+            collector.emit(tup, message.getId());
+        }
     }
 
     @Override
@@ -159,7 +188,7 @@ public class RocketMQSpout implements IRichSpout {
 
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields(properties.getProperty(SpoutConfig.DECLARE_FIELDS, SpoutConfig.DEFAULT_DECLARE_FIELDS)));
+        declarer.declare(scheme.getOutputFields());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/ded7a1e4/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/KeyValueScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/KeyValueScheme.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/KeyValueScheme.java
new file mode 100644
index 0000000..e066784
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/KeyValueScheme.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq.spout.scheme;
+
+import org.apache.storm.spout.Scheme;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public interface KeyValueScheme extends Scheme {
+    List<Object> deserializeKeyAndValue(ByteBuffer key, ByteBuffer value);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/ded7a1e4/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/StringKeyValueScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/StringKeyValueScheme.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/StringKeyValueScheme.java
new file mode 100644
index 0000000..47ef996
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/StringKeyValueScheme.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq.spout.scheme;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.storm.tuple.Values;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class StringKeyValueScheme extends StringScheme implements KeyValueScheme {
+
+    @Override
+    public List<Object> deserializeKeyAndValue(ByteBuffer key, ByteBuffer value) {
+        if ( key == null ) {
+            return deserialize(value);
+        }
+        String keyString = StringScheme.deserializeString(key);
+        String valueString = StringScheme.deserializeString(value);
+        return new Values(ImmutableMap.of(keyString, valueString));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/ded7a1e4/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/StringScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/StringScheme.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/StringScheme.java
new file mode 100644
index 0000000..64c8241
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/StringScheme.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq.spout.scheme;
+
+import org.apache.storm.spout.Scheme;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+public class StringScheme implements Scheme {
+    public static final String STRING_SCHEME_KEY = "str";
+
+    public List<Object> deserialize(ByteBuffer bytes) {
+        return new Values(deserializeString(bytes));
+    }
+
+    public static String deserializeString(ByteBuffer string) {
+        if (string.hasArray()) {
+            int base = string.arrayOffset();
+            return new String(string.array(), base + string.position(), string.remaining());
+        } else {
+            return new String(Utils.toByteArray(string), StandardCharsets.UTF_8);
+        }
+    }
+
+    public Fields getOutputFields() {
+        return new Fields(STRING_SCHEME_KEY);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/ded7a1e4/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/TestMessageRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/TestMessageRetryManager.java b/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/TestMessageRetryManager.java
index ecb86d1..47de20d 100644
--- a/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/TestMessageRetryManager.java
+++ b/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/TestMessageRetryManager.java
@@ -17,11 +17,11 @@
  */
 package org.apache.storm.rocketmq;
 
+import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.storm.utils.Utils;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.ArrayList;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
@@ -33,8 +33,8 @@ import static org.junit.Assert.assertTrue;
 
 public class TestMessageRetryManager {
     MessageRetryManager messageRetryManager;
-    Map<String,MessageSet> cache;
-    BlockingQueue<MessageSet> queue;
+    Map<String, ConsumerMessage> cache;
+    BlockingQueue<ConsumerMessage> queue;
 
     @Before
     public void prepare() {
@@ -49,58 +49,58 @@ public class TestMessageRetryManager {
     @Test
     public void testRetryLogics() {
         //ack
-        MessageSet messageSet = new MessageSet(new ArrayList<>());
-        messageRetryManager.mark(messageSet);
+        ConsumerMessage message = new ConsumerMessage("id", new MessageExt());
+        messageRetryManager.mark(message);
         assertEquals(1, cache.size());
-        assertTrue(cache.containsKey(messageSet.getId()));
+        assertTrue(cache.containsKey(message.getId()));
 
-        messageRetryManager.ack(messageSet.getId());
+        messageRetryManager.ack(message.getId());
         assertEquals(0, cache.size());
-        assertFalse(cache.containsKey(messageSet.getId()));
+        assertFalse(cache.containsKey(message.getId()));
 
 
         //fail need retry: retries < maxRetry
-        messageSet = new MessageSet(new ArrayList<>());
-        messageRetryManager.mark(messageSet);
+        message = new ConsumerMessage("id", new MessageExt());
+        messageRetryManager.mark(message);
         assertEquals(1, cache.size());
-        assertTrue(cache.containsKey(messageSet.getId()));
+        assertTrue(cache.containsKey(message.getId()));
 
-        messageRetryManager.fail(messageSet.getId());
+        messageRetryManager.fail(message.getId());
         assertEquals(0, cache.size());
-        assertFalse(cache.containsKey(messageSet.getId()));
-        assertEquals(1, messageSet.getRetries());
+        assertFalse(cache.containsKey(message.getId()));
+        assertEquals(1, message.getRetries());
         assertEquals(1, queue.size());
-        assertEquals(messageSet, queue.poll());
+        assertEquals(message, queue.poll());
 
 
         //fail need not retry: retries >= maxRetry
-        messageSet = new MessageSet(new ArrayList<>());
-        messageRetryManager.mark(messageSet);
-        messageRetryManager.fail(messageSet.getId());
+        message = new ConsumerMessage("id", new MessageExt());
+        messageRetryManager.mark(message);
+        messageRetryManager.fail(message.getId());
         assertEquals(0, cache.size());
-        assertFalse(cache.containsKey(messageSet.getId()));
-
-        messageRetryManager.mark(messageSet);
-        messageRetryManager.fail(messageSet.getId());
-        assertEquals(2, messageSet.getRetries());
-        messageRetryManager.mark(messageSet);
-        messageRetryManager.fail(messageSet.getId());
-        assertEquals(3, messageSet.getRetries());
-
-        assertFalse(messageRetryManager.needRetry(messageSet));
-        messageRetryManager.mark(messageSet);
-        messageRetryManager.fail(messageSet.getId());
+        assertFalse(cache.containsKey(message.getId()));
+
+        messageRetryManager.mark(message);
+        messageRetryManager.fail(message.getId());
+        assertEquals(2, message.getRetries());
+        messageRetryManager.mark(message);
+        messageRetryManager.fail(message.getId());
+        assertEquals(3, message.getRetries());
+
+        assertFalse(messageRetryManager.needRetry(message));
+        messageRetryManager.mark(message);
+        messageRetryManager.fail(message.getId());
         assertEquals(0, cache.size());
         assertEquals(3, queue.size());
-        assertEquals(messageSet, queue.poll());
+        assertEquals(message, queue.poll());
 
 
         //fail: no ack/fail received in ttl
-        messageSet = new MessageSet(new ArrayList<>());
-        messageRetryManager.mark(messageSet);
+        message = new ConsumerMessage("id", new MessageExt());
+        messageRetryManager.mark(message);
         Utils.sleep(10000);
         assertEquals(0, cache.size());
-        assertFalse(cache.containsKey(messageSet.getId()));
+        assertFalse(cache.containsKey(message.getId()));
 
     }
 }