You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by kn...@apache.org on 2016/04/04 18:18:38 UTC

[1/6] storm git commit: Fix indent on clojure testing function

Repository: storm
Updated Branches:
  refs/heads/master 69ec8ffa1 -> f63151679


Fix indent on clojure testing function


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/843d6904
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/843d6904
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/843d6904

Branch: refs/heads/master
Commit: 843d6904369ab7db584bcef6e9a62e6be2845b7a
Parents: 8c31e4d
Author: Bryant <br...@gmail.com>
Authored: Thu Mar 24 13:37:20 2016 -0700
Committer: Bryant <br...@gmail.com>
Committed: Fri Mar 25 16:03:13 2016 -0700

----------------------------------------------------------------------
 storm-core/src/clj/org/apache/storm/testing.clj | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/843d6904/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj
index 0c13a98..b7bdad9 100644
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@ -90,13 +90,13 @@
   []
   (Time/stopSimulating))
 
- (defmacro with-simulated-time
-   [& body]
-   `(try
-     (start-simulating-time!)
-     ~@body
-     (finally
-       (stop-simulating-time!))))
+(defmacro with-simulated-time
+  [& body]
+  `(try
+    (start-simulating-time!)
+    ~@body
+    (finally
+      (stop-simulating-time!))))
 
 (defn advance-time-ms! [ms]
   (Time/advanceTime ms))


[5/6] storm git commit: Merge branch 'STORM-515' of https://github.com/brymaven/storm

Posted by kn...@apache.org.
Merge branch 'STORM-515' of https://github.com/brymaven/storm


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6e7892ca
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6e7892ca
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6e7892ca

Branch: refs/heads/master
Commit: 6e7892cad2c8fe651eb7e2b2650b4403e3a8874e
Parents: 69ec8ff 1ed04f1
Author: Kyle Nusbaum <Ky...@gmail.com>
Authored: Mon Apr 4 11:17:50 2016 -0500
Committer: Kyle Nusbaum <Ky...@gmail.com>
Committed: Mon Apr 4 11:17:50 2016 -0500

----------------------------------------------------------------------
 examples/storm-starter/pom.xml                  |  10 ++
 .../clj/org/apache/storm/starter/clj/bolts.clj  |  79 +++++++++++++
 .../apache/storm/starter/clj/exclamation.clj    |  52 +++++++++
 .../storm/starter/clj/rolling_top_words.clj     |  58 ++++++++++
 .../storm/starter/FastWordCountTopology.java    |   2 +-
 .../org/apache/storm/starter/clj/bolts_test.clj | 114 +++++++++++++++++++
 storm-core/src/clj/org/apache/storm/testing.clj |  14 +--
 7 files changed, 321 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/6e7892ca/examples/storm-starter/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/6e7892ca/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------


[4/6] storm git commit: Clean up macros in storm starter

Posted by kn...@apache.org.
Clean up macros in storm starter


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1ed04f14
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1ed04f14
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1ed04f14

Branch: refs/heads/master
Commit: 1ed04f14e54efd4b9fbaac4f9e4b73869961ed20
Parents: 3b7c4a8
Author: Bryant <br...@gmail.com>
Authored: Thu Mar 31 18:24:48 2016 -0700
Committer: Bryant <br...@gmail.com>
Committed: Thu Mar 31 20:40:08 2016 -0700

----------------------------------------------------------------------
 .../clj/org/apache/storm/starter/clj/bolts.clj  | 20 ++++---
 .../org/apache/storm/starter/clj/bolts_test.clj | 59 ++++++++++----------
 2 files changed, 41 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/1ed04f14/examples/storm-starter/src/clj/org/apache/storm/starter/clj/bolts.clj
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/clj/org/apache/storm/starter/clj/bolts.clj b/examples/storm-starter/src/clj/org/apache/storm/starter/clj/bolts.clj
index 95aa8bf..270952f 100644
--- a/examples/storm-starter/src/clj/org/apache/storm/starter/clj/bolts.clj
+++ b/examples/storm-starter/src/clj/org/apache/storm/starter/clj/bolts.clj
@@ -44,12 +44,12 @@
            (.incrementCount counter word)
            (ack! collector tuple)))))))
 
