[4/6] storm git commit: STORM-1179: Create Maven Profiles for Integration Tests - Mark Clojure tests as integration tests

STORM-1179: Create Maven Profiles for Integration Tests
- Mark Clojure tests as integration tests


Branch: refs/heads/master
Commit: d4fcc0fde96c667d1763fe5626ddf1284e4a7f70
Parents: 8f1b4fb
Author: Hugo Louro <>
Authored: Wed Dec 9 16:50:28 2015 -0800
Committer: Hugo Louro <>
Committed: Wed Dec 9 18:58:22 2015 -0800

 .../clj/backtype/storm/integration_test.clj     | 622 -------------------
 .../test/clj/backtype/storm/testing4j_test.clj  | 212 -------
 .../backtype/storm/integration_test.clj         | 622 +++++++++++++++++++
 .../backtype/storm/testing4j_test.clj           | 212 +++++++
 .../storm/trident/integration_test.clj          | 292 +++++++++
 .../test/clj/storm/trident/integration_test.clj | 292 ---------
 6 files changed, 1126 insertions(+), 1126 deletions(-)
diff --git a/storm-core/test/clj/integration/backtype/storm/integration_test.clj b/storm-core/test/clj/integration/backtype/storm/integration_test.clj
new file mode 100644
index 0000000..f5fa501
--- /dev/null
+++ b/storm-core/test/clj/integration/backtype/storm/integration_test.clj
@@ -0,0 +1,622 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns integration.backtype.storm.integration-test
+  (:use [clojure test])
+  (:import [backtype.storm Config])
+  (:import [backtype.storm.topology TopologyBuilder])
+  (:import [backtype.storm.generated InvalidTopologyException SubmitOptions TopologyInitialStatus RebalanceOptions])
+  (:import [backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount
+            TestAggregatesCounter TestConfBolt AckFailMapTracker AckTracker TestPlannerSpout])
+  (:import [backtype.storm.tuple Fields])
+  (:use [backtype.storm testing config clojure util])
+  (:use [backtype.storm.daemon common])
+  (:require [backtype.storm [thrift :as thrift]]))
+(deftest test-basic-topology
+  (doseq [zmq-on? [true false]]
+    (with-simulated-time-local-cluster [cluster :supervisors 4
+                                        :daemon-conf {STORM-LOCAL-MODE-ZMQ zmq-on?}]
+      (let [topology (thrift/mk-topology
+                      {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
+                      {"2" (thrift/mk-bolt-spec {"1" ["word"]} (TestWordCounter.) :parallelism-hint 4)
+                       "3" (thrift/mk-bolt-spec {"1" :global} (TestGlobalCount.))
+                       "4" (thrift/mk-bolt-spec {"2" :global} (TestAggregatesCounter.))
+                       })
+            results (complete-topology cluster
+                                       topology
+                                       :mock-sources {"1" [["nathan"] ["bob"] ["joey"] ["nathan"]]}
+                                       :storm-conf {TOPOLOGY-WORKERS 2})]
+        (is (ms= [["nathan"] ["bob"] ["joey"] ["nathan"]]
+                 (read-tuples results "1")))
+        (is (ms= [["nathan" 1] ["nathan" 2] ["bob" 1] ["joey" 1]]
+                 (read-tuples results "2")))
+        (is (= [[1] [2] [3] [4]]
+               (read-tuples results "3")))
+        (is (= [[1] [2] [3] [4]]
+               (read-tuples results "4")))
+        ))))
+(defbolt emit-task-id ["tid"] {:prepare true}
+  [conf context collector]
+  (let [tid (.getThisTaskIndex context)]
+    (bolt
+      (execute [tuple]
+        (emit-bolt! collector [tid] :anchor tuple)
+        (ack! collector tuple)
+        ))))
+(deftest test-multi-tasks-per-executor
+  (with-simulated-time-local-cluster [cluster :supervisors 4]
+    (let [topology (thrift/mk-topology
+                    {"1" (thrift/mk-spout-spec (TestWordSpout. true))}
+                    {"2" (thrift/mk-bolt-spec {"1" :all} emit-task-id
+                      :parallelism-hint 3
+                      :conf {TOPOLOGY-TASKS 6})
+                     })
+          results (complete-topology cluster
+                                     topology
+                                     :mock-sources {"1" [["a"]]})]
+      (is (ms= [[0] [1] [2] [3] [4] [5]]
+               (read-tuples results "2")))
+      )))
+(defbolt ack-every-other {} {:prepare true}
+  [conf context collector]
+  (let [state (atom -1)]
+    (bolt
+      (execute [tuple]
+        (let [val (swap! state -)]
+          (when (pos? val)
+            (ack! collector tuple)
+            ))))))
+(defn assert-loop [afn ids]
+  (while (not (every? afn ids))
+    (Thread/sleep 1)))
+(defn assert-acked [tracker & ids]
+  (assert-loop #(.isAcked tracker %) ids))
+(defn assert-failed [tracker & ids]
+  (assert-loop #(.isFailed tracker %) ids))
+(deftest test-timeout
+  (with-simulated-time-local-cluster [cluster :daemon-conf {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true}]
+    (let [feeder (feeder-spout ["field1"])
+          tracker (AckFailMapTracker.)
+          _ (.setAckFailDelegate feeder tracker)
+          topology (thrift/mk-topology
+                     {"1" (thrift/mk-spout-spec feeder)}
+                     {"2" (thrift/mk-bolt-spec {"1" :global} ack-every-other)})]      
+      (submit-local-topology (:nimbus cluster)
+                             "timeout-tester"
+                             {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10}
+                             topology)
+      (advance-cluster-time cluster 11)
+      (.feed feeder ["a"] 1)
+      (.feed feeder ["b"] 2)
+      (.feed feeder ["c"] 3)
+      (advance-cluster-time cluster 9)
+      (assert-acked tracker 1 3)
+      (is (not (.isFailed tracker 2)))
+      (advance-cluster-time cluster 12)
+      (assert-failed tracker 2)
+      )))
+(defn mk-validate-topology-1 []
+  (thrift/mk-topology
+                    {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
+                    {"2" (thrift/mk-bolt-spec {"1" ["word"]} (TestWordCounter.) :parallelism-hint 4)}))
+(defn mk-invalidate-topology-1 []
+  (thrift/mk-topology
+                    {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
+                    {"2" (thrift/mk-bolt-spec {"3" ["word"]} (TestWordCounter.) :parallelism-hint 4)}))
+(defn mk-invalidate-topology-2 []
+  (thrift/mk-topology
+                    {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
+                    {"2" (thrift/mk-bolt-spec {"1" ["non-exists-field"]} (TestWordCounter.) :parallelism-hint 4)}))
+(defn mk-invalidate-topology-3 []
+  (thrift/mk-topology
+                    {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
+                    {"2" (thrift/mk-bolt-spec {["1" "non-exists-stream"] ["word"]} (TestWordCounter.) :parallelism-hint 4)}))
+(defn try-complete-wc-topology [cluster topology]
+  (try (do
+         (complete-topology cluster
+                            topology
+                            :mock-sources {"1" [["nathan"] ["bob"] ["joey"] ["nathan"]]}
+                            :storm-conf {TOPOLOGY-WORKERS 2})
+         false)
+       (catch InvalidTopologyException e true)))
+(deftest test-validate-topology-structure
+  (with-simulated-time-local-cluster [cluster :supervisors 4]
+    (let [any-error1? (try-complete-wc-topology cluster (mk-validate-topology-1))
+          any-error2? (try-complete-wc-topology cluster (mk-invalidate-topology-1))
+          any-error3? (try-complete-wc-topology cluster (mk-invalidate-topology-2))
+          any-error4? (try-complete-wc-topology cluster (mk-invalidate-topology-3))]
+      (is (= any-error1? false))
+      (is (= any-error2? true))
+      (is (= any-error3? true))
+      (is (= any-error4? true)))))
+(defbolt identity-bolt ["num"]
+  [tuple collector]
+  (emit-bolt! collector (.getValues tuple) :anchor tuple)
+  (ack! collector tuple))
+(deftest test-system-stream
+  ;; this test works because mocking a spout splits up the tuples evenly among the tasks
+  (with-simulated-time-local-cluster [cluster]
+      (let [topology (thrift/mk-topology
+                      {"1" (thrift/mk-spout-spec (TestWordSpout. true) :p 3)}
+                      {"2" (thrift/mk-bolt-spec {"1" ["word"] ["1" "__system"] :global} identity-bolt :p 1)
+                       })
+            results (complete-topology cluster
+                                       topology
+                                       :mock-sources {"1" [["a"] ["b"] ["c"]]}
+                                       :storm-conf {TOPOLOGY-WORKERS 2})]
+        (is (ms= [["a"] ["b"] ["c"] ["startup"] ["startup"] ["startup"]]
+                 (read-tuples results "2")))
+        )))
+(defn ack-tracking-feeder [fields]
+  (let [tracker (AckTracker.)]
+    [(doto (feeder-spout fields)
+       (.setAckFailDelegate tracker))
+     (fn [val]
+       (is (= (.getNumAcks tracker) val))
+       (.resetNumAcks tracker)
+       )]
+    ))
+(defbolt branching-bolt ["num"]
+  {:params [amt]}
+  [tuple collector]
+  (doseq [i (range amt)]
+    (emit-bolt! collector [i] :anchor tuple))
+  (ack! collector tuple))
+(defbolt agg-bolt ["num"] {:prepare true :params [amt]}
+  [conf context collector]
+  (let [seen (atom [])]
+    (bolt
+      (execute [tuple]
+        (swap! seen conj tuple)
+        (when (= (count @seen) amt)
+          (emit-bolt! collector [1] :anchor @seen)
+          (doseq [s @seen]
+            (ack! collector s))
+          (reset! seen [])
+          )))
+      ))
+(defbolt ack-bolt {}
+  [tuple collector]
+  (ack! collector tuple))
+(deftest test-acking
+  (with-tracked-cluster [cluster]
+    (let [[feeder1 checker1] (ack-tracking-feeder ["num"])
+          [feeder2 checker2] (ack-tracking-feeder ["num"])
+          [feeder3 checker3] (ack-tracking-feeder ["num"])
+          tracked (mk-tracked-topology
+                   cluster
+                   (topology
+                     {"1" (spout-spec feeder1)
+                      "2" (spout-spec feeder2)
+                      "3" (spout-spec feeder3)}
+                     {"4" (bolt-spec {"1" :shuffle} (branching-bolt 2))
+                      "5" (bolt-spec {"2" :shuffle} (branching-bolt 4))
+                      "6" (bolt-spec {"3" :shuffle} (branching-bolt 1))
+                      "7" (bolt-spec
+                            {"4" :shuffle
+                            "5" :shuffle
+                            "6" :shuffle}
+                            (agg-bolt 3))
+                      "8" (bolt-spec {"7" :shuffle} (branching-bolt 2))
+                      "9" (bolt-spec {"8" :shuffle} ack-bolt)}
+                     ))]
+      (submit-local-topology (:nimbus cluster)
+                             "acking-test1"
+                             {}
+                             (:topology tracked))
+      (advance-cluster-time cluster 11)
+      (.feed feeder1 [1])
+      (tracked-wait tracked 1)
+      (checker1 0)
+      (.feed feeder2 [1])
+      (tracked-wait tracked 1)
+      (checker1 1)
+      (checker2 1)
+      (.feed feeder1 [1])
+      (tracked-wait tracked 1)
+      (checker1 0)
+      (.feed feeder1 [1])
+      (tracked-wait tracked 1)
+      (checker1 1)
+      (.feed feeder3 [1])
+      (tracked-wait tracked 1)
+      (checker1 0)
+      (checker3 0)
+      (.feed feeder2 [1])
+      (tracked-wait tracked 1)
+      (checker1 1)
+      (checker2 1)
+      (checker3 1)
+      )))
+(deftest test-ack-branching
+  (with-tracked-cluster [cluster]
+    (let [[feeder checker] (ack-tracking-feeder ["num"])
+          tracked (mk-tracked-topology
+                   cluster
+                   (topology
+                     {"1" (spout-spec feeder)}
+                     {"2" (bolt-spec {"1" :shuffle} identity-bolt)
+                      "3" (bolt-spec {"1" :shuffle} identity-bolt)
+                      "4" (bolt-spec
+                            {"2" :shuffle
+                             "3" :shuffle}
+                             (agg-bolt 4))}))]
+      (submit-local-topology (:nimbus cluster)
+                             "test-acking2"
+                             {}
+                             (:topology tracked))
+      (advance-cluster-time cluster 11)
+      (.feed feeder [1])
+      (tracked-wait tracked 1)
+      (checker 0)
+      (.feed feeder [1])
+      (tracked-wait tracked 1)
+      (checker 2)
+      )))
+(defbolt dup-anchor ["num"]
+  [tuple collector]
+  (emit-bolt! collector [1] :anchor [tuple tuple])
+  (ack! collector tuple))
+(def bolt-prepared? (atom false))
+(defbolt prepare-tracked-bolt [] {:prepare true}
+  [conf context collector]  
+  (reset! bolt-prepared? true)
+  (bolt
+   (execute [tuple]
+            (ack! collector tuple))))
+(def spout-opened? (atom false))
+(defspout open-tracked-spout ["val"]
+  [conf context collector]
+  (reset! spout-opened? true)
+  (spout
+   (nextTuple [])))
+(deftest test-submit-inactive-topology
+  (with-simulated-time-local-cluster [cluster :daemon-conf {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true}]
+    (let [feeder (feeder-spout ["field1"])
+          tracker (AckFailMapTracker.)
+          _ (.setAckFailDelegate feeder tracker)
+          topology (thrift/mk-topology
+                    {"1" (thrift/mk-spout-spec feeder)
+                     "2" (thrift/mk-spout-spec open-tracked-spout)}
+                    {"3" (thrift/mk-bolt-spec {"1" :global} prepare-tracked-bolt)})]
+      (reset! bolt-prepared? false)
+      (reset! spout-opened? false)      
+      (submit-local-topology-with-opts (:nimbus cluster)
+        "test"
+        topology
+        (SubmitOptions. TopologyInitialStatus/INACTIVE))
+      (advance-cluster-time cluster 11)
+      (.feed feeder ["a"] 1)
+      (advance-cluster-time cluster 9)
+      (is (not @bolt-prepared?))
+      (is (not @spout-opened?))        
+      (.activate (:nimbus cluster) "test")              
+      (advance-cluster-time cluster 12)
+      (assert-acked tracker 1)
+      (is @bolt-prepared?)
+      (is @spout-opened?))))
+(deftest test-acking-self-anchor
+  (with-tracked-cluster [cluster]
+    (let [[feeder checker] (ack-tracking-feeder ["num"])
+          tracked (mk-tracked-topology
+                   cluster
+                   (topology
+                     {"1" (spout-spec feeder)}
+                     {"2" (bolt-spec {"1" :shuffle} dup-anchor)
+                      "3" (bolt-spec {"2" :shuffle} ack-bolt)}))]
+      (submit-local-topology (:nimbus cluster)
+                             "test"
+                             {}
+                             (:topology tracked))
+      (advance-cluster-time cluster 11)
+      (.feed feeder [1])
+      (tracked-wait tracked 1)
+      (checker 1)
+      (.feed feeder [1])
+      (.feed feeder [1])
+      (.feed feeder [1])
+      (tracked-wait tracked 3)
+      (checker 3)
+      )))
+;; (defspout ConstantSpout ["val"] {:prepare false}
+;;   [collector]
+;;   (Time/sleep 100)
+;;   (emit-spout! collector [1]))
+;; (def errored (atom false))
+;; (def restarted (atom false))
+;; (defbolt local-error-checker {} [tuple collector]
+;;   (when-not @errored
+;;     (reset! errored true)
+;;     (println "erroring")
+;;     (throw (RuntimeException.)))
+;;   (when-not @restarted (println "restarted"))
+;;   (reset! restarted true))
+;; (deftest test-no-halt-local-mode
+;;   (with-simulated-time-local-cluster [cluster]
+;;       (let [topology (topology
+;;                       {1 (spout-spec ConstantSpout)}
+;;                       {2 (bolt-spec {1 :shuffle} local-error-checker)
+;;                        })]
+;;         (submit-local-topology (:nimbus cluster)
+;;                                "test"
+;;                                {}
+;;                                topology)
+;;         (while (not @restarted)
+;;           (advance-time-ms! 100))
+;;         )))
+(defspout IncSpout ["word"]
+  [conf context collector]
+  (let [state (atom 0)]
+    (spout
+     (nextTuple []
+       (Thread/sleep 100)
+       (emit-spout! collector [@state] :id 1)         
+       )
+     (ack [id]
+       (swap! state inc))
+     )))
+(defspout IncSpout2 ["word"] {:params [prefix]}
+  [conf context collector]
+  (let [state (atom 0)]
+    (spout
+     (nextTuple []
+       (Thread/sleep 100)
+       (swap! state inc)
+       (emit-spout! collector [(str prefix "-" @state)])         
+       )
+     )))
+;; (deftest test-clojure-spout
+;;   (with-local-cluster [cluster]
+;;     (let [nimbus (:nimbus cluster)
+;;           top (topology
+;;                {1 (spout-spec IncSpout)}
+;;                {}
+;;                )]
+;;       (submit-local-topology nimbus
+;;                              "spout-test"
+;;                              {TOPOLOGY-DEBUG true
+;;                               TOPOLOGY-MESSAGE-TIMEOUT-SECS 3}
+;;                              top)
+;;       (Thread/sleep 10000)
+;;       (.killTopology nimbus "spout-test")
+;;       (Thread/sleep 10000)
+;;       )))
+(deftest test-kryo-decorators-config
+  (with-simulated-time-local-cluster [cluster
+                                      :daemon-conf {TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true
+                                                    TOPOLOGY-KRYO-DECORATORS ["this-is-overriden"]}]
+    (letlocals
+     (bind builder (TopologyBuilder.))
+     (.setSpout builder "1" (TestPlannerSpout. (Fields. ["conf"])))
+     (-> builder
+         (.setBolt "2"
+                   (TestConfBolt.
+                    {TOPOLOGY-KRYO-DECORATORS ["one" "two"]}))
+         (.shuffleGrouping "1"))
+     (bind results
+           (complete-topology cluster
+                              (.createTopology builder)
+                              :storm-conf {TOPOLOGY-KRYO-DECORATORS ["one" "three"]}
+                              :mock-sources {"1" [[TOPOLOGY-KRYO-DECORATORS]]}))
+     (is (= {"topology.kryo.decorators" (list "one" "two" "three")}            
+            (->> (read-tuples results "2")
+                 (apply concat)
+                 (apply hash-map)))))))
+(deftest test-component-specific-config
+  (with-simulated-time-local-cluster [cluster
+                                      :daemon-conf {TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true}]
+    (letlocals
+     (bind builder (TopologyBuilder.))
+     (.setSpout builder "1" (TestPlannerSpout. (Fields. ["conf"])))
+     (-> builder
+         (.setBolt "2"
+                   (TestConfBolt.
+                    {"fake.config" 123
+                     TOPOLOGY-MAX-TASK-PARALLELISM 20
+                     TOPOLOGY-MAX-SPOUT-PENDING 30
+                     TOPOLOGY-KRYO-REGISTER [{"fake.type" "bad.serializer"}
+                                             {"fake.type2" "a.serializer"}]
+                     }))
+         (.shuffleGrouping "1")
+         (.setMaxTaskParallelism (int 2))
+         (.addConfiguration "fake.config2" 987)
+         )
+     (bind results
+           (complete-topology cluster
+                              (.createTopology builder)
+                              :storm-conf {TOPOLOGY-KRYO-REGISTER [{"fake.type" "good.serializer" "fake.type3" "a.serializer3"}]}
+                              :mock-sources {"1" [["fake.config"]
+                                                  [TOPOLOGY-MAX-TASK-PARALLELISM]
+                                                  [TOPOLOGY-MAX-SPOUT-PENDING]
+                                                  ["fake.config2"]
+                                                  [TOPOLOGY-KRYO-REGISTER]
+                                                  ]}))
+     (is (= {"fake.config" 123
+             "fake.config2" 987
+             TOPOLOGY-KRYO-REGISTER {"fake.type" "good.serializer"
+                                     "fake.type2" "a.serializer"
+                                     "fake.type3" "a.serializer3"}}
+            (->> (read-tuples results "2")
+                 (apply concat)
+                 (apply hash-map))
+            ))
+     )))
+(defbolt hooks-bolt ["emit" "ack" "fail" "executed"] {:prepare true}
+  [conf context collector]
+  (let [acked (atom 0)
+        failed (atom 0)
+        executed (atom 0)
+        emitted (atom 0)]
+    (.addTaskHook context
+                  (reify backtype.storm.hooks.ITaskHook
+                    (prepare [this conf context]
+                      )
+                    (cleanup [this]
+                      )
+                    (emit [this info]
+                      (swap! emitted inc))
+                    (boltAck [this info]
+                      (swap! acked inc))
+                    (boltFail [this info]
+                      (swap! failed inc))
+                    (boltExecute [this info]
+                      (swap! executed inc))
+                      ))
+    (bolt
+     (execute [tuple]
+        (emit-bolt! collector [@emitted @acked @failed @executed])
+        (if (= 0 (- @acked @failed))
+          (ack! collector tuple)
+          (fail! collector tuple))
+        ))))
+(deftest test-hooks
+  (with-simulated-time-local-cluster [cluster]
+    (let [topology (topology {"1" (spout-spec (TestPlannerSpout. (Fields. ["conf"])))
+                              }
+                             {"2" (bolt-spec {"1" :shuffle}
+                                             hooks-bolt)
+                              })
+          results (complete-topology cluster
+                                     topology
+                                     :mock-sources {"1" [[1]
+                                                         [1]
+                                                         [1]
+                                                         [1]
+                                                         ]})]
+      (is (= [[0 0 0 0]
+              [2 1 0 1]
+              [4 1 1 2]
+              [6 2 1 3]]
+             (read-tuples results "2")
+             )))))
+(defbolt report-errors-bolt {}
+  [tuple collector]
+  (doseq [i (range (.getValue tuple 0))]
+    (report-error! collector (RuntimeException.)))
+  (ack! collector tuple))
+(deftest test-throttled-errors
+  (with-simulated-time
+    (with-tracked-cluster [cluster]
+      (let [state (:storm-cluster-state cluster)
+            [feeder checker] (ack-tracking-feeder ["num"])
+            tracked (mk-tracked-topology
+                     cluster
+                     (topology
+                       {"1" (spout-spec feeder)}
+                       {"2" (bolt-spec {"1" :shuffle} report-errors-bolt)}))
+            _       (submit-local-topology (:nimbus cluster)
+                                             "test-errors"
+                                             {TOPOLOGY-ERROR-THROTTLE-INTERVAL-SECS 10
+                                              TOPOLOGY-MAX-ERROR-REPORT-PER-INTERVAL 4
+                                              TOPOLOGY-DEBUG true
+                                              }
+                                             (:topology tracked))
+            _ (advance-cluster-time cluster 11)
+            storm-id (get-storm-id state "test-errors")
+            errors-count (fn [] (count (.errors state storm-id "2")))]
+        (is (nil? (.last-error state storm-id "2")))
+        ;; so it launches the topology
+        (advance-cluster-time cluster 2)
+        (.feed feeder [6])
+        (tracked-wait tracked 1)
+        (is (= 4 (errors-count)))
+        (is (.last-error state storm-id "2"))
+        (advance-time-secs! 5)
+        (.feed feeder [2])
+        (tracked-wait tracked 1)
+        (is (= 4 (errors-count)))
+        (is (.last-error state storm-id "2"))
+        (advance-time-secs! 6)
+        (.feed feeder [2])
+        (tracked-wait tracked 1)
+        (is (= 6 (errors-count)))
+        (is (.last-error state storm-id "2"))
+        (advance-time-secs! 6)
+        (.feed feeder [3])
+        (tracked-wait tracked 1)
+        (is (= 8 (errors-count)))
+        (is (.last-error state storm-id "2"))))))
+(deftest test-acking-branching-complex
+  ;; test acking with branching in the topology
+  )
+(deftest test-fields-grouping
+  ;; 1. put a shitload of random tuples through it and test that counts are right
+  ;; 2. test that different spouts with different phints group the same way
+  )
+(deftest test-all-grouping
+  )
+(deftest test-direct-grouping
+  )
diff --git a/storm-core/test/clj/integration/backtype/storm/testing4j_test.clj b/storm-core/test/clj/integration/backtype/storm/testing4j_test.clj
new file mode 100644
index 0000000..5cdb182
--- /dev/null
+++ b/storm-core/test/clj/integration/backtype/storm/testing4j_test.clj
@@ -0,0 +1,212 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns integration.backtype.storm.testing4j-test
+  (:use [clojure.test])
+  (:use [backtype.storm config clojure testing util])
+  (:require [integration.backtype.storm.integration-test :as it])
+  (:require [backtype.storm.thrift :as thrift])
+  (:import [backtype.storm Testing Config ILocalCluster])
+  (:import [backtype.storm.tuple Values Tuple])
+  (:import [backtype.storm.utils Time Utils])
+  (:import [backtype.storm.testing MkClusterParam TestJob MockedSources TestWordSpout
+            TestWordCounter TestGlobalCount TestAggregatesCounter CompleteTopologyParam
+            AckFailMapTracker MkTupleParam]))
+(deftest test-with-simulated-time
+  (is (= false (Time/isSimulating)))
+  (Testing/withSimulatedTime (fn []
+                               (is (= true (Time/isSimulating)))))
+  (is (= false (Time/isSimulating))))
+(deftest test-with-local-cluster
+  (let [mk-cluster-param (doto (MkClusterParam.)
+                           (.setSupervisors (int 2))
+                           (.setPortsPerSupervisor (int 5)))
+        daemon-conf (doto (Config.)
+                      (.put SUPERVISOR-ENABLE false)
+                      (.put TOPOLOGY-ACKER-EXECUTORS 0))]
+    (Testing/withLocalCluster mk-cluster-param (reify TestJob
+                                                 (^void run [this ^ILocalCluster cluster]
+                                                   (is (not (nil? cluster)))
+                                                   (is (not (nil? (.getState cluster))))
+                                                   (is (not (nil? (:nimbus (.getState cluster))))))))))
+(deftest test-with-simulated-time-local-cluster
+  (let [mk-cluster-param (doto (MkClusterParam.)
+                           (.setSupervisors (int 2)))
+        daemon-conf (doto (Config.)
+                      (.put SUPERVISOR-ENABLE false)
+                      (.put TOPOLOGY-ACKER-EXECUTORS 0))]
+    (is (not (Time/isSimulating)))
+    (Testing/withSimulatedTimeLocalCluster mk-cluster-param (reify TestJob
+                                                              (^void run [this ^ILocalCluster cluster]
+                                                                (is (not (nil? cluster)))
+                                                                (is (not (nil? (.getState cluster))))
+                                                                (is (not (nil? (:nimbus (.getState cluster)))))
+                                                                (is (Time/isSimulating)))))
+    (is (not (Time/isSimulating)))))
+(deftest test-complete-topology
+  (doseq [zmq-on? [true false]
+          :let [daemon-conf (doto (Config.)
+                              (.put STORM-LOCAL-MODE-ZMQ zmq-on?))
+                mk-cluster-param (doto (MkClusterParam.)
+                                   (.setSupervisors (int 4))
+                                   (.setDaemonConf daemon-conf))]]
+    (Testing/withSimulatedTimeLocalCluster
+     (reify TestJob
+       (^void run [this ^ILocalCluster cluster]
+         (let [topology (thrift/mk-topology
+                         {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
+                         {"2" (thrift/mk-bolt-spec {"1" ["word"]} (TestWordCounter.) :parallelism-hint 4)
+                          "3" (thrift/mk-bolt-spec {"1" :global} (TestGlobalCount.))
+                          "4" (thrift/mk-bolt-spec {"2" :global} (TestAggregatesCounter.))
+                          })
+               mocked-sources (doto (MockedSources.)
+                                (.addMockData "1" (into-array Values [(Values. (into-array ["nathan"]))
+                                                                      (Values. (into-array ["bob"]))
+                                                                      (Values. (into-array ["joey"]))
+                                                                      (Values. (into-array ["nathan"]))])
+                                              ))
+               storm-conf (doto (Config.)
+                            (.setNumWorkers 2))
+               complete-topology-param (doto (CompleteTopologyParam.)
+                                         (.setMockedSources mocked-sources)
+                                         (.setStormConf storm-conf))
+               results (Testing/completeTopology cluster
+                                                 topology
+                                                 complete-topology-param)]
+           (is (Testing/multiseteq [["nathan"] ["bob"] ["joey"] ["nathan"]]
+                           (Testing/readTuples results "1")))
+           (is (Testing/multiseteq [["nathan" 1] ["nathan" 2] ["bob" 1] ["joey" 1]]
+                           (read-tuples results "2")))
+           (is (= [[1] [2] [3] [4]]
+                  (Testing/readTuples results "3")))
+           (is (= [[1] [2] [3] [4]]
+                  (Testing/readTuples results "4")))
+           ))))))
+(deftest test-with-tracked-cluster
+  (Testing/withTrackedCluster
+   (reify TestJob
+     (^void run [this ^ILocalCluster cluster]
+       (let [[feeder checker] (it/ack-tracking-feeder ["num"])
+             tracked (Testing/mkTrackedTopology
+                      cluster
+                      (topology
+                       {"1" (spout-spec feeder)}
+                       {"2" (bolt-spec {"1" :shuffle} it/identity-bolt)
+                        "3" (bolt-spec {"1" :shuffle} it/identity-bolt)
+                        "4" (bolt-spec
+                             {"2" :shuffle
+                              "3" :shuffle}
+                             (it/agg-bolt 4))}))]
+         (.submitTopology cluster
+                          "test-acking2"
+                          (Config.)
+                          (.getTopology tracked))
+         (advance-cluster-time (.getState cluster) 11)
+         (.feed feeder [1])
+         (Testing/trackedWait tracked (int 1))
+         (checker 0)
+         (.feed feeder [1])
+         (Testing/trackedWait tracked (int 1))
+         (checker 2)
+         )))))
+(deftest test-advance-cluster-time
+  (let [daemon-conf (doto (Config.)
+                      (.put TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true))
+        mk-cluster-param (doto (MkClusterParam.)
+                           (.setDaemonConf daemon-conf))]
+    (Testing/withSimulatedTimeLocalCluster
+     mk-cluster-param
+     (reify TestJob
+       (^void run [this ^ILocalCluster cluster]
+         (let [feeder (feeder-spout ["field1"])
+               tracker (AckFailMapTracker.)
+               _ (.setAckFailDelegate feeder tracker)
+               topology (thrift/mk-topology
+                         {"1" (thrift/mk-spout-spec feeder)}
+                         {"2" (thrift/mk-bolt-spec {"1" :global} it/ack-every-other)})
+               storm-conf (doto (Config.)
+                            (.put TOPOLOGY-MESSAGE-TIMEOUT-SECS 10))]
+           (.submitTopology cluster
+                            "timeout-tester"
+                            storm-conf
+                            topology)
+           (.feed feeder ["a"] 1)
+           (.feed feeder ["b"] 2)
+           (.feed feeder ["c"] 3)
+           (Testing/advanceClusterTime cluster (int 9))
+           (it/assert-acked tracker 1 3)
+           (is (not (.isFailed tracker 2)))
+           (Testing/advanceClusterTime cluster (int 12))
+           (it/assert-failed tracker 2)
+           ))))))
+(deftest test-disable-tuple-timeout
+  (let [daemon-conf (doto (Config.)
+                      (.put TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS false))
+        mk-cluster-param (doto (MkClusterParam.)
+                           (.setDaemonConf daemon-conf))]
+    (Testing/withSimulatedTimeLocalCluster
+      mk-cluster-param
+      (reify TestJob
+        (^void run [this ^ILocalCluster cluster]
+          (let [feeder (feeder-spout ["field1"])
+                tracker (AckFailMapTracker.)
+                _ (.setAckFailDelegate feeder tracker)
+                topology (thrift/mk-topology
+                           {"1" (thrift/mk-spout-spec feeder)}
+                           {"2" (thrift/mk-bolt-spec {"1" :global} it/ack-every-other)})
+                storm-conf (doto (Config.)
+                             (.put TOPOLOGY-MESSAGE-TIMEOUT-SECS 10)
+                             (.put TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS false))]
+            (.submitTopology cluster
+              "disable-timeout-tester"
+              storm-conf
+              topology)
+            (.feed feeder ["a"] 1)
+            (.feed feeder ["b"] 2)
+            (.feed feeder ["c"] 3)
+            (Testing/advanceClusterTime cluster (int 9))
+            (it/assert-acked tracker 1 3)
+            (is (not (.isFailed tracker 2)))
+            (Testing/advanceClusterTime cluster (int 12))
+            (is (not (.isFailed tracker 2)))
+            ))))))
+(deftest test-test-tuple
+  (letlocals
+   ;; test the one-param signature
+   (bind ^Tuple tuple (Testing/testTuple ["james" "bond"]))
+   (is (= ["james" "bond"] (.getValues tuple)))
+   (is (= Utils/DEFAULT_STREAM_ID (.getSourceStreamId tuple)))
+   (is (= ["field1" "field2"] (-> tuple .getFields .toList)))
+   (is (= "component" (.getSourceComponent tuple)))
+   ;; test the two-params signature
+   (bind mk-tuple-param (MkTupleParam.))
+   (doto mk-tuple-param
+     (.setStream "test-stream")
+     (.setComponent "test-component")
+     (.setFields (into-array String ["fname" "lname"])))
+   (bind ^Tuple tuple (Testing/testTuple ["james" "bond"] mk-tuple-param))
+   (is (= ["james" "bond"] (.getValues tuple)))
+   (is (= "test-stream" (.getSourceStreamId tuple)))
+   (is (= ["fname" "lname"] (-> tuple .getFields .toList)))
+   (is (= "test-component" (.getSourceComponent tuple)))))
diff --git a/storm-core/test/clj/integration/storm/trident/integration_test.clj b/storm-core/test/clj/integration/storm/trident/integration_test.clj
new file mode 100644
index 0000000..bcd8173
--- /dev/null
+++ b/storm-core/test/clj/integration/storm/trident/integration_test.clj
@@ -0,0 +1,292 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns integration.storm.trident.integration-test
+  (:use [clojure test])
+  (:require [backtype.storm [testing :as t]])
+  (:import [storm.trident.testing Split CountAsAggregator StringLength TrueFilter
+            MemoryMapState$Factory])
+  (:import [storm.trident.state StateSpec])
+  (:import [storm.trident.operation.impl CombinerAggStateUpdater])
+  (:use [storm.trident testing])
+  (:use [backtype.storm util]))
+(deftest test-memory-map-get-tuples
+  (t/with-local-cluster [cluster]
+    (with-drpc [drpc]
+      (letlocals
+        (bind topo (TridentTopology.))
+        (bind feeder (feeder-spout ["sentence"]))
+        (bind word-counts
+          (-> topo
+              (.newStream "tester" feeder)
+              (.each (fields "sentence") (Split.) (fields "word"))
+              (.groupBy (fields "word"))
+              (.persistentAggregate (memory-map-state) (Count.) (fields "count"))
+              (.parallelismHint 6)
+              ))       
+        (-> topo
+            (.newDRPCStream "all-tuples" drpc)
+            (.broadcast)
+            (.stateQuery word-counts (fields "args") (TupleCollectionGet.) (fields "word" "count"))
+            (.project (fields "word" "count")))
+        (with-topology [cluster topo]
+          (feed feeder [["hello the man said"] ["the"]])
+          (is (= #{["hello" 1] ["said" 1] ["the" 2] ["man" 1]}
+                 (into #{} (exec-drpc drpc "all-tuples" "man"))))
+          (feed feeder [["the foo"]])
+          (is (= #{["hello" 1] ["said" 1] ["the" 3] ["man" 1] ["foo" 1]}
+                 (into #{} (exec-drpc drpc "all-tuples" "man")))))))))
+(deftest test-word-count
+  (t/with-local-cluster [cluster]
+    (with-drpc [drpc]
+      (letlocals
+        (bind topo (TridentTopology.))
+        (bind feeder (feeder-spout ["sentence"]))
+        (bind word-counts
+          (-> topo
+              (.newStream "tester" feeder)
+              (.each (fields "sentence") (Split.) (fields "word"))
+              (.groupBy (fields "word"))
+              (.persistentAggregate (memory-map-state) (Count.) (fields "count"))
+              (.parallelismHint 6)
+              ))
+        (-> topo
+            (.newDRPCStream "words" drpc)
+            (.each (fields "args") (Split.) (fields "word"))
+            (.groupBy (fields "word"))
+            (.stateQuery word-counts (fields "word") (MapGet.) (fields "count"))
+            (.aggregate (fields "count") (Sum.) (fields "sum"))
+            (.project (fields "sum")))
+        (with-topology [cluster topo]
+          (feed feeder [["hello the man said"] ["the"]])
+          (is (= [[2]] (exec-drpc drpc "words" "the")))
+          (is (= [[1]] (exec-drpc drpc "words" "hello")))
+          (feed feeder [["the man on the moon"] ["where are you"]])
+          (is (= [[4]] (exec-drpc drpc "words" "the")))
+          (is (= [[2]] (exec-drpc drpc "words" "man")))
+          (is (= [[8]] (exec-drpc drpc "words" "man where you the")))
+          )))))
+;; this test reproduces a bug where committer spouts freeze processing when 
+;; there's at least one repartitioning after the spout
+(deftest test-word-count-committer-spout
+  (t/with-local-cluster [cluster]
+    (with-drpc [drpc]
+      (letlocals
+        (bind topo (TridentTopology.))
+        (bind feeder (feeder-committer-spout ["sentence"]))
+        (.setWaitToEmit feeder false) ;;this causes lots of empty batches
+        (bind word-counts
+          (-> topo
+              (.newStream "tester" feeder)
+              (.parallelismHint 2)
+              (.each (fields "sentence") (Split.) (fields "word"))
+              (.groupBy (fields "word"))
+              (.persistentAggregate (memory-map-state) (Count.) (fields "count"))
+              (.parallelismHint 6)
+              ))
+        (-> topo
+            (.newDRPCStream "words" drpc)
+            (.each (fields "args") (Split.) (fields "word"))
+            (.groupBy (fields "word"))
+            (.stateQuery word-counts (fields "word") (MapGet.) (fields "count"))
+            (.aggregate (fields "count") (Sum.) (fields "sum"))
+            (.project (fields "sum")))
+        (with-topology [cluster topo]
+          (feed feeder [["hello the man said"] ["the"]])
+          (is (= [[2]] (exec-drpc drpc "words" "the")))
+          (is (= [[1]] (exec-drpc drpc "words" "hello")))
+          (Thread/sleep 1000) ;; this is necessary to reproduce the bug where committer spouts freeze processing
+          (feed feeder [["the man on the moon"] ["where are you"]])
+          (is (= [[4]] (exec-drpc drpc "words" "the")))
+          (is (= [[2]] (exec-drpc drpc "words" "man")))
+          (is (= [[8]] (exec-drpc drpc "words" "man where you the")))
+          (feed feeder [["the the"]])
+          (is (= [[6]] (exec-drpc drpc "words" "the")))
+          (feed feeder [["the"]])
+          (is (= [[7]] (exec-drpc drpc "words" "the")))
+          )))))
+(deftest test-count-agg
+  (t/with-local-cluster [cluster]
+    (with-drpc [drpc]
+      (letlocals
+        (bind topo (TridentTopology.))
+        (-> topo
+            (.newDRPCStream "numwords" drpc)
+            (.each (fields "args") (Split.) (fields "word"))
+            (.aggregate (CountAsAggregator.) (fields "count"))
+            (.parallelismHint 2) ;;this makes sure batchGlobal is working correctly
+            (.project (fields "count")))
+        (with-topology [cluster topo]
+          (doseq [i (range 100)]
+            (is (= [[1]] (exec-drpc drpc "numwords" "the"))))
+          (is (= [[0]] (exec-drpc drpc "numwords" "")))
+          (is (= [[8]] (exec-drpc drpc "numwords" "1 2 3 4 5 6 7 8")))
+          )))))
+(deftest test-split-merge
+  (t/with-local-cluster [cluster]
+    (with-drpc [drpc]
+      (letlocals
+        (bind topo (TridentTopology.))
+        (bind drpc-stream (-> topo (.newDRPCStream "splitter" drpc)))
+        (bind s1
+          (-> drpc-stream
+              (.each (fields "args") (Split.) (fields "word"))
+              (.project (fields "word"))))
+        (bind s2
+          (-> drpc-stream
+              (.each (fields "args") (StringLength.) (fields "len"))
+              (.project (fields "len"))))
+        (.merge topo [s1 s2])
+        (with-topology [cluster topo]
+          (is (t/ms= [[7] ["the"] ["man"]] (exec-drpc drpc "splitter" "the man")))
+          (is (t/ms= [[5] ["hello"]] (exec-drpc drpc "splitter" "hello")))
+          )))))
+(deftest test-multiple-groupings-same-stream
+  (t/with-local-cluster [cluster]
+    (with-drpc [drpc]
+      (letlocals
+        (bind topo (TridentTopology.))
+        (bind drpc-stream (-> topo (.newDRPCStream "tester" drpc)
+                                   (.each (fields "args") (TrueFilter.))))
+        (bind s1
+          (-> drpc-stream
+              (.groupBy (fields "args"))
+              (.aggregate (CountAsAggregator.) (fields "count"))))
+        (bind s2
+          (-> drpc-stream
+              (.groupBy (fields "args"))
+              (.aggregate (CountAsAggregator.) (fields "count"))))
+        (.merge topo [s1 s2])
+        (with-topology [cluster topo]
+          (is (t/ms= [["the" 1] ["the" 1]] (exec-drpc drpc "tester" "the")))
+          (is (t/ms= [["aaaaa" 1] ["aaaaa" 1]] (exec-drpc drpc "tester" "aaaaa")))
+          )))))
+(deftest test-multi-repartition
+  (t/with-local-cluster [cluster]
+    (with-drpc [drpc]
+      (letlocals
+        (bind topo (TridentTopology.))
+        (bind drpc-stream (-> topo (.newDRPCStream "tester" drpc)
+                                   (.each (fields "args") (Split.) (fields "word"))
+                                   (.localOrShuffle)
+                                   (.shuffle)
+                                   (.aggregate (CountAsAggregator.) (fields "count"))
+                                   ))
+        (with-topology [cluster topo]
+          (is (t/ms= [[2]] (exec-drpc drpc "tester" "the man")))
+          (is (t/ms= [[1]] (exec-drpc drpc "tester" "aaa")))
+          )))))
+(deftest test-stream-projection-validation
+  (t/with-local-cluster [cluster]
+    (letlocals
+     (bind feeder (feeder-committer-spout ["sentence"]))
+     (bind topo (TridentTopology.))
+     ;; valid projection fields will not throw exceptions
+     (bind word-counts
+           (-> topo
+               (.newStream "tester" feeder)
+               (.each (fields "sentence") (Split.) (fields "word"))
+               (.groupBy (fields "word"))
+               (.persistentAggregate (memory-map-state) (Count.) (fields "count"))
+               (.parallelismHint 6)
+               ))
+     (bind stream (-> topo
+                      (.newStream "tester" feeder)))
+     ;; test .each
+     (is (thrown? IllegalArgumentException
+                  (-> stream
+                      (.each (fields "sentence1") (Split.) (fields "word")))))
+     ;; test .groupBy
+     (is (thrown? IllegalArgumentException
+                  (-> stream
+                      (.each (fields "sentence") (Split.) (fields "word"))
+                      (.groupBy (fields "word1")))))
+     ;; test .aggregate
+     (is (thrown? IllegalArgumentException
+                  (-> stream
+                      (.each (fields "sentence") (Split.) (fields "word"))
+                      (.groupBy (fields "word"))
+                      (.aggregate (fields "word1") (Count.) (fields "count")))))
+     ;; test .project
+     (is (thrown? IllegalArgumentException
+                  (-> stream
+                      (.project (fields "sentence1")))))
+     ;; test .partitionBy
+     (is (thrown? IllegalArgumentException
+                  (-> stream
+                      (.partitionBy (fields "sentence1")))))
+     ;; test .partitionAggregate
+     (is (thrown? IllegalArgumentException
+                  (-> stream
+                      (.each (fields "sentence") (Split.) (fields "word"))
+                      (.partitionAggregate (fields "word1") (Count.) (fields "count")))))
+     ;; test .persistentAggregate
+     (is (thrown? IllegalArgumentException
+                  (-> stream
+                      (.each (fields "sentence") (Split.) (fields "word"))
+                      (.groupBy (fields "word"))
+                      (.persistentAggregate (StateSpec. (MemoryMapState$Factory.)) (fields "non-existent") (Count.) (fields "count")))))
+     ;; test .partitionPersist
+     (is (thrown? IllegalArgumentException
+                  (-> stream
+                      (.each (fields "sentence") (Split.) (fields "word"))
+                      (.groupBy (fields "word"))
+                      (.partitionPersist (StateSpec. (MemoryMapState$Factory.))
+                                         (fields "non-existent")
+                                         (CombinerAggStateUpdater. (Count.))
+                                         (fields "count")))))
+     ;; test .stateQuery
+     (with-drpc [drpc]
+       (is (thrown? IllegalArgumentException
+                    (-> topo
+                        (.newDRPCStream "words" drpc)
+                        (.each (fields "args") (Split.) (fields "word"))
+                        (.groupBy (fields "word"))
+                        (.stateQuery word-counts (fields "word1") (MapGet.) (fields "count"))))))
+     )))
+;; (deftest test-split-merge
+;;   (t/with-local-cluster [cluster]
+;;     (with-drpc [drpc]
+;;       (letlocals
+;;         (bind topo (TridentTopology.))
+;;         (bind drpc-stream (-> topo (.newDRPCStream "splitter" drpc)))
+;;         (bind s1
+;;           (-> drpc-stream
+;;               (.each (fields "args") (Split.) (fields "word"))
+;;               (.project (fields "word"))))
+;;         (bind s2
+;;           (-> drpc-stream
+;;               (.each (fields "args") (StringLength.) (fields "len"))
+;;               (.project (fields "len"))))
+;;         (.merge topo [s1 s2])
+;;         (with-topology [cluster topo]
+;;           (is (t/ms= [[7] ["the"] ["man"]] (exec-drpc drpc "splitter" "the man")))
+;;           (is (t/ms= [[5] ["hello"]] (exec-drpc drpc "splitter" "hello")))
+;;           )))))