You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/07/10 23:23:10 UTC

[1/4] storm git commit: STORM-2594: Apply new code style to storm-rocketmq

Repository: storm
Updated Branches:
  refs/heads/master 29a8a42c8 -> 64fab1e58


http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/StringKeyValueScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/StringKeyValueScheme.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/StringKeyValueScheme.java
index 47ef996..21a762e 100644
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/StringKeyValueScheme.java
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/StringKeyValueScheme.java
@@ -15,19 +15,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.rocketmq.spout.scheme;
 
 import com.google.common.collect.ImmutableMap;
-import org.apache.storm.tuple.Values;
 
 import java.nio.ByteBuffer;
 import java.util.List;
 
+import org.apache.storm.tuple.Values;
+
 public class StringKeyValueScheme extends StringScheme implements KeyValueScheme {
 
     @Override
     public List<Object> deserializeKeyAndValue(ByteBuffer key, ByteBuffer value) {
-        if ( key == null ) {
+        if (key == null) {
             return deserialize(value);
         }
         String keyString = StringScheme.deserializeString(key);

http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/StringScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/StringScheme.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/StringScheme.java
index 64c8241..810cb98 100644
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/StringScheme.java
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/StringScheme.java
@@ -15,17 +15,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.rocketmq.spout.scheme;
 
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
 import org.apache.storm.spout.Scheme;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
 import org.apache.storm.utils.Utils;
 
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-
 public class StringScheme implements Scheme {
     public static final String STRING_SCHEME_KEY = "str";
 
@@ -33,12 +33,17 @@ public class StringScheme implements Scheme {
         return new Values(deserializeString(bytes));
     }
 
-    public static String deserializeString(ByteBuffer string) {
-        if (string.hasArray()) {
-            int base = string.arrayOffset();
-            return new String(string.array(), base + string.position(), string.remaining());
+    /**
+     * Deserialize ByteBuffer to String.
+     * @param byteBuffer input ByteBuffer
+     * @return deserialized string
+     */
+    public static String deserializeString(ByteBuffer byteBuffer) {
+        if (byteBuffer.hasArray()) {
+            int base = byteBuffer.arrayOffset();
+            return new String(byteBuffer.array(), base + byteBuffer.position(), byteBuffer.remaining());
         } else {
-            return new String(Utils.toByteArray(string), StandardCharsets.UTF_8);
+            return new String(Utils.toByteArray(byteBuffer), StandardCharsets.UTF_8);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQState.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQState.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQState.java
deleted file mode 100644
index b6413ad..0000000
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQState.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.rocketmq.trident.state;
-
-import org.apache.commons.lang.Validate;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.client.producer.MQProducer;
-import org.apache.rocketmq.common.message.Message;
-import org.apache.storm.rocketmq.RocketMQConfig;
-import org.apache.storm.rocketmq.common.mapper.TupleToMessageMapper;
-import org.apache.storm.rocketmq.common.selector.TopicSelector;
-import org.apache.storm.topology.FailedException;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.state.State;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-public class RocketMQState implements State {
-
-    private static final Logger LOG = LoggerFactory.getLogger(RocketMQState.class);
-
-    private Options options;
-    private MQProducer producer;
-
-    protected RocketMQState(Map map, Options options) {
-        this.options = options;
-    }
-
-    public static class Options implements Serializable {
-        private TopicSelector selector;
-        private TupleToMessageMapper mapper;
-        private Properties properties;
-
-        public Options withSelector(TopicSelector selector) {
-            this.selector = selector;
-            return this;
-        }
-
-        public Options withMapper(TupleToMessageMapper mapper) {
-            this.mapper = mapper;
-            return this;
-        }
-
-        public Options withProperties(Properties properties) {
-            this.properties = properties;
-            return this;
-        }
-    }
-
-    protected void prepare() {
-        Validate.notEmpty(options.properties, "Producer properties can not be empty");
-
-        producer = new DefaultMQProducer();
-        RocketMQConfig.buildProducerConfigs(options.properties, (DefaultMQProducer)producer);
-
-        try {
-            producer.start();
-        } catch (MQClientException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public void beginCommit(Long txid) {
-        LOG.debug("beginCommit is noop.");
-    }
-
-    @Override
-    public void commit(Long txid) {
-        LOG.debug("commit is noop.");
-    }
-
-    public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
-        try {
-            for (TridentTuple tuple : tuples) {
-                String topic = options.selector.getTopic(tuple);
-                String tag = options.selector.getTag(tuple);
-                String key = options.mapper.getKeyFromTuple(tuple);
-                byte[] value = options.mapper.getValueFromTuple(tuple);
-
-                if (topic == null) {
-                    LOG.warn("skipping Message with Key = " + key + ", topic selector returned null.");
-                    continue;
-                }
-
-                Message msg = new Message(topic,tag, key, value);
-                this.producer.send(msg);
-            }
-        } catch (Exception e) {
-            LOG.warn("Batch write failed but some requests might have succeeded. Triggering replay.", e);
-            throw new FailedException(e);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQStateFactory.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQStateFactory.java
deleted file mode 100644
index 2b4cc37..0000000
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQStateFactory.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.rocketmq.trident.state;
-
-import org.apache.storm.task.IMetricsContext;
-import org.apache.storm.trident.state.State;
-import org.apache.storm.trident.state.StateFactory;
-
-import java.util.Map;
-
-public class RocketMQStateFactory implements StateFactory {
-
-    private RocketMQState.Options options;
-
-    public RocketMQStateFactory(RocketMQState.Options options) {
-        this.options = options;
-    }
-
-    @Override
-    public State makeState(Map<String, Object> conf, IMetricsContext metrics,
-            int partitionIndex, int numPartitions) {
-        RocketMQState state = new RocketMQState(conf, options);
-        state.prepare();
-        return state;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQStateUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQStateUpdater.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQStateUpdater.java
deleted file mode 100644
index a548ce8..0000000
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQStateUpdater.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.rocketmq.trident.state;
-
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.state.BaseStateUpdater;
-import org.apache.storm.trident.tuple.TridentTuple;
-
-import java.util.List;
-
-public class RocketMQStateUpdater extends BaseStateUpdater<RocketMQState>  {
-
-    @Override
-    public void updateState(RocketMQState state, List<TridentTuple> tuples,
-                            TridentCollector collector) {
-        state.updateState(tuples, collector);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMqState.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMqState.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMqState.java
new file mode 100644
index 0000000..9a8a46e
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMqState.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.rocketmq.trident.state;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.lang.Validate;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.MQProducer;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.storm.rocketmq.RocketMqConfig;
+import org.apache.storm.rocketmq.common.mapper.TupleToMessageMapper;
+import org.apache.storm.rocketmq.common.selector.TopicSelector;
+import org.apache.storm.topology.FailedException;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RocketMqState implements State {
+
+    private static final Logger LOG = LoggerFactory.getLogger(RocketMqState.class);
+
+    private Options options;
+    private MQProducer producer;
+
+    protected RocketMqState(Map map, Options options) {
+        this.options = options;
+    }
+
+    public static class Options implements Serializable {
+        private TopicSelector selector;
+        private TupleToMessageMapper mapper;
+        private Properties properties;
+
+        public Options withSelector(TopicSelector selector) {
+            this.selector = selector;
+            return this;
+        }
+
+        public Options withMapper(TupleToMessageMapper mapper) {
+            this.mapper = mapper;
+            return this;
+        }
+
+        public Options withProperties(Properties properties) {
+            this.properties = properties;
+            return this;
+        }
+    }
+
+    protected void prepare() {
+        Validate.notEmpty(options.properties, "Producer properties can not be empty");
+
+        producer = new DefaultMQProducer();
+        RocketMqConfig.buildProducerConfigs(options.properties, (DefaultMQProducer)producer);
+
+        try {
+            producer.start();
+        } catch (MQClientException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void beginCommit(Long txid) {
+        LOG.debug("beginCommit is noop.");
+    }
+
+    @Override
+    public void commit(Long txid) {
+        LOG.debug("commit is noop.");
+    }
+
+    /**
+     * Update the RocketMQ state.
+     * @param tuples trident tuples
+     * @param collector trident collector
+     */
+    public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
+        try {
+            for (TridentTuple tuple : tuples) {
+                String topic = options.selector.getTopic(tuple);
+                String tag = options.selector.getTag(tuple);
+                String key = options.mapper.getKeyFromTuple(tuple);
+                byte[] value = options.mapper.getValueFromTuple(tuple);
+
+                if (topic == null) {
+                    LOG.warn("skipping Message with Key = " + key + ", topic selector returned null.");
+                    continue;
+                }
+
+                Message msg = new Message(topic,tag, key, value);
+                this.producer.send(msg);
+            }
+        } catch (Exception e) {
+            LOG.warn("Batch write failed but some requests might have succeeded. Triggering replay.", e);
+            throw new FailedException(e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMqStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMqStateFactory.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMqStateFactory.java
new file mode 100644
index 0000000..64b8277
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMqStateFactory.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.rocketmq.trident.state;
+
+import java.util.Map;
+
+import org.apache.storm.task.IMetricsContext;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.state.StateFactory;
+
+public class RocketMqStateFactory implements StateFactory {
+
+    private RocketMqState.Options options;
+
+    public RocketMqStateFactory(RocketMqState.Options options) {
+        this.options = options;
+    }
+
+    @Override
+    public State makeState(Map<String, Object> conf, IMetricsContext metrics,
+            int partitionIndex, int numPartitions) {
+        RocketMqState state = new RocketMqState(conf, options);
+        state.prepare();
+        return state;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMqStateUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMqStateUpdater.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMqStateUpdater.java
new file mode 100644
index 0000000..589ac5e
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMqStateUpdater.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.rocketmq.trident.state;
+
+import java.util.List;
+
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.BaseStateUpdater;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+public class RocketMqStateUpdater extends BaseStateUpdater<RocketMqState>  {
+
+    @Override
+    public void updateState(RocketMqState state, List<TridentTuple> tuples,
+                            TridentCollector collector) {
+        state.updateState(tuples, collector);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/TestMessageRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/TestMessageRetryManager.java b/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/TestMessageRetryManager.java
index 47de20d..50aaf74 100644
--- a/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/TestMessageRetryManager.java
+++ b/external/storm-rocketmq/src/test/java/org/apache/storm/rocketmq/TestMessageRetryManager.java
@@ -15,21 +15,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.rocketmq;
 
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.storm.utils.Utils;
-import org.junit.Before;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingDeque;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.storm.utils.Utils;
+import org.junit.Before;
+import org.junit.Test;
 
 public class TestMessageRetryManager {
     MessageRetryManager messageRetryManager;


[3/4] storm git commit: Merge branch 'STORM-2594' of https://github.com/vesense/storm into STORM-2594-merge

Posted by ka...@apache.org.
Merge branch 'STORM-2594' of https://github.com/vesense/storm into STORM-2594-merge


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/88e691aa
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/88e691aa
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/88e691aa

Branch: refs/heads/master
Commit: 88e691aa3009cf14f433333775d37b964fcd6c90
Parents: 29a8a42 1d7be76
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Jul 11 08:22:26 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Jul 11 08:22:26 2017 +0900

----------------------------------------------------------------------
 docs/storm-rocketmq.md                          |  28 +--
 .../rocketmq/topology/WordCountTopology.java    |  12 +-
 .../rocketmq/trident/WordCountTrident.java      |  16 +-
 external/storm-rocketmq/README.md               |  28 +--
 external/storm-rocketmq/pom.xml                 |   3 -
 .../apache/storm/rocketmq/ConsumerMessage.java  |   1 +
 .../rocketmq/DefaultMessageBodySerializer.java  |   3 +-
 .../rocketmq/DefaultMessageRetryManager.java    |  20 +-
 .../storm/rocketmq/MessageBodySerializer.java   |   5 +-
 .../storm/rocketmq/MessageRetryManager.java     |  13 +-
 .../apache/storm/rocketmq/RocketMQConfig.java   | 162 --------------
 .../apache/storm/rocketmq/RocketMQUtils.java    |  64 ------
 .../apache/storm/rocketmq/RocketMqConfig.java   | 178 +++++++++++++++
 .../apache/storm/rocketmq/RocketMqUtils.java    |  76 +++++++
 .../org/apache/storm/rocketmq/SpoutConfig.java  |   3 +-
 .../storm/rocketmq/bolt/RocketMQBolt.java       | 160 -------------
 .../storm/rocketmq/bolt/RocketMqBolt.java       | 160 +++++++++++++
 .../FieldNameBasedTupleToMessageMapper.java     |  14 +-
 .../common/mapper/TupleToMessageMapper.java     |   6 +-
 .../common/selector/DefaultTopicSelector.java   |   5 +-
 .../selector/FieldNameBasedTopicSelector.java   |   9 +-
 .../rocketmq/common/selector/TopicSelector.java |   6 +-
 .../storm/rocketmq/spout/RocketMQSpout.java     | 218 ------------------
 .../storm/rocketmq/spout/RocketMqSpout.java     | 223 +++++++++++++++++++
 .../rocketmq/spout/scheme/KeyValueScheme.java   |   4 +-
 .../spout/scheme/StringKeyValueScheme.java      |   6 +-
 .../rocketmq/spout/scheme/StringScheme.java     |  23 +-
 .../rocketmq/trident/state/RocketMQState.java   | 117 ----------
 .../trident/state/RocketMQStateFactory.java     |  42 ----
 .../trident/state/RocketMQStateUpdater.java     |  34 ---
 .../rocketmq/trident/state/RocketMqState.java   | 123 ++++++++++
 .../trident/state/RocketMqStateFactory.java     |  43 ++++
 .../trident/state/RocketMqStateUpdater.java     |  35 +++
 .../storm/rocketmq/TestMessageRetryManager.java |  15 +-
 34 files changed, 964 insertions(+), 891 deletions(-)
----------------------------------------------------------------------



[4/4] storm git commit: STORM-2594: CHANGELOG

Posted by ka...@apache.org.
STORM-2594: CHANGELOG


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/64fab1e5
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/64fab1e5
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/64fab1e5

Branch: refs/heads/master
Commit: 64fab1e58349f5057d9bfd95be018ed8224a88c4
Parents: 88e691a
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Jul 11 08:22:46 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Jul 11 08:22:46 2017 +0900

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/64fab1e5/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 4a5f7b1..5a58d75 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 2.0.0
+ * STORM-2594: Apply new code style to storm-rocketmq
  * STORM-2615: Add topology readonly user configuration
  * STORM-2093: Fix permissions in multi-tenant, secure mode
  * STORM-2610: Fixed throttle metrics


[2/4] storm git commit: STORM-2594: Apply new code style to storm-rocketmq

Posted by ka...@apache.org.
STORM-2594: Apply new code style to storm-rocketmq


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1d7be760
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1d7be760
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1d7be760

Branch: refs/heads/master
Commit: 1d7be760198189711165abac48acfd3bc332083a
Parents: d7c7818
Author: Xin Wang <be...@163.com>
Authored: Sun Jul 9 21:02:30 2017 +0800
Committer: Xin Wang <be...@163.com>
Committed: Sun Jul 9 21:02:30 2017 +0800

----------------------------------------------------------------------
 docs/storm-rocketmq.md                          |  28 +--
 .../rocketmq/topology/WordCountTopology.java    |  12 +-
 .../rocketmq/trident/WordCountTrident.java      |  16 +-
 external/storm-rocketmq/README.md               |  28 +--
 external/storm-rocketmq/pom.xml                 |   3 -
 .../apache/storm/rocketmq/ConsumerMessage.java  |   1 +
 .../rocketmq/DefaultMessageBodySerializer.java  |   3 +-
 .../rocketmq/DefaultMessageRetryManager.java    |  20 +-
 .../storm/rocketmq/MessageBodySerializer.java   |   5 +-
 .../storm/rocketmq/MessageRetryManager.java     |  13 +-
 .../apache/storm/rocketmq/RocketMQConfig.java   | 162 --------------
 .../apache/storm/rocketmq/RocketMQUtils.java    |  64 ------
 .../apache/storm/rocketmq/RocketMqConfig.java   | 178 +++++++++++++++
 .../apache/storm/rocketmq/RocketMqUtils.java    |  76 +++++++
 .../org/apache/storm/rocketmq/SpoutConfig.java  |   3 +-
 .../storm/rocketmq/bolt/RocketMQBolt.java       | 160 -------------
 .../storm/rocketmq/bolt/RocketMqBolt.java       | 160 +++++++++++++
 .../FieldNameBasedTupleToMessageMapper.java     |  14 +-
 .../common/mapper/TupleToMessageMapper.java     |   6 +-
 .../common/selector/DefaultTopicSelector.java   |   5 +-
 .../selector/FieldNameBasedTopicSelector.java   |   9 +-
 .../rocketmq/common/selector/TopicSelector.java |   6 +-
 .../storm/rocketmq/spout/RocketMQSpout.java     | 218 ------------------
 .../storm/rocketmq/spout/RocketMqSpout.java     | 223 +++++++++++++++++++
 .../rocketmq/spout/scheme/KeyValueScheme.java   |   4 +-
 .../spout/scheme/StringKeyValueScheme.java      |   6 +-
 .../rocketmq/spout/scheme/StringScheme.java     |  23 +-
 .../rocketmq/trident/state/RocketMQState.java   | 117 ----------
 .../trident/state/RocketMQStateFactory.java     |  42 ----
 .../trident/state/RocketMQStateUpdater.java     |  34 ---
 .../rocketmq/trident/state/RocketMqState.java   | 123 ++++++++++
 .../trident/state/RocketMqStateFactory.java     |  43 ++++
 .../trident/state/RocketMqStateUpdater.java     |  35 +++
 .../storm/rocketmq/TestMessageRetryManager.java |  15 +-
 34 files changed, 964 insertions(+), 891 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/docs/storm-rocketmq.md
----------------------------------------------------------------------
diff --git a/docs/storm-rocketmq.md b/docs/storm-rocketmq.md
index 17daf8c..2661ddf 100644
--- a/docs/storm-rocketmq.md
+++ b/docs/storm-rocketmq.md
@@ -6,10 +6,10 @@ Storm/Trident integration for [RocketMQ](https://rocketmq.incubator.apache.org/)
 ## Read from Topic
 The spout included in this package for reading data from a topic.
 
-### RocketMQSpout
-To use the `RocketMQSpout`,  you construct an instance of it by specifying a Properties instance which including rocketmq configs.
-RocketMQSpout uses RocketMQ MQPushConsumer as the default implementation. PushConsumer is a high level consumer API, wrapping the pulling details. Looks like broker push messages to consumer.
-RocketMQSpout will retry 3(use `SpoutConfig.DEFAULT_MESSAGES_MAX_RETRY` to change the value) times when messages are failed.
+### RocketMqSpout
+To use the `RocketMqSpout`,  you construct an instance of it by specifying a Properties instance which including rocketmq configs.
+RocketMqSpout uses RocketMQ MQPushConsumer as the default implementation. PushConsumer is a high level consumer API, wrapping the pulling details. Looks like broker push messages to consumer.
+RocketMqSpout will retry 3(use `SpoutConfig.DEFAULT_MESSAGES_MAX_RETRY` to change the value) times when messages are failed.
 
  ```java
         Properties properties = new Properties();
@@ -17,7 +17,7 @@ RocketMQSpout will retry 3(use `SpoutConfig.DEFAULT_MESSAGES_MAX_RETRY` to chang
         properties.setProperty(SpoutConfig.CONSUMER_GROUP, group);
         properties.setProperty(SpoutConfig.CONSUMER_TOPIC, topic);
 
-        RocketMQSpout spout = new RocketMQSpout(properties);
+        RocketMqSpout spout = new RocketMqSpout(properties);
  ```
 
 
@@ -51,18 +51,18 @@ public interface TopicSelector extends Serializable {
 `storm-rocketmq` includes general purpose `TopicSelector` implementations called `DefaultTopicSelector` and `FieldNameBasedTopicSelector`.
 
 
-### RocketMQBolt
-To use the `RocketMQBolt`, you construct an instance of it by specifying TupleToMessageMapper, TopicSelector and Properties instances.
-RocketMQBolt send messages async by default. You can change this by invoking `withAsync(false)`.
+### RocketMqBolt
+To use the `RocketMqBolt`, you construct an instance of it by specifying TupleToMessageMapper, TopicSelector and Properties instances.
+RocketMqBolt send messages async by default. You can change this by invoking `withAsync(false)`.
 
  ```java
         TupleToMessageMapper mapper = new FieldNameBasedTupleToMessageMapper("word", "count");
         TopicSelector selector = new DefaultTopicSelector(topic);
 
         properties = new Properties();
-        properties.setProperty(RocketMQConfig.NAME_SERVER_ADDR, nameserverAddr);
+        properties.setProperty(RocketMqConfig.NAME_SERVER_ADDR, nameserverAddr);
 
-        RocketMQBolt insertBolt = new RocketMQBolt()
+        RocketMqBolt insertBolt = new RocketMqBolt()
                 .withMapper(mapper)
                 .withSelector(selector)
                 .withProperties(properties);
@@ -76,19 +76,19 @@ We support trident persistent state that can be used with trident topologies. To
         TopicSelector selector = new DefaultTopicSelector(topic);
 
         Properties properties = new Properties();
-        properties.setProperty(RocketMQConfig.NAME_SERVER_ADDR, nameserverAddr);
+        properties.setProperty(RocketMqConfig.NAME_SERVER_ADDR, nameserverAddr);
 
-        RocketMQState.Options options = new RocketMQState.Options()
+        RocketMqState.Options options = new RocketMqState.Options()
                 .withMapper(mapper)
                 .withSelector(selector)
                 .withProperties(properties);
 
-        StateFactory factory = new RocketMQStateFactory(options);
+        StateFactory factory = new RocketMqStateFactory(options);
 
         TridentTopology topology = new TridentTopology();
         Stream stream = topology.newStream("spout1", spout);
 
         stream.partitionPersist(factory, fields,
-                new RocketMQStateUpdater(), new Fields());
+                new RocketMqStateUpdater(), new Fields());
  ```
 

http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/topology/WordCountTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/topology/WordCountTopology.java b/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/topology/WordCountTopology.java
index 0ce844e..5ae4bca 100644
--- a/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/topology/WordCountTopology.java
+++ b/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/topology/WordCountTopology.java
@@ -22,14 +22,14 @@ import org.apache.storm.LocalCluster;
 import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
-import org.apache.storm.rocketmq.RocketMQConfig;
+import org.apache.storm.rocketmq.RocketMqConfig;
 import org.apache.storm.rocketmq.SpoutConfig;
-import org.apache.storm.rocketmq.bolt.RocketMQBolt;
+import org.apache.storm.rocketmq.bolt.RocketMqBolt;
 import org.apache.storm.rocketmq.common.mapper.FieldNameBasedTupleToMessageMapper;
 import org.apache.storm.rocketmq.common.mapper.TupleToMessageMapper;
 import org.apache.storm.rocketmq.common.selector.DefaultTopicSelector;
 import org.apache.storm.rocketmq.common.selector.TopicSelector;
-import org.apache.storm.rocketmq.spout.RocketMQSpout;
+import org.apache.storm.rocketmq.spout.RocketMqSpout;
 import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.tuple.Fields;
 
@@ -49,7 +49,7 @@ public class WordCountTopology {
         properties.setProperty(SpoutConfig.CONSUMER_GROUP, CONSUMER_GROUP);
         properties.setProperty(SpoutConfig.CONSUMER_TOPIC, CONSUMER_TOPIC);
 
-        RocketMQSpout spout = new RocketMQSpout(properties);
+        RocketMqSpout spout = new RocketMqSpout(properties);
 
         WordCounter bolt = new WordCounter();
 
@@ -57,9 +57,9 @@ public class WordCountTopology {
         TopicSelector selector = new DefaultTopicSelector(topic);
 
         properties = new Properties();
-        properties.setProperty(RocketMQConfig.NAME_SERVER_ADDR, nameserverAddr);
+        properties.setProperty(RocketMqConfig.NAME_SERVER_ADDR, nameserverAddr);
 
-        RocketMQBolt insertBolt = new RocketMQBolt()
+        RocketMqBolt insertBolt = new RocketMqBolt()
                 .withMapper(mapper)
                 .withSelector(selector)
                 .withProperties(properties);

http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/trident/WordCountTrident.java
----------------------------------------------------------------------
diff --git a/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/trident/WordCountTrident.java b/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/trident/WordCountTrident.java
index 1817d2e..3d1f37c 100644
--- a/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/trident/WordCountTrident.java
+++ b/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/trident/WordCountTrident.java
@@ -22,14 +22,14 @@ import org.apache.storm.LocalCluster;
 import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
-import org.apache.storm.rocketmq.RocketMQConfig;
+import org.apache.storm.rocketmq.RocketMqConfig;
 import org.apache.storm.rocketmq.common.mapper.FieldNameBasedTupleToMessageMapper;
 import org.apache.storm.rocketmq.common.mapper.TupleToMessageMapper;
 import org.apache.storm.rocketmq.common.selector.DefaultTopicSelector;
 import org.apache.storm.rocketmq.common.selector.TopicSelector;
-import org.apache.storm.rocketmq.trident.state.RocketMQState;
-import org.apache.storm.rocketmq.trident.state.RocketMQStateFactory;
-import org.apache.storm.rocketmq.trident.state.RocketMQStateUpdater;
+import org.apache.storm.rocketmq.trident.state.RocketMqState;
+import org.apache.storm.rocketmq.trident.state.RocketMqStateFactory;
+import org.apache.storm.rocketmq.trident.state.RocketMqStateUpdater;
 import org.apache.storm.trident.Stream;
 import org.apache.storm.trident.TridentTopology;
 import org.apache.storm.trident.state.StateFactory;
@@ -55,20 +55,20 @@ public class WordCountTrident {
         TopicSelector selector = new DefaultTopicSelector(topic);
 
         Properties properties = new Properties();
-        properties.setProperty(RocketMQConfig.NAME_SERVER_ADDR, nameserverAddr);
+        properties.setProperty(RocketMqConfig.NAME_SERVER_ADDR, nameserverAddr);
 
-        RocketMQState.Options options = new RocketMQState.Options()
+        RocketMqState.Options options = new RocketMqState.Options()
                 .withMapper(mapper)
                 .withSelector(selector)
                 .withProperties(properties);
 
-        StateFactory factory = new RocketMQStateFactory(options);
+        StateFactory factory = new RocketMqStateFactory(options);
 
         TridentTopology topology = new TridentTopology();
         Stream stream = topology.newStream("spout1", spout);
 
         stream.partitionPersist(factory, fields,
-                new RocketMQStateUpdater(), new Fields());
+                new RocketMqStateUpdater(), new Fields());
 
         return topology.build();
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/README.md
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/README.md b/external/storm-rocketmq/README.md
index 341bb07..84c2ae2 100644
--- a/external/storm-rocketmq/README.md
+++ b/external/storm-rocketmq/README.md
@@ -6,10 +6,10 @@ Storm/Trident integration for [RocketMQ](https://rocketmq.incubator.apache.org/)
 ## Read from Topic
 The spout included in this package for reading data from a topic.
 
-### RocketMQSpout
-To use the `RocketMQSpout`,  you construct an instance of it by specifying a Properties instance which including rocketmq configs.
-RocketMQSpout uses RocketMQ MQPushConsumer as the default implementation. PushConsumer is a high level consumer API, wrapping the pulling details. Looks like broker push messages to consumer.
-RocketMQSpout will retry 3(use `SpoutConfig.DEFAULT_MESSAGES_MAX_RETRY` to change the value) times when messages are failed.
+### RocketMqSpout
+To use the `RocketMqSpout`,  you construct an instance of it by specifying a Properties instance which including rocketmq configs.
+RocketMqSpout uses RocketMQ MQPushConsumer as the default implementation. PushConsumer is a high level consumer API, wrapping the pulling details. Looks like broker push messages to consumer.
+RocketMqSpout will retry 3(use `SpoutConfig.DEFAULT_MESSAGES_MAX_RETRY` to change the value) times when messages are failed.
 
  ```java
         Properties properties = new Properties();
@@ -17,7 +17,7 @@ RocketMQSpout will retry 3(use `SpoutConfig.DEFAULT_MESSAGES_MAX_RETRY` to chang
         properties.setProperty(SpoutConfig.CONSUMER_GROUP, group);
         properties.setProperty(SpoutConfig.CONSUMER_TOPIC, topic);
 
-        RocketMQSpout spout = new RocketMQSpout(properties);
+        RocketMqSpout spout = new RocketMqSpout(properties);
  ```
 
 
@@ -51,18 +51,18 @@ public interface TopicSelector extends Serializable {
 `storm-rocketmq` includes general purpose `TopicSelector` implementations called `DefaultTopicSelector` and `FieldNameBasedTopicSelector`.
 
 
-### RocketMQBolt
-To use the `RocketMQBolt`, you construct an instance of it by specifying TupleToMessageMapper, TopicSelector and Properties instances.
-RocketMQBolt send messages async by default. You can change this by invoking `withAsync(false)`.
+### RocketMqBolt
+To use the `RocketMqBolt`, you construct an instance of it by specifying TupleToMessageMapper, TopicSelector and Properties instances.
+RocketMqBolt send messages async by default. You can change this by invoking `withAsync(false)`.
 
  ```java
         TupleToMessageMapper mapper = new FieldNameBasedTupleToMessageMapper("word", "count");
         TopicSelector selector = new DefaultTopicSelector(topic);
 
         properties = new Properties();
-        properties.setProperty(RocketMQConfig.NAME_SERVER_ADDR, nameserverAddr);
+        properties.setProperty(RocketMqConfig.NAME_SERVER_ADDR, nameserverAddr);
 
-        RocketMQBolt insertBolt = new RocketMQBolt()
+        RocketMqBolt insertBolt = new RocketMqBolt()
                 .withMapper(mapper)
                 .withSelector(selector)
                 .withProperties(properties);
@@ -76,20 +76,20 @@ We support trident persistent state that can be used with trident topologies. To
         TopicSelector selector = new DefaultTopicSelector(topic);
 
         Properties properties = new Properties();
-        properties.setProperty(RocketMQConfig.NAME_SERVER_ADDR, nameserverAddr);
+        properties.setProperty(RocketMqConfig.NAME_SERVER_ADDR, nameserverAddr);
 
-        RocketMQState.Options options = new RocketMQState.Options()
+        RocketMqState.Options options = new RocketMqState.Options()
                 .withMapper(mapper)
                 .withSelector(selector)
                 .withProperties(properties);
 
-        StateFactory factory = new RocketMQStateFactory(options);
+        StateFactory factory = new RocketMqStateFactory(options);
 
         TridentTopology topology = new TridentTopology();
         Stream stream = topology.newStream("spout1", spout);
 
         stream.partitionPersist(factory, fields,
-                new RocketMQStateUpdater(), new Fields());
+                new RocketMqStateUpdater(), new Fields());
  ```
 
 

http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/pom.xml b/external/storm-rocketmq/pom.xml
index 4e3d729..efdbeab 100644
--- a/external/storm-rocketmq/pom.xml
+++ b/external/storm-rocketmq/pom.xml
@@ -74,9 +74,6 @@
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-checkstyle-plugin</artifactId>
                 <!--Note - the version would be inherited-->
-                <configuration>
-                    <maxAllowedViolations>100</maxAllowedViolations>
-                </configuration>
             </plugin>
         </plugins>
     </build>

http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/ConsumerMessage.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/ConsumerMessage.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/ConsumerMessage.java
index 97afae1..a3fd70e 100644
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/ConsumerMessage.java
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/ConsumerMessage.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.rocketmq;
 
 import org.apache.rocketmq.common.message.MessageExt;

http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/DefaultMessageBodySerializer.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/DefaultMessageBodySerializer.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/DefaultMessageBodySerializer.java
index 5e7e314..0394721 100644
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/DefaultMessageBodySerializer.java
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/DefaultMessageBodySerializer.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.rocketmq;
 
 import java.nio.charset.StandardCharsets;
@@ -25,7 +26,7 @@ public class DefaultMessageBodySerializer implements MessageBodySerializer {
      * Currently, we just convert string to bytes using UTF-8 charset.
      * Note: in this way, object.toString() method is invoked.
      * @param body RocketMQ Message body
-     * @return
+     * @return serialized byte[]
      */
     @Override
     public byte[] serialize(Object body) {

http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/DefaultMessageRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/DefaultMessageRetryManager.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/DefaultMessageRetryManager.java
index bcc7e99..528d8ac 100644
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/DefaultMessageRetryManager.java
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/DefaultMessageRetryManager.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.rocketmq;
 
 import java.util.Map;
@@ -24,14 +25,20 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
- * An implementation of MessageRetryManager
+ * An implementation of MessageRetryManager.
  */
-public class DefaultMessageRetryManager implements MessageRetryManager{
+public class DefaultMessageRetryManager implements MessageRetryManager {
     private Map<String,ConsumerMessage> cache = new ConcurrentHashMap<>(500);
     private BlockingQueue<ConsumerMessage> queue;
     private int maxRetry;
     private int ttl;
 
+    /**
+     * DefaultMessageRetryManager Constructor.
+     * @param queue messages BlockingQueue
+     * @param maxRetry max retry times
+     * @param ttl TTL
+     */
     public DefaultMessageRetryManager(BlockingQueue<ConsumerMessage> queue, int maxRetry, int ttl) {
         this.queue = queue;
         this.maxRetry = maxRetry;
@@ -53,10 +60,12 @@ public class DefaultMessageRetryManager implements MessageRetryManager{
         }, period, period);
     }
 
+    @Override
     public void ack(String id) {
         cache.remove(id);
     }
 
+    @Override
     public void fail(String id) {
         ConsumerMessage message = cache.remove(id);
         if (message == null) {
@@ -70,16 +79,13 @@ public class DefaultMessageRetryManager implements MessageRetryManager{
         }
     }
 
+    @Override
     public void mark(ConsumerMessage message) {
         message.setTimestamp(System.currentTimeMillis());
         cache.put(message.getId(), message);
     }
 
-    /**
-     * Whether the message need retry.
-     * @param message
-     * @return
-     */
+    @Override
     public boolean needRetry(ConsumerMessage message) {
         return message.getRetries() < maxRetry;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageBodySerializer.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageBodySerializer.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageBodySerializer.java
index f86ea26..fbf16cd 100644
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageBodySerializer.java
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageBodySerializer.java
@@ -15,13 +15,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.rocketmq;
 
 import java.io.Serializable;
 
 /**
- * RocketMQ message body serializer
+ * RocketMQ message body serializer.
  */
-public interface MessageBodySerializer extends Serializable{
+public interface MessageBodySerializer extends Serializable {
     byte[] serialize(Object body);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageRetryManager.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageRetryManager.java
index 5e59d9c..1b24817 100644
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageRetryManager.java
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageRetryManager.java
@@ -15,35 +15,36 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.rocketmq;
 
 /**
- * Interface for messages retry manager
+ * Interface for messages retry manager.
  */
 public interface MessageRetryManager {
     /**
      * Remove from the cache. Message with the id is successful.
-     * @param id
+     * @param id message id
      */
     void ack(String id);
 
     /**
      * Remove from the cache. Message with the id is failed.
      * Invoke retry logics if necessary.
-     * @param id
+     * @param id message id
      */
     void fail(String id);
 
     /**
      * Mark message in the cache.
-     * @param message
+     * @param message message
      */
     void mark(ConsumerMessage message);
 
     /**
      * Whether the message need retry.
-     * @param message
-     * @return
+     * @param message ConsumerMessage
+     * @return true if need retry, otherwise false
      */
     boolean needRetry(ConsumerMessage message);
 

http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQConfig.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQConfig.java
deleted file mode 100644
index fcf6ff4..0000000
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQConfig.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.rocketmq;
-
-import org.apache.commons.lang.Validate;
-import org.apache.rocketmq.client.ClientConfig;
-import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
-import org.apache.rocketmq.remoting.common.RemotingUtil;
-
-import java.util.Properties;
-import java.util.UUID;
-
-import static org.apache.storm.rocketmq.RocketMQUtils.getInteger;
-
-/**
- * RocketMQConfig for Consumer/Producer
- */
-public class RocketMQConfig {
-    // common
-    public static final String NAME_SERVER_ADDR = "nameserver.addr"; // Required
-
-    public static final String CLIENT_NAME = "client.name";
-
-    public static final String CLIENT_IP = "client.ip";
-    public static final String DEFAULT_CLIENT_IP = RemotingUtil.getLocalAddress();
-
-    public static final String CLIENT_CALLBACK_EXECUTOR_THREADS = "client.callback.executor.threads";
-    public static final int DEFAULT_CLIENT_CALLBACK_EXECUTOR_THREADS = Runtime.getRuntime().availableProcessors();;
-
-    public static final String NAME_SERVER_POLL_INTERVAL = "nameserver.poll.interval";
-    public static final int DEFAULT_NAME_SERVER_POLL_INTERVAL = 30000; // 30 seconds
-
-    public static final String BROKER_HEART_BEAT_INTERVAL = "brokerserver.heartbeat.interval";
-    public static final int DEFAULT_BROKER_HEART_BEAT_INTERVAL = 30000; // 30 seconds
-
-
-    // producer
-    public static final String PRODUCER_GROUP = "producer.group";
-
-    public static final String PRODUCER_RETRY_TIMES = "producer.retry.times";
-    public static final int DEFAULT_PRODUCER_RETRY_TIMES = 2;
-
-    public static final String PRODUCER_TIMEOUT = "producer.timeout";
-    public static final int DEFAULT_PRODUCER_TIMEOUT = 3000; // 3 seconds
-
-
-    // consumer
-    public static final String CONSUMER_GROUP = "consumer.group"; // Required
-
-    public static final String CONSUMER_TOPIC = "consumer.topic"; // Required
-
-    public static final String CONSUMER_TAG = "consumer.tag";
-    public static final String DEFAULT_TAG = "*";
-
-    public static final String CONSUMER_OFFSET_RESET_TO = "consumer.offset.reset.to";
-    public static final String CONSUMER_OFFSET_LATEST = "latest";
-    public static final String CONSUMER_OFFSET_EARLIEST = "earliest";
-    public static final String CONSUMER_OFFSET_TIMESTAMP = "timestamp";
-
-    public static final String CONSUMER_MESSAGES_ORDERLY = "consumer.messages.orderly";
-
-    public static final String CONSUMER_OFFSET_PERSIST_INTERVAL = "consumer.offset.persist.interval";
-    public static final int DEFAULT_CONSUMER_OFFSET_PERSIST_INTERVAL = 5000; // 5 seconds
-
-    public static final String CONSUMER_MIN_THREADS = "consumer.min.threads";
-    public static final int DEFAULT_CONSUMER_MIN_THREADS = 20;
-
-    public static final String CONSUMER_MAX_THREADS = "consumer.max.threads";
-    public static final int DEFAULT_CONSUMER_MAX_THREADS = 64;
-
-
-    public static void buildProducerConfigs(Properties props, DefaultMQProducer producer) {
-        buildCommonConfigs(props, producer);
-
-        // According to the RocketMQ official docs, "only one instance is allowed per producer group"
-        // So, we use UUID as the producer group by default, to allow many producer instances for one topic
-        String defaultGroup = UUID.randomUUID().toString();
-        producer.setProducerGroup(props.getProperty(PRODUCER_GROUP, defaultGroup));
-
-        producer.setRetryTimesWhenSendFailed(getInteger(props,
-                PRODUCER_RETRY_TIMES, DEFAULT_PRODUCER_RETRY_TIMES));
-        producer.setRetryTimesWhenSendAsyncFailed(getInteger(props,
-                PRODUCER_RETRY_TIMES, DEFAULT_PRODUCER_RETRY_TIMES));
-        producer.setSendMsgTimeout(getInteger(props,
-                PRODUCER_TIMEOUT, DEFAULT_PRODUCER_TIMEOUT));
-    }
-
-    public static void buildConsumerConfigs(Properties props, DefaultMQPushConsumer consumer) {
-        buildCommonConfigs(props, consumer);
-
-        String group = props.getProperty(CONSUMER_GROUP);
-        Validate.notEmpty(group);
-        consumer.setConsumerGroup(group);
-
-        consumer.setPersistConsumerOffsetInterval(getInteger(props,
-                CONSUMER_OFFSET_PERSIST_INTERVAL, DEFAULT_CONSUMER_OFFSET_PERSIST_INTERVAL));
-        consumer.setConsumeThreadMin(getInteger(props,
-                CONSUMER_MIN_THREADS, DEFAULT_CONSUMER_MIN_THREADS));
-        consumer.setConsumeThreadMax(getInteger(props,
-                CONSUMER_MAX_THREADS, DEFAULT_CONSUMER_MAX_THREADS));
-
-        String initOffset = props.getProperty(CONSUMER_OFFSET_RESET_TO, CONSUMER_OFFSET_LATEST);
-        switch (initOffset) {
-            case CONSUMER_OFFSET_EARLIEST:
-                consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
-                break;
-            case CONSUMER_OFFSET_LATEST:
-                consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
-                break;
-            case CONSUMER_OFFSET_TIMESTAMP:
-                consumer.setConsumeTimestamp(initOffset);
-                break;
-            default:
-                consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
-        }
-
-        String topic = props.getProperty(CONSUMER_TOPIC);
-        Validate.notEmpty(topic);
-        try {
-            consumer.subscribe(topic, props.getProperty(CONSUMER_TAG, DEFAULT_TAG));
-        } catch (MQClientException e) {
-            throw new IllegalArgumentException(e);
-        }
-    }
-
-    public static void buildCommonConfigs(Properties props, ClientConfig client) {
-        String namesvr = props.getProperty(NAME_SERVER_ADDR);
-        Validate.notEmpty(namesvr);
-        client.setNamesrvAddr(namesvr);
-
-        client.setClientIP(props.getProperty(CLIENT_IP, DEFAULT_CLIENT_IP));
-        // According to the RocketMQ official docs, "only one instance is allowed per machine"
-        // So, we use UUID as the client name by default, to allow RocketMQ spout/bolt instances in one machine.
-        String defaultClientName = UUID.randomUUID().toString();
-        client.setInstanceName(props.getProperty(CLIENT_NAME, defaultClientName));
-
-        client.setClientCallbackExecutorThreads(getInteger(props,
-                CLIENT_CALLBACK_EXECUTOR_THREADS, DEFAULT_CLIENT_CALLBACK_EXECUTOR_THREADS));
-        client.setPollNameServerInteval(getInteger(props,
-                NAME_SERVER_POLL_INTERVAL, DEFAULT_NAME_SERVER_POLL_INTERVAL));
-        client.setHeartbeatBrokerInterval(getInteger(props,
-                BROKER_HEART_BEAT_INTERVAL, DEFAULT_BROKER_HEART_BEAT_INTERVAL));
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQUtils.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQUtils.java
deleted file mode 100644
index 7cbf749..0000000
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQUtils.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.rocketmq;
-
-import org.apache.rocketmq.common.message.Message;
-import org.apache.storm.rocketmq.spout.scheme.KeyValueScheme;
-import org.apache.storm.spout.Scheme;
-
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-import java.util.Properties;
-
-public final class RocketMQUtils {
-
-    public static int getInteger(Properties props, String key, int defaultValue) {
-        return Integer.parseInt(props.getProperty(key, String.valueOf(defaultValue)));
-    }
-
-    public static boolean getBoolean(Properties props, String key, boolean defaultValue) {
-        return Boolean.parseBoolean(props.getProperty(key, String.valueOf(defaultValue)));
-    }
-
-    public static Scheme createScheme(Properties props) {
-        String schemeString = props.getProperty(SpoutConfig.SCHEME, SpoutConfig.DEFAULT_SCHEME);
-        Scheme scheme;
-        try {
-            Class clazz = Class.forName(schemeString);
-            scheme = (Scheme)clazz.newInstance();
-        } catch (Exception e) {
-            throw new IllegalArgumentException("Cannot create Scheme for " + schemeString
-                    + " due to " + e.getMessage());
-        }
-        return scheme;
-    }
-
-    public static List<Object> generateTuples(Message msg, Scheme scheme) {
-        List<Object> tup;
-        String rawKey = msg.getKeys();
-        ByteBuffer body = ByteBuffer.wrap(msg.getBody());
-        if (rawKey != null && scheme instanceof KeyValueScheme) {
-            ByteBuffer key = ByteBuffer.wrap(rawKey.getBytes(StandardCharsets.UTF_8));
-            tup = ((KeyValueScheme)scheme).deserializeKeyAndValue(key, body);
-        } else {
-            tup = scheme.deserialize(body);
-        }
-        return tup;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMqConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMqConfig.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMqConfig.java
new file mode 100644
index 0000000..a69aaeb
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMqConfig.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.rocketmq;
+
+import static org.apache.storm.rocketmq.RocketMqUtils.getInteger;
+
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.commons.lang.Validate;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+
+/**
+ * RocketMqConfig for Consumer/Producer.
+ */
+public class RocketMqConfig {
+    // common
+    public static final String NAME_SERVER_ADDR = "nameserver.addr"; // Required
+
+    public static final String CLIENT_NAME = "client.name";
+
+    public static final String CLIENT_IP = "client.ip";
+    public static final String DEFAULT_CLIENT_IP = RemotingUtil.getLocalAddress();
+
+    public static final String CLIENT_CALLBACK_EXECUTOR_THREADS = "client.callback.executor.threads";
+    public static final int DEFAULT_CLIENT_CALLBACK_EXECUTOR_THREADS = Runtime.getRuntime().availableProcessors();
+
+    public static final String NAME_SERVER_POLL_INTERVAL = "nameserver.poll.interval";
+    public static final int DEFAULT_NAME_SERVER_POLL_INTERVAL = 30000; // 30 seconds
+
+    public static final String BROKER_HEART_BEAT_INTERVAL = "brokerserver.heartbeat.interval";
+    public static final int DEFAULT_BROKER_HEART_BEAT_INTERVAL = 30000; // 30 seconds
+
+
+    // producer
+    public static final String PRODUCER_GROUP = "producer.group";
+
+    public static final String PRODUCER_RETRY_TIMES = "producer.retry.times";
+    public static final int DEFAULT_PRODUCER_RETRY_TIMES = 2;
+
+    public static final String PRODUCER_TIMEOUT = "producer.timeout";
+    public static final int DEFAULT_PRODUCER_TIMEOUT = 3000; // 3 seconds
+
+
+    // consumer
+    public static final String CONSUMER_GROUP = "consumer.group"; // Required
+
+    public static final String CONSUMER_TOPIC = "consumer.topic"; // Required
+
+    public static final String CONSUMER_TAG = "consumer.tag";
+    public static final String DEFAULT_TAG = "*";
+
+    public static final String CONSUMER_OFFSET_RESET_TO = "consumer.offset.reset.to";
+    public static final String CONSUMER_OFFSET_LATEST = "latest";
+    public static final String CONSUMER_OFFSET_EARLIEST = "earliest";
+    public static final String CONSUMER_OFFSET_TIMESTAMP = "timestamp";
+
+    public static final String CONSUMER_MESSAGES_ORDERLY = "consumer.messages.orderly";
+
+    public static final String CONSUMER_OFFSET_PERSIST_INTERVAL = "consumer.offset.persist.interval";
+    public static final int DEFAULT_CONSUMER_OFFSET_PERSIST_INTERVAL = 5000; // 5 seconds
+
+    public static final String CONSUMER_MIN_THREADS = "consumer.min.threads";
+    public static final int DEFAULT_CONSUMER_MIN_THREADS = 20;
+
+    public static final String CONSUMER_MAX_THREADS = "consumer.max.threads";
+    public static final int DEFAULT_CONSUMER_MAX_THREADS = 64;
+
+
+    /**
+     * Build Producer Configs.
+     * @param props Properties
+     * @param producer DefaultMQProducer
+     */
+    public static void buildProducerConfigs(Properties props, DefaultMQProducer producer) {
+        buildCommonConfigs(props, producer);
+
+        // According to the RocketMQ official docs, "only one instance is allowed per producer group"
+        // So, we use UUID as the producer group by default, to allow many producer instances for one topic
+        String defaultGroup = UUID.randomUUID().toString();
+        producer.setProducerGroup(props.getProperty(PRODUCER_GROUP, defaultGroup));
+
+        producer.setRetryTimesWhenSendFailed(getInteger(props,
+                PRODUCER_RETRY_TIMES, DEFAULT_PRODUCER_RETRY_TIMES));
+        producer.setRetryTimesWhenSendAsyncFailed(getInteger(props,
+                PRODUCER_RETRY_TIMES, DEFAULT_PRODUCER_RETRY_TIMES));
+        producer.setSendMsgTimeout(getInteger(props,
+                PRODUCER_TIMEOUT, DEFAULT_PRODUCER_TIMEOUT));
+    }
+
+    /**
+     * Build Consumer Configs.
+     * @param props Properties
+     * @param consumer DefaultMQPushConsumer
+     */
+    public static void buildConsumerConfigs(Properties props, DefaultMQPushConsumer consumer) {
+        buildCommonConfigs(props, consumer);
+
+        String group = props.getProperty(CONSUMER_GROUP);
+        Validate.notEmpty(group);
+        consumer.setConsumerGroup(group);
+
+        consumer.setPersistConsumerOffsetInterval(getInteger(props,
+                CONSUMER_OFFSET_PERSIST_INTERVAL, DEFAULT_CONSUMER_OFFSET_PERSIST_INTERVAL));
+        consumer.setConsumeThreadMin(getInteger(props,
+                CONSUMER_MIN_THREADS, DEFAULT_CONSUMER_MIN_THREADS));
+        consumer.setConsumeThreadMax(getInteger(props,
+                CONSUMER_MAX_THREADS, DEFAULT_CONSUMER_MAX_THREADS));
+
+        String initOffset = props.getProperty(CONSUMER_OFFSET_RESET_TO, CONSUMER_OFFSET_LATEST);
+        switch (initOffset) {
+            case CONSUMER_OFFSET_EARLIEST:
+                consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+                break;
+            case CONSUMER_OFFSET_LATEST:
+                consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
+                break;
+            case CONSUMER_OFFSET_TIMESTAMP:
+                consumer.setConsumeTimestamp(initOffset);
+                break;
+            default:
+                consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
+        }
+
+        String topic = props.getProperty(CONSUMER_TOPIC);
+        Validate.notEmpty(topic);
+        try {
+            consumer.subscribe(topic, props.getProperty(CONSUMER_TAG, DEFAULT_TAG));
+        } catch (MQClientException e) {
+            throw new IllegalArgumentException(e);
+        }
+    }
+
+    /**
+     * Build Common Configs.
+     * @param props Properties
+     * @param client ClientConfig
+     */
+    public static void buildCommonConfigs(Properties props, ClientConfig client) {
+        String namesvr = props.getProperty(NAME_SERVER_ADDR);
+        Validate.notEmpty(namesvr);
+        client.setNamesrvAddr(namesvr);
+
+        client.setClientIP(props.getProperty(CLIENT_IP, DEFAULT_CLIENT_IP));
+        // According to the RocketMQ official docs, "only one instance is allowed per machine"
+        // So, we use UUID as the client name by default, to allow RocketMQ spout/bolt instances in one machine.
+        String defaultClientName = UUID.randomUUID().toString();
+        client.setInstanceName(props.getProperty(CLIENT_NAME, defaultClientName));
+
+        client.setClientCallbackExecutorThreads(getInteger(props,
+                CLIENT_CALLBACK_EXECUTOR_THREADS, DEFAULT_CLIENT_CALLBACK_EXECUTOR_THREADS));
+        client.setPollNameServerInteval(getInteger(props,
+                NAME_SERVER_POLL_INTERVAL, DEFAULT_NAME_SERVER_POLL_INTERVAL));
+        client.setHeartbeatBrokerInterval(getInteger(props,
+                BROKER_HEART_BEAT_INTERVAL, DEFAULT_BROKER_HEART_BEAT_INTERVAL));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMqUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMqUtils.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMqUtils.java
new file mode 100644
index 0000000..f971ae9
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMqUtils.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.rocketmq;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.rocketmq.common.message.Message;
+import org.apache.storm.rocketmq.spout.scheme.KeyValueScheme;
+import org.apache.storm.spout.Scheme;
+
+public final class RocketMqUtils {
+
+    public static int getInteger(Properties props, String key, int defaultValue) {
+        return Integer.parseInt(props.getProperty(key, String.valueOf(defaultValue)));
+    }
+
+    public static boolean getBoolean(Properties props, String key, boolean defaultValue) {
+        return Boolean.parseBoolean(props.getProperty(key, String.valueOf(defaultValue)));
+    }
+
+    /**
+     * Create Scheme by Properties.
+     * @param props Properties
+     * @return Scheme
+     */
+    public static Scheme createScheme(Properties props) {
+        String schemeString = props.getProperty(SpoutConfig.SCHEME, SpoutConfig.DEFAULT_SCHEME);
+        Scheme scheme;
+        try {
+            Class clazz = Class.forName(schemeString);
+            scheme = (Scheme)clazz.newInstance();
+        } catch (Exception e) {
+            throw new IllegalArgumentException("Cannot create Scheme for " + schemeString
+                    + " due to " + e.getMessage());
+        }
+        return scheme;
+    }
+
+    /**
+     * Generate Storm tuple values by Message and Scheme.
+     * @param msg RocketMQ Message
+     * @param scheme Scheme for deserializing
+     * @return tuple values
+     */
+    public static List<Object> generateTuples(Message msg, Scheme scheme) {
+        List<Object> tup;
+        String rawKey = msg.getKeys();
+        ByteBuffer body = ByteBuffer.wrap(msg.getBody());
+        if (rawKey != null && scheme instanceof KeyValueScheme) {
+            ByteBuffer key = ByteBuffer.wrap(rawKey.getBytes(StandardCharsets.UTF_8));
+            tup = ((KeyValueScheme)scheme).deserializeKeyAndValue(key, body);
+        } else {
+            tup = scheme.deserialize(body);
+        }
+        return tup;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/SpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/SpoutConfig.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/SpoutConfig.java
index 05d7f76..5990274 100644
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/SpoutConfig.java
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/SpoutConfig.java
@@ -15,11 +15,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.rocketmq;
 
 import org.apache.storm.rocketmq.spout.scheme.StringScheme;
 
-public class SpoutConfig extends RocketMQConfig {
+public class SpoutConfig extends RocketMqConfig {
     public static final String QUEUE_SIZE = "spout.queue.size";
 
     public static final String MESSAGES_MAX_RETRY = "spout.messages.max.retry";

http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/bolt/RocketMQBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/bolt/RocketMQBolt.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/bolt/RocketMQBolt.java
deleted file mode 100644
index 3932381..0000000
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/bolt/RocketMQBolt.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.rocketmq.bolt;
-
-import org.apache.commons.lang.Validate;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.client.producer.MQProducer;
-import org.apache.rocketmq.client.producer.SendCallback;
-import org.apache.rocketmq.client.producer.SendResult;
-import org.apache.rocketmq.common.message.Message;
-import org.apache.storm.rocketmq.RocketMQConfig;
-import org.apache.storm.rocketmq.common.mapper.TupleToMessageMapper;
-import org.apache.storm.rocketmq.common.selector.TopicSelector;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.IRichBolt;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Tuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.Properties;
-
-public class RocketMQBolt implements IRichBolt {
-    private static final Logger LOG = LoggerFactory.getLogger(RocketMQBolt.class);
-
-    private static MQProducer producer;
-    private OutputCollector collector;
-    private boolean async = true;
-    private TopicSelector selector;
-    private TupleToMessageMapper mapper;
-    private Properties properties;
-
-    @Override
-    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
-        Validate.notEmpty(properties, "Producer properties can not be empty");
-
-        // Since RocketMQ Producer is thread-safe, RocketMQBolt uses a single
-        // producer instance across threads to improve the performance.
-        synchronized (RocketMQBolt.class) {
-            if (producer == null) {
-                producer = new DefaultMQProducer();
-                RocketMQConfig.buildProducerConfigs(properties, (DefaultMQProducer)producer);
-
-                try {
-                    producer.start();
-                } catch (MQClientException e) {
-                    throw new RuntimeException(e);
-                }
-            }
-        }
-
-        this.collector = collector;
-
-        Validate.notNull(selector, "TopicSelector can not be null");
-        Validate.notNull(mapper, "TupleToMessageMapper can not be null");
-    }
-
-    public RocketMQBolt withSelector(TopicSelector selector) {
-        this.selector = selector;
-        return this;
-    }
-
-    public RocketMQBolt withMapper(TupleToMessageMapper mapper) {
-        this.mapper = mapper;
-        return this;
-    }
-
-    public RocketMQBolt withAsync(boolean async) {
-        this.async = async;
-        return this;
-    }
-
-    public RocketMQBolt withProperties(Properties properties) {
-        this.properties = properties;
-        return this;
-    }
-
-    @Override
-    public void execute(Tuple input) {
-        // Mapping: from storm tuple -> rocketmq Message
-        String topic = selector.getTopic(input);
-        String tag = selector.getTag(input);
-        String key = mapper.getKeyFromTuple(input);
-        byte[] value = mapper.getValueFromTuple(input);
-
-        if (topic == null) {
-            LOG.warn("skipping Message with Key = " + key + ", topic selector returned null.");
-            collector.ack(input);
-            return;
-        }
-
-        Message msg = new Message(topic,tag, key, value);
-
-        try {
-            if (async) {
-                // async sending
-                producer.send(msg, new SendCallback() {
-                    @Override
-                    public void onSuccess(SendResult sendResult) {
-                        collector.ack(input);
-                    }
-
-                    @Override
-                    public void onException(Throwable throwable) {
-                        if (throwable != null) {
-                            collector.reportError(throwable);
-                            collector.fail(input);
-                        }
-                    }
-                });
-            } else {
-                // sync sending, will return a SendResult
-                producer.send(msg);
-                collector.ack(input);
-            }
-        } catch (Exception e) {
-            collector.reportError(e);
-            collector.fail(input);
-        }
-
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        return null;
-    }
-
-    @Override
-    public void cleanup() {
-        synchronized (RocketMQBolt.class) {
-            if (producer != null) {
-                producer.shutdown();
-                producer = null;
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/bolt/RocketMqBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/bolt/RocketMqBolt.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/bolt/RocketMqBolt.java
new file mode 100644
index 0000000..e945334
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/bolt/RocketMqBolt.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.rocketmq.bolt;
+
+import java.util.Map;
+import java.util.Properties;
+import org.apache.commons.lang.Validate;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.MQProducer;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.storm.rocketmq.RocketMqConfig;
+import org.apache.storm.rocketmq.common.mapper.TupleToMessageMapper;
+import org.apache.storm.rocketmq.common.selector.TopicSelector;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RocketMqBolt implements IRichBolt {
+    private static final Logger LOG = LoggerFactory.getLogger(RocketMqBolt.class);
+
+    private static MQProducer producer;
+    private OutputCollector collector;
+    private boolean async = true;
+    private TopicSelector selector;
+    private TupleToMessageMapper mapper;
+    private Properties properties;
+
+    @Override
+    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
+        Validate.notEmpty(properties, "Producer properties can not be empty");
+
+        // Since RocketMQ Producer is thread-safe, RocketMQBolt uses a single
+        // producer instance across threads to improve the performance.
+        synchronized (RocketMqBolt.class) {
+            if (producer == null) {
+                producer = new DefaultMQProducer();
+                RocketMqConfig.buildProducerConfigs(properties, (DefaultMQProducer)producer);
+
+                try {
+                    producer.start();
+                } catch (MQClientException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+
+        this.collector = collector;
+
+        Validate.notNull(selector, "TopicSelector can not be null");
+        Validate.notNull(mapper, "TupleToMessageMapper can not be null");
+    }
+
+    public RocketMqBolt withSelector(TopicSelector selector) {
+        this.selector = selector;
+        return this;
+    }
+
+    public RocketMqBolt withMapper(TupleToMessageMapper mapper) {
+        this.mapper = mapper;
+        return this;
+    }
+
+    public RocketMqBolt withAsync(boolean async) {
+        this.async = async;
+        return this;
+    }
+
+    public RocketMqBolt withProperties(Properties properties) {
+        this.properties = properties;
+        return this;
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        // Mapping: from storm tuple -> rocketmq Message
+        String topic = selector.getTopic(input);
+        String tag = selector.getTag(input);
+        String key = mapper.getKeyFromTuple(input);
+        byte[] value = mapper.getValueFromTuple(input);
+
+        if (topic == null) {
+            LOG.warn("skipping Message with Key = " + key + ", topic selector returned null.");
+            collector.ack(input);
+            return;
+        }
+
+        Message msg = new Message(topic,tag, key, value);
+
+        try {
+            if (async) {
+                // async sending
+                producer.send(msg, new SendCallback() {
+                    @Override
+                    public void onSuccess(SendResult sendResult) {
+                        collector.ack(input);
+                    }
+
+                    @Override
+                    public void onException(Throwable throwable) {
+                        if (throwable != null) {
+                            collector.reportError(throwable);
+                            collector.fail(input);
+                        }
+                    }
+                });
+            } else {
+                // sync sending, will return a SendResult
+                producer.send(msg);
+                collector.ack(input);
+            }
+        } catch (Exception e) {
+            collector.reportError(e);
+            collector.fail(input);
+        }
+
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return null;
+    }
+
+    @Override
+    public void cleanup() {
+        synchronized (RocketMqBolt.class) {
+            if (producer != null) {
+                producer.shutdown();
+                producer = null;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/mapper/FieldNameBasedTupleToMessageMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/mapper/FieldNameBasedTupleToMessageMapper.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/mapper/FieldNameBasedTupleToMessageMapper.java
index 622cbdb..a38a365 100644
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/mapper/FieldNameBasedTupleToMessageMapper.java
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/mapper/FieldNameBasedTupleToMessageMapper.java
@@ -15,14 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.rocketmq.common.mapper;
 
 import org.apache.storm.rocketmq.DefaultMessageBodySerializer;
 import org.apache.storm.rocketmq.MessageBodySerializer;
 import org.apache.storm.tuple.ITuple;
 
-import java.nio.charset.StandardCharsets;
-
 public class FieldNameBasedTupleToMessageMapper implements TupleToMessageMapper {
     public static final String BOLT_KEY = "key";
     public static final String BOLT_MESSAGE = "message";
@@ -34,6 +33,11 @@ public class FieldNameBasedTupleToMessageMapper implements TupleToMessageMapper
         this(BOLT_KEY, BOLT_MESSAGE);
     }
 
+    /**
+     * FieldNameBasedTupleToMessageMapper Constructor.
+     * @param boltKeyField tuple field for selecting the key
+     * @param boltMessageField  tuple field for selecting the value
+     */
     public FieldNameBasedTupleToMessageMapper(String boltKeyField, String boltMessageField) {
         this.boltKeyField = boltKeyField;
         this.boltMessageField = boltMessageField;
@@ -55,9 +59,9 @@ public class FieldNameBasedTupleToMessageMapper implements TupleToMessageMapper
     }
 
     /**
-     * using this method can override the default  MessageBodySerializer
-     * @param serializer
-     * @return
+     * using this method can override the default  MessageBodySerializer.
+     * @param serializer MessageBodySerializer
+     * @return this object
      */
     public FieldNameBasedTupleToMessageMapper withMessageBodySerializer(MessageBodySerializer serializer) {
         this.messageBodySerializer = serializer;

http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/mapper/TupleToMessageMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/mapper/TupleToMessageMapper.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/mapper/TupleToMessageMapper.java
index 84ff4a2..ef716a4 100644
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/mapper/TupleToMessageMapper.java
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/mapper/TupleToMessageMapper.java
@@ -15,16 +15,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.storm.rocketmq.common.mapper;
 
-import org.apache.storm.tuple.ITuple;
+package org.apache.storm.rocketmq.common.mapper;
 
 import java.io.Serializable;
+import org.apache.storm.tuple.ITuple;
 
 /**
  * Interface defining a mapping from storm tuple to rocketmq key and message.
  */
 public interface TupleToMessageMapper extends Serializable {
+
     String getKeyFromTuple(ITuple tuple);
+
     byte[] getValueFromTuple(ITuple tuple);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/DefaultTopicSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/DefaultTopicSelector.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/DefaultTopicSelector.java
index 5332036..f45bdf0 100644
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/DefaultTopicSelector.java
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/DefaultTopicSelector.java
@@ -15,9 +15,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.rocketmq.common.selector;
 
-import org.apache.storm.rocketmq.RocketMQConfig;
+import org.apache.storm.rocketmq.RocketMqConfig;
 import org.apache.storm.tuple.ITuple;
 
 public class DefaultTopicSelector implements TopicSelector {
@@ -30,7 +31,7 @@ public class DefaultTopicSelector implements TopicSelector {
     }
 
     public DefaultTopicSelector(final String topicName) {
-        this(topicName, RocketMQConfig.DEFAULT_TAG);
+        this(topicName, RocketMqConfig.DEFAULT_TAG);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/FieldNameBasedTopicSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/FieldNameBasedTopicSelector.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/FieldNameBasedTopicSelector.java
index 60865c1..253221e 100644
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/FieldNameBasedTopicSelector.java
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/FieldNameBasedTopicSelector.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.rocketmq.common.selector;
 
 import org.apache.storm.tuple.ITuple;
@@ -33,7 +34,13 @@ public class FieldNameBasedTopicSelector implements TopicSelector {
     private final String tagFieldName;
     private final String defaultTagName;
 
-
+    /**
+     * FieldNameBasedTopicSelector Constructor.
+     * @param topicFieldName field name used for selecting topic
+     * @param defaultTopicName default field name used for selecting topic
+     * @param tagFieldName field name used for selecting tag
+     * @param defaultTagName default field name used for selecting tag
+     */
     public FieldNameBasedTopicSelector(String topicFieldName, String defaultTopicName, String tagFieldName, String defaultTagName) {
         this.topicFieldName = topicFieldName;
         this.defaultTopicName = defaultTopicName;

http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/TopicSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/TopicSelector.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/TopicSelector.java
index f33d4a6..2940920 100644
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/TopicSelector.java
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/TopicSelector.java
@@ -15,13 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.storm.rocketmq.common.selector;
 
-import org.apache.storm.tuple.ITuple;
+package org.apache.storm.rocketmq.common.selector;
 
 import java.io.Serializable;
+import org.apache.storm.tuple.ITuple;
 
 public interface TopicSelector extends Serializable {
+
     String getTopic(ITuple tuple);
+
     String getTag(ITuple tuple);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMQSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMQSpout.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMQSpout.java
deleted file mode 100644
index d4dfc61..0000000
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMQSpout.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.rocketmq.spout;
-
-import org.apache.commons.lang.Validate;
-import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
-import org.apache.rocketmq.client.consumer.MQPushConsumer;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
-import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
-import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
-import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
-import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.storm.Config;
-import org.apache.storm.rocketmq.ConsumerMessage;
-import org.apache.storm.rocketmq.DefaultMessageRetryManager;
-import org.apache.storm.rocketmq.MessageRetryManager;
-import org.apache.storm.rocketmq.RocketMQConfig;
-import org.apache.storm.rocketmq.RocketMQUtils;
-import org.apache.storm.rocketmq.SpoutConfig;
-import org.apache.storm.spout.Scheme;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.IRichSpout;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.utils.ObjectReader;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import static org.apache.storm.rocketmq.RocketMQUtils.getBoolean;
-import static org.apache.storm.rocketmq.RocketMQUtils.getInteger;
-
-/**
- * RocketMQSpout uses MQPushConsumer as the default implementation.
- * PushConsumer is a high level consumer API, wrapping the pulling details
- * Looks like broker push messages to consumer
- */
-public class RocketMQSpout implements IRichSpout {
-    // TODO add metrics
-
-    private static MQPushConsumer consumer;
-    private SpoutOutputCollector collector;
-    private BlockingQueue<ConsumerMessage> queue;
-    private BlockingQueue<ConsumerMessage> pending;
-
-    private Properties properties;
-    private MessageRetryManager messageRetryManager;
-    private Scheme scheme;
-
-    public RocketMQSpout(Properties properties) {
-        Validate.notEmpty(properties, "Consumer properties can not be empty");
-        this.properties = properties;
-        scheme = RocketMQUtils.createScheme(properties);
-    }
-
-    @Override
-    public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
-        // Since RocketMQ Consumer is thread-safe, RocketMQSpout uses a single
-        // consumer instance across threads to improve the performance.
-        synchronized (RocketMQSpout.class) {
-            if (consumer == null) {
-                buildAndStartConsumer();
-            }
-        }
-
-        int queueSize = getInteger(properties, SpoutConfig.QUEUE_SIZE, ObjectReader.getInt(conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING)));
-        queue = new LinkedBlockingQueue<>(queueSize);
-        pending = new LinkedBlockingQueue<>(queueSize);
-        int maxRetry = getInteger(properties, SpoutConfig.MESSAGES_MAX_RETRY, SpoutConfig.DEFAULT_MESSAGES_MAX_RETRY);
-        int ttl = getInteger(properties, SpoutConfig.MESSAGES_TTL, SpoutConfig.DEFAULT_MESSAGES_TTL);
-
-        this.messageRetryManager = new DefaultMessageRetryManager(queue, maxRetry, ttl);
-        this.collector = collector;
-    }
-
-    protected void buildAndStartConsumer() {
-        consumer = new DefaultMQPushConsumer();
-        RocketMQConfig.buildConsumerConfigs(properties, (DefaultMQPushConsumer)consumer);
-
-        boolean ordered = getBoolean(properties, RocketMQConfig.CONSUMER_MESSAGES_ORDERLY, false);
-        if (ordered) {
-            consumer.registerMessageListener(new MessageListenerOrderly() {
-                @Override
-                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
-                                                           ConsumeOrderlyContext context) {
-                    if (process(msgs)) {
-                        return ConsumeOrderlyStatus.SUCCESS;
-                    } else {
-                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
-                    }
-                }
-            });
-        } else {
-            consumer.registerMessageListener(new MessageListenerConcurrently() {
-                @Override
-                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
-                                                                ConsumeConcurrentlyContext context) {
-                    if (process(msgs)) {
-                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-                    } else {
-                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
-                    }
-                }
-            });
-        }
-
-        try {
-            consumer.start();
-        } catch (MQClientException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    /**
-     * process pushed messages
-     * @param msgs
-     * @return
-     */
-    protected boolean process(List<MessageExt> msgs) {
-        if (msgs.isEmpty()) {
-            return true;
-        }
-
-        boolean notFull = true;
-        for (MessageExt msg : msgs) {
-            ConsumerMessage message = new ConsumerMessage(msg);
-            // returning true upon success and false if this queue is full.
-            if(!queue.offer(message)){
-                notFull = false;
-                pending.offer(message);
-            }
-        }
-        return notFull;
-    }
-
-    @Override
-    public void nextTuple() {
-        ConsumerMessage message;
-        if (!pending.isEmpty()) {
-            message = pending.poll();
-        } else {
-            message = queue.poll();
-        }
-
-        if (message == null) {
-            return;
-        }
-
-        messageRetryManager.mark(message);
-        List<Object> tup = RocketMQUtils.generateTuples(message.getData(), scheme);
-        if (tup != null) {
-            collector.emit(tup, message.getId());
-        }
-    }
-
-    @Override
-    public void ack(Object msgId) {
-        String id = msgId.toString();
-        messageRetryManager.ack(id);
-    }
-
-    @Override
-    public void fail(Object msgId) {
-        String id = msgId.toString();
-        messageRetryManager.fail(id);
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(scheme.getOutputFields());
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        return null;
-    }
-
-    @Override
-    public void close() {
-        synchronized (RocketMQSpout.class) {
-            if (consumer != null) {
-                consumer.shutdown();
-                consumer = null;
-            }
-        }
-    }
-
-    @Override
-    public void activate() {
-        consumer.resume();
-    }
-
-    @Override
-    public void deactivate() {
-        consumer.suspend();
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMqSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMqSpout.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMqSpout.java
new file mode 100644
index 0000000..cebc230
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMqSpout.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.rocketmq.spout;
+
+import static org.apache.storm.rocketmq.RocketMqUtils.getBoolean;
+import static org.apache.storm.rocketmq.RocketMqUtils.getInteger;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.lang.Validate;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.MQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.storm.Config;
+import org.apache.storm.rocketmq.ConsumerMessage;
+import org.apache.storm.rocketmq.DefaultMessageRetryManager;
+import org.apache.storm.rocketmq.MessageRetryManager;
+import org.apache.storm.rocketmq.RocketMqConfig;
+import org.apache.storm.rocketmq.RocketMqUtils;
+import org.apache.storm.rocketmq.SpoutConfig;
+import org.apache.storm.spout.Scheme;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.utils.ObjectReader;
+
+/**
+ * RocketMqSpout uses MQPushConsumer as the default implementation.
+ * PushConsumer is a high level consumer API, wrapping the pulling details
+ * Looks like broker push messages to consumer
+ */
+public class RocketMqSpout implements IRichSpout {
+    // TODO add metrics
+
+    private static MQPushConsumer consumer;
+    private SpoutOutputCollector collector;
+    private BlockingQueue<ConsumerMessage> queue;
+    private BlockingQueue<ConsumerMessage> pending;
+
+    private Properties properties;
+    private MessageRetryManager messageRetryManager;
+    private Scheme scheme;
+
+    /**
+     * RocketMqSpout Constructor.
+     * @param properties Properties Config
+     */
+    public RocketMqSpout(Properties properties) {
+        Validate.notEmpty(properties, "Consumer properties can not be empty");
+        this.properties = properties;
+        scheme = RocketMqUtils.createScheme(properties);
+    }
+
+    @Override
+    public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
+        // Since RocketMQ Consumer is thread-safe, RocketMQSpout uses a single
+        // consumer instance across threads to improve the performance.
+        synchronized (RocketMqSpout.class) {
+            if (consumer == null) {
+                buildAndStartConsumer();
+            }
+        }
+
+        int queueSize = getInteger(properties, SpoutConfig.QUEUE_SIZE, ObjectReader.getInt(conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING)));
+        queue = new LinkedBlockingQueue<>(queueSize);
+        pending = new LinkedBlockingQueue<>(queueSize);
+        int maxRetry = getInteger(properties, SpoutConfig.MESSAGES_MAX_RETRY, SpoutConfig.DEFAULT_MESSAGES_MAX_RETRY);
+        int ttl = getInteger(properties, SpoutConfig.MESSAGES_TTL, SpoutConfig.DEFAULT_MESSAGES_TTL);
+
+        this.messageRetryManager = new DefaultMessageRetryManager(queue, maxRetry, ttl);
+        this.collector = collector;
+    }
+
+    protected void buildAndStartConsumer() {
+        consumer = new DefaultMQPushConsumer();
+        RocketMqConfig.buildConsumerConfigs(properties, (DefaultMQPushConsumer)consumer);
+
+        boolean ordered = getBoolean(properties, RocketMqConfig.CONSUMER_MESSAGES_ORDERLY, false);
+        if (ordered) {
+            consumer.registerMessageListener(new MessageListenerOrderly() {
+                @Override
+                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
+                                                           ConsumeOrderlyContext context) {
+                    if (process(msgs)) {
+                        return ConsumeOrderlyStatus.SUCCESS;
+                    } else {
+                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
+                    }
+                }
+            });
+        } else {
+            consumer.registerMessageListener(new MessageListenerConcurrently() {
+                @Override
+                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
+                                                                ConsumeConcurrentlyContext context) {
+                    if (process(msgs)) {
+                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+                    } else {
+                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
+                    }
+                }
+            });
+        }
+
+        try {
+            consumer.start();
+        } catch (MQClientException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Process pushed messages.
+     * @param msgs messages
+     * @return the boolean flag processed result
+     */
+    protected boolean process(List<MessageExt> msgs) {
+        if (msgs.isEmpty()) {
+            return true;
+        }
+
+        boolean notFull = true;
+        for (MessageExt msg : msgs) {
+            ConsumerMessage message = new ConsumerMessage(msg);
+            // returning true upon success and false if this queue is full.
+            if (!queue.offer(message)) {
+                notFull = false;
+                pending.offer(message);
+            }
+        }
+        return notFull;
+    }
+
+    @Override
+    public void nextTuple() {
+        ConsumerMessage message;
+        if (!pending.isEmpty()) {
+            message = pending.poll();
+        } else {
+            message = queue.poll();
+        }
+
+        if (message == null) {
+            return;
+        }
+
+        messageRetryManager.mark(message);
+        List<Object> tup = RocketMqUtils.generateTuples(message.getData(), scheme);
+        if (tup != null) {
+            collector.emit(tup, message.getId());
+        }
+    }
+
+    @Override
+    public void ack(Object msgId) {
+        String id = msgId.toString();
+        messageRetryManager.ack(id);
+    }
+
+    @Override
+    public void fail(Object msgId) {
+        String id = msgId.toString();
+        messageRetryManager.fail(id);
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(scheme.getOutputFields());
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return null;
+    }
+
+    @Override
+    public void close() {
+        synchronized (RocketMqSpout.class) {
+            if (consumer != null) {
+                consumer.shutdown();
+                consumer = null;
+            }
+        }
+    }
+
+    @Override
+    public void activate() {
+        consumer.resume();
+    }
+
+    @Override
+    public void deactivate() {
+        consumer.suspend();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/1d7be760/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/KeyValueScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/KeyValueScheme.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/KeyValueScheme.java
index e066784..f4cf803 100644
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/KeyValueScheme.java
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/scheme/KeyValueScheme.java
@@ -15,12 +15,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.storm.rocketmq.spout.scheme;
 
-import org.apache.storm.spout.Scheme;
+package org.apache.storm.rocketmq.spout.scheme;
 
 import java.nio.ByteBuffer;
 import java.util.List;
+import org.apache.storm.spout.Scheme;
 
 public interface KeyValueScheme extends Scheme {
     List<Object> deserializeKeyAndValue(ByteBuffer key, ByteBuffer value);