-(defmacro update-rankings [& body]
-  `(if (TupleUtils/isTick ~'tuple)
+(defmacro update-rankings [tuple collector rankings & body]
+  `(if (TupleUtils/isTick ~tuple)
      (do
        (log-debug "Received tick tuple, triggering emit of current rankings")
-       (emit-bolt! ~'collector [(.copy ~'rankings)])
-       (log-debug "Rankings: " ~'rankings))
+       (emit-bolt! ~collector [(.copy ~rankings)])
+       (log-debug "Rankings: " ~rankings))
      ~@body))
 
 (defbolt intermediate-rankings-bolt ["rankings"]
@@ -60,7 +60,9 @@
   (let [rankings (Rankings. top-n)]
     (bolt
      (execute [tuple]
-       (update-rankings (.updateWith rankings (RankableObjectWithFields/from tuple)))))))
+       (update-rankings
+        tuple collector rankings
+        (.updateWith rankings (RankableObjectWithFields/from tuple)))))))
 
 (defbolt total-rankings-bolt ["rankings"]
   {:prepare true
@@ -70,6 +72,8 @@
   (let [rankings (Rankings. top-n)]
     (bolt
      (execute [{rankings-to-merge "rankings" :as tuple}]
-       (update-rankings (doto rankings
-                          (.updateWith rankings-to-merge)
-                          (.pruneZeroCounts)))))))
+       (update-rankings
+        tuple collector rankings
+        (doto rankings
+          (.updateWith rankings-to-merge)
+          (.pruneZeroCounts)))))))

http://git-wip-us.apache.org/repos/asf/storm/blob/1ed04f14/examples/storm-starter/test/clj/org/apache/storm/starter/clj/bolts_test.clj
----------------------------------------------------------------------
diff --git a/examples/storm-starter/test/clj/org/apache/storm/starter/clj/bolts_test.clj b/examples/storm-starter/test/clj/org/apache/storm/starter/clj/bolts_test.clj
index d18f19b..164e7ac 100644
--- a/examples/storm-starter/test/clj/org/apache/storm/starter/clj/bolts_test.clj
+++ b/examples/storm-starter/test/clj/org/apache/storm/starter/clj/bolts_test.clj
@@ -25,20 +25,18 @@
            [org.apache.storm.starter.tools Rankable]
            [org.apache.storm.tuple Tuple]))
 
