You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/07/30 18:45:14 UTC

[1/4] storm git commit: STORM-3156: Remove the transactional topology API

Repository: storm
Updated Branches:
  refs/heads/master c8896efcc -> 31624a2d2


http://git-wip-us.apache.org/repos/asf/storm/blob/da0bc6a7/storm-core/test/clj/org/apache/storm/transactional_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/transactional_test.clj b/storm-core/test/clj/org/apache/storm/transactional_test.clj
deleted file mode 100644
index de48ceb..0000000
--- a/storm-core/test/clj/org/apache/storm/transactional_test.clj
+++ /dev/null
@@ -1,706 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.transactional-test
-  (:use [clojure test])
-  (:import [org.apache.storm Constants LocalCluster$Builder Testing])
-  (:import [org.apache.storm.topology TopologyBuilder])
-  (:import [org.apache.storm.transactional TransactionalSpoutCoordinator ITransactionalSpout ITransactionalSpout$Coordinator TransactionAttempt
-            TransactionalTopologyBuilder])
-  (:import [org.apache.storm.transactional.state TransactionalState TestTransactionalState RotatingTransactionalState RotatingTransactionalState$StateInitializer])
-  (:import [org.apache.storm.generated RebalanceOptions])
-  (:import [org.apache.storm.spout SpoutOutputCollector ISpoutOutputCollector])
-  (:import [org.apache.storm.task OutputCollector IOutputCollector])
-  (:import [org.apache.storm.coordination BatchBoltExecutor])
-  (:import [org.apache.storm.utils RegisteredGlobalState])
-  (:import [org.apache.storm.tuple Fields])
-  (:import [org.apache.storm.testing InProcessZookeeper CountingBatchBolt MemoryTransactionalSpout
-            KeyedCountingBatchBolt KeyedCountingCommitterBolt KeyedSummingBatchBolt
-            IdentityBolt CountingCommitBolt OpaqueMemoryTransactionalSpout])
-  (:import [org.apache.storm.utils ZookeeperAuthInfo Utils])
-  (:import [org.apache.storm.shade.org.apache.curator.framework CuratorFramework])
-  (:import [org.apache.storm.shade.org.apache.curator.framework.api CreateBuilder ProtectACLCreateModeStatPathAndBytesable])
-  (:import [org.apache.zookeeper CreateMode ZooDefs ZooDefs$Ids])
-  (:import [org.mockito Matchers Mockito])
-  (:import [org.mockito.exceptions.base MockitoAssertionError])
-  (:import [java.util HashMap Collections ArrayList])
-  (:use [org.apache.storm util config log])
-  (:use [org.apache.storm clojure]))
-
-;; Testing TODO:
-;; * Test that it repeats the meta for a partitioned state (test partitioned emitter on its own)
-;; * Test that partitioned state emits nothing for the partition if it has seen a future transaction for that partition (test partitioned emitter on its own)
-
-(defn mk-coordinator-state-changer [atom]
-  (TransactionalSpoutCoordinator.
-   (reify ITransactionalSpout
-     (getComponentConfiguration [this]
-       nil)
-     (getCoordinator [this conf context]
-       (reify ITransactionalSpout$Coordinator
-         (isReady [this] (not (nil? @atom)))
-         (initializeTransaction [this txid prevMetadata]
-           @atom )
-         (close [this]
-           )))
-     )))
-
-(def BATCH-STREAM TransactionalSpoutCoordinator/TRANSACTION_BATCH_STREAM_ID)
-(def COMMIT-STREAM TransactionalSpoutCoordinator/TRANSACTION_COMMIT_STREAM_ID)
-
-(defn mk-spout-capture [capturer]
-  (SpoutOutputCollector.
-    (reify ISpoutOutputCollector
-      (emit [this stream-id tuple message-id]
-        (swap! capturer update-in [stream-id]
-          (fn [oldval] (concat oldval [{:tuple tuple :id message-id}])))
-        []
-        ))))
-
-(defn normalize-tx-tuple [values]
-  (-> values vec (update 0 #(-> % .getTransactionId .intValue))))
-
-;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-(defn verify-and-reset! [expected-map emitted-map-atom]
-  (let [results @emitted-map-atom]
-    (dorun
-     (map-val
-      (fn [tuples]
-        (doseq [t tuples]
-          (is (= (-> t :tuple first) (:id t)))
-          ))
-      results))
-    (is (= expected-map
-           (map-val
-            (fn [tuples]
-              (map (comp normalize-tx-tuple
-                         #(take 2 %)
-                         :tuple)
-                   tuples))
-            results
-            )))
-    (reset! emitted-map-atom {})
-    ))
-
-(defn get-attempts [capture-atom stream]
-  (map :id (get @capture-atom stream)))
-
-(defn get-commit [capture-atom]
-  (-> @capture-atom (get COMMIT-STREAM) first :id))
-
-(defmacro letlocals
-  [& body]
-  (let [[tobind lexpr] (split-at (dec (count body)) body)
-        binded (vec (mapcat (fn [e]
-                              (if (and (list? e) (= 'bind (first e)))
-                                [(second e) (last e)]
-                                ['_ e]
-                                ))
-                            tobind))]
-    `(let ~binded
-       ~(first lexpr))))
-
-(deftest test-coordinator
-  (let [coordinator-state (atom nil)
-        emit-capture (atom nil)]
-    (with-open [zk (InProcessZookeeper. )]
-      (letlocals
-        (bind coordinator
-              (mk-coordinator-state-changer coordinator-state))
-        (.open coordinator
-               (merge (clojurify-structure (Utils/readDefaultConfig))
-                       {TOPOLOGY-MAX-SPOUT-PENDING 4
-                       TOPOLOGY-TRANSACTIONAL-ID "abc"
-                       STORM-ZOOKEEPER-PORT (.getPort zk)
-                       STORM-ZOOKEEPER-SERVERS ["localhost"]
-                       })
-               nil
-               (mk-spout-capture emit-capture))
-        (reset! coordinator-state 10)
-        (.nextTuple coordinator)
-        (bind attempts (get-attempts emit-capture BATCH-STREAM))
-        (bind first-attempt (first attempts))
-        (verify-and-reset! {BATCH-STREAM [[1 10] [2 10] [3 10] [4 10]]}
-                           emit-capture)
-
-        (.nextTuple coordinator)
-        (verify-and-reset! {} emit-capture)
-        
-        (.fail coordinator (second attempts))
-        (bind attempts (get-attempts emit-capture BATCH-STREAM))
-        (bind new-second-attempt (first attempts))
-        (verify-and-reset! {BATCH-STREAM [[2 10] [3 10] [4 10]]} emit-capture)
-        (is (not= new-second-attempt (second attempts)))
-        (.ack coordinator new-second-attempt)
-        (verify-and-reset! {} emit-capture)
-        (.ack coordinator first-attempt)
-        (bind commit-id (get-commit emit-capture))
-        (verify-and-reset! {COMMIT-STREAM [[1]]} emit-capture)
-
-        (reset! coordinator-state 12)
-        (.ack coordinator commit-id)
-        (bind commit-id (get-commit emit-capture))
-        (verify-and-reset! {COMMIT-STREAM [[2]] BATCH-STREAM [[5 12]]} emit-capture)
-        (reset! coordinator-state nil)
-        (.ack coordinator commit-id)
-        (verify-and-reset! {} emit-capture)
-
-        (.fail coordinator (nth attempts 1))
-        (bind attempts (get-attempts emit-capture BATCH-STREAM))
-        (verify-and-reset! {BATCH-STREAM [[3 10] [4 10] [5 12]]} emit-capture)
-
-        (reset! coordinator-state 12)
-        (.nextTuple coordinator)
-        (verify-and-reset! {BATCH-STREAM [[6 12]]} emit-capture)
-
-        (.ack coordinator (first attempts))
-        (bind commit-id (get-commit emit-capture))
-        (verify-and-reset! {COMMIT-STREAM [[3]]} emit-capture)
-
-        (.ack coordinator (nth attempts 1))
-        (verify-and-reset! {} emit-capture)
-
-        (.fail coordinator commit-id)
-        (bind attempts (get-attempts emit-capture BATCH-STREAM))
-        (verify-and-reset! {BATCH-STREAM [[3 10] [4 10] [5 12] [6 12]]} emit-capture)
-
-        (.ack coordinator (first attempts))
-        (bind commit-id (get-commit emit-capture))
-        (verify-and-reset! {COMMIT-STREAM [[3]]} emit-capture)
-
-        (.ack coordinator (second attempts))
-        (.nextTuple coordinator)
-        (verify-and-reset! {} emit-capture)
-        
-        (.ack coordinator commit-id)
-        (verify-and-reset! {COMMIT-STREAM [[4]] BATCH-STREAM [[7 12]]} emit-capture)
-
-        (.close coordinator)
-        ))))
-
-(defn verify-bolt-and-reset! [expected-map emitted-atom]
-  (is (= expected-map @emitted-atom))
-  (reset! emitted-atom {}))
-
-(defn mk-bolt-capture [capturer]
-  (let [adder (fn [amap key newvalue]
-                (update-in
-                 amap
-                 [key]
-                 (fn [ov]
-                   (concat ov [newvalue])
-                   )))]
-    (OutputCollector.
-     (reify IOutputCollector
-       (emit [this stream-id anchors values]
-         (swap! capturer adder stream-id values)
-         []
-         )
-       (ack [this tuple]
-         (swap! capturer adder :ack (.getValues tuple))
-         )
-       (fail [this tuple]
-         (swap! capturer adder :fail (.getValues tuple)))
-       ))))
-
-(defn mk-attempt [txid attempt-id]
-  (TransactionAttempt. (BigInteger. (str txid)) attempt-id))
-
-
-(defn finish! [bolt id]
-  (.finishedId bolt id))
-
-(deftest test-batch-bolt
-  (let [bolt (BatchBoltExecutor. (CountingBatchBolt.))
-        capture-atom (atom {})
-        attempt1-1 (mk-attempt 1 1)
-        attempt1-2 (mk-attempt 1 2)
-        attempt2-1 (mk-attempt 2 1)
-        attempt3 (mk-attempt 3 1)
-        attempt4 (mk-attempt 4 1)
-        attempt5 (mk-attempt 5 1)
-        attempt6 (mk-attempt 6 1)]
-    (.prepare bolt {} nil (mk-bolt-capture capture-atom))
-
-
-    ;; test that transactions are independent
-    
-    (.execute bolt (Testing/testTuple [attempt1-1]))
-    (.execute bolt (Testing/testTuple [attempt1-1]))
-    (.execute bolt (Testing/testTuple [attempt1-2]))
-    (.execute bolt (Testing/testTuple [attempt2-1]))
-    (.execute bolt (Testing/testTuple [attempt1-1]))
-    
-    (finish! bolt attempt1-1)
-
-    (verify-bolt-and-reset! {:ack [[attempt1-1] [attempt1-1] [attempt1-2]
-                                   [attempt2-1] [attempt1-1]]
-                             "default" [[attempt1-1 3]]}
-                            capture-atom)
-
-    (.execute bolt (Testing/testTuple [attempt1-2]))
-    (finish! bolt attempt2-1)
-    (verify-bolt-and-reset! {:ack [[attempt1-2]]
-                             "default" [[attempt2-1 1]]}
-                            capture-atom)
-
-    (finish! bolt attempt1-2)
-    (verify-bolt-and-reset! {"default" [[attempt1-2 2]]}
-                            capture-atom)  
-    ))
-
-(defn mk-state-initializer [atom]
-  (reify RotatingTransactionalState$StateInitializer
-    (init [this txid last-state]
-      @atom
-      )))
-
-
-(defn- to-txid [txid]
-  (BigInteger. (str txid)))
-
-(defn- get-state [state txid initializer]
-  (.getState state (to-txid txid) initializer))
-
-(defn- get-state-or-create [state txid initializer]
-  (.getStateOrCreate state (to-txid txid) initializer))
-
-(defn- cleanup-before [state txid]
-  (.cleanupBefore state (to-txid txid)))
-
-(deftest test-rotating-transactional-state
-  ;; test strict ordered vs not strict ordered
-  (with-open [zk (InProcessZookeeper. )]
-    (let [conf (merge (clojurify-structure (Utils/readDefaultConfig))
-                      {STORM-ZOOKEEPER-PORT (.getPort zk)
-                       STORM-ZOOKEEPER-SERVERS ["localhost"]
-                       })
-          state (TransactionalState/newUserState conf "id1" {})
-          strict-rotating (RotatingTransactionalState. state "strict" true)
-          unstrict-rotating (RotatingTransactionalState. state "unstrict" false)
-          init (atom 10)
-          initializer (mk-state-initializer init)]
-      (is (= 10 (get-state strict-rotating 1 initializer)))
-      (is (= 10 (get-state strict-rotating 2 initializer)))
-      (reset! init 20)
-      (is (= 20 (get-state strict-rotating 3 initializer)))
-      (is (= 10 (get-state strict-rotating 1 initializer)))
-
-      (is (thrown? Exception (get-state strict-rotating 5 initializer)))
-      (is (= 20 (get-state strict-rotating 4 initializer)))
-      (is (= 4 (count (.list state "strict"))))
-      (cleanup-before strict-rotating 3)
-      (is (= 2 (count (.list state "strict"))))
-
-      (is (nil? (get-state-or-create strict-rotating 5 initializer)))
-      (is (= 20 (get-state-or-create strict-rotating 5 initializer)))
-      (is (nil? (get-state-or-create strict-rotating 6 initializer)))        
-      (cleanup-before strict-rotating 6)
-      (is (= 1 (count (.list state "strict"))))
-
-      (is (= 20 (get-state unstrict-rotating 10 initializer)))
-      (is (= 20 (get-state unstrict-rotating 20 initializer)))
-      (is (nil? (get-state unstrict-rotating 12 initializer)))
-      (is (nil? (get-state unstrict-rotating 19 initializer)))
-      (is (nil? (get-state unstrict-rotating 12 initializer)))
-      (is (= 20 (get-state unstrict-rotating 21 initializer)))
-
-      (.close state)
-      )))
-
-(defn mk-transactional-source []
-  (HashMap.))
-
-(defn add-transactional-data [source partition-map]
-  (doseq [[p data] partition-map :let [p (int p)]]
-    (if-not (contains? source p)
-      (.put source p (Collections/synchronizedList (ArrayList.))))
-    (-> source (.get p) (.addAll data))
-    ))
-
-;; puts its collector and tuples into the global state to be used externally
-(defbolt controller-bolt {} {:prepare true :params [state-id]}
-  [conf context collector]
-  (let [{tuples :tuples
-         collector-atom :collector} (RegisteredGlobalState/getState state-id)]
-    (reset! collector-atom collector)
-    (reset! tuples [])
-    (bolt
-     (execute [tuple]
-              (swap! tuples conj tuple))
-     )))
-
-(defmacro with-controller-bolt [[bolt collector-atom tuples-atom] & body]
-  `(let [~collector-atom (atom nil)
-         ~tuples-atom (atom [])
-         id# (RegisteredGlobalState/registerState {:collector ~collector-atom
-                                                   :tuples ~tuples-atom})
-         ~bolt (controller-bolt id#)]
-     ~@body
-     (RegisteredGlobalState/clearState id#)
-    ))
-
-(defn separate
-  [pred aseq]
-  [(filter pred aseq) (filter (complement pred) aseq)])
-
-
-(deftest test-transactional-topology
-  (with-open [cluster (.build (.withSimulatedTime (.withTracked (LocalCluster$Builder. ))))]
-    (with-controller-bolt [controller collector tuples]
-      (letlocals
-       (bind data (mk-transactional-source))
-       (bind builder (TransactionalTopologyBuilder.
-                      "id"
-                      "spout"
-                      (MemoryTransactionalSpout. data
-                                                 (Fields. ["word" "amt"])
-                                                 2)
-                      2))
-
-       (-> builder
-           (.setBolt "id1" (IdentityBolt. (Fields. ["tx" "word" "amt"])) 3)
-           (.shuffleGrouping "spout"))
-
-       (-> builder
-           (.setBolt "id2" (IdentityBolt. (Fields. ["tx" "word" "amt"])) 3)
-           (.shuffleGrouping "spout"))
-
-       (-> builder
-           (.setBolt "global" (CountingBatchBolt.) 1)
-           (.globalGrouping "spout"))
-
-       (-> builder
-           (.setBolt "gcommit" (CountingCommitBolt.) 1)
-           (.globalGrouping "spout"))
-       
-       (-> builder
-           (.setBolt "sum" (KeyedSummingBatchBolt.) 2)
-           (.fieldsGrouping "id1" (Fields. ["word"])))
-
-       (-> builder
-           (.setCommitterBolt "count" (KeyedCountingBatchBolt.) 2)
-           (.fieldsGrouping "id2" (Fields. ["word"])))
-
-       (-> builder
-           (.setBolt "count2" (KeyedCountingCommitterBolt.) 3)
-           (.fieldsGrouping "sum" (Fields. ["key"]))
-           (.fieldsGrouping "count" (Fields. ["key"])))
-
-       (bind builder (.buildTopologyBuilder builder))
-       
-       (-> builder
-           (.setBolt "controller" controller 1)
-           (.directGrouping "count2" Constants/COORDINATED_STREAM_ID)
-           (.directGrouping "sum" Constants/COORDINATED_STREAM_ID))
-
-       (add-transactional-data data
-                               {0 [["dog" 3]
-                                   ["cat" 4]
-                                   ["apple" 1]
-                                   ["dog" 3]]
-                                1 [["cat" 1]
-                                   ["mango" 4]]
-                                2 [["happy" 11]
-                                   ["mango" 2]
-                                   ["zebra" 1]]})
-       
-       (bind topo-info (Testing/trackAndCaptureTopology
-                        cluster
-                        (.createTopology builder)))
-       (log-message "TOPO INFO \n" (pr-str topo-info))
-       (.submitTopology cluster
-                        "transactional-test"
-                        {TOPOLOGY-MAX-SPOUT-PENDING 2}
-                        (.topology topo-info))
-       (.advanceClusterTime cluster 11)
-       (bind ack-tx! (fn [txid]
-                       (let [[to-ack not-to-ack] (separate
-                                                  #(-> %
-                                                       (.getValue 0)
-                                                       .getTransactionId
-                                                       (= txid))
-                                                  @tuples)]
-                         (reset! tuples not-to-ack)
-                         (doseq [t to-ack]
-                           (ack! @collector t)))))
-
-       (bind fail-tx! (fn [txid]
-                        (let [[to-fail not-to-fail] (separate
-                                                     #(-> %
-                                                          (.getValue 0)
-                                                          .getTransactionId
-                                                          (= txid))
-                                                     @tuples)]
-                          (reset! tuples not-to-fail)
-                          (doseq [t to-fail]
-                            (fail! @collector t)))))
-
-       ;; only check default streams
-       (bind verify! (fn [expected]
-                       (let [results (-> topo-info .capturer .getResults)]
-                         (doseq [[component tuples] expected
-                                 :let [emitted (->> (Testing/readTuples results
-                                                                 component
-                                                                 "default")
-                                                    (map normalize-tx-tuple))]]
-                           (log-message "\t\t!!!!! " component)
-                           (is (Testing/multiseteq tuples emitted)))
-                         (.clear results)
-                         )))
-
-       (Testing/trackedWait topo-info (int 2))
-       (log-message "\n\n\t\t!!!!!!!!!!!! 1\n\n")
-       (verify! {"sum" [[(int 1) "dog" 3]
-                        [(int 1) "cat" 5]
-                        [(int 1) "mango" 6]
-                        [(int 1) "happy" 11]
-                        [(int 2) "apple" 1]
-                        [(int 2) "dog" 3]
-                        [(int 2) "zebra" 1]]
-                 "count" []
-                 "count2" []
-                 "global" [[(int 1) (int 6)]
-                           [(int 2) (int 3)]]
-                 "gcommit" []})
-       (ack-tx! 1)
-       (Testing/trackedWait topo-info (int 1))
-       (log-message "\n\n\t\t!!!!!!!!!!!! 2\n\n")
-       (verify! {"sum" []
-                 "count" [[(int 1) "dog" (int 1)]
-                          [(int 1) "cat" (int 2)]
-                          [(int 1) "mango" (int 2)]
-                          [(int 1) "happy" (int 1)]]
-                 "count2" [[(int 1) "dog" (int 2)]
-                           [(int 1) "cat" (int 2)]
-                           [(int 1) "mango" (int 2)]
-                           [(int 1) "happy" (int 2)]]
-                 "global" []
-                 "gcommit" [[(int 1) (int 6)]]})
-
-       (add-transactional-data data
-                               {0 [["a" 1]
-                                   ["b" 2]
-                                   ["c" 3]]
-                                1 [["d" 4]
-                                   ["c" 1]]
-                                2 [["a" 2]
-                                   ["e" 7]
-                                   ["c" 11]]
-                                3 [["a" 2]]})
-       
-       (ack-tx! 1)
-       (Testing/trackedWait topo-info (int 1))
-       (log-message "\n\n\t\t!!!!!!!!!!!! 3\n\n")
-       (verify! {"sum" [[(int 3) "a" 5]
-                        [(int 3) "b" 2]
-                        [(int 3) "d" 4]
-                        [(int 3) "c" 1]
-                        [(int 3) "e" 7]]
-                 "count" []
-                 "count2" []
-                 "global" [[(int 3) (int 7)]]
-                 "gcommit" []})
-       (ack-tx! 3)
-       (ack-tx! 2)
-       (Testing/trackedWait topo-info (int 1))
-       (log-message "\n\n\t\t!!!!!!!!!!!! 4\n\n")
-       (verify! {"sum" []
-                 "count" [[(int 2) "apple" (int 1)]
-                          [(int 2) "dog" (int 1)]
-                          [(int 2) "zebra" (int 1)]]
-                 "count2" [[(int 2) "apple" (int 2)]
-                           [(int 2) "dog" (int 2)]
-                           [(int 2) "zebra" (int 2)]]
-                 "global" []
-                 "gcommit" [[(int 2) (int 3)]]})
-
-       (fail-tx! 2)
-       (Testing/trackedWait topo-info (int 2))
-
-       (log-message "\n\n\t\t!!!!!!!!!!!! 5\n\n")
-       (verify! {"sum" [[(int 2) "apple" 1]
-                        [(int 2) "dog" 3]
-                        [(int 2) "zebra" 1]
-                        [(int 3) "a" 5]
-                        [(int 3) "b" 2]
-                        [(int 3) "d" 4]
-                        [(int 3) "c" 1]
-                        [(int 3) "e" 7]]
-                 "count" []
-                 "count2" []
-                 "global" [[(int 2) (int 3)]
-                           [(int 3) (int 7)]]
-                 "gcommit" []})
-       (ack-tx! 2)
-       (Testing/trackedWait topo-info (int 1))
-       
-       (log-message "\n\n\t\t!!!!!!!!!!!! 6\n\n")
-       (verify! {"sum" []
-                 "count" [[(int 2) "apple" (int 1)]
-                          [(int 2) "dog" (int 1)]
-                          [(int 2) "zebra" (int 1)]]
-                 "count2" [[(int 2) "apple" (int 2)]
-                           [(int 2) "dog" (int 2)]
-                           [(int 2) "zebra" (int 2)]]
-                 "global" []
-                 "gcommit" [[(int 2) (int 3)]]})
-       
-       (ack-tx! 2)
-       (ack-tx! 3)
-       
-       (Testing/trackedWait topo-info (int 2))
-       (log-message "\n\n\t\t!!!!!!!!!!!! 7\n\n")
-       (verify! {"sum" [[(int 4) "c" 14]]
-                 "count" [[(int 3) "a" (int 3)]
-                          [(int 3) "b" (int 1)]
-                          [(int 3) "d" (int 1)]
-                          [(int 3) "c" (int 1)]
-                          [(int 3) "e" (int 1)]]
-                 "count2" [[(int 3) "a" (int 2)]
-                           [(int 3) "b" (int 2)]
-                           [(int 3) "d" (int 2)]
-                           [(int 3) "c" (int 2)]
-                           [(int 3) "e" (int 2)]]
-                 "global" [[(int 4) (int 2)]]
-                 "gcommit" [[(int 3) (int 7)]]})
-       
-       (ack-tx! 4)
-       (ack-tx! 3)
-       (Testing/trackedWait topo-info (int 2))
-       (log-message "\n\n\t\t!!!!!!!!!!!! 8\n\n")
-       (verify! {"sum" []
-                 "count" [[(int 4) "c" (int 2)]]
-                 "count2" [[(int 4) "c" (int 2)]]
-                 "global" [[(int 5) (int 0)]]
-                 "gcommit" [[(int 4) (int 2)]]})
-       
-       (ack-tx! 5)
-       (ack-tx! 4)
-       (Testing/trackedWait topo-info (int 2))
-       (log-message "\n\n\t\t!!!!!!!!!!!! 9\n\n")
-       (verify! {"sum" []
-                 "count" []
-                 "count2" []
-                 "global" [[(int 6) (int 0)]]
-                 "gcommit" [[(int 5) (int 0)]]})
-       
-       (-> topo-info .capturer .getAndClearResults)
-       ))))
-
-(deftest test-opaque-transactional-topology
-  (with-open [cluster (.build (.withSimulatedTime (.withTracked (LocalCluster$Builder. ))))]
-    (with-controller-bolt [controller collector tuples]
-      (letlocals
-       (bind data (mk-transactional-source))
-       (bind builder (TransactionalTopologyBuilder.
-                      "id"
-                      "spout"
-                      (OpaqueMemoryTransactionalSpout. data
-                                                       (Fields. ["word"])
-                                                       2)
-                      2))
-
-       (-> builder
-           (.setCommitterBolt "count" (KeyedCountingBatchBolt.) 2)
-           (.fieldsGrouping "spout" (Fields. ["word"])))
-
-       (bind builder (.buildTopologyBuilder builder))
-       
-       (-> builder
-           (.setBolt "controller" controller 1)
-           (.directGrouping "spout" Constants/COORDINATED_STREAM_ID)
-           (.directGrouping "count" Constants/COORDINATED_STREAM_ID))
-
-       (add-transactional-data data
-                               {0 [["dog"]
-                                   ["cat"]
-                                   ["apple"]
-                                   ["dog"]]})
-       
-       (bind topo-info (Testing/trackAndCaptureTopology
-                        cluster
-                        (.createTopology builder)))
-       (.submitTopology cluster
-                        "transactional-test"
-                        {TOPOLOGY-MAX-SPOUT-PENDING 2}
-                        (.topology topo-info))
-       (.advanceClusterTime cluster 11)
-       (bind ack-tx! (fn [txid]
-                       (let [[to-ack not-to-ack] (separate
-                                                  #(-> %
-                                                       (.getValue 0)
-                                                       .getTransactionId
-                                                       (= txid))
-                                                  @tuples)]
-                         (reset! tuples not-to-ack)
-                         (doseq [t to-ack]
-                           (ack! @collector t)))))
-
-       (bind fail-tx! (fn [txid]
-                        (let [[to-fail not-to-fail] (separate
-                                                     #(-> %
-                                                          (.getValue 0)
-                                                          .getTransactionId
-                                                          (= txid))
-                                                     @tuples)]
-                          (reset! tuples not-to-fail)
-                          (doseq [t to-fail]
-                            (fail! @collector t)))))
-
-       ;; only check default streams
-       (bind verify! (fn [expected]
-                       (let [results (-> topo-info .capturer .getResults)]
-                         (doseq [[component tuples] expected
-                                 :let [emitted (->> (Testing/readTuples results
-                                                                 component
-                                                                 "default")
-                                                    (map normalize-tx-tuple))]]
-                           (is (Testing/multiseteq tuples emitted)))
-                         (.clear results)
-                         )))
-
-       (Testing/trackedWait topo-info (int 2))
-       (verify! {"count" []})
-       (ack-tx! 1)
-       (Testing/trackedWait topo-info (int 1))
-
-       (verify! {"count" [[(int 1) "dog" (int 1)]
-                          [(int 1) "cat" (int 1)]]})       
-       (ack-tx! 2)
-       (ack-tx! 1)
-       (Testing/trackedWait topo-info (int 2))
-
-       (verify! {"count" [[(int 2) "apple" (int 1)]
-                          [(int 2) "dog" (int 1)]]})
-
-       ))))
-
-(deftest test-create-node-acl
-  (testing "Creates ZooKeeper nodes with the correct ACLs"
-    (let [curator (Mockito/mock CuratorFramework)
-          builder0 (Mockito/mock CreateBuilder)
-          builder1 (Mockito/mock ProtectACLCreateModeStatPathAndBytesable)
-          expectedAcls ZooDefs$Ids/CREATOR_ALL_ACL]
-      (. (Mockito/when (.create curator)) (thenReturn builder0))
-      (. (Mockito/when (.creatingParentsIfNeeded builder0)) (thenReturn builder1))
-      (. (Mockito/when (.withMode builder1 (Matchers/isA CreateMode))) (thenReturn builder1))
-      (. (Mockito/when (.withACL builder1 (Mockito/anyList))) (thenReturn builder1))
-      (TestTransactionalState/createNode curator "" (byte-array 0) expectedAcls nil)
-      (is (nil?
-        (try
-          (. (Mockito/verify builder1) (withACL expectedAcls))
-        (catch MockitoAssertionError e
-          e)))))))


[4/4] storm git commit: Merge branch 'STORM-3156' of https://github.com/srdo/storm into STORM-3156

Posted by bo...@apache.org.
Merge branch 'STORM-3156' of https://github.com/srdo/storm into STORM-3156

STORM-3156: Remove the transactional topology API

This closes #2768


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/31624a2d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/31624a2d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/31624a2d

Branch: refs/heads/master
Commit: 31624a2d2f9ed2df0fd37ee871e45e321735d27c
Parents: c8896ef da0bc6a
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Mon Jul 30 13:24:31 2018 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Mon Jul 30 13:24:31 2018 -0500

----------------------------------------------------------------------
 docs/Structure-of-the-codebase.md               |  33 +-
 docs/Trident-tutorial.md                        |   2 +
 docs/Tutorial.md                                |   4 +-
 docs/index.md                                   |   2 +-
 docs/storm-sql.md                               |   2 +-
 .../storm/starter/TransactionalGlobalCount.java | 165 -----
 .../storm/starter/TransactionalWords.java       | 234 ------
 .../apache/storm/testing/BatchNumberList.java   |  65 --
 .../apache/storm/testing/BatchProcessWord.java  |  34 -
 .../org/apache/storm/testing/BatchRepeatA.java  |  43 --
 .../apache/storm/testing/CountingBatchBolt.java |  50 --
 .../storm/testing/CountingCommitBolt.java       |  52 --
 .../storm/testing/KeyedCountingBatchBolt.java   |  56 --
 .../testing/KeyedCountingCommitterBolt.java     |  19 -
 .../storm/testing/KeyedSummingBatchBolt.java    |  55 --
 .../storm/testing/MemoryTransactionalSpout.java | 185 -----
 .../testing/MemoryTransactionalSpoutMeta.java   |  33 -
 .../testing/OpaqueMemoryTransactionalSpout.java | 187 -----
 ...BaseOpaquePartitionedTransactionalSpout.java |  20 -
 .../base/BasePartitionedTransactionalSpout.java |  19 -
 .../topology/base/BaseTransactionalBolt.java    |  19 -
 .../topology/base/BaseTransactionalSpout.java   |  19 -
 .../apache/storm/transactional/ICommitter.java  |  21 -
 .../ICommitterTransactionalSpout.java           |  26 -
 .../transactional/ITransactionalSpout.java      |  85 ---
 .../storm/transactional/TransactionAttempt.java |   4 +
 .../TransactionalSpoutBatchExecutor.java        |  91 ---
 .../TransactionalSpoutCoordinator.java          | 209 ------
 .../TransactionalTopologyBuilder.java           | 562 ---------------
 .../IOpaquePartitionedTransactionalSpout.java   |  52 --
 .../IPartitionedTransactionalSpout.java         |  64 --
 ...uePartitionedTransactionalSpoutExecutor.java | 158 -----
 .../PartitionedTransactionalSpoutExecutor.java  | 138 ----
 .../state/RotatingTransactionalState.java       | 149 ----
 .../state/TestTransactionalState.java           |  38 -
 .../transactional/state/TransactionalState.java | 185 -----
 .../clj/org/apache/storm/transactional_test.clj | 706 -------------------
 37 files changed, 26 insertions(+), 3760 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/31624a2d/docs/index.md
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/31624a2d/docs/storm-sql.md
----------------------------------------------------------------------


[2/4] storm git commit: STORM-3156: Remove the transactional topology API

Posted by bo...@apache.org.
http://git-wip-us.apache.org/repos/asf/storm/blob/da0bc6a7/storm-client/src/jvm/org/apache/storm/transactional/TransactionAttempt.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/transactional/TransactionAttempt.java b/storm-client/src/jvm/org/apache/storm/transactional/TransactionAttempt.java
index a192c8f..683e414 100644
--- a/storm-client/src/jvm/org/apache/storm/transactional/TransactionAttempt.java
+++ b/storm-client/src/jvm/org/apache/storm/transactional/TransactionAttempt.java
@@ -14,6 +14,10 @@ package org.apache.storm.transactional;
 
 import java.math.BigInteger;
 
+/**
+ * This is dead code. It is retained to avoid breaking Kryo registration order. 
+ */
+@Deprecated
 public class TransactionAttempt {
     BigInteger _txid;
     long _attemptId;

http://git-wip-us.apache.org/repos/asf/storm/blob/da0bc6a7/storm-client/src/jvm/org/apache/storm/transactional/TransactionalSpoutBatchExecutor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/transactional/TransactionalSpoutBatchExecutor.java b/storm-client/src/jvm/org/apache/storm/transactional/TransactionalSpoutBatchExecutor.java
deleted file mode 100644
index add1d64..0000000
--- a/storm-client/src/jvm/org/apache/storm/transactional/TransactionalSpoutBatchExecutor.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.transactional;
-
-import java.math.BigInteger;
-import java.util.Map;
-import java.util.TreeMap;
-import org.apache.storm.coordination.BatchOutputCollectorImpl;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.FailedException;
-import org.apache.storm.topology.IRichBolt;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Tuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TransactionalSpoutBatchExecutor implements IRichBolt {
-    public static final Logger LOG = LoggerFactory.getLogger(TransactionalSpoutBatchExecutor.class);
-
-    BatchOutputCollectorImpl _collector;
-    ITransactionalSpout _spout;
-    ITransactionalSpout.Emitter _emitter;
-
-    TreeMap<BigInteger, TransactionAttempt> _activeTransactions = new TreeMap<>();
-
-    public TransactionalSpoutBatchExecutor(ITransactionalSpout spout) {
-        _spout = spout;
-    }
-
-    @Override
-    public void prepare(Map<String, Object> conf, TopologyContext context, OutputCollector collector) {
-        _collector = new BatchOutputCollectorImpl(collector);
-        _emitter = _spout.getEmitter(conf, context);
-    }
-
-    @Override
-    public void execute(Tuple input) {
-        TransactionAttempt attempt = (TransactionAttempt) input.getValue(0);
-        try {
-            if (input.getSourceStreamId().equals(TransactionalSpoutCoordinator.TRANSACTION_COMMIT_STREAM_ID)) {
-                if (attempt.equals(_activeTransactions.get(attempt.getTransactionId()))) {
-                    ((ICommitterTransactionalSpout.Emitter) _emitter).commit(attempt);
-                    _activeTransactions.remove(attempt.getTransactionId());
-                    _collector.ack(input);
-                } else {
-                    _collector.fail(input);
-                }
-            } else {
-                _emitter.emitBatch(attempt, input.getValue(1), _collector);
-                _activeTransactions.put(attempt.getTransactionId(), attempt);
-                _collector.ack(input);
-                BigInteger committed = (BigInteger) input.getValue(2);
-                if (committed != null) {
-                    // valid to delete before what's been committed since 
-                    // those batches will never be accessed again
-                    _activeTransactions.headMap(committed).clear();
-                    _emitter.cleanupBefore(committed);
-                }
-            }
-        } catch (FailedException e) {
-            LOG.warn("Failed to emit batch for transaction", e);
-            _collector.fail(input);
-        }
-    }
-
-    @Override
-    public void cleanup() {
-        _emitter.close();
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        _spout.declareOutputFields(declarer);
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        return _spout.getComponentConfiguration();
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/da0bc6a7/storm-client/src/jvm/org/apache/storm/transactional/TransactionalSpoutCoordinator.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/transactional/TransactionalSpoutCoordinator.java b/storm-client/src/jvm/org/apache/storm/transactional/TransactionalSpoutCoordinator.java
deleted file mode 100644
index 55c570a..0000000
--- a/storm-client/src/jvm/org/apache/storm/transactional/TransactionalSpoutCoordinator.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.transactional;
-
-import java.math.BigInteger;
-import java.util.Map;
-import java.util.Random;
-import java.util.TreeMap;
-import org.apache.storm.Config;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.FailedException;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichSpout;
-import org.apache.storm.transactional.state.RotatingTransactionalState;
-import org.apache.storm.transactional.state.TransactionalState;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.ObjectReader;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TransactionalSpoutCoordinator extends BaseRichSpout {
-    public static final Logger LOG = LoggerFactory.getLogger(TransactionalSpoutCoordinator.class);
-
-    public static final BigInteger INIT_TXID = BigInteger.ONE;
-
-
-    public static final String TRANSACTION_BATCH_STREAM_ID = TransactionalSpoutCoordinator.class.getName() + "/batch";
-    public static final String TRANSACTION_COMMIT_STREAM_ID = TransactionalSpoutCoordinator.class.getName() + "/commit";
-
-    private static final String CURRENT_TX = "currtx";
-    private static final String META_DIR = "meta";
-    TreeMap<BigInteger, TransactionStatus> _activeTx = new TreeMap<>();
-    BigInteger _currTransaction;
-    int _maxTransactionActive;
-    StateInitializer _initializer;
-    private ITransactionalSpout _spout;
-    private ITransactionalSpout.Coordinator _coordinator;
-    private TransactionalState _state;
-    private RotatingTransactionalState _coordinatorState;
-    private SpoutOutputCollector _collector;
-    private Random _rand;
-
-
-    public TransactionalSpoutCoordinator(ITransactionalSpout spout) {
-        _spout = spout;
-    }
-
-    public ITransactionalSpout getSpout() {
-        return _spout;
-    }
-
-    @Override
-    public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
-        _rand = new Random(Utils.secureRandomLong());
-        _state = TransactionalState
-            .newCoordinatorState(conf, (String) conf.get(Config.TOPOLOGY_TRANSACTIONAL_ID), _spout.getComponentConfiguration());
-        _coordinatorState = new RotatingTransactionalState(_state, META_DIR, true);
-        _collector = collector;
-        _coordinator = _spout.getCoordinator(conf, context);
-        _currTransaction = getStoredCurrTransaction(_state);
-        _maxTransactionActive = ObjectReader.getInt(conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 1);
-        _initializer = new StateInitializer();
-    }
-
-    @Override
-    public void close() {
-        _state.close();
-    }
-
-    @Override
-    public void nextTuple() {
-        sync();
-    }
-
-    @Override
-    public void ack(Object msgId) {
-        TransactionAttempt tx = (TransactionAttempt) msgId;
-        TransactionStatus status = _activeTx.get(tx.getTransactionId());
-        if (status != null && tx.equals(status.attempt)) {
-            if (status.status == AttemptStatus.PROCESSING) {
-                status.status = AttemptStatus.PROCESSED;
-            } else if (status.status == AttemptStatus.COMMITTING) {
-                _activeTx.remove(tx.getTransactionId());
-                _coordinatorState.cleanupBefore(tx.getTransactionId());
-                _currTransaction = nextTransactionId(tx.getTransactionId());
-                _state.setData(CURRENT_TX, _currTransaction);
-            }
-            sync();
-        }
-    }
-
-    @Override
-    public void fail(Object msgId) {
-        TransactionAttempt tx = (TransactionAttempt) msgId;
-        TransactionStatus stored = _activeTx.remove(tx.getTransactionId());
-        if (stored != null && tx.equals(stored.attempt)) {
-            _activeTx.tailMap(tx.getTransactionId()).clear();
-            sync();
-        }
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        // in partitioned example, in case an emitter task receives a later transaction than it's emitted so far,
-        // when it sees the earlier txid it should know to emit nothing
-        declarer.declareStream(TRANSACTION_BATCH_STREAM_ID, new Fields("tx", "tx-meta", "committed-txid"));
-        declarer.declareStream(TRANSACTION_COMMIT_STREAM_ID, new Fields("tx"));
-    }
-
-    private void sync() {
-        // note that sometimes the tuples active may be less than max_spout_pending, e.g.
-        // max_spout_pending = 3
-        // tx 1, 2, 3 active, tx 2 is acked. there won't be a commit for tx 2 (because tx 1 isn't committed yet),
-        // and there won't be a batch for tx 4 because there's max_spout_pending tx active
-        TransactionStatus maybeCommit = _activeTx.get(_currTransaction);
-        if (maybeCommit != null && maybeCommit.status == AttemptStatus.PROCESSED) {
-            maybeCommit.status = AttemptStatus.COMMITTING;
-            _collector.emit(TRANSACTION_COMMIT_STREAM_ID, new Values(maybeCommit.attempt), maybeCommit.attempt);
-        }
-
-        try {
-            if (_activeTx.size() < _maxTransactionActive) {
-                BigInteger curr = _currTransaction;
-                for (int i = 0; i < _maxTransactionActive; i++) {
-                    if ((_coordinatorState.hasCache(curr) || _coordinator.isReady())
-                        && !_activeTx.containsKey(curr)) {
-                        TransactionAttempt attempt = new TransactionAttempt(curr, _rand.nextLong());
-                        Object state = _coordinatorState.getState(curr, _initializer);
-                        _activeTx.put(curr, new TransactionStatus(attempt));
-                        _collector.emit(TRANSACTION_BATCH_STREAM_ID, new Values(attempt, state, previousTransactionId(_currTransaction)),
-                                        attempt);
-                    }
-                    curr = nextTransactionId(curr);
-                }
-            }
-        } catch (FailedException e) {
-            LOG.warn("Failed to get metadata for a transaction", e);
-        }
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        Config ret = new Config();
-        ret.setMaxTaskParallelism(1);
-        return ret;
-    }
-
-    private BigInteger nextTransactionId(BigInteger id) {
-        return id.add(BigInteger.ONE);
-    }
-
-    private BigInteger previousTransactionId(BigInteger id) {
-        if (id.equals(INIT_TXID)) {
-            return null;
-        } else {
-            return id.subtract(BigInteger.ONE);
-        }
-    }
-
-    private BigInteger getStoredCurrTransaction(TransactionalState state) {
-        BigInteger ret = (BigInteger) state.getData(CURRENT_TX);
-        if (ret == null) {
-            return INIT_TXID;
-        } else {
-            return ret;
-        }
-    }
-
-    private static enum AttemptStatus {
-        PROCESSING,
-        PROCESSED,
-        COMMITTING
-    }
-
-    private static class TransactionStatus {
-        TransactionAttempt attempt;
-        AttemptStatus status;
-
-        public TransactionStatus(TransactionAttempt attempt) {
-            this.attempt = attempt;
-            this.status = AttemptStatus.PROCESSING;
-        }
-
-        @Override
-        public String toString() {
-            return attempt.toString() + " <" + status.toString() + ">";
-        }
-    }
-
-    private class StateInitializer implements RotatingTransactionalState.StateInitializer {
-        @Override
-        public Object init(BigInteger txid, Object lastState) {
-            return _coordinator.initializeTransaction(txid, lastState);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/da0bc6a7/storm-client/src/jvm/org/apache/storm/transactional/TransactionalTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/transactional/TransactionalTopologyBuilder.java b/storm-client/src/jvm/org/apache/storm/transactional/TransactionalTopologyBuilder.java
deleted file mode 100644
index 9e850df..0000000
--- a/storm-client/src/jvm/org/apache/storm/transactional/TransactionalTopologyBuilder.java
+++ /dev/null
@@ -1,562 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.transactional;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.storm.Config;
-import org.apache.storm.Constants;
-import org.apache.storm.coordination.BatchBoltExecutor;
-import org.apache.storm.coordination.CoordinatedBolt;
-import org.apache.storm.coordination.CoordinatedBolt.IdStreamSpec;
-import org.apache.storm.coordination.CoordinatedBolt.SourceArgs;
-import org.apache.storm.coordination.IBatchBolt;
-import org.apache.storm.generated.GlobalStreamId;
-import org.apache.storm.generated.Grouping;
-import org.apache.storm.generated.SharedMemory;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.grouping.CustomStreamGrouping;
-import org.apache.storm.grouping.PartialKeyGrouping;
-import org.apache.storm.topology.BaseConfigurationDeclarer;
-import org.apache.storm.topology.BasicBoltExecutor;
-import org.apache.storm.topology.BoltDeclarer;
-import org.apache.storm.topology.IBasicBolt;
-import org.apache.storm.topology.IRichBolt;
-import org.apache.storm.topology.InputDeclarer;
-import org.apache.storm.topology.SpoutDeclarer;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.transactional.partitioned.IOpaquePartitionedTransactionalSpout;
-import org.apache.storm.transactional.partitioned.IPartitionedTransactionalSpout;
-import org.apache.storm.transactional.partitioned.OpaquePartitionedTransactionalSpoutExecutor;
-import org.apache.storm.transactional.partitioned.PartitionedTransactionalSpoutExecutor;
-import org.apache.storm.tuple.Fields;
-
-/**
- * Trident subsumes the functionality provided by transactional topologies, so this class is deprecated.
- */
-@Deprecated
-public class TransactionalTopologyBuilder {
-    final String id;
-    final String spoutId;
-    final ITransactionalSpout spout;
-    final Map<String, Component> bolts = new HashMap<>();
-    final Integer spoutParallelism;
-    final Map<String, Object> spoutConf = new HashMap<>();
-    final Set<SharedMemory> spoutSharedMemory = new HashSet<>();
-
-    // id is used to store the state of this transactionalspout in zookeeper
-    // it would be very dangerous to have 2 topologies active with the same id in the same cluster    
-    public TransactionalTopologyBuilder(String id, String spoutId, ITransactionalSpout spout, Number spoutParallelism) {
-        this.id = id;
-        this.spoutId = spoutId;
-        this.spout = spout;
-        this.spoutParallelism = (spoutParallelism == null) ? null : spoutParallelism.intValue();
-    }
-
-    public TransactionalTopologyBuilder(String id, String spoutId, ITransactionalSpout spout) {
-        this(id, spoutId, spout, null);
-    }
-
-    public TransactionalTopologyBuilder(String id, String spoutId, IPartitionedTransactionalSpout spout, Number spoutParallelism) {
-        this(id, spoutId, new PartitionedTransactionalSpoutExecutor(spout), spoutParallelism);
-    }
-
-    public TransactionalTopologyBuilder(String id, String spoutId, IPartitionedTransactionalSpout spout) {
-        this(id, spoutId, spout, null);
-    }
-
-    public TransactionalTopologyBuilder(String id, String spoutId, IOpaquePartitionedTransactionalSpout spout, Number spoutParallelism) {
-        this(id, spoutId, new OpaquePartitionedTransactionalSpoutExecutor(spout), spoutParallelism);
-    }
-
-    public TransactionalTopologyBuilder(String id, String spoutId, IOpaquePartitionedTransactionalSpout spout) {
-        this(id, spoutId, spout, null);
-    }
-
-    public SpoutDeclarer getSpoutDeclarer() {
-        return new SpoutDeclarerImpl();
-    }
-
-    public BoltDeclarer setBolt(String id, IBatchBolt bolt) {
-        return setBolt(id, bolt, null);
-    }
-
-    public BoltDeclarer setBolt(String id, IBatchBolt bolt, Number parallelism) {
-        return setBolt(id, new BatchBoltExecutor(bolt), parallelism, bolt instanceof ICommitter);
-    }
-
-    public BoltDeclarer setCommitterBolt(String id, IBatchBolt bolt) {
-        return setCommitterBolt(id, bolt, null);
-    }
-
-    public BoltDeclarer setCommitterBolt(String id, IBatchBolt bolt, Number parallelism) {
-        return setBolt(id, new BatchBoltExecutor(bolt), parallelism, true);
-    }
-
-    public BoltDeclarer setBolt(String id, IBasicBolt bolt) {
-        return setBolt(id, bolt, null);
-    }
-
-    public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelism) {
-        return setBolt(id, new BasicBoltExecutor(bolt), parallelism, false);
-    }
-
-    private BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism, boolean committer) {
-        Integer p = null;
-        if (parallelism != null) {
-            p = parallelism.intValue();
-        }
-        Component component = new Component(bolt, p, committer);
-        bolts.put(id, component);
-        return new BoltDeclarerImpl(component);
-    }
-
-    public TopologyBuilder buildTopologyBuilder() {
-        String coordinator = spoutId + "/coordinator";
-        TopologyBuilder builder = new TopologyBuilder();
-        SpoutDeclarer declarer = builder.setSpout(coordinator, new TransactionalSpoutCoordinator(spout));
-        for (SharedMemory request : spoutSharedMemory) {
-            declarer.addSharedMemory(request);
-        }
-        if (!spoutConf.isEmpty()) {
-            declarer.addConfigurations(spoutConf);
-        }
-        declarer.addConfiguration(Config.TOPOLOGY_TRANSACTIONAL_ID, id);
-
-        BoltDeclarer emitterDeclarer =
-            builder.setBolt(spoutId,
-                            new CoordinatedBolt(new TransactionalSpoutBatchExecutor(spout),
-                                                null,
-                                                null),
-                            spoutParallelism)
-                   .allGrouping(coordinator, TransactionalSpoutCoordinator.TRANSACTION_BATCH_STREAM_ID)
-                   .addConfiguration(Config.TOPOLOGY_TRANSACTIONAL_ID, id);
-        if (spout instanceof ICommitterTransactionalSpout) {
-            emitterDeclarer.allGrouping(coordinator, TransactionalSpoutCoordinator.TRANSACTION_COMMIT_STREAM_ID);
-        }
-        for (String id : bolts.keySet()) {
-            Component component = bolts.get(id);
-            Map<String, SourceArgs> coordinatedArgs = new HashMap<String, SourceArgs>();
-            for (String c : componentBoltSubscriptions(component)) {
-                coordinatedArgs.put(c, SourceArgs.all());
-            }
-
-            IdStreamSpec idSpec = null;
-            if (component.committer) {
-                idSpec = IdStreamSpec.makeDetectSpec(coordinator, TransactionalSpoutCoordinator.TRANSACTION_COMMIT_STREAM_ID);
-            }
-            BoltDeclarer input = builder.setBolt(id,
-                                                 new CoordinatedBolt(component.bolt,
-                                                                     coordinatedArgs,
-                                                                     idSpec),
-                                                 component.parallelism);
-            for (SharedMemory request : component.sharedMemory) {
-                input.addSharedMemory(request);
-            }
-            if (!component.componentConf.isEmpty()) {
-                input.addConfigurations(component.componentConf);
-            }
-            for (String c : componentBoltSubscriptions(component)) {
-                input.directGrouping(c, Constants.COORDINATED_STREAM_ID);
-            }
-            for (InputDeclaration d : component.declarations) {
-                d.declare(input);
-            }
-            if (component.committer) {
-                input.allGrouping(coordinator, TransactionalSpoutCoordinator.TRANSACTION_COMMIT_STREAM_ID);
-            }
-        }
-        return builder;
-    }
-
-    public StormTopology buildTopology() {
-        return buildTopologyBuilder().createTopology();
-    }
-
-    private Set<String> componentBoltSubscriptions(Component component) {
-        Set<String> ret = new HashSet<String>();
-        for (InputDeclaration d : component.declarations) {
-            ret.add(d.getComponent());
-        }
-        return ret;
-    }
-
-    private static interface InputDeclaration {
-        void declare(InputDeclarer declarer);
-
-        String getComponent();
-    }
-
-    private static class Component {
-        public final IRichBolt bolt;
-        public final Integer parallelism;
-        public final List<InputDeclaration> declarations = new ArrayList<>();
-        public final Map<String, Object> componentConf = new HashMap<>();
-        public final boolean committer;
-        public final Set<SharedMemory> sharedMemory = new HashSet<>();
-
-        public Component(IRichBolt bolt, Integer parallelism, boolean committer) {
-            this.bolt = bolt;
-            this.parallelism = parallelism;
-            this.committer = committer;
-        }
-    }
-
-    private static class BoltDeclarerImpl extends BaseConfigurationDeclarer<BoltDeclarer> implements BoltDeclarer {
-        Component component;
-
-        public BoltDeclarerImpl(Component component) {
-            this.component = component;
-        }
-
-        @Override
-        public BoltDeclarer fieldsGrouping(final String component, final Fields fields) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.fieldsGrouping(component, fields);
-                }
-
-                @Override
-                public String getComponent() {
-                    return component;
-                }
-            });
-            return this;
-        }
-
-        @Override
-        public BoltDeclarer fieldsGrouping(final String component, final String streamId, final Fields fields) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.fieldsGrouping(component, streamId, fields);
-                }
-
-                @Override
-                public String getComponent() {
-                    return component;
-                }
-            });
-            return this;
-        }
-
-        @Override
-        public BoltDeclarer globalGrouping(final String component) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.globalGrouping(component);
-                }
-
-                @Override
-                public String getComponent() {
-                    return component;
-                }
-            });
-            return this;
-        }
-
-        @Override
-        public BoltDeclarer globalGrouping(final String component, final String streamId) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.globalGrouping(component, streamId);
-                }
-
-                @Override
-                public String getComponent() {
-                    return component;
-                }
-            });
-            return this;
-        }
-
-        @Override
-        public BoltDeclarer shuffleGrouping(final String component) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.shuffleGrouping(component);
-                }
-
-                @Override
-                public String getComponent() {
-                    return component;
-                }
-            });
-            return this;
-        }
-
-        @Override
-        public BoltDeclarer shuffleGrouping(final String component, final String streamId) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.shuffleGrouping(component, streamId);
-                }
-
-                @Override
-                public String getComponent() {
-                    return component;
-                }
-            });
-            return this;
-        }
-
-        @Override
-        public BoltDeclarer localOrShuffleGrouping(final String component) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.localOrShuffleGrouping(component);
-                }
-
-                @Override
-                public String getComponent() {
-                    return component;
-                }
-            });
-            return this;
-        }
-
-        @Override
-        public BoltDeclarer localOrShuffleGrouping(final String component, final String streamId) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.localOrShuffleGrouping(component, streamId);
-                }
-
-                @Override
-                public String getComponent() {
-                    return component;
-                }
-            });
-            return this;
-        }
-
-        @Override
-        public BoltDeclarer noneGrouping(final String component) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.noneGrouping(component);
-                }
-
-                @Override
-                public String getComponent() {
-                    return component;
-                }
-            });
-            return this;
-        }
-
-        @Override
-        public BoltDeclarer noneGrouping(final String component, final String streamId) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.noneGrouping(component, streamId);
-                }
-
-                @Override
-                public String getComponent() {
-                    return component;
-                }
-            });
-            return this;
-        }
-
-        @Override
-        public BoltDeclarer allGrouping(final String component) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.allGrouping(component);
-                }
-
-                @Override
-                public String getComponent() {
-                    return component;
-                }
-            });
-            return this;
-        }
-
-        @Override
-        public BoltDeclarer allGrouping(final String component, final String streamId) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.allGrouping(component, streamId);
-                }
-
-                @Override
-                public String getComponent() {
-                    return component;
-                }
-            });
-            return this;
-        }
-
-        @Override
-        public BoltDeclarer directGrouping(final String component) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.directGrouping(component);
-                }
-
-                @Override
-                public String getComponent() {
-                    return component;
-                }
-            });
-            return this;
-        }
-
-        @Override
-        public BoltDeclarer directGrouping(final String component, final String streamId) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.directGrouping(component, streamId);
-                }
-
-                @Override
-                public String getComponent() {
-                    return component;
-                }
-            });
-            return this;
-        }
-
-        @Override
-        public BoltDeclarer partialKeyGrouping(String componentId, Fields fields) {
-            return customGrouping(componentId, new PartialKeyGrouping(fields));
-        }
-
-        @Override
-        public BoltDeclarer partialKeyGrouping(String componentId, String streamId, Fields fields) {
-            return customGrouping(componentId, streamId, new PartialKeyGrouping(fields));
-        }
-
-        @Override
-        public BoltDeclarer customGrouping(final String component, final CustomStreamGrouping grouping) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.customGrouping(component, grouping);
-                }
-
-                @Override
-                public String getComponent() {
-                    return component;
-                }
-            });
-            return this;
-        }
-
-        @Override
-        public BoltDeclarer customGrouping(final String component, final String streamId, final CustomStreamGrouping grouping) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.customGrouping(component, streamId, grouping);
-                }
-
-                @Override
-                public String getComponent() {
-                    return component;
-                }
-            });
-            return this;
-        }
-
-        @Override
-        public BoltDeclarer grouping(final GlobalStreamId stream, final Grouping grouping) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.grouping(stream, grouping);
-                }
-
-                @Override
-                public String getComponent() {
-                    return stream.get_componentId();
-                }
-            });
-            return this;
-        }
-
-        private void addDeclaration(InputDeclaration declaration) {
-            component.declarations.add(declaration);
-        }
-
-        @Override
-        public BoltDeclarer addConfigurations(Map<String, Object> conf) {
-            if (conf != null) {
-                getComponentConfiguration().putAll(conf);
-            }
-            return this;
-        }
-
-        /**
-         * return the current component configuration.
-         *
-         * @return the current configuration.
-         */
-        @Override
-        public Map<String, Object> getComponentConfiguration() {
-            return component.componentConf;
-        }
-
-        @Override
-        public BoltDeclarer addSharedMemory(SharedMemory request) {
-            component.sharedMemory.add(request);
-            return this;
-        }
-    }
-
-    private class SpoutDeclarerImpl extends BaseConfigurationDeclarer<SpoutDeclarer> implements SpoutDeclarer {
-        @Override
-        public SpoutDeclarer addConfigurations(Map<String, Object> conf) {
-            if (conf != null) {
-                spoutConf.putAll(conf);
-            }
-            return this;
-        }
-
-        /**
-         * return the current component configuration.
-         *
-         * @return the current configuration.
-         */
-        @Override
-        public Map<String, Object> getComponentConfiguration() {
-            return spoutConf;
-        }
-
-        @Override
-        public SpoutDeclarer addSharedMemory(SharedMemory request) {
-            spoutSharedMemory.add(request);
-            return this;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/da0bc6a7/storm-client/src/jvm/org/apache/storm/transactional/partitioned/IOpaquePartitionedTransactionalSpout.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/transactional/partitioned/IOpaquePartitionedTransactionalSpout.java b/storm-client/src/jvm/org/apache/storm/transactional/partitioned/IOpaquePartitionedTransactionalSpout.java
deleted file mode 100644
index 763b6a0..0000000
--- a/storm-client/src/jvm/org/apache/storm/transactional/partitioned/IOpaquePartitionedTransactionalSpout.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.transactional.partitioned;
-
-import java.util.Map;
-import org.apache.storm.coordination.BatchOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.IComponent;
-import org.apache.storm.transactional.TransactionAttempt;
-
-/**
- * This defines a transactional spout which does *not* necessarily replay the same batch every time it emits a batch for a transaction id.
- */
-public interface IOpaquePartitionedTransactionalSpout<T> extends IComponent {
-    Emitter<T> getEmitter(Map<String, Object> conf, TopologyContext context);
-
-    Coordinator getCoordinator(Map<String, Object> conf, TopologyContext context);
-
-    public interface Coordinator {
-        /**
-         * Returns true if its ok to emit start a new transaction, false otherwise (will skip this transaction).
-         *
-         * You should sleep here if you want a delay between asking for the next transaction (this will be called repeatedly in a loop).
-         */
-        boolean isReady();
-
-        void close();
-    }
-
-    public interface Emitter<X> {
-        /**
-         * Emit a batch of tuples for a partition/transaction.
-         *
-         * Return the metadata describing this batch that will be used as lastPartitionMeta for defining the parameters of the next batch.
-         */
-        X emitPartitionBatch(TransactionAttempt tx, BatchOutputCollector collector, int partition, X lastPartitionMeta);
-
-        int numPartitions();
-
-        void close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/da0bc6a7/storm-client/src/jvm/org/apache/storm/transactional/partitioned/IPartitionedTransactionalSpout.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/transactional/partitioned/IPartitionedTransactionalSpout.java b/storm-client/src/jvm/org/apache/storm/transactional/partitioned/IPartitionedTransactionalSpout.java
deleted file mode 100644
index ed86d01..0000000
--- a/storm-client/src/jvm/org/apache/storm/transactional/partitioned/IPartitionedTransactionalSpout.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.transactional.partitioned;
-
-import java.util.Map;
-import org.apache.storm.coordination.BatchOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.IComponent;
-import org.apache.storm.transactional.TransactionAttempt;
-
-/**
- * This interface defines a transactional spout that reads its tuples from a partitioned set of brokers. It automates the storing of
- * metadata for each partition to ensure that the same batch is always emitted for the same transaction id. The partition metadata is stored
- * in Zookeeper.
- */
-public interface IPartitionedTransactionalSpout<T> extends IComponent {
-    Coordinator getCoordinator(Map<String, Object> conf, TopologyContext context);
-
-    Emitter<T> getEmitter(Map<String, Object> conf, TopologyContext context);
-
-    public interface Coordinator {
-        /**
-         * Return the number of partitions currently in the source of data. The idea is is that if a new partition is added and a prior
-         * transaction is replayed, it doesn't emit tuples for the new partition because it knows how many partitions were in that
-         * transaction.
-         */
-        int numPartitions();
-
-        /**
-         * Returns true if its ok to emit start a new transaction, false otherwise (will skip this transaction).
-         *
-         * You should sleep here if you want a delay between asking for the next transaction (this will be called repeatedly in a loop).
-         */
-        boolean isReady();
-
-        void close();
-    }
-
-    public interface Emitter<X> {
-        /**
-         * Emit a batch of tuples for a partition/transaction that's never been emitted before. Return the metadata that can be used to
-         * reconstruct this partition/batch in the future.
-         */
-        X emitPartitionBatchNew(TransactionAttempt tx, BatchOutputCollector collector, int partition, X lastPartitionMeta);
-
-        /**
-         * Emit a batch of tuples for a partition/transaction that has been emitted before, using the metadata created when it was first
-         * emitted.
-         */
-        void emitPartitionBatch(TransactionAttempt tx, BatchOutputCollector collector, int partition, X partitionMeta);
-
-        void close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/da0bc6a7/storm-client/src/jvm/org/apache/storm/transactional/partitioned/OpaquePartitionedTransactionalSpoutExecutor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/transactional/partitioned/OpaquePartitionedTransactionalSpoutExecutor.java b/storm-client/src/jvm/org/apache/storm/transactional/partitioned/OpaquePartitionedTransactionalSpoutExecutor.java
deleted file mode 100644
index 26f87d0..0000000
--- a/storm-client/src/jvm/org/apache/storm/transactional/partitioned/OpaquePartitionedTransactionalSpoutExecutor.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.transactional.partitioned;
-
-import java.math.BigInteger;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.TreeMap;
-import org.apache.storm.Config;
-import org.apache.storm.coordination.BatchOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.transactional.ICommitterTransactionalSpout;
-import org.apache.storm.transactional.ITransactionalSpout;
-import org.apache.storm.transactional.TransactionAttempt;
-import org.apache.storm.transactional.state.RotatingTransactionalState;
-import org.apache.storm.transactional.state.TransactionalState;
-
-
-public class OpaquePartitionedTransactionalSpoutExecutor implements ICommitterTransactionalSpout<Object> {
-    IOpaquePartitionedTransactionalSpout _spout;
-
-    public OpaquePartitionedTransactionalSpoutExecutor(IOpaquePartitionedTransactionalSpout spout) {
-        _spout = spout;
-    }
-
-    @Override
-    public ITransactionalSpout.Coordinator<Object> getCoordinator(Map<String, Object> conf, TopologyContext context) {
-        return new Coordinator(conf, context);
-    }
-
-    @Override
-    public ICommitterTransactionalSpout.Emitter getEmitter(Map<String, Object> conf, TopologyContext context) {
-        return new Emitter(conf, context);
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        _spout.declareOutputFields(declarer);
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        return _spout.getComponentConfiguration();
-    }
-
-    public class Coordinator implements ITransactionalSpout.Coordinator<Object> {
-        IOpaquePartitionedTransactionalSpout.Coordinator _coordinator;
-
-        public Coordinator(Map<String, Object> conf, TopologyContext context) {
-            _coordinator = _spout.getCoordinator(conf, context);
-        }
-
-        @Override
-        public Object initializeTransaction(BigInteger txid, Object prevMetadata) {
-            return null;
-        }
-
-        @Override
-        public boolean isReady() {
-            return _coordinator.isReady();
-        }
-
-        @Override
-        public void close() {
-            _coordinator.close();
-        }
-    }
-
-    public class Emitter implements ICommitterTransactionalSpout.Emitter {
-        IOpaquePartitionedTransactionalSpout.Emitter _emitter;
-        TransactionalState _state;
-        TreeMap<BigInteger, Map<Integer, Object>> _cachedMetas = new TreeMap<>();
-        Map<Integer, RotatingTransactionalState> _partitionStates = new HashMap<>();
-        int _index;
-        int _numTasks;
-
-        public Emitter(Map<String, Object> conf, TopologyContext context) {
-            _emitter = _spout.getEmitter(conf, context);
-            _index = context.getThisTaskIndex();
-            _numTasks = context.getComponentTasks(context.getThisComponentId()).size();
-            _state =
-                TransactionalState.newUserState(conf, (String) conf.get(Config.TOPOLOGY_TRANSACTIONAL_ID), getComponentConfiguration());
-            List<String> existingPartitions = _state.list("");
-            for (String p : existingPartitions) {
-                int partition = Integer.parseInt(p);
-                if ((partition - _index) % _numTasks == 0) {
-                    _partitionStates.put(partition, new RotatingTransactionalState(_state, p));
-                }
-            }
-        }
-
-        @Override
-        public void emitBatch(TransactionAttempt tx, Object coordinatorMeta, BatchOutputCollector collector) {
-            Map<Integer, Object> metas = new HashMap<>();
-            _cachedMetas.put(tx.getTransactionId(), metas);
-            int partitions = _emitter.numPartitions();
-            Entry<BigInteger, Map<Integer, Object>> entry = _cachedMetas.lowerEntry(tx.getTransactionId());
-            Map<Integer, Object> prevCached;
-            if (entry != null) {
-                prevCached = entry.getValue();
-            } else {
-                prevCached = new HashMap<>();
-            }
-
-            for (int i = _index; i < partitions; i += _numTasks) {
-                RotatingTransactionalState state = _partitionStates.get(i);
-                if (state == null) {
-                    state = new RotatingTransactionalState(_state, "" + i);
-                    _partitionStates.put(i, state);
-                }
-                state.removeState(tx.getTransactionId());
-                Object lastMeta = prevCached.get(i);
-                if (lastMeta == null) {
-                    lastMeta = state.getLastState();
-                }
-                Object meta = _emitter.emitPartitionBatch(tx, collector, i, lastMeta);
-                metas.put(i, meta);
-            }
-        }
-
-        @Override
-        public void cleanupBefore(BigInteger txid) {
-            for (RotatingTransactionalState state : _partitionStates.values()) {
-                state.cleanupBefore(txid);
-            }
-        }
-
-        @Override
-        public void commit(TransactionAttempt attempt) {
-            BigInteger txid = attempt.getTransactionId();
-            Map<Integer, Object> metas = _cachedMetas.remove(txid);
-            for (Entry<Integer, Object> entry : metas.entrySet()) {
-                Integer partition = entry.getKey();
-                Object meta = entry.getValue();
-                _partitionStates.get(partition).overrideState(txid, meta);
-            }
-        }
-
-        @Override
-        public void close() {
-            _emitter.close();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/da0bc6a7/storm-client/src/jvm/org/apache/storm/transactional/partitioned/PartitionedTransactionalSpoutExecutor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/transactional/partitioned/PartitionedTransactionalSpoutExecutor.java b/storm-client/src/jvm/org/apache/storm/transactional/partitioned/PartitionedTransactionalSpoutExecutor.java
deleted file mode 100644
index 898ffbf..0000000
--- a/storm-client/src/jvm/org/apache/storm/transactional/partitioned/PartitionedTransactionalSpoutExecutor.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.transactional.partitioned;
-
-import java.math.BigInteger;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.storm.Config;
-import org.apache.storm.coordination.BatchOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.transactional.ITransactionalSpout;
-import org.apache.storm.transactional.TransactionAttempt;
-import org.apache.storm.transactional.state.RotatingTransactionalState;
-import org.apache.storm.transactional.state.TransactionalState;
-
-
-public class PartitionedTransactionalSpoutExecutor implements ITransactionalSpout<Integer> {
-    IPartitionedTransactionalSpout _spout;
-
-    public PartitionedTransactionalSpoutExecutor(IPartitionedTransactionalSpout spout) {
-        _spout = spout;
-    }
-
-    public IPartitionedTransactionalSpout getPartitionedSpout() {
-        return _spout;
-    }
-
-    @Override
-    public ITransactionalSpout.Coordinator getCoordinator(Map<String, Object> conf, TopologyContext context) {
-        return new Coordinator(conf, context);
-    }
-
-    @Override
-    public ITransactionalSpout.Emitter getEmitter(Map<String, Object> conf, TopologyContext context) {
-        return new Emitter(conf, context);
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        _spout.declareOutputFields(declarer);
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        return _spout.getComponentConfiguration();
-    }
-
-    class Coordinator implements ITransactionalSpout.Coordinator<Integer> {
-        private IPartitionedTransactionalSpout.Coordinator _coordinator;
-
-        public Coordinator(Map<String, Object> conf, TopologyContext context) {
-            _coordinator = _spout.getCoordinator(conf, context);
-        }
-
-        @Override
-        public Integer initializeTransaction(BigInteger txid, Integer prevMetadata) {
-            return _coordinator.numPartitions();
-        }
-
-        @Override
-        public boolean isReady() {
-            return _coordinator.isReady();
-        }
-
-        @Override
-        public void close() {
-            _coordinator.close();
-        }
-    }
-
-    class Emitter implements ITransactionalSpout.Emitter<Integer> {
-        private IPartitionedTransactionalSpout.Emitter _emitter;
-        private TransactionalState _state;
-        private Map<Integer, RotatingTransactionalState> _partitionStates = new HashMap<>();
-        private int _index;
-        private int _numTasks;
-
-        public Emitter(Map<String, Object> conf, TopologyContext context) {
-            _emitter = _spout.getEmitter(conf, context);
-            _state =
-                TransactionalState.newUserState(conf, (String) conf.get(Config.TOPOLOGY_TRANSACTIONAL_ID), getComponentConfiguration());
-            _index = context.getThisTaskIndex();
-            _numTasks = context.getComponentTasks(context.getThisComponentId()).size();
-        }
-
-        @Override
-        public void emitBatch(final TransactionAttempt tx, final Integer partitions,
-                              final BatchOutputCollector collector) {
-            for (int i = _index; i < partitions; i += _numTasks) {
-                if (!_partitionStates.containsKey(i)) {
-                    _partitionStates.put(i, new RotatingTransactionalState(_state, "" + i));
-                }
-                RotatingTransactionalState state = _partitionStates.get(i);
-                final int partition = i;
-                Object meta = state.getStateOrCreate(tx.getTransactionId(),
-                                                     new RotatingTransactionalState.StateInitializer() {
-                                                         @Override
-                                                         public Object init(BigInteger txid, Object lastState) {
-                                                             return _emitter.emitPartitionBatchNew(tx, collector, partition, lastState);
-                                                         }
-                                                     });
-                // it's null if one of:
-                //   a) a later transaction batch was emitted before this, so we should skip this batch
-                //   b) if didn't exist and was created (in which case the StateInitializer was invoked and
-                //      it was emitted
-                if (meta != null) {
-                    _emitter.emitPartitionBatch(tx, collector, partition, meta);
-                }
-            }
-
-        }
-
-        @Override
-        public void cleanupBefore(BigInteger txid) {
-            for (RotatingTransactionalState state : _partitionStates.values()) {
-                state.cleanupBefore(txid);
-            }
-        }
-
-        @Override
-        public void close() {
-            _state.close();
-            _emitter.close();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/da0bc6a7/storm-client/src/jvm/org/apache/storm/transactional/state/RotatingTransactionalState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/transactional/state/RotatingTransactionalState.java b/storm-client/src/jvm/org/apache/storm/transactional/state/RotatingTransactionalState.java
deleted file mode 100644
index e551a00..0000000
--- a/storm-client/src/jvm/org/apache/storm/transactional/state/RotatingTransactionalState.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.transactional.state;
-
-import java.math.BigInteger;
-import java.util.HashSet;
-import java.util.List;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import org.apache.storm.transactional.TransactionalSpoutCoordinator;
-
-/**
- * A map from txid to a value. Automatically deletes txids that have been committed.
- */
-public class RotatingTransactionalState {
-    private TransactionalState _state;
-    private String _subdir;
-    private boolean _strictOrder;
-    private TreeMap<BigInteger, Object> _curr = new TreeMap<BigInteger, Object>();
-
-    public RotatingTransactionalState(TransactionalState state, String subdir, boolean strictOrder) {
-        _state = state;
-        _subdir = subdir;
-        _strictOrder = strictOrder;
-        state.mkdir(subdir);
-        sync();
-    }
-
-    public RotatingTransactionalState(TransactionalState state, String subdir) {
-        this(state, subdir, false);
-    }
-
-    public Object getLastState() {
-        if (_curr.isEmpty()) {
-            return null;
-        } else {
-            return _curr.lastEntry().getValue();
-        }
-    }
-
-    public void overrideState(BigInteger txid, Object state) {
-        _state.setData(txPath(txid), state);
-        _curr.put(txid, state);
-    }
-
-    public void removeState(BigInteger txid) {
-        if (_curr.containsKey(txid)) {
-            _curr.remove(txid);
-            _state.delete(txPath(txid));
-        }
-    }
-
-    public Object getState(BigInteger txid, StateInitializer init) {
-        if (!_curr.containsKey(txid)) {
-            SortedMap<BigInteger, Object> prevMap = _curr.headMap(txid);
-            SortedMap<BigInteger, Object> afterMap = _curr.tailMap(txid);
-
-            BigInteger prev = null;
-            if (!prevMap.isEmpty()) {
-                prev = prevMap.lastKey();
-            }
-
-            if (_strictOrder) {
-                if (prev == null && !txid.equals(TransactionalSpoutCoordinator.INIT_TXID)) {
-                    throw new IllegalStateException("Trying to initialize transaction for which there should be a previous state");
-                }
-                if (prev != null && !prev.equals(txid.subtract(BigInteger.ONE))) {
-                    throw new IllegalStateException("Expecting previous txid state to be the previous transaction");
-                }
-                if (!afterMap.isEmpty()) {
-                    throw new IllegalStateException(
-                        "Expecting tx state to be initialized in strict order but there are txids after that have state");
-                }
-            }
-
-
-            Object data;
-            if (afterMap.isEmpty()) {
-                Object prevData;
-                if (prev != null) {
-                    prevData = _curr.get(prev);
-                } else {
-                    prevData = null;
-                }
-                data = init.init(txid, prevData);
-            } else {
-                data = null;
-            }
-            _curr.put(txid, data);
-            _state.setData(txPath(txid), data);
-        }
-        return _curr.get(txid);
-    }
-
-    public boolean hasCache(BigInteger txid) {
-        return _curr.containsKey(txid);
-    }
-
-    /**
-     * Returns null if it was created, the value otherwise.
-     */
-    public Object getStateOrCreate(BigInteger txid, StateInitializer init) {
-        if (_curr.containsKey(txid)) {
-            return _curr.get(txid);
-        } else {
-            getState(txid, init);
-            return null;
-        }
-    }
-
-    public void cleanupBefore(BigInteger txid) {
-        SortedMap<BigInteger, Object> toDelete = _curr.headMap(txid);
-        for (BigInteger tx : new HashSet<BigInteger>(toDelete.keySet())) {
-            _curr.remove(tx);
-            _state.delete(txPath(tx));
-        }
-    }
-
-    private void sync() {
-        List<String> txids = _state.list(_subdir);
-        for (String txid_s : txids) {
-            Object data = _state.getData(txPath(txid_s));
-            _curr.put(new BigInteger(txid_s), data);
-        }
-    }
-
-    private String txPath(BigInteger tx) {
-        return txPath(tx.toString());
-    }
-
-    private String txPath(String tx) {
-        return _subdir + "/" + tx;
-    }
-
-    public static interface StateInitializer {
-        Object init(BigInteger txid, Object lastState);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/da0bc6a7/storm-client/src/jvm/org/apache/storm/transactional/state/TestTransactionalState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/transactional/state/TestTransactionalState.java b/storm-client/src/jvm/org/apache/storm/transactional/state/TestTransactionalState.java
deleted file mode 100644
index af0bbf5..0000000
--- a/storm-client/src/jvm/org/apache/storm/transactional/state/TestTransactionalState.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.transactional.state;
-
-import java.util.List;
-import java.util.Map;
-import org.apache.storm.shade.org.apache.curator.framework.CuratorFramework;
-import org.apache.storm.shade.org.apache.zookeeper.CreateMode;
-import org.apache.storm.shade.org.apache.zookeeper.data.ACL;
-
-/**
- * Facilitates testing of non-public methods in the parent class.
- */
-public class TestTransactionalState extends TransactionalState {
-
-    /**
-     * Matching constructor in absence of a default constructor in the parent class.
-     */
-    protected TestTransactionalState(Map<String, Object> conf, String id, Map<String, Object> componentConf, String subroot) {
-        super(conf, id, componentConf, subroot);
-    }
-
-    public static void createNode(CuratorFramework curator,
-                                  String rootDir, byte[] data, List<ACL> acls, CreateMode mode)
-        throws Exception {
-        TransactionalState.createNode(curator, rootDir, data, acls, mode);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/da0bc6a7/storm-client/src/jvm/org/apache/storm/transactional/state/TransactionalState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/transactional/state/TransactionalState.java b/storm-client/src/jvm/org/apache/storm/transactional/state/TransactionalState.java
deleted file mode 100644
index 4ebbc66..0000000
--- a/storm-client/src/jvm/org/apache/storm/transactional/state/TransactionalState.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.transactional.state;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.storm.Config;
-import org.apache.storm.cluster.DaemonType;
-import org.apache.storm.serialization.KryoValuesDeserializer;
-import org.apache.storm.serialization.KryoValuesSerializer;
-import org.apache.storm.shade.org.apache.curator.framework.CuratorFramework;
-import org.apache.storm.shade.org.apache.curator.framework.api.PathAndBytesable;
-import org.apache.storm.shade.org.apache.curator.framework.api.ProtectACLCreateModePathAndBytesable;
-import org.apache.storm.shade.org.apache.zookeeper.CreateMode;
-import org.apache.storm.shade.org.apache.zookeeper.KeeperException;
-import org.apache.storm.shade.org.apache.zookeeper.data.ACL;
-import org.apache.storm.utils.CuratorUtils;
-import org.apache.storm.utils.Utils;
-import org.apache.storm.utils.ZookeeperAuthInfo;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TransactionalState {
-    public static final Logger LOG = LoggerFactory.getLogger(TransactionalState.class);
-    CuratorFramework _curator;
-    KryoValuesSerializer _ser;
-    KryoValuesDeserializer _des;
-    List<ACL> _zkAcls = null;
-
-    protected TransactionalState(Map<String, Object> conf, String id, Map<String, Object> componentConf, String subroot) {
-        try {
-            conf = new HashMap<>(conf);
-            // ensure that the serialization registrations are consistent with the declarations in this spout
-            if (componentConf != null) {
-                conf.put(Config.TOPOLOGY_KRYO_REGISTER,
-                         componentConf
-                             .get(Config.TOPOLOGY_KRYO_REGISTER));
-            }
-            String transactionalRoot = (String) conf.get(Config.TRANSACTIONAL_ZOOKEEPER_ROOT);
-            String rootDir = transactionalRoot + "/" + id + "/" + subroot;
-            List<String> servers =
-                (List<String>) getWithBackup(conf, Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Config.STORM_ZOOKEEPER_SERVERS);
-            Object port = getWithBackup(conf, Config.TRANSACTIONAL_ZOOKEEPER_PORT, Config.STORM_ZOOKEEPER_PORT);
-            ZookeeperAuthInfo auth = new ZookeeperAuthInfo(conf);
-            CuratorFramework initter = CuratorUtils.newCuratorStarted(conf, servers, port, auth, DaemonType.WORKER.getDefaultZkAcls(conf));
-            _zkAcls = Utils.getWorkerACL(conf);
-            try {
-                TransactionalState.createNode(initter, transactionalRoot, null, null, null);
-            } catch (KeeperException.NodeExistsException e) {
-            }
-            try {
-                TransactionalState.createNode(initter, rootDir, null, _zkAcls, null);
-            } catch (KeeperException.NodeExistsException e) {
-            }
-            initter.close();
-
-            _curator = CuratorUtils.newCuratorStarted(conf, servers, port, rootDir, auth, DaemonType.WORKER.getDefaultZkAcls(conf));
-            _ser = new KryoValuesSerializer(conf);
-            _des = new KryoValuesDeserializer(conf);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public static TransactionalState newUserState(Map<String, Object> conf, String id, Map<String, Object> componentConf) {
-        return new TransactionalState(conf, id, componentConf, "user");
-    }
-
-    public static TransactionalState newCoordinatorState(Map<String, Object> conf, String id, Map<String, Object> componentConf) {
-        return new TransactionalState(conf, id, componentConf, "coordinator");
-    }
-
-    protected static void forPath(PathAndBytesable<String> builder,
-                                  String path, byte[] data) throws Exception {
-        try {
-            if (data == null) {
-                builder.forPath(path);
-            } else {
-                builder.forPath(path, data);
-            }
-        } catch (KeeperException.NodeExistsException e) {
-            LOG.info("Path {} already exists.", path);
-        }
-    }
-
-    protected static void createNode(CuratorFramework curator, String path,
-                                     byte[] data, List<ACL> acls, CreateMode mode) throws Exception {
-        ProtectACLCreateModePathAndBytesable<String> builder =
-            curator.create().creatingParentsIfNeeded();
-
-        if (acls == null) {
-            if (mode == null) {
-                TransactionalState.forPath(builder, path, data);
-            } else {
-                TransactionalState.forPath(builder.withMode(mode), path, data);
-            }
-            return;
-        }
-
-        TransactionalState.forPath(builder.withACL(acls), path, data);
-    }
-
-    public void setData(String path, Object obj) {
-        path = "/" + path;
-        byte[] ser = _ser.serializeObject(obj);
-        try {
-            if (_curator.checkExists().forPath(path) != null) {
-                _curator.setData().forPath(path, ser);
-            } else {
-                TransactionalState.createNode(_curator, path, ser, _zkAcls,
-                                              CreateMode.PERSISTENT);
-            }
-        } catch (KeeperException.NodeExistsException nee) {
-            LOG.warn("Path {} already exists.", path);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public void delete(String path) {
-        path = "/" + path;
-        try {
-            _curator.delete().forPath(path);
-        } catch (KeeperException.NoNodeException nne) {
-            // node was already deleted
-            LOG.info("Path {} has already been deleted.", path);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public List<String> list(String path) {
-        path = "/" + path;
-        try {
-            if (_curator.checkExists().forPath(path) == null) {
-                return new ArrayList<String>();
-            } else {
-                return _curator.getChildren().forPath(path);
-            }
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public void mkdir(String path) {
-        setData(path, 7);
-    }
-
-    public Object getData(String path) {
-        path = "/" + path;
-        try {
-            if (_curator.checkExists().forPath(path) != null) {
-                return _des.deserializeObject(_curator.getData().forPath(path));
-            } else {
-                return null;
-            }
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public void close() {
-        _curator.close();
-    }
-
-    private Object getWithBackup(Map amap, Object primary, Object backup) {
-        Object ret = amap.get(primary);
-        if (ret == null) {
-            return amap.get(backup);
-        }
-        return ret;
-    }
-}


[3/4] storm git commit: STORM-3156: Remove the transactional topology API

Posted by bo...@apache.org.
STORM-3156: Remove the transactional topology API


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/da0bc6a7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/da0bc6a7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/da0bc6a7

Branch: refs/heads/master
Commit: da0bc6a75b2b395dfb8204a2f0f85ae0fe1566ec
Parents: af42f43
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Tue Jul 17 22:18:10 2018 +0200
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Wed Jul 18 13:40:41 2018 +0200

----------------------------------------------------------------------
 docs/Structure-of-the-codebase.md               |  33 +-
 docs/Trident-tutorial.md                        |   2 +
 docs/Tutorial.md                                |   4 +-
 docs/index.md                                   |   2 +-
 docs/storm-sql.md                               |   2 +-
 .../storm/starter/TransactionalGlobalCount.java | 165 -----
 .../storm/starter/TransactionalWords.java       | 234 ------
 .../apache/storm/testing/BatchNumberList.java   |  65 --
 .../apache/storm/testing/BatchProcessWord.java  |  34 -
 .../org/apache/storm/testing/BatchRepeatA.java  |  43 --
 .../apache/storm/testing/CountingBatchBolt.java |  50 --
 .../storm/testing/CountingCommitBolt.java       |  52 --
 .../storm/testing/KeyedCountingBatchBolt.java   |  56 --
 .../testing/KeyedCountingCommitterBolt.java     |  19 -
 .../storm/testing/KeyedSummingBatchBolt.java    |  55 --
 .../storm/testing/MemoryTransactionalSpout.java | 185 -----
 .../testing/MemoryTransactionalSpoutMeta.java   |  33 -
 .../testing/OpaqueMemoryTransactionalSpout.java | 187 -----
 ...BaseOpaquePartitionedTransactionalSpout.java |  20 -
 .../base/BasePartitionedTransactionalSpout.java |  19 -
 .../topology/base/BaseTransactionalBolt.java    |  19 -
 .../topology/base/BaseTransactionalSpout.java   |  19 -
 .../apache/storm/transactional/ICommitter.java  |  21 -
 .../ICommitterTransactionalSpout.java           |  26 -
 .../transactional/ITransactionalSpout.java      |  85 ---
 .../storm/transactional/TransactionAttempt.java |   4 +
 .../TransactionalSpoutBatchExecutor.java        |  91 ---
 .../TransactionalSpoutCoordinator.java          | 209 ------
 .../TransactionalTopologyBuilder.java           | 562 ---------------
 .../IOpaquePartitionedTransactionalSpout.java   |  52 --
 .../IPartitionedTransactionalSpout.java         |  64 --
 ...uePartitionedTransactionalSpoutExecutor.java | 158 -----
 .../PartitionedTransactionalSpoutExecutor.java  | 138 ----
 .../state/RotatingTransactionalState.java       | 149 ----
 .../state/TestTransactionalState.java           |  38 -
 .../transactional/state/TransactionalState.java | 185 -----
 .../clj/org/apache/storm/transactional_test.clj | 706 -------------------
 37 files changed, 26 insertions(+), 3760 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/da0bc6a7/docs/Structure-of-the-codebase.md
----------------------------------------------------------------------
diff --git a/docs/Structure-of-the-codebase.md b/docs/Structure-of-the-codebase.md
index 8c53002..c5804ee 100644
--- a/docs/Structure-of-the-codebase.md
+++ b/docs/Structure-of-the-codebase.md
@@ -7,9 +7,7 @@ There are three distinct layers to Storm's codebase.
 
 First, Storm was designed from the very beginning to be compatible with multiple languages. Nimbus is a Thrift service and topologies are defined as Thrift structures. The usage of Thrift allows Storm to be used from any language.
 
-Second, all of Storm's interfaces are specified as Java interfaces. So even though there's a lot of Clojure in Storm's implementation, all usage must go through the Java API. This means that every feature of Storm is always available via Java.
-
-Third, Storm's implementation is largely in Clojure. Line-wise, Storm is about half Java code, half Clojure code. But Clojure is much more expressive, so in reality the great majority of the implementation logic is in Clojure. 
+Second, all of Storm's interfaces are specified as Java interfaces. This means that every feature of Storm is always available via Java.
 
 The following sections explain each of these layers in more detail.
 
@@ -53,30 +51,30 @@ You can see this strategy at work with the [BaseRichSpout](javadocs/org/apache/s
 
 Spouts and bolts are serialized into the Thrift definition of the topology as described above. 
 
-One subtle aspect of the interfaces is the difference between `IBolt` and `ISpout` vs. `IRichBolt` and `IRichSpout`. The main difference between them is the addition of the `declareOutputFields` method in the "Rich" versions of the interfaces. The reason for the split is that the output fields declaration for each output stream needs to be part of the Thrift struct (so it can be specified from any language), but as a user you want to be able to declare the streams as part of your class. What `TopologyBuilder` does when constructing the Thrift representation is call `declareOutputFields` to get the declaration and convert it into the Thrift structure. The conversion happens [at this portion]({{page.git-blob-base}}/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java) of the `TopologyBuilder` code.
+One subtle aspect of the interfaces is the difference between `IBolt` and `ISpout` vs. `IRichBolt` and `IRichSpout`. The main difference between them is the addition of the `declareOutputFields` method in the "Rich" versions of the interfaces. The reason for the split is that the output fields declaration for each output stream needs to be part of the Thrift struct (so it can be specified from any language), but as a user you want to be able to declare the streams as part of your class. What `TopologyBuilder` does when constructing the Thrift representation is call `declareOutputFields` to get the declaration and convert it into the Thrift structure. The conversion happens in the `TopologyBuilder` code.
 
 
 ### Implementation
 
 Specifying all the functionality via Java interfaces ensures that every feature of Storm is available via Java. Moreso, the focus on Java interfaces ensures that the user experience from Java-land is pleasant as well.
 
-The implementation of Storm, on the other hand, is primarily in Clojure. While the codebase is about 50% Java and 50% Clojure in terms of LOC, most of the implementation logic is in Clojure. There are two notable exceptions to this, and that is the [DRPC](https://github.com/apache/storm/wiki/Distributed-RPC) and [transactional topologies](https://github.com/apache/storm/wiki/Transactional-topologies) implementations. These are implemented purely in Java. This was done to serve as an illustration for how to implement a higher level abstraction on Storm. The DRPC and transactional topologies implementations are in the [org.apache.storm.coordination]({{page.git-tree-base}}/storm-client/src/jvm/org/apache/storm/coordination), [org.apache.storm.drpc]({{page.git-tree-base}}/storm-client/src/jvm/org/apache/storm/drpc), and [org.apache.storm.transactional]({{page.git-tree-base}}/storm-client/src/jvm/org/apache/storm/transactional) packages.
+Storm was originally implemented in Clojure, but most of the code has since been ported to Java.
 
-Here's a summary of the purpose of the main Java packages and Clojure namespace:
+Here's a summary of the purpose of the main Java packages:
 
 #### Java packages
 
-[org.apache.storm.coordination]({{page.git-tree-base}}/storm-client/src/jvm/org/apache/storm/coordination): Implements the pieces required to coordinate batch-processing on top of Storm, which both DRPC and transactional topologies use. `CoordinatedBolt` is the most important class here.
+[org.apache.storm.coordination]({{page.git-tree-base}}/storm-client/src/jvm/org/apache/storm/coordination): Implements the pieces required to coordinate batch-processing on top of Storm, which DRPC uses. `CoordinatedBolt` is the most important class here.
 
 [org.apache.storm.drpc]({{page.git-tree-base}}/storm-client/src/jvm/org/apache/storm/drpc): Implementation of the DRPC higher level abstraction
 
-[org.apache.storm.generated]({{page.git-tree-base}}/storm-client/src/jvm/org/apache/storm/generated): The generated Thrift code for Storm (generated using [this fork](https://github.com/nathanmarz/thrift) of Thrift, which simply renames the packages to org.apache.thrift7 to avoid conflicts with other Thrift versions)
+[org.apache.storm.generated]({{page.git-tree-base}}/storm-client/src/jvm/org/apache/storm/generated): The generated Thrift code for Storm.
 
 [org.apache.storm.grouping]({{page.git-tree-base}}/storm-client/src/jvm/org/apache/storm/grouping): Contains interface for making custom stream groupings
 
-[org.apache.storm.hooks]({{page.git-tree-base}}/storm-client/src/jvm/org/apache/storm/hooks): Interfaces for hooking into various events in Storm, such as when tasks emit tuples, when tuples are acked, etc. User guide for hooks is [here](https://github.com/apache/storm/wiki/Hooks).
+[org.apache.storm.hooks]({{page.git-tree-base}}/storm-client/src/jvm/org/apache/storm/hooks): Interfaces for hooking into various events in Storm, such as when tasks emit tuples, when tuples are acked, etc. User guide for hooks is [here](Hooks.html).
 
-[org.apache.storm.serialization]({{page.git-tree-base}}/storm-client/src/jvm/org/apache/storm/serialization): Implementation of how Storm serializes/deserializes tuples. Built on top of [Kryo](http://code.google.com/p/kryo/).
+[org.apache.storm.serialization]({{page.git-tree-base}}/storm-client/src/jvm/org/apache/storm/serialization): Implementation of how Storm serializes/deserializes tuples. Built on top of [Kryo](https://github.com/EsotericSoftware/kryo).
 
 [org.apache.storm.spout]({{page.git-tree-base}}/storm-client/src/jvm/org/apache/storm/spout): Definition of spout and associated interfaces (like the `SpoutOutputCollector`). Also contains `ShellSpout` which implements the protocol for defining spouts in non-JVM languages.
 
@@ -86,13 +84,12 @@ Here's a summary of the purpose of the main Java packages and Clojure namespace:
 
 [org.apache.storm.topology]({{page.git-tree-base}}/storm-client/src/jvm/org/apache/storm/topology): Java layer over the underlying Thrift structure to provide a clean, pure-Java API to Storm (users don't have to know about Thrift). `TopologyBuilder` is here as well as the helpful base classes for the different spouts and bolts. The slightly-higher level `IBasicBolt` interface is here, which is a simpler way to write certain kinds of bolts.
 
-[org.apache.storm.transactional]({{page.git-tree-base}}/storm-client/src/jvm/org/apache/storm/transactional): Implementation of transactional topologies.
-
 [org.apache.storm.tuple]({{page.git-tree-base}}/storm-client/src/jvm/org/apache/storm/tuple): Implementation of Storm's tuple data model.
 
-[org.apache.storm.utils]({{page.git-tree-base}}/storm-client/src/jvm/org/apache/storm/utils): Data structures and miscellaneous utilities used throughout the codebase.
+[org.apache.storm.utils]({{page.git-tree-base}}/storm-client/src/jvm/org/apache/storm/utils): Data structures and miscellaneous utilities used throughout the codebase. This includes utilities for time simulation.
 
 [org.apache.storm.command.*]({{page.git-blob-base}}/storm-core/src/jvm/org/apache/storm/command): These implement various commands for the `storm` command line client. These implementations are very short.
+
 [org.apache.storm.cluster]({{page.git-blob-base}}/storm-client/src/jvm/org/apache/storm/cluster): This code manages how cluster state (like what tasks are running where, what spout/bolt each task runs as) is stored, typically in Zookeeper.
 
 [org.apache.storm.daemon.Acker]({{page.git-blob-base}}/storm-client/src/jvm/org/apache/storm/daemon/Acker.java): Implementation of the "acker" bolt, which is a key part of how Storm guarantees data processing.
@@ -101,6 +98,8 @@ Here's a summary of the purpose of the main Java packages and Clojure namespace:
 
 [org.apache.storm.event]({{page.git-blob-base}}/storm-server/src/jvm/org/apache/storm/event): Implements a simple asynchronous function executor. Used in various places in Nimbus and Supervisor to make functions execute in serial to avoid any race conditions.
 
+[org.apache.storm.LocalCluster]({{page.git-blob-base}}/storm-server/src/main/java/org/apache/storm/LocalCluster.java): Utility to boot up Storm inside an existing Java process. Often used in conjunction with `Testing.java` to implement integration tests.
+
 [org.apache.storm.messaging.*]({{page.git-blob-base}}/storm-client/src/jvm/org/apache/storm/messaging): Defines a higher level interface to implementing point to point messaging. In local mode Storm uses in-memory Java queues to do this; on a cluster, it uses Netty, but it is pluggable.
 
 [org.apache.storm.stats]({{page.git-blob-base}}/storm-client/src/jvm/org/apache/storm/stats): Implementation of stats rollup routines used when sending stats to ZK for use by the UI. Does things like windowed and rolling aggregations at multiple granularities.
@@ -117,14 +116,14 @@ Here's a summary of the purpose of the main Java packages and Clojure namespace:
 
 [org.apache.storm.daemon.worker]({{page.git-blob-base}}/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java): Implementation of a worker process (which will contain many tasks within). Implements message transferring and task launching.
 
+[org.apache.storm.Testing]({{page.git-blob-base}}/storm-server/src/main/java/org/apache/storm/Testing.java): Various utilities for working with local clusters during tests, e.g. `completeTopology` for running a fixed set of tuples through a topology for capturing the output, tracker topologies for having fine grained control over detecting when a cluster is "idle", and other utilities.
+
 #### Clojure namespaces
 
 [org.apache.storm.clojure]({{page.git-blob-base}}/storm-clojure/src/clj/org/apache/storm/clojure.clj): Implementation of the Clojure DSL for Storm.
 
-[org.apache.storm.config]({{page.git-blob-base}}/storm-core/src/clj/org/apache/storm/config.clj): Created clojure symbols for config names in [Config.java](javadocs/org/apache/storm/Config.html)
+[org.apache.storm.config]({{page.git-blob-base}}/storm-clojure/src/clj/org/apache/storm/config.clj): Created clojure symbols for config names in [Config.java](javadocs/org/apache/storm/Config.html)
  
-[org.apache.storm.log]({{page.git-blob-base}}/storm-core/src/clj/org/apache/storm/log.clj): Defines the functions used to log messages to log4j.
-
-[org.apache.storm.testing]({{page.git-blob-base}}/storm-clojure/src/clj/org/apache/storm/testing.clj): Implementation of facilities used to test Storm topologies. Includes time simulation, `complete-topology` for running a fixed set of tuples through a topology and capturing the output, tracker topologies for having fine grained control over detecting when a cluster is "idle", and other utilities.
+[org.apache.storm.log]({{page.git-blob-base}}/storm-clojure/src/clj/org/apache/storm/log.clj): Defines the functions used to log messages to log4j.
 
 [org.apache.storm.ui.*]({{page.git-blob-base}}/storm-core/src/clj/org/apache/storm/ui): Implementation of Storm UI. Completely independent from rest of code base and uses the Nimbus Thrift API to get data.

http://git-wip-us.apache.org/repos/asf/storm/blob/da0bc6a7/docs/Trident-tutorial.md
----------------------------------------------------------------------
diff --git a/docs/Trident-tutorial.md b/docs/Trident-tutorial.md
index 6ad9103..7df2594 100644
--- a/docs/Trident-tutorial.md
+++ b/docs/Trident-tutorial.md
@@ -6,6 +6,8 @@ documentation: true
 
 Trident is a high-level abstraction for doing realtime computing on top of Storm. It allows you to seamlessly intermix high throughput (millions of messages per second), stateful stream processing with low latency distributed querying. If you're familiar with high level batch processing tools like Pig or Cascading, the concepts of Trident will be very familiar – Trident has joins, aggregations, grouping, functions, and filters. In addition to these, Trident adds primitives for doing stateful, incremental processing on top of any database or persistence store. Trident has consistent, exactly-once semantics, so it is easy to reason about Trident topologies.
 
+Trident developed from an earlier effort to provide exactly-once guarantees for Storm. While this earlier API is no longer present in Storm, the [documentation](Transactional-topologies.html) provides a gentle introduction to some of the concepts used by Trident, and may be worth reading as an addendum to the Trident documentation.
+
 ## Illustrative example
 
 Let's look at an illustrative example of Trident. This example will do two things:

http://git-wip-us.apache.org/repos/asf/storm/blob/da0bc6a7/docs/Tutorial.md
----------------------------------------------------------------------
diff --git a/docs/Tutorial.md b/docs/Tutorial.md
index f71c209..aea38d3 100644
--- a/docs/Tutorial.md
+++ b/docs/Tutorial.md
@@ -280,9 +280,9 @@ For more information on writing spouts and bolts in other languages, and to lear
 
 Earlier on in this tutorial, we skipped over a few aspects of how tuples are emitted. Those aspects were part of Storm's reliability API: how Storm guarantees that every message coming off a spout will be fully processed. See [Guaranteeing message processing](Guaranteeing-message-processing.html) for information on how this works and what you have to do as a user to take advantage of Storm's reliability capabilities.
 
-## Transactional topologies
+## Trident
 
-Storm guarantees that every message will be played through the topology at least once. A common question asked is "how do you do things like counting on top of Storm? Won't you overcount?" Storm has a feature called transactional topologies that let you achieve exactly-once messaging semantics for most computations. Read more about transactional topologies [here](Transactional-topologies.html). 
+Storm guarantees that every message will be played through the topology at least once. A common question asked is "how do you do things like counting on top of Storm? Won't you overcount?" Storm has a higher level API called Trudent that let you achieve exactly-once messaging semantics for most computations. Read more about Trident [here](Trident-tutorial.html). 
 
 ## Distributed RPC
 

http://git-wip-us.apache.org/repos/asf/storm/blob/da0bc6a7/docs/index.md
----------------------------------------------------------------------
diff --git a/docs/index.md b/docs/index.md
index 2697c47..d957045 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -6,6 +6,7 @@ documentation: true
 ### Basics of Storm
 
 * [Javadoc](javadocs/index.html)
+* [Tutorial](Tutorial.html)
 * [Concepts](Concepts.html)
 * [Scheduler](Storm-Scheduler.html)
 * [Configuration](Configuration.html)
@@ -75,7 +76,6 @@ But small change will not affect the user experience. We will notify the user wh
 * [DSLs and multilang adapters](DSLs-and-multilang-adapters.html)
 * [Using non-JVM languages with Storm](Using-non-JVM-languages-with-Storm.html)
 * [Distributed RPC](Distributed-RPC.html)
-* [Transactional topologies](Transactional-topologies.html)
 * [Hooks](Hooks.html)
 * [Metrics (Deprecated)](Metrics.html)
 * [Metrics V2](metrics_v2.html)

http://git-wip-us.apache.org/repos/asf/storm/blob/da0bc6a7/docs/storm-sql.md
----------------------------------------------------------------------
diff --git a/docs/storm-sql.md b/docs/storm-sql.md
index 7467e2d..47fe93e 100644
--- a/docs/storm-sql.md
+++ b/docs/storm-sql.md
@@ -6,7 +6,7 @@ documentation: true
 
 The Storm SQL integration allows users to run SQL queries over streaming data in Storm. Not only the SQL interface allows faster development cycles on streaming analytics, but also opens up the opportunities to unify batch data processing like [Apache Hive](///hive.apache.org) and real-time streaming data analytics.
 
-At a very high level StormSQL compiles the SQL queries to Storm topologies leveraging Streams API and executes them in Storm clusters. This document provides information of how to use StormSQL as end users. For people that are interested in more details in the design and the implementation of StormSQL please refer to the [this](storm-sql-internal.html) page.
+At a very high level StormSQL compiles the SQL queries to Storm topologies leveraging Streams API and executes them in Storm clusters. This document provides information of how to use StormSQL as end users. For people that are interested in more details in the design and the implementation of StormSQL please refer to [this](storm-sql-internal.html) page.
 
 Storm SQL integration is an `experimental` feature, so the internal of Storm SQL and supported features are subject to change.
 But small change will not affect the user experience. We will notice/announce the user when breaking UX change is introduced.

http://git-wip-us.apache.org/repos/asf/storm/blob/da0bc6a7/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalGlobalCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalGlobalCount.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalGlobalCount.java
deleted file mode 100644
index 45cd68d..0000000
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalGlobalCount.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.starter;
-
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.storm.Config;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.coordination.BatchOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.testing.MemoryTransactionalSpout;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseBatchBolt;
-import org.apache.storm.topology.base.BaseTransactionalBolt;
-import org.apache.storm.transactional.ICommitter;
-import org.apache.storm.transactional.TransactionAttempt;
-import org.apache.storm.transactional.TransactionalTopologyBuilder;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.NimbusClient;
-
-/**
- * This is a basic example of a transactional topology. It keeps a count of the number of tuples seen so far in a
- * database. The source of data and the databases are mocked out as in memory maps for demonstration purposes.
- *
- * @see <a href="http://storm.apache.org/documentation/Transactional-topologies.html">Transactional topologies</a>
- */
-public class TransactionalGlobalCount {
-    public static final int PARTITION_TAKE_PER_BATCH = 3;
-    public static final Map<Integer, List<List<Object>>> DATA = new HashMap<Integer, List<List<Object>>>() {{
-        put(0, new ArrayList<List<Object>>() {{
-            add(new Values("cat"));
-            add(new Values("dog"));
-            add(new Values("chicken"));
-            add(new Values("cat"));
-            add(new Values("dog"));
-            add(new Values("apple"));
-        }});
-        put(1, new ArrayList<List<Object>>() {{
-            add(new Values("cat"));
-            add(new Values("dog"));
-            add(new Values("apple"));
-            add(new Values("banana"));
-        }});
-        put(2, new ArrayList<List<Object>>() {{
-            add(new Values("cat"));
-            add(new Values("cat"));
-            add(new Values("cat"));
-            add(new Values("cat"));
-            add(new Values("cat"));
-            add(new Values("dog"));
-            add(new Values("dog"));
-            add(new Values("dog"));
-            add(new Values("dog"));
-        }});
-    }};
-    public static final String GLOBAL_COUNT_KEY = "GLOBAL-COUNT";
-    public static Map<String, Value> DATABASE = new HashMap<String, Value>();
-
-    public static void main(String[] args) throws Exception {
-        if (!NimbusClient.isLocalOverride()) {
-            throw new IllegalStateException("This example only works in local mode.  "
-                                            + "Run with storm local not storm jar");
-        }
-        MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);
-        TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("global-count", "spout", spout, 3);
-        builder.setBolt("partial-count", new BatchCount(), 5).noneGrouping("spout");
-        builder.setBolt("sum", new UpdateGlobalCount()).globalGrouping("partial-count");
-
-        Config config = new Config();
-        config.setDebug(true);
-        config.setMaxSpoutPending(3);
-
-        StormSubmitter.submitTopology("global-count-topology", config, builder.buildTopology());
-    }
-
-    public static class Value {
-        int count = 0;
-        BigInteger txid;
-    }
-
-    public static class BatchCount extends BaseBatchBolt<Object> {
-        Object _id;
-        BatchOutputCollector _collector;
-
-        int _count = 0;
-
-        @Override
-        public void prepare(Map<String, Object> conf, TopologyContext context, BatchOutputCollector collector, Object id) {
-            _collector = collector;
-            _id = id;
-        }
-
-        @Override
-        public void execute(Tuple tuple) {
-            _count++;
-        }
-
-        @Override
-        public void finishBatch() {
-            _collector.emit(new Values(_id, _count));
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("id", "count"));
-        }
-    }
-
-    public static class UpdateGlobalCount extends BaseTransactionalBolt implements ICommitter {
-        TransactionAttempt _attempt;
-        BatchOutputCollector _collector;
-
-        int _sum = 0;
-
-        @Override
-        public void prepare(Map<String, Object> conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) {
-            _collector = collector;
-            _attempt = attempt;
-        }
-
-        @Override
-        public void execute(Tuple tuple) {
-            _sum += tuple.getInteger(1);
-        }
-
-        @Override
-        public void finishBatch() {
-            Value val = DATABASE.get(GLOBAL_COUNT_KEY);
-            Value newval;
-            if (val == null || !val.txid.equals(_attempt.getTransactionId())) {
-                newval = new Value();
-                newval.txid = _attempt.getTransactionId();
-                if (val == null) {
-                    newval.count = _sum;
-                } else {
-                    newval.count = _sum + val.count;
-                }
-                DATABASE.put(GLOBAL_COUNT_KEY, newval);
-            } else {
-                newval = val;
-            }
-            _collector.emit(new Values(_attempt, newval.count));
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("id", "sum"));
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/da0bc6a7/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalWords.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalWords.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalWords.java
deleted file mode 100644
index 7875360..0000000
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/TransactionalWords.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.starter;
-
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.storm.Config;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.coordination.BatchOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.testing.MemoryTransactionalSpout;
-import org.apache.storm.topology.BasicOutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseBasicBolt;
-import org.apache.storm.topology.base.BaseTransactionalBolt;
-import org.apache.storm.transactional.ICommitter;
-import org.apache.storm.transactional.TransactionAttempt;
-import org.apache.storm.transactional.TransactionalTopologyBuilder;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.NimbusClient;
-
-/**
- * This class defines a more involved transactional topology then TransactionalGlobalCount. This topology processes a
- * stream of words and produces two outputs:
- * <p/>
- * 1. A count for each word (stored in a database) 2. The number of words for every bucket of 10 counts. So it stores in
- * the database how many words have appeared 0-9 times, how many have appeared 10-19 times, and so on.
- * <p/>
- * A batch of words can cause the bucket counts to decrement for some buckets and increment for others as words move
- * between buckets as their counts accumulate.
- */
-public class TransactionalWords {
-    public static final int BUCKET_SIZE = 10;
-    public static final int PARTITION_TAKE_PER_BATCH = 3;
-    public static final Map<Integer, List<List<Object>>> DATA = new HashMap<Integer, List<List<Object>>>() {{
-        put(0, new ArrayList<List<Object>>() {{
-            add(new Values("cat"));
-            add(new Values("dog"));
-            add(new Values("chicken"));
-            add(new Values("cat"));
-            add(new Values("dog"));
-            add(new Values("apple"));
-        }});
-        put(1, new ArrayList<List<Object>>() {{
-            add(new Values("cat"));
-            add(new Values("dog"));
-            add(new Values("apple"));
-            add(new Values("banana"));
-        }});
-        put(2, new ArrayList<List<Object>>() {{
-            add(new Values("cat"));
-            add(new Values("cat"));
-            add(new Values("cat"));
-            add(new Values("cat"));
-            add(new Values("cat"));
-            add(new Values("dog"));
-            add(new Values("dog"));
-            add(new Values("dog"));
-            add(new Values("dog"));
-        }});
-    }};
-    public static Map<String, CountValue> COUNT_DATABASE = new HashMap<String, CountValue>();
-    public static Map<Integer, BucketValue> BUCKET_DATABASE = new HashMap<Integer, BucketValue>();
-
-    public static void main(String[] args) throws Exception {
-        if (!NimbusClient.isLocalOverride()) {
-            throw new IllegalStateException("This example only works in local mode.  "
-                                            + "Run with storm local not storm jar");
-        }
-        MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);
-        TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("top-n-words", "spout", spout, 2);
-        builder.setBolt("count", new KeyedCountUpdater(), 5).fieldsGrouping("spout", new Fields("word"));
-        builder.setBolt("bucketize", new Bucketize()).noneGrouping("count");
-        builder.setBolt("buckets", new BucketCountUpdater(), 5).fieldsGrouping("bucketize", new Fields("bucket"));
-        Config config = new Config();
-        config.setDebug(true);
-        config.setMaxSpoutPending(3);
-
-        StormSubmitter.submitTopology("top-n-topology", config, builder.buildTopology());
-    }
-
-    public static class CountValue {
-        Integer prev_count = null;
-        int count = 0;
-        BigInteger txid = null;
-    }
-
-    public static class BucketValue {
-        int count = 0;
-        BigInteger txid;
-    }
-
-    public static class KeyedCountUpdater extends BaseTransactionalBolt implements ICommitter {
-        Map<String, Integer> _counts = new HashMap<String, Integer>();
-        BatchOutputCollector _collector;
-        TransactionAttempt _id;
-
-        int _count = 0;
-
-        @Override
-        public void prepare(Map<String, Object> conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt id) {
-            _collector = collector;
-            _id = id;
-        }
-
-        @Override
-        public void execute(Tuple tuple) {
-            String key = tuple.getString(1);
-            Integer curr = _counts.get(key);
-            if (curr == null) {
-                curr = 0;
-            }
-            _counts.put(key, curr + 1);
-        }
-
-        @Override
-        public void finishBatch() {
-            for (String key : _counts.keySet()) {
-                CountValue val = COUNT_DATABASE.get(key);
-                CountValue newVal;
-                if (val == null || !val.txid.equals(_id.getTransactionId())) {
-                    newVal = new CountValue();
-                    newVal.txid = _id.getTransactionId();
-                    if (val != null) {
-                        newVal.prev_count = val.count;
-                        newVal.count = val.count;
-                    }
-                    newVal.count = newVal.count + _counts.get(key);
-                    COUNT_DATABASE.put(key, newVal);
-                } else {
-                    newVal = val;
-                }
-                _collector.emit(new Values(_id, key, newVal.count, newVal.prev_count));
-            }
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("id", "key", "count", "prev-count"));
-        }
-    }
-
-    public static class Bucketize extends BaseBasicBolt {
-        @Override
-        public void execute(Tuple tuple, BasicOutputCollector collector) {
-            TransactionAttempt attempt = (TransactionAttempt) tuple.getValue(0);
-            int curr = tuple.getInteger(2);
-            Integer prev = tuple.getInteger(3);
-
-            int currBucket = curr / BUCKET_SIZE;
-            Integer prevBucket = null;
-            if (prev != null) {
-                prevBucket = prev / BUCKET_SIZE;
-            }
-
-            if (prevBucket == null) {
-                collector.emit(new Values(attempt, currBucket, 1));
-            } else if (currBucket != prevBucket) {
-                collector.emit(new Values(attempt, currBucket, 1));
-                collector.emit(new Values(attempt, prevBucket, -1));
-            }
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("attempt", "bucket", "delta"));
-        }
-    }
-
-    public static class BucketCountUpdater extends BaseTransactionalBolt {
-        Map<Integer, Integer> _accum = new HashMap<Integer, Integer>();
-        BatchOutputCollector _collector;
-        TransactionAttempt _attempt;
-
-        int _count = 0;
-
-        @Override
-        public void prepare(Map<String, Object> conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) {
-            _collector = collector;
-            _attempt = attempt;
-        }
-
-        @Override
-        public void execute(Tuple tuple) {
-            Integer bucket = tuple.getInteger(1);
-            Integer delta = tuple.getInteger(2);
-            Integer curr = _accum.get(bucket);
-            if (curr == null) {
-                curr = 0;
-            }
-            _accum.put(bucket, curr + delta);
-        }
-
-        @Override
-        public void finishBatch() {
-            for (Integer bucket : _accum.keySet()) {
-                BucketValue currVal = BUCKET_DATABASE.get(bucket);
-                BucketValue newVal;
-                if (currVal == null || !currVal.txid.equals(_attempt.getTransactionId())) {
-                    newVal = new BucketValue();
-                    newVal.txid = _attempt.getTransactionId();
-                    newVal.count = _accum.get(bucket);
-                    if (currVal != null) {
-                        newVal.count += currVal.count;
-                    }
-                    BUCKET_DATABASE.put(bucket, newVal);
-                } else {
-                    newVal = currVal;
-                }
-                _collector.emit(new Values(_attempt, bucket, newVal.count));
-            }
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("id", "bucket", "count"));
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/da0bc6a7/storm-client/src/jvm/org/apache/storm/testing/BatchNumberList.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/testing/BatchNumberList.java b/storm-client/src/jvm/org/apache/storm/testing/BatchNumberList.java
deleted file mode 100644
index 38348c0..0000000
--- a/storm-client/src/jvm/org/apache/storm/testing/BatchNumberList.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.testing;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import org.apache.storm.coordination.BatchOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseBatchBolt;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-
-public class BatchNumberList extends BaseBatchBolt<Object> {
-
-    String _wordComponent;
-    String word = null;
-    List<Integer> intSet = new ArrayList<Integer>();
-    BatchOutputCollector _collector;
-
-    public BatchNumberList(String wordComponent) {
-        _wordComponent = wordComponent;
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields("word", "list"));
-    }
-
-    @Override
-    public void prepare(Map<String, Object> conf, TopologyContext context, BatchOutputCollector collector, Object id) {
-        _collector = collector;
-    }
-
-    @Override
-    public void execute(Tuple tuple) {
-        if (tuple.getSourceComponent().equals(_wordComponent)) {
-            this.word = tuple.getString(1);
-        } else {
-            intSet.add(tuple.getInteger(1));
-        }
-    }
-
-    @Override
-    public void finishBatch() {
-        if (word != null) {
-            Collections.sort(intSet);
-            _collector.emit(new Values(word, intSet));
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/da0bc6a7/storm-client/src/jvm/org/apache/storm/testing/BatchProcessWord.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/testing/BatchProcessWord.java b/storm-client/src/jvm/org/apache/storm/testing/BatchProcessWord.java
deleted file mode 100644
index 88a19c7..0000000
--- a/storm-client/src/jvm/org/apache/storm/testing/BatchProcessWord.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.testing;
-
-import org.apache.storm.topology.BasicOutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseBasicBolt;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-
-public class BatchProcessWord extends BaseBasicBolt {
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields("id", "size"));
-    }
-
-    @Override
-    public void execute(Tuple input, BasicOutputCollector collector) {
-        collector.emit(new Values(input.getValue(0), input.getString(1).length()));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/da0bc6a7/storm-client/src/jvm/org/apache/storm/testing/BatchRepeatA.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/testing/BatchRepeatA.java b/storm-client/src/jvm/org/apache/storm/testing/BatchRepeatA.java
deleted file mode 100644
index abe00b3..0000000
--- a/storm-client/src/jvm/org/apache/storm/testing/BatchRepeatA.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.testing;
-
-import org.apache.storm.topology.BasicOutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseBasicBolt;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-
-
-public class BatchRepeatA extends BaseBasicBolt {
-
-    @Override
-    public void execute(Tuple input, BasicOutputCollector collector) {
-        Object id = input.getValue(0);
-        String word = input.getString(1);
-        for (int i = 0; i < word.length(); i++) {
-            if (word.charAt(i) == 'a') {
-                collector.emit("multi", new Values(id, word.substring(0, i)));
-            }
-        }
-        collector.emit("single", new Values(id, word));
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declareStream("multi", new Fields("id", "word"));
-        declarer.declareStream("single", new Fields("id", "word"));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/da0bc6a7/storm-client/src/jvm/org/apache/storm/testing/CountingBatchBolt.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/testing/CountingBatchBolt.java b/storm-client/src/jvm/org/apache/storm/testing/CountingBatchBolt.java
deleted file mode 100644
index 31bec7c..0000000
--- a/storm-client/src/jvm/org/apache/storm/testing/CountingBatchBolt.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.testing;
-
-import java.util.Map;
-import org.apache.storm.coordination.BatchOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseBatchBolt;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-
-public class CountingBatchBolt extends BaseBatchBolt<Object> {
-    BatchOutputCollector _collector;
-    Object _id;
-    int _count = 0;
-
-    @Override
-    public void prepare(Map<String, Object> conf, TopologyContext context, BatchOutputCollector collector, Object id) {
-        _collector = collector;
-        _id = id;
-    }
-
-    @Override
-    public void execute(Tuple tuple) {
-        _count++;
-    }
-
-    @Override
-    public void finishBatch() {
-        _collector.emit(new Values(_id, _count));
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields("tx", "count"));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/da0bc6a7/storm-client/src/jvm/org/apache/storm/testing/CountingCommitBolt.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/testing/CountingCommitBolt.java b/storm-client/src/jvm/org/apache/storm/testing/CountingCommitBolt.java
deleted file mode 100644
index a9790e8..0000000
--- a/storm-client/src/jvm/org/apache/storm/testing/CountingCommitBolt.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.testing;
-
-import java.util.Map;
-import org.apache.storm.coordination.BatchOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseTransactionalBolt;
-import org.apache.storm.transactional.ICommitter;
-import org.apache.storm.transactional.TransactionAttempt;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-
-public class CountingCommitBolt extends BaseTransactionalBolt implements ICommitter {
-    BatchOutputCollector _collector;
-    TransactionAttempt _id;
-    int _count = 0;
-
-    @Override
-    public void prepare(Map<String, Object> conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt id) {
-        _id = id;
-        _collector = collector;
-    }
-
-    @Override
-    public void execute(Tuple tuple) {
-        _count++;
-    }
-
-    @Override
-    public void finishBatch() {
-        _collector.emit(new Values(_id, _count));
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields("tx", "count"));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/da0bc6a7/storm-client/src/jvm/org/apache/storm/testing/KeyedCountingBatchBolt.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/testing/KeyedCountingBatchBolt.java b/storm-client/src/jvm/org/apache/storm/testing/KeyedCountingBatchBolt.java
deleted file mode 100644
index 583a910..0000000
--- a/storm-client/src/jvm/org/apache/storm/testing/KeyedCountingBatchBolt.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.testing;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.storm.coordination.BatchOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseBatchBolt;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
-
-public class KeyedCountingBatchBolt extends BaseBatchBolt<Object> {
-    BatchOutputCollector _collector;
-    Object _id;
-    Map<Object, Integer> _counts = new HashMap<Object, Integer>();
-
-    @Override
-    public void prepare(Map<String, Object> conf, TopologyContext context, BatchOutputCollector collector, Object id) {
-        _collector = collector;
-        _id = id;
-    }
-
-    @Override
-    public void execute(Tuple tuple) {
-        Object key = tuple.getValue(1);
-        int curr = Utils.get(_counts, key, 0);
-        _counts.put(key, curr + 1);
-    }
-
-    @Override
-    public void finishBatch() {
-        for (Object key : _counts.keySet()) {
-            _collector.emit(new Values(_id, key, _counts.get(key)));
-        }
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields("tx", "key", "count"));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/da0bc6a7/storm-client/src/jvm/org/apache/storm/testing/KeyedCountingCommitterBolt.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/testing/KeyedCountingCommitterBolt.java b/storm-client/src/jvm/org/apache/storm/testing/KeyedCountingCommitterBolt.java
deleted file mode 100644
index 198f574..0000000
--- a/storm-client/src/jvm/org/apache/storm/testing/KeyedCountingCommitterBolt.java
+++ /dev/null
@@ -1,19 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.testing;
-
-import org.apache.storm.transactional.ICommitter;
-
-public class KeyedCountingCommitterBolt extends KeyedCountingBatchBolt implements ICommitter {
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/da0bc6a7/storm-client/src/jvm/org/apache/storm/testing/KeyedSummingBatchBolt.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/testing/KeyedSummingBatchBolt.java b/storm-client/src/jvm/org/apache/storm/testing/KeyedSummingBatchBolt.java
deleted file mode 100644
index 1cbbc62..0000000
--- a/storm-client/src/jvm/org/apache/storm/testing/KeyedSummingBatchBolt.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.testing;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.storm.coordination.BatchOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseBatchBolt;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
-
-public class KeyedSummingBatchBolt extends BaseBatchBolt<Object> {
-    BatchOutputCollector _collector;
-    Object _id;
-    Map<Object, Number> _sums = new HashMap<Object, Number>();
-
-    @Override
-    public void prepare(Map<String, Object> conf, TopologyContext context, BatchOutputCollector collector, Object id) {
-        _collector = collector;
-        _id = id;
-    }
-
-    @Override
-    public void execute(Tuple tuple) {
-        Object key = tuple.getValue(1);
-        Number curr = Utils.get(_sums, key, 0);
-        _sums.put(key, curr.longValue() + ((Number) tuple.getValue(2)).longValue());
-    }
-
-    @Override
-    public void finishBatch() {
-        for (Object key : _sums.keySet()) {
-            _collector.emit(new Values(_id, key, _sums.get(key)));
-        }
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields("tx", "key", "sum"));
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/da0bc6a7/storm-client/src/jvm/org/apache/storm/testing/MemoryTransactionalSpout.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/testing/MemoryTransactionalSpout.java b/storm-client/src/jvm/org/apache/storm/testing/MemoryTransactionalSpout.java
deleted file mode 100644
index fb894f3..0000000
--- a/storm-client/src/jvm/org/apache/storm/testing/MemoryTransactionalSpout.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.testing;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.storm.Config;
-import org.apache.storm.coordination.BatchOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.transactional.TransactionAttempt;
-import org.apache.storm.transactional.partitioned.IPartitionedTransactionalSpout;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.utils.ObjectReader;
-import org.apache.storm.utils.RegisteredGlobalState;
-import org.apache.storm.utils.Utils;
-
-public class MemoryTransactionalSpout implements IPartitionedTransactionalSpout<MemoryTransactionalSpoutMeta> {
-    public static final String TX_FIELD = MemoryTransactionalSpout.class.getName() + "/id";
-
-    private String _id;
-    private String _finishedPartitionsId;
-    private int _takeAmt;
-    private Fields _outFields;
-    private Map<Integer, List<List<Object>>> _initialPartitions;
-
-    public MemoryTransactionalSpout(Map<Integer, List<List<Object>>> partitions, Fields outFields, int takeAmt) {
-        _id = RegisteredGlobalState.registerState(partitions);
-        Map<Integer, Boolean> finished = Collections.synchronizedMap(new HashMap<Integer, Boolean>());
-        _finishedPartitionsId = RegisteredGlobalState.registerState(finished);
-        _takeAmt = takeAmt;
-        _outFields = outFields;
-        _initialPartitions = partitions;
-    }
-
-    public boolean isExhaustedTuples() {
-        Map<Integer, Boolean> statuses = getFinishedStatuses();
-        for (Integer partition : getQueues().keySet()) {
-            if (!statuses.containsKey(partition) || !getFinishedStatuses().get(partition)) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    @Override
-    public IPartitionedTransactionalSpout.Coordinator getCoordinator(Map<String, Object> conf, TopologyContext context) {
-        return new Coordinator();
-    }
-
-    @Override
-    public IPartitionedTransactionalSpout.Emitter<MemoryTransactionalSpoutMeta> getEmitter(Map<String, Object> conf,
-                                                                                           TopologyContext context) {
-        return new Emitter(conf);
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        List<String> toDeclare = new ArrayList<>(_outFields.toList());
-        toDeclare.add(0, TX_FIELD);
-        declarer.declare(new Fields(toDeclare));
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        Config conf = new Config();
-        conf.registerSerialization(MemoryTransactionalSpoutMeta.class);
-        return conf;
-    }
-
-    public void startup() {
-        getFinishedStatuses().clear();
-    }
-
-    public void cleanup() {
-        RegisteredGlobalState.clearState(_id);
-        RegisteredGlobalState.clearState(_finishedPartitionsId);
-    }
-
-    private Map<Integer, List<List<Object>>> getQueues() {
-        Map<Integer, List<List<Object>>> ret = (Map<Integer, List<List<Object>>>) RegisteredGlobalState.getState(_id);
-        if (ret != null) {
-            return ret;
-        } else {
-            return _initialPartitions;
-        }
-    }
-
-    private Map<Integer, Boolean> getFinishedStatuses() {
-        return (Map<Integer, Boolean>) RegisteredGlobalState.getState(_finishedPartitionsId);
-    }
-
-    class Coordinator implements IPartitionedTransactionalSpout.Coordinator {
-
-        @Override
-        public int numPartitions() {
-            return getQueues().size();
-        }
-
-        @Override
-        public boolean isReady() {
-            return true;
-        }
-
-        @Override
-        public void close() {
-        }
-    }
-
-    class Emitter implements IPartitionedTransactionalSpout.Emitter<MemoryTransactionalSpoutMeta> {
-
-        Integer _maxSpoutPending;
-        Map<Integer, Integer> _emptyPartitions = new HashMap<>();
-
-        public Emitter(Map<String, Object> conf) {
-            Object c = conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING);
-            if (c == null) {
-                _maxSpoutPending = 1;
-            } else {
-                _maxSpoutPending = ObjectReader.getInt(c);
-            }
-        }
-
-
-        @Override
-        public MemoryTransactionalSpoutMeta emitPartitionBatchNew(TransactionAttempt tx, BatchOutputCollector collector, int partition,
-                                                                  MemoryTransactionalSpoutMeta lastPartitionMeta) {
-            int index;
-            if (lastPartitionMeta == null) {
-                index = 0;
-            } else {
-                index = lastPartitionMeta.index + lastPartitionMeta.amt;
-            }
-            List<List<Object>> queue = getQueues().get(partition);
-            int total = queue.size();
-            int left = total - index;
-            int toTake = Math.min(left, _takeAmt);
-
-            MemoryTransactionalSpoutMeta ret = new MemoryTransactionalSpoutMeta(index, toTake);
-            emitPartitionBatch(tx, collector, partition, ret);
-            if (toTake == 0) {
-                // this is a pretty hacky way to determine when all the partitions have been committed
-                // wait until we've emitted max-spout-pending empty partitions for the partition
-                int curr = Utils.get(_emptyPartitions, partition, 0) + 1;
-                _emptyPartitions.put(partition, curr);
-                if (curr > _maxSpoutPending) {
-                    Map<Integer, Boolean> finishedStatuses = getFinishedStatuses();
-                    // will be null in remote mode
-                    if (finishedStatuses != null) {
-                        finishedStatuses.put(partition, true);
-                    }
-                }
-            }
-            return ret;
-        }
-
-        @Override
-        public void emitPartitionBatch(TransactionAttempt tx, BatchOutputCollector collector, int partition,
-                                       MemoryTransactionalSpoutMeta partitionMeta) {
-            List<List<Object>> queue = getQueues().get(partition);
-            for (int i = partitionMeta.index; i < partitionMeta.index + partitionMeta.amt; i++) {
-                List<Object> toEmit = new ArrayList<>(queue.get(i));
-                toEmit.add(0, tx);
-                collector.emit(toEmit);
-            }
-        }
-
-        @Override
-        public void close() {
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/da0bc6a7/storm-client/src/jvm/org/apache/storm/testing/MemoryTransactionalSpoutMeta.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/testing/MemoryTransactionalSpoutMeta.java b/storm-client/src/jvm/org/apache/storm/testing/MemoryTransactionalSpoutMeta.java
deleted file mode 100644
index 6c2b2fd..0000000
--- a/storm-client/src/jvm/org/apache/storm/testing/MemoryTransactionalSpoutMeta.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.testing;
-
-public class MemoryTransactionalSpoutMeta {
-    int index;
-    int amt;
-
-    // for kryo compatibility
-    public MemoryTransactionalSpoutMeta() {
-
-    }
-
-    public MemoryTransactionalSpoutMeta(int index, int amt) {
-        this.index = index;
-        this.amt = amt;
-    }
-
-    @Override
-    public String toString() {
-        return "index: " + index + "; amt: " + amt;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/da0bc6a7/storm-client/src/jvm/org/apache/storm/testing/OpaqueMemoryTransactionalSpout.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/testing/OpaqueMemoryTransactionalSpout.java b/storm-client/src/jvm/org/apache/storm/testing/OpaqueMemoryTransactionalSpout.java
deleted file mode 100644
index 81fd9e4..0000000
--- a/storm-client/src/jvm/org/apache/storm/testing/OpaqueMemoryTransactionalSpout.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.testing;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.storm.Config;
-import org.apache.storm.coordination.BatchOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.transactional.TransactionAttempt;
-import org.apache.storm.transactional.partitioned.IOpaquePartitionedTransactionalSpout;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.utils.ObjectReader;
-import org.apache.storm.utils.RegisteredGlobalState;
-import org.apache.storm.utils.Utils;
-
-/**
- * This spout only works in local mode.
- */
-public class OpaqueMemoryTransactionalSpout implements IOpaquePartitionedTransactionalSpout<MemoryTransactionalSpoutMeta> {
-    public static final String TX_FIELD = MemoryTransactionalSpout.class.getName() + "/id";
-
-    private String _id;
-    private String _finishedPartitionsId;
-    private String _disabledId;
-    private int _takeAmt;
-    private Fields _outFields;
-
-    public OpaqueMemoryTransactionalSpout(Map<Integer, List<List<Object>>> partitions, Fields outFields, int takeAmt) {
-        _id = RegisteredGlobalState.registerState(partitions);
-
-        Map<Integer, Boolean> finished = Collections.synchronizedMap(new HashMap<Integer, Boolean>());
-        _finishedPartitionsId = RegisteredGlobalState.registerState(finished);
-
-        Map<Integer, Boolean> disabled = Collections.synchronizedMap(new HashMap<Integer, Boolean>());
-        _disabledId = RegisteredGlobalState.registerState(disabled);
-
-        _takeAmt = takeAmt;
-        _outFields = outFields;
-    }
-
-    public void setDisabled(Integer partition, boolean disabled) {
-        getDisabledStatuses().put(partition, disabled);
-    }
-
-    public boolean isExhaustedTuples() {
-        Map<Integer, Boolean> statuses = getFinishedStatuses();
-        for (Integer partition : getQueues().keySet()) {
-            if (!statuses.containsKey(partition) || !getFinishedStatuses().get(partition)) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    @Override
-    public IOpaquePartitionedTransactionalSpout.Emitter<MemoryTransactionalSpoutMeta> getEmitter(Map<String, Object> conf,
-                                                                                                 TopologyContext context) {
-        return new Emitter(conf);
-    }
-
-    @Override
-    public IOpaquePartitionedTransactionalSpout.Coordinator getCoordinator(Map<String, Object> conf, TopologyContext context) {
-        return new Coordinator();
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        List<String> toDeclare = new ArrayList<>(_outFields.toList());
-        toDeclare.add(0, TX_FIELD);
-        declarer.declare(new Fields(toDeclare));
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        Config conf = new Config();
-        conf.registerSerialization(MemoryTransactionalSpoutMeta.class);
-        return conf;
-    }
-
-    public void startup() {
-        getFinishedStatuses().clear();
-    }
-
-    public void cleanup() {
-        RegisteredGlobalState.clearState(_id);
-        RegisteredGlobalState.clearState(_finishedPartitionsId);
-    }
-
-    private Map<Integer, List<List<Object>>> getQueues() {
-        return (Map<Integer, List<List<Object>>>) RegisteredGlobalState.getState(_id);
-    }
-
-    private Map<Integer, Boolean> getFinishedStatuses() {
-        return (Map<Integer, Boolean>) RegisteredGlobalState.getState(_finishedPartitionsId);
-    }
-
-    private Map<Integer, Boolean> getDisabledStatuses() {
-        return (Map<Integer, Boolean>) RegisteredGlobalState.getState(_disabledId);
-    }
-
-    private static class Coordinator implements IOpaquePartitionedTransactionalSpout.Coordinator {
-        @Override
-        public boolean isReady() {
-            return true;
-        }
-
-        @Override
-        public void close() {
-        }
-    }
-
-    class Emitter implements IOpaquePartitionedTransactionalSpout.Emitter<MemoryTransactionalSpoutMeta> {
-
-        Integer _maxSpoutPending;
-        Map<Integer, Integer> _emptyPartitions = new HashMap<>();
-
-        public Emitter(Map<String, Object> conf) {
-            Object c = conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING);
-            if (c == null) {
-                _maxSpoutPending = 1;
-            } else {
-                _maxSpoutPending = ObjectReader.getInt(c);
-            }
-        }
-
-
-        @Override
-        public MemoryTransactionalSpoutMeta emitPartitionBatch(TransactionAttempt tx, BatchOutputCollector collector, int partition,
-                                                               MemoryTransactionalSpoutMeta lastPartitionMeta) {
-            if (!Boolean.FALSE.equals(getDisabledStatuses().get(partition))) {
-                int index;
-                if (lastPartitionMeta == null) {
-                    index = 0;
-                } else {
-                    index = lastPartitionMeta.index + lastPartitionMeta.amt;
-                }
-                List<List<Object>> queue = getQueues().get(partition);
-                int total = queue.size();
-                int left = total - index;
-                int toTake = Math.min(left, _takeAmt);
-
-                MemoryTransactionalSpoutMeta ret = new MemoryTransactionalSpoutMeta(index, toTake);
-                for (int i = ret.index; i < ret.index + ret.amt; i++) {
-                    List<Object> toEmit = new ArrayList<>(queue.get(i));
-                    toEmit.add(0, tx);
-                    collector.emit(toEmit);
-                }
-                if (toTake == 0) {
-                    // this is a pretty hacky way to determine when all the partitions have been committed
-                    // wait until we've emitted max-spout-pending empty partitions for the partition
-                    int curr = Utils.get(_emptyPartitions, partition, 0) + 1;
-                    _emptyPartitions.put(partition, curr);
-                    if (curr > _maxSpoutPending) {
-                        getFinishedStatuses().put(partition, true);
-                    }
-                }
-                return ret;
-            } else {
-                return null;
-            }
-        }
-
-        @Override
-        public void close() {
-        }
-
-        @Override
-        public int numPartitions() {
-            return getQueues().size();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/da0bc6a7/storm-client/src/jvm/org/apache/storm/topology/base/BaseOpaquePartitionedTransactionalSpout.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/topology/base/BaseOpaquePartitionedTransactionalSpout.java b/storm-client/src/jvm/org/apache/storm/topology/base/BaseOpaquePartitionedTransactionalSpout.java
deleted file mode 100644
index 5b08b32..0000000
--- a/storm-client/src/jvm/org/apache/storm/topology/base/BaseOpaquePartitionedTransactionalSpout.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.topology.base;
-
-import org.apache.storm.transactional.partitioned.IOpaquePartitionedTransactionalSpout;
-
-
-public abstract class BaseOpaquePartitionedTransactionalSpout<T> extends BaseComponent implements IOpaquePartitionedTransactionalSpout<T> {
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/da0bc6a7/storm-client/src/jvm/org/apache/storm/topology/base/BasePartitionedTransactionalSpout.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/topology/base/BasePartitionedTransactionalSpout.java b/storm-client/src/jvm/org/apache/storm/topology/base/BasePartitionedTransactionalSpout.java
deleted file mode 100644
index 7e89228..0000000
--- a/storm-client/src/jvm/org/apache/storm/topology/base/BasePartitionedTransactionalSpout.java
+++ /dev/null
@@ -1,19 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.topology.base;
-
-import org.apache.storm.transactional.partitioned.IPartitionedTransactionalSpout;
-
-public abstract class BasePartitionedTransactionalSpout<T> extends BaseComponent implements IPartitionedTransactionalSpout<T> {
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/da0bc6a7/storm-client/src/jvm/org/apache/storm/topology/base/BaseTransactionalBolt.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/topology/base/BaseTransactionalBolt.java b/storm-client/src/jvm/org/apache/storm/topology/base/BaseTransactionalBolt.java
deleted file mode 100644
index 8137d6d..0000000
--- a/storm-client/src/jvm/org/apache/storm/topology/base/BaseTransactionalBolt.java
+++ /dev/null
@@ -1,19 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.topology.base;
-
-import org.apache.storm.transactional.TransactionAttempt;
-
-public abstract class BaseTransactionalBolt extends BaseBatchBolt<TransactionAttempt> {
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/da0bc6a7/storm-client/src/jvm/org/apache/storm/topology/base/BaseTransactionalSpout.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/topology/base/BaseTransactionalSpout.java b/storm-client/src/jvm/org/apache/storm/topology/base/BaseTransactionalSpout.java
deleted file mode 100644
index 6fae726..0000000
--- a/storm-client/src/jvm/org/apache/storm/topology/base/BaseTransactionalSpout.java
+++ /dev/null
@@ -1,19 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.topology.base;
-
-import org.apache.storm.transactional.ITransactionalSpout;
-
-public abstract class BaseTransactionalSpout<T> extends BaseComponent implements ITransactionalSpout<T> {
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/da0bc6a7/storm-client/src/jvm/org/apache/storm/transactional/ICommitter.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/transactional/ICommitter.java b/storm-client/src/jvm/org/apache/storm/transactional/ICommitter.java
deleted file mode 100644
index cd2fdd1..0000000
--- a/storm-client/src/jvm/org/apache/storm/transactional/ICommitter.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.transactional;
-
-/**
- * This marks an IBatchBolt within a transactional topology as a committer. This causes the finishBatch method to be called in order of the
- * transactions.
- */
-public interface ICommitter {
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/da0bc6a7/storm-client/src/jvm/org/apache/storm/transactional/ICommitterTransactionalSpout.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/transactional/ICommitterTransactionalSpout.java b/storm-client/src/jvm/org/apache/storm/transactional/ICommitterTransactionalSpout.java
deleted file mode 100644
index 366cd13..0000000
--- a/storm-client/src/jvm/org/apache/storm/transactional/ICommitterTransactionalSpout.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.transactional;
-
-import java.util.Map;
-import org.apache.storm.task.TopologyContext;
-
-
-public interface ICommitterTransactionalSpout<X> extends ITransactionalSpout<X> {
-    @Override
-    public Emitter getEmitter(Map<String, Object> conf, TopologyContext context);
-
-    public interface Emitter extends ITransactionalSpout.Emitter {
-        void commit(TransactionAttempt attempt);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/da0bc6a7/storm-client/src/jvm/org/apache/storm/transactional/ITransactionalSpout.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/transactional/ITransactionalSpout.java b/storm-client/src/jvm/org/apache/storm/transactional/ITransactionalSpout.java
deleted file mode 100644
index e873336..0000000
--- a/storm-client/src/jvm/org/apache/storm/transactional/ITransactionalSpout.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.transactional;
-
-import java.math.BigInteger;
-import java.util.Map;
-import org.apache.storm.coordination.BatchOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.IComponent;
-
-public interface ITransactionalSpout<T> extends IComponent {
-    /**
-     * The coordinator for a TransactionalSpout runs in a single thread and indicates when batches of tuples should be emitted and when
-     * transactions should commit. The Coordinator that you provide in a TransactionalSpout provides metadata for each transaction so that
-     * the transactions can be replayed.
-     */
-    Coordinator<T> getCoordinator(Map<String, Object> conf, TopologyContext context);
-
-    /**
-     * The emitter for a TransactionalSpout runs as many tasks across the cluster. Emitters are responsible for emitting batches of tuples
-     * for a transaction and must ensure that the same batch of tuples is always emitted for the same transaction id.
-     */
-    Emitter<T> getEmitter(Map<String, Object> conf, TopologyContext context);
-
-    public interface Coordinator<X> {
-        /**
-         * Create metadata for this particular transaction id which has never been emitted before. The metadata should contain whatever is
-         * necessary to be able to replay the exact batch for the transaction at a later point.
-         *
-         * The metadata is stored in Zookeeper.
-         *
-         * Storm uses the Kryo serializations configured in the component configuration for this spout to serialize and deserialize the
-         * metadata.
-         *
-         * @param txid         The id of the transaction.
-         * @param prevMetadata The metadata of the previous transaction
-         * @return the metadata for this new transaction
-         */
-        X initializeTransaction(BigInteger txid, X prevMetadata);
-
-        /**
-         * Returns true if its ok to emit start a new transaction, false otherwise (will skip this transaction).
-         *
-         * You should sleep here if you want a delay between asking for the next transaction (this will be called repeatedly in a loop).
-         */
-        boolean isReady();
-
-        /**
-         * Release any resources from this coordinator.
-         */
-        void close();
-    }
-
-    public interface Emitter<X> {
-        /**
-         * Emit a batch for the specified transaction attempt and metadata for the transaction. The metadata was created by the Coordinator
-         * in the initializeTranaction method. This method must always emit the same batch of tuples across all tasks for the same
-         * transaction id.
-         *
-         * The first field of all emitted tuples must contain the provided TransactionAttempt.
-         */
-        void emitBatch(TransactionAttempt tx, X coordinatorMeta, BatchOutputCollector collector);
-
-        /**
-         * Any state for transactions prior to the provided transaction id can be safely cleaned up, so this method should clean up that
-         * state.
-         */
-        void cleanupBefore(BigInteger txid);
-
-        /**
-         * Release any resources held by this emitter.
-         */
-        void close();
-    }
-}