You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by kn...@apache.org on 2016/04/04 18:18:38 UTC
[1/6] storm git commit: Fix indent on clojure testing function
Repository: storm
Updated Branches:
refs/heads/master 69ec8ffa1 -> f63151679
Fix indent on clojure testing function
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/843d6904
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/843d6904
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/843d6904
Branch: refs/heads/master
Commit: 843d6904369ab7db584bcef6e9a62e6be2845b7a
Parents: 8c31e4d
Author: Bryant <br...@gmail.com>
Authored: Thu Mar 24 13:37:20 2016 -0700
Committer: Bryant <br...@gmail.com>
Committed: Fri Mar 25 16:03:13 2016 -0700
----------------------------------------------------------------------
storm-core/src/clj/org/apache/storm/testing.clj | 14 +++++++-------
1 file changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/843d6904/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj
index 0c13a98..b7bdad9 100644
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@ -90,13 +90,13 @@
[]
(Time/stopSimulating))
- (defmacro with-simulated-time
- [& body]
- `(try
- (start-simulating-time!)
- ~@body
- (finally
- (stop-simulating-time!))))
+(defmacro with-simulated-time
+ [& body]
+ `(try
+ (start-simulating-time!)
+ ~@body
+ (finally
+ (stop-simulating-time!))))
(defn advance-time-ms! [ms]
(Time/advanceTime ms))
[5/6] storm git commit: Merge branch 'STORM-515' of
https://github.com/brymaven/storm
Posted by kn...@apache.org.
Merge branch 'STORM-515' of https://github.com/brymaven/storm
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6e7892ca
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6e7892ca
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6e7892ca
Branch: refs/heads/master
Commit: 6e7892cad2c8fe651eb7e2b2650b4403e3a8874e
Parents: 69ec8ff 1ed04f1
Author: Kyle Nusbaum <Ky...@gmail.com>
Authored: Mon Apr 4 11:17:50 2016 -0500
Committer: Kyle Nusbaum <Ky...@gmail.com>
Committed: Mon Apr 4 11:17:50 2016 -0500
----------------------------------------------------------------------
examples/storm-starter/pom.xml | 10 ++
.../clj/org/apache/storm/starter/clj/bolts.clj | 79 +++++++++++++
.../apache/storm/starter/clj/exclamation.clj | 52 +++++++++
.../storm/starter/clj/rolling_top_words.clj | 58 ++++++++++
.../storm/starter/FastWordCountTopology.java | 2 +-
.../org/apache/storm/starter/clj/bolts_test.clj | 114 +++++++++++++++++++
storm-core/src/clj/org/apache/storm/testing.clj | 14 +--
7 files changed, 321 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/6e7892ca/examples/storm-starter/pom.xml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/6e7892ca/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
[4/6] storm git commit: Clean up macros in storm starter
Posted by kn...@apache.org.
Clean up macros in storm starter
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1ed04f14
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1ed04f14
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1ed04f14
Branch: refs/heads/master
Commit: 1ed04f14e54efd4b9fbaac4f9e4b73869961ed20
Parents: 3b7c4a8
Author: Bryant <br...@gmail.com>
Authored: Thu Mar 31 18:24:48 2016 -0700
Committer: Bryant <br...@gmail.com>
Committed: Thu Mar 31 20:40:08 2016 -0700
----------------------------------------------------------------------
.../clj/org/apache/storm/starter/clj/bolts.clj | 20 ++++---
.../org/apache/storm/starter/clj/bolts_test.clj | 59 ++++++++++----------
2 files changed, 41 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/1ed04f14/examples/storm-starter/src/clj/org/apache/storm/starter/clj/bolts.clj
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/clj/org/apache/storm/starter/clj/bolts.clj b/examples/storm-starter/src/clj/org/apache/storm/starter/clj/bolts.clj
index 95aa8bf..270952f 100644
--- a/examples/storm-starter/src/clj/org/apache/storm/starter/clj/bolts.clj
+++ b/examples/storm-starter/src/clj/org/apache/storm/starter/clj/bolts.clj
@@ -44,12 +44,12 @@
(.incrementCount counter word)
(ack! collector tuple)))))))
-(defmacro update-rankings [& body]
- `(if (TupleUtils/isTick ~'tuple)
+(defmacro update-rankings [tuple collector rankings & body]
+ `(if (TupleUtils/isTick ~tuple)
(do
(log-debug "Received tick tuple, triggering emit of current rankings")
- (emit-bolt! ~'collector [(.copy ~'rankings)])
- (log-debug "Rankings: " ~'rankings))
+ (emit-bolt! ~collector [(.copy ~rankings)])
+ (log-debug "Rankings: " ~rankings))
~@body))
(defbolt intermediate-rankings-bolt ["rankings"]
@@ -60,7 +60,9 @@
(let [rankings (Rankings. top-n)]
(bolt
(execute [tuple]
- (update-rankings (.updateWith rankings (RankableObjectWithFields/from tuple)))))))
+ (update-rankings
+ tuple collector rankings
+ (.updateWith rankings (RankableObjectWithFields/from tuple)))))))
(defbolt total-rankings-bolt ["rankings"]
{:prepare true
@@ -70,6 +72,8 @@
(let [rankings (Rankings. top-n)]
(bolt
(execute [{rankings-to-merge "rankings" :as tuple}]
- (update-rankings (doto rankings
- (.updateWith rankings-to-merge)
- (.pruneZeroCounts)))))))
+ (update-rankings
+ tuple collector rankings
+ (doto rankings
+ (.updateWith rankings-to-merge)
+ (.pruneZeroCounts)))))))
http://git-wip-us.apache.org/repos/asf/storm/blob/1ed04f14/examples/storm-starter/test/clj/org/apache/storm/starter/clj/bolts_test.clj
----------------------------------------------------------------------
diff --git a/examples/storm-starter/test/clj/org/apache/storm/starter/clj/bolts_test.clj b/examples/storm-starter/test/clj/org/apache/storm/starter/clj/bolts_test.clj
index d18f19b..164e7ac 100644
--- a/examples/storm-starter/test/clj/org/apache/storm/starter/clj/bolts_test.clj
+++ b/examples/storm-starter/test/clj/org/apache/storm/starter/clj/bolts_test.clj
@@ -25,20 +25,18 @@
[org.apache.storm.starter.tools Rankable]
[org.apache.storm.tuple Tuple]))
-(defmacro test-with-tuple [[bolt tuples] & body]
- `(let [bolt# ~bolt
- tuples# (atom [])]
- (.prepare bolt# {} nil (OutputCollector.
- (reify IOutputCollector
- (emit [_ _ _ tuple#]
- (swap! tuples# conj tuple#))
- (ack [_ input]))))
- (if (vector? ~tuples)
- (doseq [t# ~tuples]
- (.execute bolt# t#))
- (.execute bolt# ~tuples))
- (let [~'tuples @tuples#]
- ~@body)))
+(defn execute-tuples [bolt tuples]
+ (let [out (atom [])]
+ (.prepare bolt {} nil (OutputCollector.
+ (reify IOutputCollector
+ (emit [_ _ _ tuple]
+ (swap! out conj tuple))
+ (ack [_ input]))))
+ (if (vector? tuples)
+ (doseq [t tuples]
+ (.execute bolt t))
+ (.execute bolt tuples))
+ @out))
(defn- mock-tuple [m & {component :component stream-id :stream-id
:or {component "1" stream-id "1"}}]
@@ -65,38 +63,39 @@
(deftest test-split-sentence
(testing "Bolt emits word per sentence"
- (test-with-tuple [split-sentence (mock-tuple
- {"sentence" "the cat jumped over the door"})]
+ (let [tuples (execute-tuples
+ split-sentence
+ (mock-tuple {"sentence" "the cat jumped over the door"}))]
(is (= [["the"] ["cat"] ["jumped"] ["over"] ["the"] ["door"]] tuples)))))
(deftest test-word-count
(testing "Bolt emits new count"
- (test-with-tuple [word-count [(mock-tuple {"word" "the"})
- (mock-tuple {"word" "the"})
- (mock-tuple {"word" "cat"})]]
+ (let [tuples (execute-tuples word-count [(mock-tuple {"word" "the"})
+ (mock-tuple {"word" "the"})
+ (mock-tuple {"word" "cat"})])]
(is (ms= [["the" 1] ["the" 2] ["cat" 1]] tuples)))))
(deftest test-exclamation-bolt
(testing "Bolt emits word with exclamation marks"
- (test-with-tuple [exclamation-bolt (mock-tuple {"word" "nathan"})]
+ (let [tuples (execute-tuples exclamation-bolt (mock-tuple {"word" "nathan"}))]
(is (= [["nathan!!!"]] tuples)))))
(deftest test-rolling-bolt
(testing "Emits nothing if no object has been counted"
- (test-with-tuple [(rolling-count-bolt 9 3) tick-tuple]
+ (let [tuples (execute-tuples (rolling-count-bolt 9 3) tick-tuple)]
(is (empty? tuples))))
(testing "Emits something if object was counted"
- (test-with-tuple [(rolling-count-bolt 9 3)
- [(mock-tuple {"word" "nathan"}) tick-tuple]]
+ (let [tuples (execute-tuples (rolling-count-bolt 9 3)
+ [(mock-tuple {"word" "nathan"}) tick-tuple])]
(is (= [["nathan" 1 0]] tuples)))))
(deftest test-intermediate-rankings-bolt
(testing "Emits rankings for tick tuple"
- (test-with-tuple [(intermediate-rankings-bolt 5 2) tick-tuple]
+ (let [tuples (execute-tuples (intermediate-rankings-bolt 5 2) tick-tuple)]
(is (seq tuples))))
(testing "Emits nothing for normal tuple"
- (test-with-tuple [(intermediate-rankings-bolt 5 2)
- (mock-tuple {"obj" "nathan" "count" 1})]
+ (let [tuples (execute-tuples (intermediate-rankings-bolt 5 2)
+ (mock-tuple {"obj" "nathan" "count" 1}))]
(is (empty? tuples)))))
(defn- mock-rankable [object count]
@@ -107,9 +106,9 @@
(deftest test-total-rankings-bolt
(testing "Emits rankings for tick tuple"
- (test-with-tuple [(total-rankings-bolt 5 2) tick-tuple]
+ (let [tuples (execute-tuples (total-rankings-bolt 5 2) tick-tuple)]
(is (seq tuples))))
(testing "Emits nothing for normal tuple"
- (test-with-tuple [(total-rankings-bolt 5 2)
- (mock-tuple {"rankings" (mock-rankable "nathan" 2)})]
- (is (empty? tuples)))))
+ (let [tuples (execute-tuples (total-rankings-bolt 5 2)
+ (mock-tuple {"rankings" (mock-rankable "nathan" 2)}))]
+ (is (empty? tuples)))))
[6/6] storm git commit: Adding STORM-515 to the changelog.
Posted by kn...@apache.org.
Adding STORM-515 to the changelog.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f6315167
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f6315167
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f6315167
Branch: refs/heads/master
Commit: f631516793cd84fc90ad40567479e31f681436a5
Parents: 6e7892c
Author: Kyle Nusbaum <Ky...@gmail.com>
Authored: Mon Apr 4 11:18:23 2016 -0500
Committer: Kyle Nusbaum <Ky...@gmail.com>
Committed: Mon Apr 4 11:18:23 2016 -0500
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/f6315167/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 50129b3..4b7a3d0 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 2.0.0
+ * STORM-515: Clojure documentation and examples
* STORM-1279: port backtype.storm.daemon.supervisor to java
* STORM-1668: Flux silently fails while setting a non-existent property.
* STORM-1271: Port backtype.storm.daemon.task to java
[3/6] storm git commit: Add Clojure examples for rolling top words
and exclamation
Posted by kn...@apache.org.
Add Clojure examples for rolling top words and exclamation
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3b7c4a81
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3b7c4a81
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3b7c4a81
Branch: refs/heads/master
Commit: 3b7c4a81d7c5111eb6d506bef7cedf9254bf91d0
Parents: 843d690
Author: Bryant <br...@gmail.com>
Authored: Thu Mar 24 13:42:45 2016 -0700
Committer: Bryant <br...@gmail.com>
Committed: Fri Mar 25 17:28:26 2016 -0700
----------------------------------------------------------------------
examples/storm-starter/pom.xml | 10 ++
.../clj/org/apache/storm/starter/clj/bolts.clj | 75 ++++++++++++
.../apache/storm/starter/clj/exclamation.clj | 52 +++++++++
.../storm/starter/clj/rolling_top_words.clj | 58 ++++++++++
.../org/apache/storm/starter/clj/bolts_test.clj | 115 +++++++++++++++++++
5 files changed, 310 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/3b7c4a81/examples/storm-starter/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml
index 929c8ea..f7fce6f 100644
--- a/examples/storm-starter/pom.xml
+++ b/examples/storm-starter/pom.xml
@@ -222,6 +222,9 @@
<sourceDirectories>
<sourceDirectory>src/clj</sourceDirectory>
</sourceDirectories>
+ <testSourceDirectories>
+ <testSourceDirectory>test/clj</testSourceDirectory>
+ </testSourceDirectories>
</configuration>
<executions>
<execution>
@@ -231,6 +234,13 @@
<goal>compile</goal>
</goals>
</execution>
+ <execution>
+ <id>test-clojure</id>
+ <phase>test</phase>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ </execution>
</executions>
</plugin>
http://git-wip-us.apache.org/repos/asf/storm/blob/3b7c4a81/examples/storm-starter/src/clj/org/apache/storm/starter/clj/bolts.clj
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/clj/org/apache/storm/starter/clj/bolts.clj b/examples/storm-starter/src/clj/org/apache/storm/starter/clj/bolts.clj
new file mode 100644
index 0000000..95aa8bf
--- /dev/null
+++ b/examples/storm-starter/src/clj/org/apache/storm/starter/clj/bolts.clj
@@ -0,0 +1,75 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.starter.clj.bolts
+ (:require [org.apache.storm
+ [clojure :refer :all]
+ [config :refer :all]
+ [log :refer :all]])
+ (:import [org.apache.storm.starter.tools
+ NthLastModifiedTimeTracker SlidingWindowCounter
+ Rankings RankableObjectWithFields]
+ [org.apache.storm.utils TupleUtils]))
+
+(defbolt rolling-count-bolt ["obj" "count" "actualWindowLengthInSeconds"]
+ {:prepare true
+ :params [window-length emit-frequency]
+ :conf {TOPOLOGY-TICK-TUPLE-FREQ-SECS emit-frequency}}
+ [conf context collector]
+ (let [num-windows (/ window-length emit-frequency)
+ counter (SlidingWindowCounter. num-windows)
+ tracker (NthLastModifiedTimeTracker. num-windows)]
+ (bolt
+ (execute [{word "word" :as tuple}]
+ (if (TupleUtils/isTick tuple)
+ (let [counts (.getCountsThenAdvanceWindow counter)
+ actual-window-length (.secondsSinceOldestModification tracker)]
+ (log-debug "Received tick tuple, triggering emit of current window counts")
+ (.markAsModified tracker)
+ (doseq [[obj count] counts]
+ (emit-bolt! collector [obj count actual-window-length])))
+ (do
+ (.incrementCount counter word)
+ (ack! collector tuple)))))))
+
+(defmacro update-rankings [& body]
+ `(if (TupleUtils/isTick ~'tuple)
+ (do
+ (log-debug "Received tick tuple, triggering emit of current rankings")
+ (emit-bolt! ~'collector [(.copy ~'rankings)])
+ (log-debug "Rankings: " ~'rankings))
+ ~@body))
+
+(defbolt intermediate-rankings-bolt ["rankings"]
+ {:prepare true
+ :params [top-n emit-frequency]
+ :conf {TOPOLOGY-TICK-TUPLE-FREQ-SECS emit-frequency}}
+ [conf context collector]
+ (let [rankings (Rankings. top-n)]
+ (bolt
+ (execute [tuple]
+ (update-rankings (.updateWith rankings (RankableObjectWithFields/from tuple)))))))
+
+(defbolt total-rankings-bolt ["rankings"]
+ {:prepare true
+ :params [top-n emit-frequency]
+ :conf {TOPOLOGY-TICK-TUPLE-FREQ-SECS emit-frequency}}
+ [conf context collector]
+ (let [rankings (Rankings. top-n)]
+ (bolt
+ (execute [{rankings-to-merge "rankings" :as tuple}]
+ (update-rankings (doto rankings
+ (.updateWith rankings-to-merge)
+ (.pruneZeroCounts)))))))
http://git-wip-us.apache.org/repos/asf/storm/blob/3b7c4a81/examples/storm-starter/src/clj/org/apache/storm/starter/clj/exclamation.clj
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/clj/org/apache/storm/starter/clj/exclamation.clj b/examples/storm-starter/src/clj/org/apache/storm/starter/clj/exclamation.clj
new file mode 100644
index 0000000..0f51951
--- /dev/null
+++ b/examples/storm-starter/src/clj/org/apache/storm/starter/clj/exclamation.clj
@@ -0,0 +1,52 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.starter.clj.exclamation
+ (:import [org.apache.storm StormSubmitter LocalCluster]
+ [org.apache.storm.utils Utils]
+ [org.apache.storm.testing TestWordSpout])
+ (:use [org.apache.storm clojure config])
+ (:gen-class))
+
+(defbolt exclamation-bolt ["word"]
+ [{word "word" :as tuple} collector]
+ (emit-bolt! collector [(str word "!!!")] :anchor tuple)
+ (ack! collector tuple))
+
+(defn mk-topology []
+ (topology
+ {"word" (spout-spec (TestWordSpout.) :p 10)}
+ {"exclaim1" (bolt-spec {"word" :shuffle} exclamation-bolt :p 3)
+ "exclaim2" (bolt-spec {"exclaim1" :shuffle} exclamation-bolt :p 2)}))
+
+(defn run-local! []
+ (let [cluster (LocalCluster.)]
+ (.submitTopology cluster "exclamation" {TOPOLOGY-DEBUG true} (mk-topology))
+ (Utils/sleep 10000)
+ (.killTopology cluster "exclamation")
+ (.shutdown cluster)))
+
+(defn submit-topology! [name]
+ (StormSubmitter/submitTopologyWithProgressBar
+ name
+ {TOPOLOGY-DEBUG true
+ TOPOLOGY-WORKERS 3}
+ (mk-topology)))
+
+(defn -main
+ ([]
+ (run-local!))
+ ([name]
+ (submit-topology! name)))
http://git-wip-us.apache.org/repos/asf/storm/blob/3b7c4a81/examples/storm-starter/src/clj/org/apache/storm/starter/clj/rolling_top_words.clj
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/clj/org/apache/storm/starter/clj/rolling_top_words.clj b/examples/storm-starter/src/clj/org/apache/storm/starter/clj/rolling_top_words.clj
new file mode 100644
index 0000000..53ba0a8
--- /dev/null
+++ b/examples/storm-starter/src/clj/org/apache/storm/starter/clj/rolling_top_words.clj
@@ -0,0 +1,58 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.starter.clj.rolling-top-words
+ (:require [org.apache.storm [clojure :refer :all] [config :refer :all]]
+ [org.apache.storm.starter.clj.bolts :refer
+ [rolling-count-bolt intermediate-rankings-bolt total-rankings-bolt]])
+ (:import [org.apache.storm StormSubmitter LocalCluster]
+ [org.apache.storm.utils Utils]
+ [org.apache.storm.testing TestWordSpout])
+ (:gen-class))
+
+(defn mk-topology []
+ (let [spout-id "wordGenerator"
+ counter-id "counter"
+ ranker-id "intermediateRanker"
+ total-ranker-id "finalRanker"]
+ (topology
+ {spout-id (spout-spec (TestWordSpout.) :p 5)}
+ {counter-id (bolt-spec {spout-id ["word"]}
+ (rolling-count-bolt 9 3)
+ :p 4)
+ ranker-id (bolt-spec {counter-id ["obj"]}
+ (intermediate-rankings-bolt 5 2)
+ :p 4)
+ total-ranker-id (bolt-spec {ranker-id :global}
+ (total-rankings-bolt 5 2))})))
+
+(defn run-local! []
+ (let [cluster (LocalCluster.)]
+ (.submitTopology cluster "slidingWindowCounts" {TOPOLOGY-DEBUG true} (mk-topology))
+ (Utils/sleep 60000)
+ (.shutdown cluster)))
+
+(defn submit-topology! [name]
+ (StormSubmitter/submitTopology
+ name
+ {TOPOLOGY-DEBUG true
+ TOPOLOGY-WORKERS 3}
+ (mk-topology)))
+
+(defn -main
+ ([]
+ (run-local!))
+ ([name]
+ (submit-topology! name)))
http://git-wip-us.apache.org/repos/asf/storm/blob/3b7c4a81/examples/storm-starter/test/clj/org/apache/storm/starter/clj/bolts_test.clj
----------------------------------------------------------------------
diff --git a/examples/storm-starter/test/clj/org/apache/storm/starter/clj/bolts_test.clj b/examples/storm-starter/test/clj/org/apache/storm/starter/clj/bolts_test.clj
new file mode 100644
index 0000000..d18f19b
--- /dev/null
+++ b/examples/storm-starter/test/clj/org/apache/storm/starter/clj/bolts_test.clj
@@ -0,0 +1,115 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.starter.clj.bolts-test
+ (:require [clojure.test :refer :all]
+ [org.apache.storm.starter.clj.word-count :refer [word-count split-sentence]]
+ [org.apache.storm.starter.clj.exclamation :refer [exclamation-bolt]]
+ [org.apache.storm.starter.clj.bolts :refer
+ [rolling-count-bolt intermediate-rankings-bolt total-rankings-bolt]]
+ [org.apache.storm [testing :refer :all]])
+ (:import [org.apache.storm Constants]
+ [org.apache.storm.task OutputCollector IOutputCollector]
+ [org.apache.storm.starter.tools Rankable]
+ [org.apache.storm.tuple Tuple]))
+
+(defmacro test-with-tuple [[bolt tuples] & body]
+ `(let [bolt# ~bolt
+ tuples# (atom [])]
+ (.prepare bolt# {} nil (OutputCollector.
+ (reify IOutputCollector
+ (emit [_ _ _ tuple#]
+ (swap! tuples# conj tuple#))
+ (ack [_ input]))))
+ (if (vector? ~tuples)
+ (doseq [t# ~tuples]
+ (.execute bolt# t#))
+ (.execute bolt# ~tuples))
+ (let [~'tuples @tuples#]
+ ~@body)))
+
+(defn- mock-tuple [m & {component :component stream-id :stream-id
+ :or {component "1" stream-id "1"}}]
+ (reify
+ Tuple
+ (getSourceComponent [_]
+ component)
+ (getSourceStreamId [_]
+ stream-id)
+ (getString [this i]
+ (nth (vals m) 0))
+ (getValues [_]
+ (vals m))
+ clojure.lang.IPersistentMap
+ (valAt [_ key]
+ (get m key))
+ (seq [_]
+ (seq m))))
+
+(def ^{:private true} tick-tuple
+ (mock-tuple {}
+ :component Constants/SYSTEM_COMPONENT_ID
+ :stream-id Constants/SYSTEM_TICK_STREAM_ID))
+
+(deftest test-split-sentence
+ (testing "Bolt emits word per sentence"
+ (test-with-tuple [split-sentence (mock-tuple
+ {"sentence" "the cat jumped over the door"})]
+ (is (= [["the"] ["cat"] ["jumped"] ["over"] ["the"] ["door"]] tuples)))))
+
+(deftest test-word-count
+ (testing "Bolt emits new count"
+ (test-with-tuple [word-count [(mock-tuple {"word" "the"})
+ (mock-tuple {"word" "the"})
+ (mock-tuple {"word" "cat"})]]
+ (is (ms= [["the" 1] ["the" 2] ["cat" 1]] tuples)))))
+
+(deftest test-exclamation-bolt
+ (testing "Bolt emits word with exclamation marks"
+ (test-with-tuple [exclamation-bolt (mock-tuple {"word" "nathan"})]
+ (is (= [["nathan!!!"]] tuples)))))
+
+(deftest test-rolling-bolt
+ (testing "Emits nothing if no object has been counted"
+ (test-with-tuple [(rolling-count-bolt 9 3) tick-tuple]
+ (is (empty? tuples))))
+ (testing "Emits something if object was counted"
+ (test-with-tuple [(rolling-count-bolt 9 3)
+ [(mock-tuple {"word" "nathan"}) tick-tuple]]
+ (is (= [["nathan" 1 0]] tuples)))))
+
+(deftest test-intermediate-rankings-bolt
+ (testing "Emits rankings for tick tuple"
+ (test-with-tuple [(intermediate-rankings-bolt 5 2) tick-tuple]
+ (is (seq tuples))))
+ (testing "Emits nothing for normal tuple"
+ (test-with-tuple [(intermediate-rankings-bolt 5 2)
+ (mock-tuple {"obj" "nathan" "count" 1})]
+ (is (empty? tuples)))))
+
+(defn- mock-rankable [object count]
+ "Creates rankable with object and count"
+ (reify Rankable
+ (getCount [_] count)
+ (getObject [_] object)))
+
+(deftest test-total-rankings-bolt
+ (testing "Emits rankings for tick tuple"
+ (test-with-tuple [(total-rankings-bolt 5 2) tick-tuple]
+ (is (seq tuples))))
+ (testing "Emits nothing for normal tuple"
+ (test-with-tuple [(total-rankings-bolt 5 2)
+ (mock-tuple {"rankings" (mock-rankable "nathan" 2)})]
+ (is (empty? tuples)))))
[2/6] storm git commit: Correct typo on example javadoc
Posted by kn...@apache.org.
Correct typo on example javadoc
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8c31e4dd
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8c31e4dd
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8c31e4dd
Branch: refs/heads/master
Commit: 8c31e4ddf9012eaf13d76e6c16684037b31fe5ef
Parents: d22d845
Author: Bryant <br...@gmail.com>
Authored: Sun Mar 20 17:15:12 2016 -0700
Committer: Bryant <br...@gmail.com>
Committed: Fri Mar 25 16:03:13 2016 -0700
----------------------------------------------------------------------
.../src/jvm/org/apache/storm/starter/FastWordCountTopology.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/8c31e4dd/examples/storm-starter/src/jvm/org/apache/storm/starter/FastWordCountTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/FastWordCountTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/FastWordCountTopology.java
index 51f6b11..5d86ff0 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/FastWordCountTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/FastWordCountTopology.java
@@ -40,7 +40,7 @@ import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
/**
- * WordCount but teh spout does not stop, and the bolts are implemented in
+ * WordCount but the spout does not stop, and the bolts are implemented in
* java. This can show how fast the word count can run.
*/
public class FastWordCountTopology {