-(defmacro test-with-tuple [[bolt tuples] & body]
-  `(let [bolt# ~bolt
-         tuples# (atom [])]
-     (.prepare bolt# {} nil (OutputCollector.
-                       (reify IOutputCollector
-                         (emit [_ _ _ tuple#]
-                           (swap! tuples# conj tuple#))
-                         (ack [_ input]))))
-     (if (vector? ~tuples)
-       (doseq [t# ~tuples]
-         (.execute bolt# t#))
-       (.execute bolt# ~tuples))
-     (let [~'tuples @tuples#]
-       ~@body)))
+(defn execute-tuples [bolt tuples]
+  (let [out (atom [])]
+    (.prepare bolt {} nil (OutputCollector.
+                           (reify IOutputCollector
+                             (emit [_ _ _ tuple]
+                               (swap! out conj tuple))
+                             (ack [_ input]))))
+    (if (vector? tuples)
+       (doseq [t tuples]
+         (.execute bolt t))
+       (.execute bolt tuples))
+    @out))
 
 (defn- mock-tuple [m & {component :component stream-id :stream-id
                         :or {component "1" stream-id "1"}}]
@@ -65,38 +63,39 @@
 
 (deftest test-split-sentence
   (testing "Bolt emits word per sentence"
-    (test-with-tuple [split-sentence (mock-tuple
-                                        {"sentence" "the cat jumped over the door"})]
+    (let [tuples (execute-tuples
+                  split-sentence
+                  (mock-tuple {"sentence" "the cat jumped over the door"}))]
       (is (= [["the"] ["cat"] ["jumped"] ["over"] ["the"] ["door"]] tuples)))))
 
 (deftest test-word-count
   (testing "Bolt emits new count"
-    (test-with-tuple [word-count [(mock-tuple {"word" "the"})
-                                  (mock-tuple {"word" "the"})
-                                  (mock-tuple {"word" "cat"})]]
+    (let [tuples (execute-tuples word-count [(mock-tuple {"word" "the"})
+                                             (mock-tuple {"word" "the"})
+                                             (mock-tuple {"word" "cat"})])]
       (is (ms= [["the" 1] ["the" 2] ["cat" 1]] tuples)))))
 
 (deftest test-exclamation-bolt
   (testing "Bolt emits word with exclamation marks"
-    (test-with-tuple [exclamation-bolt (mock-tuple {"word" "nathan"})]
+    (let [tuples (execute-tuples exclamation-bolt (mock-tuple {"word" "nathan"}))]
       (is (= [["nathan!!!"]] tuples)))))
 
 (deftest test-rolling-bolt
   (testing "Emits nothing if no object has been counted"
-    (test-with-tuple [(rolling-count-bolt 9 3) tick-tuple]
+    (let [tuples (execute-tuples (rolling-count-bolt 9 3) tick-tuple)]
       (is (empty? tuples))))
   (testing "Emits something if object was counted"
-    (test-with-tuple [(rolling-count-bolt 9 3)
-                      [(mock-tuple {"word" "nathan"}) tick-tuple]]
+    (let [tuples (execute-tuples (rolling-count-bolt 9 3)
+                                 [(mock-tuple {"word" "nathan"}) tick-tuple])]
       (is (= [["nathan" 1 0]] tuples)))))
 
 (deftest test-intermediate-rankings-bolt
   (testing "Emits rankings for tick tuple"
-    (test-with-tuple [(intermediate-rankings-bolt 5 2) tick-tuple]
+    (let [tuples (execute-tuples (intermediate-rankings-bolt 5 2) tick-tuple)]
       (is (seq tuples))))
   (testing "Emits nothing for normal tuple"
-    (test-with-tuple [(intermediate-rankings-bolt 5 2)
-                      (mock-tuple {"obj" "nathan" "count" 1})]
+    (let [tuples (execute-tuples (intermediate-rankings-bolt 5 2)
+                                 (mock-tuple {"obj" "nathan" "count" 1}))]
       (is (empty? tuples)))))
 
 (defn- mock-rankable [object count]
@@ -107,9 +106,9 @@
 
 (deftest test-total-rankings-bolt
   (testing "Emits rankings for tick tuple"
-    (test-with-tuple [(total-rankings-bolt 5 2) tick-tuple]
+    (let [tuples (execute-tuples (total-rankings-bolt 5 2) tick-tuple)]
       (is (seq tuples))))
   (testing "Emits nothing for normal tuple"
-    (test-with-tuple [(total-rankings-bolt 5 2)
-                      (mock-tuple {"rankings" (mock-rankable "nathan" 2)})]
-     (is (empty? tuples)))))
+    (let [tuples (execute-tuples (total-rankings-bolt 5 2)
+                                 (mock-tuple {"rankings" (mock-rankable "nathan" 2)}))]
+      (is (empty? tuples)))))


[6/6] storm git commit: Adding STORM-515 to the changelog.

Posted by kn...@apache.org.
Adding STORM-515 to the changelog.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f6315167
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f6315167
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f6315167

Branch: refs/heads/master
Commit: f631516793cd84fc90ad40567479e31f681436a5
Parents: 6e7892c
Author: Kyle Nusbaum <Ky...@gmail.com>
Authored: Mon Apr 4 11:18:23 2016 -0500
Committer: Kyle Nusbaum <Ky...@gmail.com>
Committed: Mon Apr 4 11:18:23 2016 -0500

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/f6315167/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 50129b3..4b7a3d0 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 2.0.0
+ * STORM-515: Clojure documentation and examples
  * STORM-1279: port backtype.storm.daemon.supervisor to java
  * STORM-1668: Flux silently fails while setting a non-existent property.
  * STORM-1271: Port backtype.storm.daemon.task to java


[3/6] storm git commit: Add Clojure examples for rolling top words and exclamation

Posted by kn...@apache.org.
Add Clojure examples for rolling top words and exclamation


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3b7c4a81
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3b7c4a81
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3b7c4a81

Branch: refs/heads/master
Commit: 3b7c4a81d7c5111eb6d506bef7cedf9254bf91d0
Parents: 843d690
Author: Bryant <br...@gmail.com>
Authored: Thu Mar 24 13:42:45 2016 -0700
Committer: Bryant <br...@gmail.com>
Committed: Fri Mar 25 17:28:26 2016 -0700

----------------------------------------------------------------------
 examples/storm-starter/pom.xml                  |  10 ++
 .../clj/org/apache/storm/starter/clj/bolts.clj  |  75 ++++++++++++
 .../apache/storm/starter/clj/exclamation.clj    |  52 +++++++++
 .../storm/starter/clj/rolling_top_words.clj     |  58 ++++++++++
 .../org/apache/storm/starter/clj/bolts_test.clj | 115 +++++++++++++++++++
 5 files changed, 310 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/3b7c4a81/examples/storm-starter/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml
index 929c8ea..f7fce6f 100644
--- a/examples/storm-starter/pom.xml
+++ b/examples/storm-starter/pom.xml
@@ -222,6 +222,9 @@
           <sourceDirectories>
             <sourceDirectory>src/clj</sourceDirectory>
           </sourceDirectories>
+          <testSourceDirectories>
+            <testSourceDirectory>test/clj</testSourceDirectory>
+          </testSourceDirectories>
         </configuration>
         <executions>
           <execution>
@@ -231,6 +234,13 @@
               <goal>compile</goal>
             </goals>
           </execution>
+          <execution>
+            <id>test-clojure</id>
+            <phase>test</phase>
+            <goals>
+              <goal>test</goal>
+            </goals>
+          </execution>
         </executions>
       </plugin>
 

http://git-wip-us.apache.org/repos/asf/storm/blob/3b7c4a81/examples/storm-starter/src/clj/org/apache/storm/starter/clj/bolts.clj
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/clj/org/apache/storm/starter/clj/bolts.clj b/examples/storm-starter/src/clj/org/apache/storm/starter/clj/bolts.clj
new file mode 100644
index 0000000..95aa8bf
--- /dev/null
+++ b/examples/storm-starter/src/clj/org/apache/storm/starter/clj/bolts.clj
@@ -0,0 +1,75 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.starter.clj.bolts
+  (:require [org.apache.storm
+             [clojure :refer :all]
+             [config :refer :all]
+             [log :refer :all]])
+  (:import [org.apache.storm.starter.tools
+            NthLastModifiedTimeTracker SlidingWindowCounter
+            Rankings RankableObjectWithFields]
+           [org.apache.storm.utils TupleUtils]))
+
+(defbolt rolling-count-bolt ["obj" "count" "actualWindowLengthInSeconds"]
+  {:prepare true
+   :params [window-length emit-frequency]
+   :conf {TOPOLOGY-TICK-TUPLE-FREQ-SECS emit-frequency}}
+  [conf context collector]
+  (let [num-windows (/ window-length emit-frequency)
+        counter (SlidingWindowCounter. num-windows)
+        tracker (NthLastModifiedTimeTracker. num-windows)]
+    (bolt
+     (execute [{word "word" :as tuple}]
+       (if (TupleUtils/isTick tuple)
+         (let [counts (.getCountsThenAdvanceWindow counter)
+               actual-window-length (.secondsSinceOldestModification tracker)]
+           (log-debug "Received tick tuple, triggering emit of current window counts")
+           (.markAsModified tracker)
+           (doseq [[obj count] counts]
+             (emit-bolt! collector [obj count actual-window-length])))
+         (do
+           (.incrementCount counter word)
+           (ack! collector tuple)))))))
+
+(defmacro update-rankings [& body]
+  `(if (TupleUtils/isTick ~'tuple)
+     (do
+       (log-debug "Received tick tuple, triggering emit of current rankings")
+       (emit-bolt! ~'collector [(.copy ~'rankings)])
+       (log-debug "Rankings: " ~'rankings))
+     ~@body))
+
+(defbolt intermediate-rankings-bolt ["rankings"]
+  {:prepare true
+   :params [top-n emit-frequency]
+   :conf {TOPOLOGY-TICK-TUPLE-FREQ-SECS emit-frequency}}
+  [conf context collector]
+  (let [rankings (Rankings. top-n)]
+    (bolt
+     (execute [tuple]
+       (update-rankings (.updateWith rankings (RankableObjectWithFields/from tuple)))))))
+
+(defbolt total-rankings-bolt ["rankings"]
+  {:prepare true
+   :params [top-n emit-frequency]
+   :conf {TOPOLOGY-TICK-TUPLE-FREQ-SECS emit-frequency}}
+  [conf context collector]
+  (let [rankings (Rankings. top-n)]
+    (bolt
+     (execute [{rankings-to-merge "rankings" :as tuple}]
+       (update-rankings (doto rankings
+                          (.updateWith rankings-to-merge)
+                          (.pruneZeroCounts)))))))

http://git-wip-us.apache.org/repos/asf/storm/blob/3b7c4a81/examples/storm-starter/src/clj/org/apache/storm/starter/clj/exclamation.clj
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/clj/org/apache/storm/starter/clj/exclamation.clj b/examples/storm-starter/src/clj/org/apache/storm/starter/clj/exclamation.clj
new file mode 100644
index 0000000..0f51951
--- /dev/null
+++ b/examples/storm-starter/src/clj/org/apache/storm/starter/clj/exclamation.clj
@@ -0,0 +1,52 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.starter.clj.exclamation
+  (:import [org.apache.storm StormSubmitter LocalCluster]
+           [org.apache.storm.utils Utils]
+           [org.apache.storm.testing TestWordSpout])
+  (:use [org.apache.storm clojure config])
+  (:gen-class))
+
+(defbolt exclamation-bolt ["word"]
+  [{word "word" :as tuple} collector]
+  (emit-bolt! collector [(str word "!!!")] :anchor tuple)
+  (ack! collector tuple))
+
+(defn mk-topology []
+  (topology
+   {"word"     (spout-spec (TestWordSpout.) :p 10)}
+   {"exclaim1" (bolt-spec {"word" :shuffle} exclamation-bolt :p 3)
+    "exclaim2" (bolt-spec {"exclaim1" :shuffle} exclamation-bolt :p 2)}))
+
+(defn run-local! []
+  (let [cluster (LocalCluster.)]
+    (.submitTopology cluster "exclamation" {TOPOLOGY-DEBUG true} (mk-topology))
+    (Utils/sleep 10000)
+    (.killTopology cluster "exclamation")
+    (.shutdown cluster)))
+
+(defn submit-topology! [name]
+  (StormSubmitter/submitTopologyWithProgressBar
+   name
+   {TOPOLOGY-DEBUG true
+    TOPOLOGY-WORKERS 3}
+   (mk-topology)))
+
+(defn -main
+  ([]
+   (run-local!))
+  ([name]
+   (submit-topology! name)))

