You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ar...@apache.org on 2016/06/07 06:36:58 UTC

[2/3] storm git commit: STORM-1878: Added a Flux example using a stateful bolt

STORM-1878: Added a Flux example using a stateful bolt

The example resumes word counting even over topology restarts if the state is persisted in Redis.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f909ce91
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f909ce91
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f909ce91

Branch: refs/heads/1.x-branch
Commit: f909ce91520448a399e70c4ba2fba18d5276ef2e
Parents: 3722b93
Author: Daniel Klessing <da...@iqser.com>
Authored: Fri Jun 3 13:17:29 2016 +0200
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Tue Jun 7 12:04:54 2016 +0530

----------------------------------------------------------------------
 external/flux/flux-examples/README.md           | 18 ++++++
 .../flux/examples/StatefulWordCounter.java      | 64 ++++++++++++++++++++
 .../resources/simple_stateful_wordcount.yaml    | 60 ++++++++++++++++++
 3 files changed, 142 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/f909ce91/external/flux/flux-examples/README.md
----------------------------------------------------------------------
diff --git a/external/flux/flux-examples/README.md b/external/flux/flux-examples/README.md
index a6afec2..3d610b4 100644
--- a/external/flux/flux-examples/README.md
+++ b/external/flux/flux-examples/README.md
@@ -39,6 +39,7 @@ Another wordcount example that uses a spout written in JavaScript (node.js), a b
 written in java.
 
 ### [kafka_spout.yaml](src/main/resources/kafka_spout.yaml)
+
 This example illustrates how to configure Storm's `storm-kafka` spout using Flux YAML DSL `components`, `references`,
 and `constructor arguments` constructs.
 
@@ -64,6 +65,7 @@ To run the `simple_hbase.yaml` example, copy the `hbase_bolt.properties` file to
 ```bash
 storm jar ./target/flux-examples-*.jar org.apache.storm.flux.Flux --local ./src/main/resources/simple_hbase.yaml --filter my_hbase_bolt.properties
 ```
+
 ### [simple_windowing.yaml](src/main/resources/simple_windowing.yaml)
 
 This example illustrates how to use Flux to set up a storm topology that contains windowing operations.
@@ -73,3 +75,19 @@ To run,
 ```bash
 storm jar ./target/flux-examples-*.jar org.apache.storm.flux.Flux --local ./src/main/resources/simple_windowing.yaml
 ```
+
+### [simple_stateful_wordcount.yaml](src/main/resources/simple_stateful_wordcount.yaml)
+
+Flux also supports stateful bolts which is illustrated with this example. It is basically an extension of the basic wordcount example.
+The state is periodically saved (checkpointed) and restored when the topology is restarted.
+
+```bash
+storm jar ./target/flux-examples-*.jar org.apache.storm.flux.Flux --local ./src/main/resources/simple_stateful_wordcount.yaml
+```
+
+By default the state is stored in-memory only. As such you won't see a resumed state unless you configure to use Redis as the state backend.
+Ensure that you have Redis running at `localhost:6379` and that `storm-redis-*.jar` is in the classpath.
+
+```bash
+STORM_EXT_CLASSPATH=../../storm-redis/target storm jar ./target/flux-examples-*.jar -c topology.state.provider=org.apache.storm.redis.state.RedisKeyValueStateProvider org.apache.storm.flux.Flux --local ./src/main/resources/simple_stateful_wordcount.yaml
+```

http://git-wip-us.apache.org/repos/asf/storm/blob/f909ce91/external/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/StatefulWordCounter.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/StatefulWordCounter.java b/external/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/StatefulWordCounter.java
new file mode 100644
index 0000000..5534888
--- /dev/null
+++ b/external/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/StatefulWordCounter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.flux.examples;
+
+import org.apache.storm.state.KeyValueState;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.base.BaseStatefulBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+import java.util.Map;
+
+public class StatefulWordCounter extends BaseStatefulBolt<KeyValueState<String, Long>> {
+
+    private KeyValueState<String, Long> wordCounts;
+    private OutputCollector collector;
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        this.collector = collector;
+    }
+
+    @Override
+    public void initState(KeyValueState<String, Long> state) {
+        wordCounts = state;
+    }
+
+    @Override
+    public void execute(Tuple tuple) {
+        String word = tuple.getString(0);
+
+        Long count = wordCounts.get(word, 0L);
+        count++;
+        wordCounts.put(word, count);
+
+        collector.emit(tuple, new Values(word, count));
+        collector.ack(tuple);
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("word", "count"));
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/f909ce91/external/flux/flux-examples/src/main/resources/simple_stateful_wordcount.yaml
----------------------------------------------------------------------
diff --git a/external/flux/flux-examples/src/main/resources/simple_stateful_wordcount.yaml b/external/flux/flux-examples/src/main/resources/simple_stateful_wordcount.yaml
new file mode 100644
index 0000000..14b9b3a
--- /dev/null
+++ b/external/flux/flux-examples/src/main/resources/simple_stateful_wordcount.yaml
@@ -0,0 +1,60 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+---
+
+# topology definition
+# name to be used when submitting
+name: "stateful-wordcount-topology"
+
+# topology configuration
+# this will be passed to the submitter as a map of config options
+#
+config:
+  topology.workers: 1
+
+# spout definitions
+spouts:
+  - id: "spout-1"
+    className: "org.apache.storm.testing.TestWordSpout"
+    parallelism: 1
+
+# bolt definitions
+bolts:
+  - id: "bolt-1"
+    className: "org.apache.storm.flux.examples.StatefulWordCounter"
+    parallelism: 1
+
+  - id: "bolt-2"
+    className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
+    parallelism: 1
+
+#stream definitions
+# stream definitions define connections between spouts and bolts.
+# note that such connections can be cyclical
+streams:
+  - name: "spout-1 --> bolt-1" # name isn't used (placeholder for logging, UI, etc.)
+    from: "spout-1"
+    to: "bolt-1"
+    grouping:
+      type: FIELDS
+      args: ["word"]
+
+  - name: "bolt-1 --> bolt2"
+    from: "bolt-1"
+    to: "bolt-2"
+    grouping:
+      type: SHUFFLE