You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by xi...@apache.org on 2017/04/24 04:05:23 UTC
[2/5] storm git commit: STORM-2349: Add one RocketMQ plugin for the
Apache Storm
STORM-2349: Add one RocketMQ plugin for the Apache Storm
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4a4a8d2e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4a4a8d2e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4a4a8d2e
Branch: refs/heads/master
Commit: 4a4a8d2e1ec89d572116865e28df7241178bc624
Parents: 1850dd5
Author: vesense <be...@163.com>
Authored: Mon Mar 20 16:50:34 2017 +0800
Committer: vesense <be...@163.com>
Committed: Tue Apr 11 10:41:32 2017 +0800
----------------------------------------------------------------------
examples/storm-rocketmq-examples/pom.xml | 89 ++++++++++
.../rocketmq/topology/WordCountTopology.java | 95 ++++++++++
.../storm/rocketmq/topology/WordCounter.java | 72 ++++++++
.../rocketmq/trident/WordCountTrident.java | 94 ++++++++++
external/storm-mongodb/pom.xml | 2 +-
external/storm-rocketmq/README.md | 118 +++++++++++++
external/storm-rocketmq/pom.xml | 71 ++++++++
.../rocketmq/DefaultMessageBodySerializer.java | 37 ++++
.../rocketmq/DefaultMessageRetryManager.java | 86 +++++++++
.../storm/rocketmq/MessageBodySerializer.java | 27 +++
.../storm/rocketmq/MessageRetryManager.java | 50 ++++++
.../org/apache/storm/rocketmq/MessageSet.java | 66 +++++++
.../apache/storm/rocketmq/RocketMQConfig.java | 172 ++++++++++++++++++
.../apache/storm/rocketmq/RocketMQUtils.java | 41 +++++
.../org/apache/storm/rocketmq/SpoutConfig.java | 32 ++++
.../storm/rocketmq/bolt/RocketMQBolt.java | 148 ++++++++++++++++
.../FieldNameBasedTupleToMessageMapper.java | 66 +++++++
.../common/mapper/TupleToMessageMapper.java | 30 ++++
.../common/selector/DefaultTopicSelector.java | 45 +++++
.../selector/FieldNameBasedTopicSelector.java | 63 +++++++
.../rocketmq/common/selector/TopicSelector.java | 27 +++
.../storm/rocketmq/spout/RocketMQSpout.java | 177 +++++++++++++++++++
.../rocketmq/trident/state/RocketMQState.java | 117 ++++++++++++
.../trident/state/RocketMQStateFactory.java | 42 +++++
.../trident/state/RocketMQStateUpdater.java | 34 ++++
.../storm/rocketmq/TestMessageRetryManager.java | 106 +++++++++++
pom.xml | 2 +
.../final-package/src/main/assembly/binary.xml | 7 +
28 files changed, 1915 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/examples/storm-rocketmq-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-rocketmq-examples/pom.xml b/examples/storm-rocketmq-examples/pom.xml
new file mode 100644
index 0000000..d66177e
--- /dev/null
+++ b/examples/storm-rocketmq-examples/pom.xml
@@ -0,0 +1,89 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>storm</artifactId>
+ <groupId>org.apache.storm</groupId>
+ <version>2.0.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>storm-rocketmq-examples</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-server</artifactId>
+ <version>${project.version}</version>
+ <scope>${provided.scope}</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-rocketmq</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <configuration>
+ <createDependencyReducedPom>true</createDependencyReducedPom>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.sf</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.dsa</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ <exclude>META-INF/*.rsa</exclude>
+ <exclude>META-INF/*.EC</exclude>
+ <exclude>META-INF/*.ec</exclude>
+ <exclude>META-INF/MSFTSIG.SF</exclude>
+ <exclude>META-INF/MSFTSIG.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <transformers>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/topology/WordCountTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/topology/WordCountTopology.java b/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/topology/WordCountTopology.java
new file mode 100644
index 0000000..0ce844e
--- /dev/null
+++ b/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/topology/WordCountTopology.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq.topology;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalCluster.LocalTopology;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.rocketmq.RocketMQConfig;
+import org.apache.storm.rocketmq.SpoutConfig;
+import org.apache.storm.rocketmq.bolt.RocketMQBolt;
+import org.apache.storm.rocketmq.common.mapper.FieldNameBasedTupleToMessageMapper;
+import org.apache.storm.rocketmq.common.mapper.TupleToMessageMapper;
+import org.apache.storm.rocketmq.common.selector.DefaultTopicSelector;
+import org.apache.storm.rocketmq.common.selector.TopicSelector;
+import org.apache.storm.rocketmq.spout.RocketMQSpout;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+
+import java.util.Properties;
+
+public class WordCountTopology {
+ private static final String WORD_SPOUT = "WORD_SPOUT";
+ private static final String COUNT_BOLT = "COUNT_BOLT";
+ private static final String INSERT_BOLT = "INSERT_BOLT";
+
+ private static final String CONSUMER_GROUP = "wordcount";
+ private static final String CONSUMER_TOPIC = "wordcountsource";
+
+ public static StormTopology buildTopology(String nameserverAddr, String topic){
+ Properties properties = new Properties();
+ properties.setProperty(SpoutConfig.NAME_SERVER_ADDR, nameserverAddr);
+ properties.setProperty(SpoutConfig.CONSUMER_GROUP, CONSUMER_GROUP);
+ properties.setProperty(SpoutConfig.CONSUMER_TOPIC, CONSUMER_TOPIC);
+
+ RocketMQSpout spout = new RocketMQSpout(properties);
+
+ WordCounter bolt = new WordCounter();
+
+ TupleToMessageMapper mapper = new FieldNameBasedTupleToMessageMapper("word", "count");
+ TopicSelector selector = new DefaultTopicSelector(topic);
+
+ properties = new Properties();
+ properties.setProperty(RocketMQConfig.NAME_SERVER_ADDR, nameserverAddr);
+
+ RocketMQBolt insertBolt = new RocketMQBolt()
+ .withMapper(mapper)
+ .withSelector(selector)
+ .withProperties(properties);
+
+ // wordSpout ==> countBolt ==> insertBolt
+ TopologyBuilder builder = new TopologyBuilder();
+
+ builder.setSpout(WORD_SPOUT, spout, 1);
+ builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT);
+ builder.setBolt(INSERT_BOLT, insertBolt, 1).fieldsGrouping(COUNT_BOLT, new Fields("word"));
+
+ return builder.createTopology();
+ }
+
+ public static void main(String[] args) throws Exception {
+ Config conf = new Config();
+ conf.setMaxSpoutPending(5);
+ if (args.length == 2) {
+ try (LocalCluster cluster = new LocalCluster();
+ LocalTopology topo = cluster.submitTopology("wordCounter", conf, buildTopology(args[0], args[1]));) {
+ Thread.sleep(120 * 1000);
+ }
+ System.exit(0);
+ }
+ else if(args.length == 3) {
+ conf.setNumWorkers(3);
+ StormSubmitter.submitTopology(args[2], conf, buildTopology(args[0], args[1]));
+ } else{
+ System.out.println("Usage: WordCountTopology <nameserver addr> <topic> [topology name]");
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/topology/WordCounter.java
----------------------------------------------------------------------
diff --git a/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/topology/WordCounter.java b/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/topology/WordCounter.java
new file mode 100644
index 0000000..8bc62b9
--- /dev/null
+++ b/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/topology/WordCounter.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq.topology;
+
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.storm.rocketmq.RocketMQUtils;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.IBasicBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class WordCounter implements IBasicBolt {
+ private Map<String, Integer> wordCounter = new HashMap<>();
+
+ public void prepare(Map stormConf, TopologyContext context) {
+
+ }
+
+ public void execute(Tuple input, BasicOutputCollector collector) {
+ List<MessageExt> list = (List<MessageExt>)input.getValueByField("msgs");
+ for (MessageExt messageExt : list) {
+ String word = RocketMQUtils.getUtf8StringBody(messageExt);
+
+ int count;
+ if (wordCounter.containsKey(word)) {
+ count = wordCounter.get(word) + 1;
+ wordCounter.put(word, wordCounter.get(word) + 1);
+ } else {
+ count = 1;
+ }
+
+ wordCounter.put(word, count);
+ collector.emit(new Values(word, count));
+ }
+ }
+
+ public void cleanup() {
+
+ }
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("word", "count"));
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/trident/WordCountTrident.java
----------------------------------------------------------------------
diff --git a/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/trident/WordCountTrident.java b/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/trident/WordCountTrident.java
new file mode 100644
index 0000000..1817d2e
--- /dev/null
+++ b/examples/storm-rocketmq-examples/src/main/java/org/apache/storm/rocketmq/trident/WordCountTrident.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalCluster.LocalTopology;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.rocketmq.RocketMQConfig;
+import org.apache.storm.rocketmq.common.mapper.FieldNameBasedTupleToMessageMapper;
+import org.apache.storm.rocketmq.common.mapper.TupleToMessageMapper;
+import org.apache.storm.rocketmq.common.selector.DefaultTopicSelector;
+import org.apache.storm.rocketmq.common.selector.TopicSelector;
+import org.apache.storm.rocketmq.trident.state.RocketMQState;
+import org.apache.storm.rocketmq.trident.state.RocketMQStateFactory;
+import org.apache.storm.rocketmq.trident.state.RocketMQStateUpdater;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+import java.util.Properties;
+
+public class WordCountTrident {
+
+ public static StormTopology buildTopology(String nameserverAddr, String topic){
+ Fields fields = new Fields("word", "count");
+ FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
+ new Values("storm", 1),
+ new Values("trident", 1),
+ new Values("needs", 1),
+ new Values("javadoc", 1)
+ );
+ spout.setCycle(true);
+
+ TupleToMessageMapper mapper = new FieldNameBasedTupleToMessageMapper("word", "count");
+ TopicSelector selector = new DefaultTopicSelector(topic);
+
+ Properties properties = new Properties();
+ properties.setProperty(RocketMQConfig.NAME_SERVER_ADDR, nameserverAddr);
+
+ RocketMQState.Options options = new RocketMQState.Options()
+ .withMapper(mapper)
+ .withSelector(selector)
+ .withProperties(properties);
+
+ StateFactory factory = new RocketMQStateFactory(options);
+
+ TridentTopology topology = new TridentTopology();
+ Stream stream = topology.newStream("spout1", spout);
+
+ stream.partitionPersist(factory, fields,
+ new RocketMQStateUpdater(), new Fields());
+
+ return topology.build();
+ }
+
+ public static void main(String[] args) throws Exception {
+ Config conf = new Config();
+ conf.setMaxSpoutPending(5);
+ if (args.length == 2) {
+ try (LocalCluster cluster = new LocalCluster();
+ LocalTopology topo = cluster.submitTopology("wordCounter", conf, buildTopology(args[0], args[1]));) {
+ Thread.sleep(60 * 1000);
+ }
+ System.exit(0);
+ }
+ else if(args.length == 3) {
+ conf.setNumWorkers(3);
+ StormSubmitter.submitTopology(args[2], conf, buildTopology(args[0], args[1]));
+ } else{
+ System.out.println("Usage: WordCountTrident <nameserver addr> <topic> [topology name]");
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/external/storm-mongodb/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/pom.xml b/external/storm-mongodb/pom.xml
index 6aaeab6..52d1cdd 100644
--- a/external/storm-mongodb/pom.xml
+++ b/external/storm-mongodb/pom.xml
@@ -44,7 +44,7 @@
<groupId>org.apache.storm</groupId>
<artifactId>storm-client</artifactId>
<version>${project.version}</version>
- <scope>provided</scope>
+ <scope>${provided.scope}</scope>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/external/storm-rocketmq/README.md
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/README.md b/external/storm-rocketmq/README.md
new file mode 100644
index 0000000..160118a
--- /dev/null
+++ b/external/storm-rocketmq/README.md
@@ -0,0 +1,118 @@
+#Storm RocketMQ
+
+Storm/Trident integration for [RocketMQ](https://rocketmq.incubator.apache.org/). This package includes the core spout, bolt and trident states that allows a storm topology to either write storm tuples into a topic or read from topics in a storm topology.
+
+
+## Read from Topic
+The spout included in this package for reading data from a topic.
+
+### RocketMQSpout
+To use the `RocketMQSpout`, you construct an instance of it by specifying a Properties instance which including rocketmq configs.
+RocketMQSpout uses RocketMQ MQPushConsumer as the default implementation. PushConsumer is a high level consumer API, wrapping the pulling details. Looks like broker push messages to consumer.
+RocketMQSpout will retry 3(use `SpoutConfig.DEFAULT_MESSAGES_MAX_RETRY` to change the value) times when messages are failed.
+
+ ```java
+ Properties properties = new Properties();
+ properties.setProperty(SpoutConfig.NAME_SERVER_ADDR, nameserverAddr);
+ properties.setProperty(SpoutConfig.CONSUMER_GROUP, group);
+ properties.setProperty(SpoutConfig.CONSUMER_TOPIC, topic);
+
+ RocketMQSpout spout = new RocketMQSpout(properties);
+ ```
+
+
+## Write into Topic
+The bolt and trident state included in this package for write data into a topic.
+
+### TupleToMessageMapper
+The main API for mapping Storm tuple to a RocketMQ Message is the `org.apache.storm.rocketmq.common.mapper.TupleToMessageMapper` interface:
+
+```java
+public interface TupleToMessageMapper extends Serializable {
+ String getKeyFromTuple(ITuple tuple);
+ byte[] getValueFromTuple(ITuple tuple);
+}
+```
+
+### FieldNameBasedTupleToMessageMapper
+`storm-rocketmq` includes a general purpose `TupleToMessageMapper` implementation called `FieldNameBasedTupleToMessageMapper`.
+
+### TopicSelector
+The main API for selecting topic and tags is the `org.apache.storm.rocketmq.common.selector.TopicSelector` interface:
+
+```java
+public interface TopicSelector extends Serializable {
+ String getTopic(ITuple tuple);
+ String getTag(ITuple tuple);
+}
+```
+
+### DefaultTopicSelector/FieldNameBasedTopicSelector
+`storm-rocketmq` includes general purpose `TopicSelector` implementations called `DefaultTopicSelector` and `FieldNameBasedTopicSelector`.
+
+
+### RocketMQBolt
+To use the `RocketMQBolt`, you construct an instance of it by specifying TupleToMessageMapper, TopicSelector and Properties instances.
+RocketMQBolt send messages async by default. You can change this by invoking `withAsync(false)`.
+
+ ```java
+ TupleToMessageMapper mapper = new FieldNameBasedTupleToMessageMapper("word", "count");
+ TopicSelector selector = new DefaultTopicSelector(topic);
+
+ properties = new Properties();
+ properties.setProperty(RocketMQConfig.NAME_SERVER_ADDR, nameserverAddr);
+
+ RocketMQBolt insertBolt = new RocketMQBolt()
+ .withMapper(mapper)
+ .withSelector(selector)
+ .withProperties(properties);
+ ```
+
+### Trident State
+We support trident persistent state that can be used with trident topologies. To create a RocketMQ persistent trident state you need to initialize it with the TupleToMessageMapper, TopicSelector, Properties instances. See the example below:
+
+ ```java
+ TupleToMessageMapper mapper = new FieldNameBasedTupleToMessageMapper("word", "count");
+ TopicSelector selector = new DefaultTopicSelector(topic);
+
+ Properties properties = new Properties();
+ properties.setProperty(RocketMQConfig.NAME_SERVER_ADDR, nameserverAddr);
+
+ RocketMQState.Options options = new RocketMQState.Options()
+ .withMapper(mapper)
+ .withSelector(selector)
+ .withProperties(properties);
+
+ StateFactory factory = new RocketMQStateFactory(options);
+
+ TridentTopology topology = new TridentTopology();
+ Stream stream = topology.newStream("spout1", spout);
+
+ stream.partitionPersist(factory, fields,
+ new RocketMQStateUpdater(), new Fields());
+ ```
+
+
+## License
+
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+
+## Committer Sponsors
+
+ * Xin Wang ([xinwang@apache.org](mailto:xinwang@apache.org))
+
http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/external/storm-rocketmq/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/pom.xml b/external/storm-rocketmq/pom.xml
new file mode 100644
index 0000000..6068988
--- /dev/null
+++ b/external/storm-rocketmq/pom.xml
@@ -0,0 +1,71 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>storm</artifactId>
+ <groupId>org.apache.storm</groupId>
+ <version>2.0.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>storm-rocketmq</artifactId>
+ <name>storm-rocketmq</name>
+
+ <packaging>jar</packaging>
+
+ <developers>
+ <developer>
+ <id>vesense</id>
+ <name>Xin Wang</name>
+ <email>xinwang@apache.org</email>
+ </developer>
+ </developers>
+
+ <properties>
+ <rocketmq.version>4.0.0-incubating</rocketmq.version>
+ </properties>
+
+ <dependencies>
+ <!--parent module dependency-->
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-client</artifactId>
+ <version>${project.version}</version>
+ <scope>${provided.scope}</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-client</artifactId>
+ <version>${rocketmq.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ </dependency>
+ <!--test dependencies -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/DefaultMessageBodySerializer.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/DefaultMessageBodySerializer.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/DefaultMessageBodySerializer.java
new file mode 100644
index 0000000..5e7e314
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/DefaultMessageBodySerializer.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq;
+
+import java.nio.charset.StandardCharsets;
+
+public class DefaultMessageBodySerializer implements MessageBodySerializer {
+
+ /**
+ * Currently, we just convert string to bytes using UTF-8 charset.
+ * Note: in this way, object.toString() method is invoked.
+ * @param body RocketMQ Message body
+ * @return
+ */
+ @Override
+ public byte[] serialize(Object body) {
+ if (body == null) {
+ return null;
+ }
+ return body.toString().getBytes(StandardCharsets.UTF_8);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/DefaultMessageRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/DefaultMessageRetryManager.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/DefaultMessageRetryManager.java
new file mode 100644
index 0000000..9d540d9
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/DefaultMessageRetryManager.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq;
+
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * An implementation of MessageRetryManager
+ */
+public class DefaultMessageRetryManager implements MessageRetryManager{
+ private Map<String,MessageSet> cache = new ConcurrentHashMap<>(500);
+ private BlockingQueue<MessageSet> queue;
+ private int maxRetry;
+ private int ttl;
+
+ public DefaultMessageRetryManager(BlockingQueue<MessageSet> queue, int maxRetry, int ttl) {
+ this.queue = queue;
+ this.maxRetry = maxRetry;
+ this.ttl = ttl;
+
+ long period = 5000;
+ new Timer().scheduleAtFixedRate(new TimerTask() {
+ @Override
+ public void run() {
+ long now = System.currentTimeMillis();
+ for (Map.Entry<String, MessageSet> entry : cache.entrySet()) {
+ String id = entry.getKey();
+ MessageSet messageSet = entry.getValue();
+ if (now - messageSet.getTimestamp() >= ttl) { // no ack/fail received in ttl
+ fail(id);
+ }
+ }
+ }
+ }, period, period);
+ }
+
+ public void ack(String id) {
+ cache.remove(id);
+ }
+
+ public void fail(String id) {
+ MessageSet messageSet = cache.remove(id);
+ if (messageSet == null) {
+ return;
+ }
+
+ if (needRetry(messageSet)) {
+ messageSet.setRetries(messageSet.getRetries() + 1);
+ messageSet.setTimestamp(0);
+ queue.offer(messageSet);
+ }
+ }
+
+ public void mark(MessageSet messageSet) {
+ messageSet.setTimestamp(System.currentTimeMillis());
+ cache.put(messageSet.getId(), messageSet);
+ }
+
+ public boolean needRetry(MessageSet messageSet) {
+ return messageSet.getRetries() < maxRetry;
+ }
+
+ // just for testing
+ public void setCache(Map<String,MessageSet> cache) {
+ this.cache = cache;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageBodySerializer.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageBodySerializer.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageBodySerializer.java
new file mode 100644
index 0000000..f86ea26
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageBodySerializer.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq;
+
+import java.io.Serializable;
+
+/**
+ * RocketMQ message body serializer
+ */
+public interface MessageBodySerializer extends Serializable{
+ byte[] serialize(Object body);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageRetryManager.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageRetryManager.java
new file mode 100644
index 0000000..18fd903
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageRetryManager.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq;
+
+/**
+ * Interface for messages retry manager
+ */
+public interface MessageRetryManager {
+ /**
+ * Remove from the cache. Message with the id is successful.
+ * @param id
+ */
+ void ack(String id);
+
+ /**
+ * Remove from the cache. Message with the id is failed.
+ * Invoke retry logics if necessary.
+ * @param id
+ */
+ void fail(String id);
+
+ /**
+ * Mark messageSet in the cache.
+ * @param messageSet
+ */
+ void mark(MessageSet messageSet);
+
+ /**
+ * Whether the messageSet need retry.
+ * @param messageSet
+ * @return
+ */
+ boolean needRetry(MessageSet messageSet);
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageSet.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageSet.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageSet.java
new file mode 100644
index 0000000..7307271
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/MessageSet.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq;
+
+import org.apache.rocketmq.common.message.MessageExt;
+
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * A message collection.
+ */
+public class MessageSet {
+ private String id;
+ private List<MessageExt> data;
+ private long timestamp;
+ private int retries;
+
+ public MessageSet(String id, List<MessageExt> data) {
+ this.id = id;
+ this.data = data;
+ }
+
+ public MessageSet(List<MessageExt> data) {
+ this(UUID.randomUUID().toString(), data);
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public List<MessageExt> getData() {
+ return data;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ public int getRetries() {
+ return retries;
+ }
+
+ public void setRetries(int retries) {
+ this.retries = retries;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQConfig.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQConfig.java
new file mode 100644
index 0000000..082822c
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQConfig.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq;
+
+import org.apache.commons.lang.Validate;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.storm.task.TopologyContext;
+
+import java.util.Properties;
+import java.util.UUID;
+
+import static org.apache.storm.rocketmq.RocketMQUtils.getInteger;
+
+/**
+ * RocketMQConfig for Consumer/Producer
+ */
+public class RocketMQConfig {
+ // common
+ public static final String NAME_SERVER_ADDR = "nameserver.addr"; // Required
+
+ public static final String CLIENT_NAME = "client.name";
+
+ public static final String CLIENT_IP = "client.ip";
+ public static final String DEFAULT_CLIENT_IP = RemotingUtil.getLocalAddress();
+
+ public static final String CLIENT_CALLBACK_EXECUTOR_THREADS = "client.callback.executor.threads";
+ public static final int DEFAULT_CLIENT_CALLBACK_EXECUTOR_THREADS = Runtime.getRuntime().availableProcessors();;
+
+ public static final String NAME_SERVER_POLL_INTERVAL = "nameserver.poll.interval";
+ public static final int DEFAULT_NAME_SERVER_POLL_INTERVAL = 30000; // 30 seconds
+
+ public static final String BROKER_HEART_BEAT_INTERVAL = "brokerserver.heartbeat.interval";
+ public static final int DEFAULT_BROKER_HEART_BEAT_INTERVAL = 30000; // 30 seconds
+
+
+ // producer
+ public static final String PRODUCER_GROUP = "producer.group";
+
+ public static final String PRODUCER_RETRY_TIMES = "producer.retry.times";
+ public static final int DEFAULT_PRODUCER_RETRY_TIMES = 2;
+
+ public static final String PRODUCER_TIMEOUT = "producer.timeout";
+ public static final int DEFAULT_PRODUCER_TIMEOUT = 3000; // 3 seconds
+
+
+ // consumer
+ public static final String CONSUMER_GROUP = "consumer.group"; // Required
+
+ public static final String CONSUMER_TOPIC = "consumer.topic"; // Required
+
+ public static final String CONSUMER_TAG = "consumer.tag";
+ public static final String DEFAULT_TAG = "*";
+
+ public static final String CONSUMER_OFFSET_RESET_TO = "consumer.offset.reset.to";
+ public static final String CONSUMER_OFFSET_LATEST = "latest";
+ public static final String CONSUMER_OFFSET_EARLIEST = "earliest";
+ public static final String CONSUMER_OFFSET_TIMESTAMP = "timestamp";
+
+ public static final String CONSUMER_MESSAGES_ORDERLY = "consumer.messages.orderly";
+
+ public static final String CONSUMER_OFFSET_PERSIST_INTERVAL = "consumer.offset.persist.interval";
+ public static final int DEFAULT_CONSUMER_OFFSET_PERSIST_INTERVAL = 5000; // 5 seconds
+
+ public static final String CONSUMER_MIN_THREADS = "consumer.min.threads";
+ public static final int DEFAULT_CONSUMER_MIN_THREADS = 20;
+
+ public static final String CONSUMER_MAX_THREADS = "consumer.max.threads";
+ public static final int DEFAULT_CONSUMER_MAX_THREADS = 64;
+
+
+ public static void buildProducerConfigs(Properties props, DefaultMQProducer producer, TopologyContext context) {
+ buildCommonConfigs(props, producer, context);
+
+ // According to the RocketMQ official docs, "only one instance is allowed per producer group"
+ // So, we use taskID/UUID as the producer group by default
+ String defaultGroup;
+ if (context != null) {
+ defaultGroup = String.valueOf(context.getThisTaskId());
+ } else {
+ defaultGroup = UUID.randomUUID().toString();
+ }
+ producer.setProducerGroup(props.getProperty(PRODUCER_GROUP, defaultGroup));
+
+ producer.setRetryTimesWhenSendFailed(getInteger(props,
+ PRODUCER_RETRY_TIMES, DEFAULT_PRODUCER_RETRY_TIMES));
+ producer.setRetryTimesWhenSendAsyncFailed(getInteger(props,
+ PRODUCER_RETRY_TIMES, DEFAULT_PRODUCER_RETRY_TIMES));
+ producer.setSendMsgTimeout(getInteger(props,
+ PRODUCER_TIMEOUT, DEFAULT_PRODUCER_TIMEOUT));
+ }
+
+ public static void buildConsumerConfigs(Properties props, DefaultMQPushConsumer consumer, TopologyContext context) {
+ buildCommonConfigs(props, consumer, context);
+
+ String group = props.getProperty(CONSUMER_GROUP);
+ Validate.notEmpty(group);
+ consumer.setConsumerGroup(group);
+
+ consumer.setPersistConsumerOffsetInterval(getInteger(props,
+ CONSUMER_OFFSET_PERSIST_INTERVAL, DEFAULT_CONSUMER_OFFSET_PERSIST_INTERVAL));
+ consumer.setConsumeThreadMin(getInteger(props,
+ CONSUMER_MIN_THREADS, DEFAULT_CONSUMER_MIN_THREADS));
+ consumer.setConsumeThreadMax(getInteger(props,
+ CONSUMER_MAX_THREADS, DEFAULT_CONSUMER_MAX_THREADS));
+
+ String initOffset = props.getProperty(CONSUMER_OFFSET_RESET_TO, CONSUMER_OFFSET_LATEST);
+ switch (initOffset) {
+ case CONSUMER_OFFSET_EARLIEST:
+ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+ break;
+ case CONSUMER_OFFSET_LATEST:
+ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
+ break;
+ case CONSUMER_OFFSET_TIMESTAMP:
+ consumer.setConsumeTimestamp(initOffset);
+ break;
+ default:
+ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
+ }
+
+ String topic = props.getProperty(CONSUMER_TOPIC);
+ Validate.notEmpty(topic);
+ try {
+ consumer.subscribe(topic, props.getProperty(CONSUMER_TAG, DEFAULT_TAG));
+ } catch (MQClientException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ public static void buildCommonConfigs(Properties props, ClientConfig client, TopologyContext context) {
+ String namesvr = props.getProperty(NAME_SERVER_ADDR);
+ Validate.notEmpty(namesvr);
+ client.setNamesrvAddr(namesvr);
+
+ client.setClientIP(props.getProperty(CLIENT_IP, DEFAULT_CLIENT_IP));
+ // use taskID/UUID for client name by default
+ String defaultClientName;
+ if (context != null) {
+ defaultClientName = String.valueOf(context.getThisTaskId());
+ } else {
+ defaultClientName = UUID.randomUUID().toString();
+ }
+ client.setInstanceName(props.getProperty(CLIENT_NAME, defaultClientName));
+
+ client.setClientCallbackExecutorThreads(getInteger(props,
+ CLIENT_CALLBACK_EXECUTOR_THREADS, DEFAULT_CLIENT_CALLBACK_EXECUTOR_THREADS));
+ client.setPollNameServerInteval(getInteger(props,
+ NAME_SERVER_POLL_INTERVAL, DEFAULT_NAME_SERVER_POLL_INTERVAL));
+ client.setHeartbeatBrokerInterval(getInteger(props,
+ BROKER_HEART_BEAT_INTERVAL, DEFAULT_BROKER_HEART_BEAT_INTERVAL));
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQUtils.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQUtils.java
new file mode 100644
index 0000000..dbe6b12
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/RocketMQUtils.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq;
+
+import org.apache.rocketmq.common.message.Message;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Properties;
+
+public final class RocketMQUtils {
+
+ public static int getInteger(Properties props, String key, int defaultValue) {
+ return Integer.parseInt(props.getProperty(key, String.valueOf(defaultValue)));
+ }
+
+ public static boolean getBoolean(Properties props, String key, boolean defaultValue) {
+ return Boolean.parseBoolean(props.getProperty(key, String.valueOf(defaultValue)));
+ }
+
+ public static String getUtf8StringBody(Message message) {
+ if (message == null) {
+ return null;
+ }
+ return new String(message.getBody(), StandardCharsets.UTF_8);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/SpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/SpoutConfig.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/SpoutConfig.java
new file mode 100644
index 0000000..53ce152
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/SpoutConfig.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq;
+
+public class SpoutConfig extends RocketMQConfig {
+ public static final String QUEUE_SIZE = "spout.queue.size";
+
+ public static final String DECLARE_FIELDS = "spout.declare.fields";
+ public static final String DEFAULT_DECLARE_FIELDS = "msgs";
+
+ public static final String MESSAGES_MAX_RETRY = "spout.messages.max.retry";
+ public static final int DEFAULT_MESSAGES_MAX_RETRY = 3;
+
+ public static final String MESSAGES_TTL = "spout.messages.ttl";
+ public static final int DEFAULT_MESSAGES_TTL = 300000; // 5min
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/bolt/RocketMQBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/bolt/RocketMQBolt.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/bolt/RocketMQBolt.java
new file mode 100644
index 0000000..d55babe
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/bolt/RocketMQBolt.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq.bolt;
+
+import org.apache.commons.lang.Validate;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.MQProducer;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.storm.rocketmq.RocketMQConfig;
+import org.apache.storm.rocketmq.common.mapper.TupleToMessageMapper;
+import org.apache.storm.rocketmq.common.selector.TopicSelector;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Properties;
+
+public class RocketMQBolt implements IRichBolt {
+ private static final Logger LOG = LoggerFactory.getLogger(RocketMQBolt.class);
+
+ private MQProducer producer;
+ private OutputCollector collector;
+ private boolean async = true;
+ private TopicSelector selector;
+ private TupleToMessageMapper mapper;
+ private Properties properties;
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ Validate.notEmpty(properties, "Producer properties can not be empty");
+
+ producer = new DefaultMQProducer();
+ RocketMQConfig.buildProducerConfigs(properties, (DefaultMQProducer)producer, context);
+
+ try {
+ producer.start();
+ } catch (MQClientException e) {
+ throw new RuntimeException(e);
+ }
+ this.collector = collector;
+
+ Validate.notNull(selector, "TopicSelector can not be null");
+ Validate.notNull(mapper, "TupleToMessageMapper can not be null");
+ }
+
+ public RocketMQBolt withSelector(TopicSelector selector) {
+ this.selector = selector;
+ return this;
+ }
+
+ public RocketMQBolt withMapper(TupleToMessageMapper mapper) {
+ this.mapper = mapper;
+ return this;
+ }
+
+ public RocketMQBolt withAsync(boolean async) {
+ this.async = async;
+ return this;
+ }
+
+ public RocketMQBolt withProperties(Properties properties) {
+ this.properties = properties;
+ return this;
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ // Mapping: from storm tuple -> rocketmq Message
+ String topic = selector.getTopic(input);
+ String tag = selector.getTag(input);
+ String key = mapper.getKeyFromTuple(input);
+ byte[] value = mapper.getValueFromTuple(input);
+
+ if (topic == null) {
+ LOG.warn("skipping Message with Key = " + key + ", topic selector returned null.");
+ collector.ack(input);
+ return;
+ }
+
+ Message msg = new Message(topic,tag, key, value);
+
+ try {
+ if (async) {
+ // async sending
+ producer.send(msg, new SendCallback() {
+ @Override
+ public void onSuccess(SendResult sendResult) {
+ collector.ack(input);
+ }
+
+ @Override
+ public void onException(Throwable throwable) {
+ if (throwable != null) {
+ collector.reportError(throwable);
+ collector.fail(input);
+ }
+ }
+ });
+ } else {
+ // sync sending, will return a SendResult
+ producer.send(msg);
+ collector.ack(input);
+ }
+ } catch (Exception e) {
+ collector.reportError(e);
+ collector.fail(input);
+ }
+
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+
+ @Override
+ public void cleanup() {
+ producer.shutdown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/mapper/FieldNameBasedTupleToMessageMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/mapper/FieldNameBasedTupleToMessageMapper.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/mapper/FieldNameBasedTupleToMessageMapper.java
new file mode 100644
index 0000000..622cbdb
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/mapper/FieldNameBasedTupleToMessageMapper.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq.common.mapper;
+
+import org.apache.storm.rocketmq.DefaultMessageBodySerializer;
+import org.apache.storm.rocketmq.MessageBodySerializer;
+import org.apache.storm.tuple.ITuple;
+
+import java.nio.charset.StandardCharsets;
+
+public class FieldNameBasedTupleToMessageMapper implements TupleToMessageMapper {
+ public static final String BOLT_KEY = "key";
+ public static final String BOLT_MESSAGE = "message";
+ public String boltKeyField;
+ public String boltMessageField;
+ private MessageBodySerializer messageBodySerializer;
+
+ public FieldNameBasedTupleToMessageMapper() {
+ this(BOLT_KEY, BOLT_MESSAGE);
+ }
+
+ public FieldNameBasedTupleToMessageMapper(String boltKeyField, String boltMessageField) {
+ this.boltKeyField = boltKeyField;
+ this.boltMessageField = boltMessageField;
+ this.messageBodySerializer = new DefaultMessageBodySerializer();
+ }
+
+ @Override
+ public String getKeyFromTuple(ITuple tuple) {
+ return tuple.getStringByField(boltKeyField);
+ }
+
+ @Override
+ public byte[] getValueFromTuple(ITuple tuple) {
+ Object obj = tuple.getValueByField(boltMessageField);
+ if (obj == null) {
+ return null;
+ }
+ return messageBodySerializer.serialize(obj);
+ }
+
+ /**
+ * using this method can override the default MessageBodySerializer
+ * @param serializer
+ * @return
+ */
+ public FieldNameBasedTupleToMessageMapper withMessageBodySerializer(MessageBodySerializer serializer) {
+ this.messageBodySerializer = serializer;
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/mapper/TupleToMessageMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/mapper/TupleToMessageMapper.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/mapper/TupleToMessageMapper.java
new file mode 100644
index 0000000..84ff4a2
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/mapper/TupleToMessageMapper.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq.common.mapper;
+
+import org.apache.storm.tuple.ITuple;
+
+import java.io.Serializable;
+
+/**
+ * Interface defining a mapping from storm tuple to rocketmq key and message.
+ */
+public interface TupleToMessageMapper extends Serializable {
+ String getKeyFromTuple(ITuple tuple);
+ byte[] getValueFromTuple(ITuple tuple);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/DefaultTopicSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/DefaultTopicSelector.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/DefaultTopicSelector.java
new file mode 100644
index 0000000..5332036
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/DefaultTopicSelector.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq.common.selector;
+
+import org.apache.storm.rocketmq.RocketMQConfig;
+import org.apache.storm.tuple.ITuple;
+
+public class DefaultTopicSelector implements TopicSelector {
+ private final String topicName;
+ private final String tagName;
+
+ public DefaultTopicSelector(final String topicName, final String tagName) {
+ this.topicName = topicName;
+ this.tagName = tagName;
+ }
+
+ public DefaultTopicSelector(final String topicName) {
+ this(topicName, RocketMQConfig.DEFAULT_TAG);
+ }
+
+ @Override
+ public String getTopic(ITuple tuple) {
+ return topicName;
+ }
+
+ @Override
+ public String getTag(ITuple tuple) {
+ return tagName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/FieldNameBasedTopicSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/FieldNameBasedTopicSelector.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/FieldNameBasedTopicSelector.java
new file mode 100644
index 0000000..60865c1
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/FieldNameBasedTopicSelector.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq.common.selector;
+
+import org.apache.storm.tuple.ITuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Uses field name to select topic and tag name from tuple.
+ */
+public class FieldNameBasedTopicSelector implements TopicSelector {
+ private static final Logger LOG = LoggerFactory.getLogger(FieldNameBasedTopicSelector.class);
+
+ private final String topicFieldName;
+ private final String defaultTopicName;
+
+ private final String tagFieldName;
+ private final String defaultTagName;
+
+
+ public FieldNameBasedTopicSelector(String topicFieldName, String defaultTopicName, String tagFieldName, String defaultTagName) {
+ this.topicFieldName = topicFieldName;
+ this.defaultTopicName = defaultTopicName;
+ this.tagFieldName = tagFieldName;
+ this.defaultTagName = defaultTagName;
+ }
+
+ @Override
+ public String getTopic(ITuple tuple) {
+ if (tuple.contains(topicFieldName)) {
+ return tuple.getStringByField(topicFieldName);
+ } else {
+ LOG.warn("Field {} Not Found. Returning default topic {}", topicFieldName, defaultTopicName);
+ return defaultTopicName;
+ }
+ }
+
+ @Override
+ public String getTag(ITuple tuple) {
+ if (tuple.contains(tagFieldName)) {
+ return tuple.getStringByField(tagFieldName);
+ } else {
+ LOG.warn("Field {} Not Found. Returning default tag {}", tagFieldName, defaultTagName);
+ return defaultTagName;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/TopicSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/TopicSelector.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/TopicSelector.java
new file mode 100644
index 0000000..f33d4a6
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/common/selector/TopicSelector.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq.common.selector;
+
+import org.apache.storm.tuple.ITuple;
+
+import java.io.Serializable;
+
+public interface TopicSelector extends Serializable {
+ String getTopic(ITuple tuple);
+ String getTag(ITuple tuple);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMQSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMQSpout.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMQSpout.java
new file mode 100644
index 0000000..c8a0802
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMQSpout.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq.spout;
+
+import org.apache.commons.lang.Validate;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.MQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.storm.Config;
+import org.apache.storm.rocketmq.DefaultMessageRetryManager;
+import org.apache.storm.rocketmq.MessageRetryManager;
+import org.apache.storm.rocketmq.MessageSet;
+import org.apache.storm.rocketmq.RocketMQConfig;
+import org.apache.storm.rocketmq.SpoutConfig;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.apache.storm.rocketmq.RocketMQUtils.getBoolean;
+import static org.apache.storm.rocketmq.RocketMQUtils.getInteger;
+
+/**
+ * RocketMQSpout uses MQPushConsumer as the default implementation.
+ * PushConsumer is a high level consumer API, wrapping the pulling details
+ * Looks like broker push messages to consumer
+ */
+public class RocketMQSpout implements IRichSpout {
+ // TODO add metrics
+
+ private MQPushConsumer consumer;
+ private SpoutOutputCollector collector;
+ private BlockingQueue<MessageSet> queue;
+
+ private Properties properties;
+ private MessageRetryManager messageRetryManager;
+
+ public RocketMQSpout(Properties properties) {
+ this.properties = properties;
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ Validate.notEmpty(properties, "Consumer properties can not be empty");
+ boolean ordered = getBoolean(properties, RocketMQConfig.CONSUMER_MESSAGES_ORDERLY, false);
+
+ int queueSize = getInteger(properties, SpoutConfig.QUEUE_SIZE, Integer.parseInt(conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING).toString()));
+ queue = new LinkedBlockingQueue<>(queueSize);
+
+ consumer = new DefaultMQPushConsumer();
+ RocketMQConfig.buildConsumerConfigs(properties, (DefaultMQPushConsumer)consumer, context);
+
+ if (ordered) {
+ consumer.registerMessageListener(new MessageListenerOrderly() {
+ @Override
+ public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
+ ConsumeOrderlyContext context) {
+ if (process(msgs)) {
+ return ConsumeOrderlyStatus.SUCCESS;
+ } else {
+ return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
+ }
+ }
+ });
+ } else {
+ consumer.registerMessageListener(new MessageListenerConcurrently() {
+ @Override
+ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
+ ConsumeConcurrentlyContext context) {
+ if (process(msgs)) {
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ } else {
+ return ConsumeConcurrentlyStatus.RECONSUME_LATER;
+ }
+ }
+ });
+ }
+
+ try {
+ consumer.start();
+ } catch (MQClientException e) {
+ throw new RuntimeException(e);
+ }
+
+ int maxRetry = getInteger(properties, SpoutConfig.MESSAGES_MAX_RETRY, SpoutConfig.DEFAULT_MESSAGES_MAX_RETRY);
+ int ttl = getInteger(properties, SpoutConfig.MESSAGES_TTL, SpoutConfig.DEFAULT_MESSAGES_TTL);
+ this.messageRetryManager = new DefaultMessageRetryManager(queue, maxRetry, ttl);
+ this.collector = collector;
+ }
+
+ public boolean process(List<MessageExt> msgs) {
+ if (msgs.isEmpty()) {
+ return true;
+ }
+ MessageSet messageSet = new MessageSet(msgs);
+ // returning true upon success and false if this queue is full.
+ return queue.offer(messageSet);
+ }
+
+ @Override
+ public void nextTuple() {
+ MessageSet messageSet = queue.poll();
+ if (messageSet == null) {
+ return;
+ }
+
+ messageRetryManager.mark(messageSet);
+ collector.emit(new Values(messageSet.getData()), messageSet.getId());
+ }
+
+ @Override
+ public void ack(Object msgId) {
+ String id = msgId.toString();
+ messageRetryManager.ack(id);
+ }
+
+ @Override
+ public void fail(Object msgId) {
+ String id = msgId.toString();
+ messageRetryManager.fail(id);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields(properties.getProperty(SpoutConfig.DECLARE_FIELDS, SpoutConfig.DEFAULT_DECLARE_FIELDS)));
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+
+ @Override
+ public void close() {
+ consumer.shutdown();
+ }
+
+ @Override
+ public void activate() {
+ consumer.resume();
+ }
+
+ @Override
+ public void deactivate() {
+ consumer.suspend();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQState.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQState.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQState.java
new file mode 100644
index 0000000..7e5c078
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQState.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq.trident.state;
+
+import org.apache.commons.lang.Validate;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.MQProducer;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.storm.rocketmq.RocketMQConfig;
+import org.apache.storm.rocketmq.common.mapper.TupleToMessageMapper;
+import org.apache.storm.rocketmq.common.selector.TopicSelector;
+import org.apache.storm.topology.FailedException;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+public class RocketMQState implements State {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RocketMQState.class);
+
+ private Options options;
+ private MQProducer producer;
+
+ protected RocketMQState(Map map, Options options) {
+ this.options = options;
+ }
+
+ public static class Options implements Serializable {
+ private TopicSelector selector;
+ private TupleToMessageMapper mapper;
+ private Properties properties;
+
+ public Options withSelector(TopicSelector selector) {
+ this.selector = selector;
+ return this;
+ }
+
+ public Options withMapper(TupleToMessageMapper mapper) {
+ this.mapper = mapper;
+ return this;
+ }
+
+ public Options withProperties(Properties properties) {
+ this.properties = properties;
+ return this;
+ }
+ }
+
+ protected void prepare() {
+ Validate.notEmpty(options.properties, "Producer properties can not be empty");
+
+ producer = new DefaultMQProducer();
+ RocketMQConfig.buildProducerConfigs(options.properties, (DefaultMQProducer)producer, null);
+
+ try {
+ producer.start();
+ } catch (MQClientException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void beginCommit(Long txid) {
+ LOG.debug("beginCommit is noop.");
+ }
+
+ @Override
+ public void commit(Long txid) {
+ LOG.debug("commit is noop.");
+ }
+
+ public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
+ try {
+ for (TridentTuple tuple : tuples) {
+ String topic = options.selector.getTopic(tuple);
+ String tag = options.selector.getTag(tuple);
+ String key = options.mapper.getKeyFromTuple(tuple);
+ byte[] value = options.mapper.getValueFromTuple(tuple);
+
+ if (topic == null) {
+ LOG.warn("skipping Message with Key = " + key + ", topic selector returned null.");
+ continue;
+ }
+
+ Message msg = new Message(topic,tag, key, value);
+ this.producer.send(msg);
+ }
+ } catch (Exception e) {
+ LOG.warn("Batch write failed but some requests might have succeeded. Triggering replay.", e);
+ throw new FailedException(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQStateFactory.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQStateFactory.java
new file mode 100644
index 0000000..82eb013
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQStateFactory.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq.trident.state;
+
+import org.apache.storm.task.IMetricsContext;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.state.StateFactory;
+
+import java.util.Map;
+
+public class RocketMQStateFactory implements StateFactory {
+
+ private RocketMQState.Options options;
+
+ public RocketMQStateFactory(RocketMQState.Options options) {
+ this.options = options;
+ }
+
+ @Override
+ public State makeState(Map conf, IMetricsContext metrics,
+ int partitionIndex, int numPartitions) {
+ RocketMQState state = new RocketMQState(conf, options);
+ state.prepare();
+ return state;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4a4a8d2e/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQStateUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQStateUpdater.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQStateUpdater.java
new file mode 100644
index 0000000..a548ce8
--- /dev/null
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMQStateUpdater.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq.trident.state;
+
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.BaseStateUpdater;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.util.List;
+
+public class RocketMQStateUpdater extends BaseStateUpdater<RocketMQState> {
+
+ @Override
+ public void updateState(RocketMQState state, List<TridentTuple> tuples,
+ TridentCollector collector) {
+ state.updateState(tuples, collector);
+ }
+
+}