http://git-wip-us.apache.org/repos/asf/storm/blob/3b7c4a81/examples/storm-starter/src/clj/org/apache/storm/starter/clj/rolling_top_words.clj
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/clj/org/apache/storm/starter/clj/rolling_top_words.clj b/examples/storm-starter/src/clj/org/apache/storm/starter/clj/rolling_top_words.clj
new file mode 100644
index 0000000..53ba0a8
--- /dev/null
+++ b/examples/storm-starter/src/clj/org/apache/storm/starter/clj/rolling_top_words.clj
@@ -0,0 +1,58 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.starter.clj.rolling-top-words
+  (:require [org.apache.storm [clojure :refer :all] [config :refer :all]]
+            [org.apache.storm.starter.clj.bolts :refer
+             [rolling-count-bolt intermediate-rankings-bolt total-rankings-bolt]])
+  (:import [org.apache.storm StormSubmitter LocalCluster]
+           [org.apache.storm.utils Utils]
+           [org.apache.storm.testing TestWordSpout])
+  (:gen-class))
+
+(defn mk-topology []
+  (let [spout-id "wordGenerator"
+        counter-id "counter"
+        ranker-id "intermediateRanker"
+        total-ranker-id "finalRanker"]
+    (topology
+     {spout-id   (spout-spec (TestWordSpout.) :p 5)}
+     {counter-id (bolt-spec {spout-id ["word"]}
+                            (rolling-count-bolt 9 3)
+                            :p 4)
+      ranker-id (bolt-spec {counter-id ["obj"]}
+                           (intermediate-rankings-bolt 5 2)
+                           :p 4)
+      total-ranker-id (bolt-spec {ranker-id :global}
+                                 (total-rankings-bolt 5 2))})))
+
+(defn run-local! []
+  (let [cluster (LocalCluster.)]
+    (.submitTopology cluster "slidingWindowCounts" {TOPOLOGY-DEBUG true} (mk-topology))
+    (Utils/sleep 60000)
+    (.shutdown cluster)))
+
+(defn submit-topology! [name]
+  (StormSubmitter/submitTopology
+   name
+   {TOPOLOGY-DEBUG true
+    TOPOLOGY-WORKERS 3}
+   (mk-topology)))
+
+(defn -main
+  ([]
+   (run-local!))
+  ([name]
+   (submit-topology! name)))

http://git-wip-us.apache.org/repos/asf/storm/blob/3b7c4a81/examples/storm-starter/test/clj/org/apache/storm/starter/clj/bolts_test.clj
----------------------------------------------------------------------
diff --git a/examples/storm-starter/test/clj/org/apache/storm/starter/clj/bolts_test.clj b/examples/storm-starter/test/clj/org/apache/storm/starter/clj/bolts_test.clj
new file mode 100644
index 0000000..d18f19b
--- /dev/null
+++ b/examples/storm-starter/test/clj/org/apache/storm/starter/clj/bolts_test.clj
@@ -0,0 +1,115 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.starter.clj.bolts-test
+  (:require [clojure.test :refer :all]
+            [org.apache.storm.starter.clj.word-count :refer [word-count split-sentence]]
+            [org.apache.storm.starter.clj.exclamation :refer [exclamation-bolt]]
+            [org.apache.storm.starter.clj.bolts :refer
+             [rolling-count-bolt intermediate-rankings-bolt total-rankings-bolt]]
+            [org.apache.storm [testing :refer :all]])
+  (:import [org.apache.storm Constants]
+           [org.apache.storm.task OutputCollector IOutputCollector]
+           [org.apache.storm.starter.tools Rankable]
+           [org.apache.storm.tuple Tuple]))
+
+(defmacro test-with-tuple [[bolt tuples] & body]
+  `(let [bolt# ~bolt
+         tuples# (atom [])]
+     (.prepare bolt# {} nil (OutputCollector.
+                       (reify IOutputCollector
+                         (emit [_ _ _ tuple#]
+                           (swap! tuples# conj tuple#))
+                         (ack [_ input]))))
+     (if (vector? ~tuples)
+       (doseq [t# ~tuples]
+         (.execute bolt# t#))
+       (.execute bolt# ~tuples))
+     (let [~'tuples @tuples#]
+       ~@body)))
+
+(defn- mock-tuple [m & {component :component stream-id :stream-id
+                        :or {component "1" stream-id "1"}}]
+  (reify
+    Tuple
+    (getSourceComponent [_]
+      component)
+    (getSourceStreamId [_]
+      stream-id)
+    (getString [this i]
+      (nth (vals m) 0))
+    (getValues [_]
+      (vals m))
+    clojure.lang.IPersistentMap
+    (valAt [_ key]
+      (get m key))
+    (seq [_]
+      (seq m))))
+
+(def ^{:private true} tick-tuple
+  (mock-tuple {}
+              :component Constants/SYSTEM_COMPONENT_ID
+              :stream-id Constants/SYSTEM_TICK_STREAM_ID))
+
+(deftest test-split-sentence
+  (testing "Bolt emits word per sentence"
+    (test-with-tuple [split-sentence (mock-tuple
+                                        {"sentence" "the cat jumped over the door"})]
+      (is (= [["the"] ["cat"] ["jumped"] ["over"] ["the"] ["door"]] tuples)))))
+
+(deftest test-word-count
+  (testing "Bolt emits new count"
+    (test-with-tuple [word-count [(mock-tuple {"word" "the"})
+                                  (mock-tuple {"word" "the"})
+                                  (mock-tuple {"word" "cat"})]]
+      (is (ms= [["the" 1] ["the" 2] ["cat" 1]] tuples)))))
+
+(deftest test-exclamation-bolt
+  (testing "Bolt emits word with exclamation marks"
+    (test-with-tuple [exclamation-bolt (mock-tuple {"word" "nathan"})]
+      (is (= [["nathan!!!"]] tuples)))))
+
+(deftest test-rolling-bolt
+  (testing "Emits nothing if no object has been counted"
+    (test-with-tuple [(rolling-count-bolt 9 3) tick-tuple]
+      (is (empty? tuples))))
+  (testing "Emits something if object was counted"
+    (test-with-tuple [(rolling-count-bolt 9 3)
+                      [(mock-tuple {"word" "nathan"}) tick-tuple]]
+      (is (= [["nathan" 1 0]] tuples)))))
+
+(deftest test-intermediate-rankings-bolt
+  (testing "Emits rankings for tick tuple"
+    (test-with-tuple [(intermediate-rankings-bolt 5 2) tick-tuple]
+      (is (seq tuples))))
+  (testing "Emits nothing for normal tuple"
+    (test-with-tuple [(intermediate-rankings-bolt 5 2)
+                      (mock-tuple {"obj" "nathan" "count" 1})]
+      (is (empty? tuples)))))
+
+(defn- mock-rankable [object count]
+  "Creates rankable with object and count"
+  (reify Rankable
+    (getCount [_] count)
+    (getObject [_] object)))
+
+(deftest test-total-rankings-bolt
+  (testing "Emits rankings for tick tuple"
+    (test-with-tuple [(total-rankings-bolt 5 2) tick-tuple]
+      (is (seq tuples))))
+  (testing "Emits nothing for normal tuple"
+    (test-with-tuple [(total-rankings-bolt 5 2)
+                      (mock-tuple {"rankings" (mock-rankable "nathan" 2)})]
+     (is (empty? tuples)))))


[2/6] storm git commit: Correct typo on example javadoc

Posted by kn...@apache.org.
Correct typo on example javadoc


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8c31e4dd
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8c31e4dd
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8c31e4dd

Branch: refs/heads/master
Commit: 8c31e4ddf9012eaf13d76e6c16684037b31fe5ef
Parents: d22d845
Author: Bryant <br...@gmail.com>
Authored: Sun Mar 20 17:15:12 2016 -0700
Committer: Bryant <br...@gmail.com>
Committed: Fri Mar 25 16:03:13 2016 -0700

----------------------------------------------------------------------
 .../src/jvm/org/apache/storm/starter/FastWordCountTopology.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/8c31e4dd/examples/storm-starter/src/jvm/org/apache/storm/starter/FastWordCountTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/FastWordCountTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/FastWordCountTopology.java
index 51f6b11..5d86ff0 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/FastWordCountTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/FastWordCountTopology.java
@@ -40,7 +40,7 @@ import java.util.Random;
 import java.util.concurrent.ThreadLocalRandom;
 
 /**
- * WordCount but teh spout does not stop, and the bolts are implemented in
+ * WordCount but the spout does not stop, and the bolts are implemented in
  * java.  This can show how fast the word count can run.
  */
 public class FastWordCountTopology {