You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2016/03/17 20:34:47 UTC

[1/5] storm git commit: STORM-1614: backpressure init and cleanup changes for 1.x branch

Repository: storm
Updated Branches:
  refs/heads/1.x-branch 4fe225cd0 -> fc64e158f


STORM-1614: backpressure init and cleanup changes for  1.x branch


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/28b96d7d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/28b96d7d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/28b96d7d

Branch: refs/heads/1.x-branch
Commit: 28b96d7db2313e4fc5d47729416594928886c696
Parents: 821d4ef
Author: Alessandro Bellina <ab...@yahoo-inc.com>
Authored: Fri Mar 11 08:25:58 2016 -0600
Committer: Alessandro Bellina <ab...@yahoo-inc.com>
Committed: Fri Mar 11 09:37:01 2016 -0600

----------------------------------------------------------------------
 storm-core/src/clj/org/apache/storm/cluster.clj |  14 +-
 .../src/clj/org/apache/storm/daemon/nimbus.clj  |  15 ++-
 .../src/clj/org/apache/storm/daemon/worker.clj  |   2 +
 .../test/clj/org/apache/storm/nimbus_test.clj   | 129 +++++++++++++++++++
 4 files changed, 151 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/28b96d7d/storm-core/src/clj/org/apache/storm/cluster.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/cluster.clj b/storm-core/src/clj/org/apache/storm/cluster.clj
index 5315f1a..760f330 100644
--- a/storm-core/src/clj/org/apache/storm/cluster.clj
+++ b/storm-core/src/clj/org/apache/storm/cluster.clj
@@ -73,6 +73,7 @@
   (teardown-topology-errors! [this storm-id])
   (heartbeat-storms [this])
   (error-topologies [this])
+  (backpressure-topologies [this])
   (set-topology-log-config! [this storm-id log-config])
   (topology-log-config [this storm-id cb])
   (worker-heartbeat! [this storm-id node port info])
@@ -277,7 +278,7 @@
                          ;; this should never happen
                          (exit-process! 30 "Unknown callback for subtree " subtree args)))))]
     (doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE WORKERBEATS-SUBTREE ERRORS-SUBTREE BLOBSTORE-SUBTREE NIMBUSES-SUBTREE
-               LOGCONFIG-SUBTREE]]
+               LOGCONFIG-SUBTREE BACKPRESSURE-ROOT]]
       (.mkdirs cluster-state p acls))
     (reify
       StormClusterState
@@ -368,6 +369,10 @@
         [this]
         (.get_children cluster-state ERRORS-SUBTREE false))
 
+      (backpressure-topologies
+        [this]
+        (.get_children cluster-state BACKPRESSURE-SUBTREE false))
+
       (get-worker-heartbeat
         [this storm-id node port]
         (let [worker-hb (.get_worker_hb cluster-state (workerbeat-path storm-id node port) false)]
@@ -505,8 +510,11 @@
 
       (remove-worker-backpressure!
         [this storm-id node port]
-        (.delete_node cluster-state (backpressure-path storm-id node port)))
-
+        (let [path (backpressure-path storm-id node port)
+              existed (.node_exists cluster-state path false)]
+          (if existed
+            (.delete_node cluster-state (backpressure-path storm-id node port)))))
+    
       (teardown-topology-errors!
         [this storm-id]
         (try-cause

http://git-wip-us.apache.org/repos/asf/storm/blob/28b96d7d/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 6145725..e79b94d 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -1043,12 +1043,13 @@
                 (filter [this key] (get-id-from-blob-key key)))]
     (set (.filterAndListKeys blob-store to-id))))
 
-(defn cleanup-storm-ids [conf storm-cluster-state blob-store]
+(defn cleanup-storm-ids [storm-cluster-state blob-store]
   (let [heartbeat-ids (set (.heartbeat-storms storm-cluster-state))
         error-ids (set (.error-topologies storm-cluster-state))
         code-ids (code-ids blob-store)
+        backpressure-ids (set (.backpressure-topologies storm-cluster-state))
         assigned-ids (set (.active-storms storm-cluster-state))]
-    (set/difference (set/union heartbeat-ids error-ids code-ids) assigned-ids)
+    (set/difference (set/union heartbeat-ids error-ids backpressure-ids code-ids) assigned-ids)
     ))
 
 (defn extract-status-str [base]
@@ -1120,6 +1121,9 @@
   (blob-rm-key blob-store (master-stormconf-key id) storm-cluster-state)
   (blob-rm-key blob-store (master-stormcode-key id) storm-cluster-state))
 
+(defn force-delete-dir [conf id]
+  (rmr (master-stormdist-root conf id)))
+
 (defn do-cleanup [nimbus]
   (if (is-leader nimbus :throw-exception false)
     (let [storm-cluster-state (:storm-cluster-state nimbus)
@@ -1127,13 +1131,14 @@
           submit-lock (:submit-lock nimbus)
           blob-store (:blob-store nimbus)]
       (let [to-cleanup-ids (locking submit-lock
-                             (cleanup-storm-ids conf storm-cluster-state blob-store))]
+                             (cleanup-storm-ids storm-cluster-state blob-store))]
         (when-not (empty? to-cleanup-ids)
           (doseq [id to-cleanup-ids]
             (log-message "Cleaning up " id)
             (.teardown-heartbeats! storm-cluster-state id)
             (.teardown-topology-errors! storm-cluster-state id)
-            (rmr (master-stormdist-root conf id))
+            (.remove-backpressure! storm-cluster-state id)
+            (force-delete-dir conf id)
             (blob-rm-topology-keys id blob-store storm-cluster-state)
             (swap! (:heartbeats-cache nimbus) dissoc id)))))
     (log-message "not a leader, skipping cleanup")))
@@ -1555,8 +1560,6 @@
                            )]
             (transition-name! nimbus storm-name [:kill wait-amt] true)
             (notify-topology-action-listener nimbus storm-name operation))
-          (if (topology-conf TOPOLOGY-BACKPRESSURE-ENABLE)
-            (.remove-backpressure! (:storm-cluster-state nimbus) storm-id))
           (add-topology-to-history-log (get-storm-id (:storm-cluster-state nimbus) storm-name)
             nimbus topology-conf)))
 

http://git-wip-us.apache.org/repos/asf/storm/blob/28b96d7d/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index 9d9c482..778e83d 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -688,6 +688,8 @@
                     (run-worker-shutdown-hooks worker)
 
                     (.remove-worker-heartbeat! (:storm-cluster-state worker) storm-id assignment-id port)
+                    (.remove-worker-backpressure! (:storm-cluster-state worker) storm-id assignment-id port)
+
                     (log-message "Disconnecting from storm cluster state context")
                     (.disconnect (:storm-cluster-state worker))
                     (.close (:cluster-state worker))

http://git-wip-us.apache.org/repos/asf/storm/blob/28b96d7d/storm-core/test/clj/org/apache/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
index 6b51d3c..0e6e4b6 100644
--- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj
+++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
@@ -1514,3 +1514,132 @@
 
            (is (= (.get_action (.get levels "other-test")) LogLevelAction/UNCHANGED))
            (is (= (.get_target_log_level (.get levels "other-test")) "DEBUG")))))))
+
+(defn teardown-heartbeats [id])
+(defn teardown-topo-errors [id])
+(defn teardown-backpressure-dirs [id])
+
+(defn mock-cluster-state 
+  ([] 
+    (mock-cluster-state nil nil))
+  ([active-topos inactive-topos]
+    (mock-cluster-state active-topos inactive-topos inactive-topos inactive-topos)) 
+  ([active-topos hb-topos error-topos bp-topos]
+    (reify cluster/StormClusterState
+      (teardown-heartbeats! [this id] (teardown-heartbeats id))
+      (teardown-topology-errors! [this id] (teardown-topo-errors id))
+      (remove-backpressure! [this id] (teardown-backpressure-dirs id))
+      (active-storms [this] active-topos)
+      (heartbeat-storms [this] hb-topos)
+      (error-topologies [this] error-topos)
+      (backpressure-topologies [this] bp-topos))))
+
+(deftest cleanup-storm-ids-returns-inactive-topos
+  (let [mock-state (mock-cluster-state (list "topo1") (list "topo1" "topo2" "topo3"))]
+    (stubbing [nimbus/is-leader true
+               nimbus/code-ids {}] 
+    (is (= (nimbus/cleanup-storm-ids mock-state nil) #{"topo2" "topo3"})))))
+
+(deftest cleanup-storm-ids-performs-union-of-storm-ids-with-active-znodes
+  (let [active-topos (list "hb1" "e2" "bp3")
+        hb-topos (list "hb1" "hb2" "hb3")
+        error-topos (list "e1" "e2" "e3")
+        bp-topos (list "bp1" "bp2" "bp3")
+        mock-state (mock-cluster-state active-topos hb-topos error-topos bp-topos)]
+    (stubbing [nimbus/is-leader true
+               nimbus/code-ids {}] 
+    (is (= (nimbus/cleanup-storm-ids mock-state nil) 
+           #{"hb2" "hb3" "e1" "e3" "bp1" "bp2"})))))
+
+(deftest cleanup-storm-ids-returns-empty-set-when-all-topos-are-active
+  (let [active-topos (list "hb1" "hb2" "hb3" "e1" "e2" "e3" "bp1" "bp2" "bp3")
+        hb-topos (list "hb1" "hb2" "hb3")
+        error-topos (list "e1" "e2" "e3")
+        bp-topos (list "bp1" "bp2" "bp3")
+        mock-state (mock-cluster-state active-topos hb-topos error-topos bp-topos)]
+    (stubbing [nimbus/is-leader true
+               nimbus/code-ids {}] 
+    (is (= (nimbus/cleanup-storm-ids mock-state nil) 
+           #{})))))
+
+(deftest do-cleanup-removes-inactive-znodes
+  (let [inactive-topos (list "topo2" "topo3")
+        hb-cache (atom (into {}(map vector inactive-topos '(nil nil))))
+        mock-state (mock-cluster-state)
+        mock-blob-store {}
+        conf {}
+        nimbus {:conf conf
+                :submit-lock mock-blob-store 
+                :blob-store {}
+                :storm-cluster-state mock-state
+                :heartbeats-cache hb-cache}]
+
+    (stubbing [nimbus/is-leader true
+               nimbus/blob-rm-topology-keys nil
+               nimbus/cleanup-storm-ids inactive-topos]
+      (mocking
+        [teardown-heartbeats 
+         teardown-topo-errors 
+         teardown-backpressure-dirs
+         nimbus/force-delete-dir
+         nimbus/blob-rm-topology-keys] 
+
+        (nimbus/do-cleanup nimbus)
+
+        ;; removed heartbeats znode
+        (verify-nth-call-args-for 1 teardown-heartbeats "topo2")
+        (verify-nth-call-args-for 2 teardown-heartbeats "topo3")
+
+        ;; removed topo errors znode
+        (verify-nth-call-args-for 1 teardown-topo-errors "topo2")
+        (verify-nth-call-args-for 2 teardown-topo-errors "topo3")
+
+        ;; removed backpressure znodes
+        (verify-nth-call-args-for 1 teardown-backpressure-dirs "topo2")
+        (verify-nth-call-args-for 2 teardown-backpressure-dirs "topo3")
+
+        ;; removed topo directories
+        (verify-nth-call-args-for 1 nimbus/force-delete-dir conf "topo2")
+        (verify-nth-call-args-for 2 nimbus/force-delete-dir conf "topo3")
+
+        ;; removed blob store topo keys
+        (verify-nth-call-args-for 1 nimbus/blob-rm-topology-keys "topo2" mock-blob-store mock-state)
+        (verify-nth-call-args-for 2 nimbus/blob-rm-topology-keys "topo3" mock-blob-store mock-state)
+
+        ;; remove topos from heartbeat cache
+        (is (= (count @hb-cache) 0))))))
+
+(deftest do-cleanup-does-not-teardown-active-topos
+  (let [inactive-topos ()
+        hb-cache (atom {"topo1" nil "topo2" nil})
+        mock-state (mock-cluster-state)
+        mock-blob-store {}
+        conf {}
+        nimbus {:conf conf
+                :submit-lock mock-blob-store 
+                :blob-store {}
+                :storm-cluster-state mock-state
+                :heartbeats-cache hb-cache}]
+
+    (stubbing [nimbus/is-leader true
+               nimbus/blob-rm-topology-keys nil
+               nimbus/cleanup-storm-ids inactive-topos]
+      (mocking
+        [teardown-heartbeats 
+         teardown-topo-errors 
+         teardown-backpressure-dirs
+         nimbus/force-delete-dir
+         nimbus/blob-rm-topology-keys] 
+
+        (nimbus/do-cleanup nimbus)
+
+        (verify-call-times-for teardown-heartbeats 0)
+        (verify-call-times-for teardown-topo-errors 0)
+        (verify-call-times-for teardown-backpressure-dirs 0)
+        (verify-call-times-for nimbus/force-delete-dir 0)
+        (verify-call-times-for nimbus/blob-rm-topology-keys 0)
+
+        ;; hb-cache goes down to 1 because only one topo was inactive
+        (is (= (count @hb-cache) 2))
+        (is (contains? @hb-cache "topo1"))
+        (is (contains? @hb-cache "topo2"))))))


[4/5] storm git commit: add STORM-1614 to changelog

Posted by pt...@apache.org.
add STORM-1614 to changelog


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/00280de7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/00280de7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/00280de7

Branch: refs/heads/1.x-branch
Commit: 00280de7321f762b175005cc964c688d3f256d0b
Parents: 2529e9f
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Thu Mar 17 15:32:29 2016 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Thu Mar 17 15:32:29 2016 -0400

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/00280de7/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index a959dbe..6697ed0 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 1.0.0
+ * STORM-1614: backpressure init and cleanup changes
  * STORM-971: Metric for messages lost due to kafka retention
  * STORM-1608: Fix stateful topology acking behavior
  * STORM-1609: Netty Client is not best effort delivery on failed Connection


[5/5] storm git commit: Merge branch '1.x-branch' of https://git-wip-us.apache.org/repos/asf/storm into 1.x-branch

Posted by pt...@apache.org.
Merge branch '1.x-branch' of https://git-wip-us.apache.org/repos/asf/storm into 1.x-branch


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fc64e158
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fc64e158
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fc64e158

Branch: refs/heads/1.x-branch
Commit: fc64e158f0bfcdccb0a278f81159a2dbe88affea
Parents: 00280de 4fe225c
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Thu Mar 17 15:34:30 2016 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Thu Mar 17 15:34:30 2016 -0400

----------------------------------------------------------------------
 CHANGELOG.md                                    |   2 +
 external/storm-mongodb/README.md                | 195 +++++++++++++++++++
 external/storm-mongodb/pom.xml                  |  74 +++++++
 .../storm/mongodb/bolt/AbstractMongoBolt.java   |  56 ++++++
 .../storm/mongodb/bolt/MongoInsertBolt.java     |  62 ++++++
 .../storm/mongodb/bolt/MongoUpdateBolt.java     |  75 +++++++
 .../storm/mongodb/common/MongoDBClient.java     |  91 +++++++++
 .../mongodb/common/QueryFilterCreator.java      |  38 ++++
 .../common/SimpleQueryFilterCreator.java        |  39 ++++
 .../mongodb/common/mapper/MongoMapper.java      |  38 ++++
 .../common/mapper/SimpleMongoMapper.java        |  40 ++++
 .../common/mapper/SimpleMongoUpdateMapper.java  |  41 ++++
 .../storm/mongodb/trident/state/MongoState.java |  97 +++++++++
 .../trident/state/MongoStateFactory.java        |  42 ++++
 .../trident/state/MongoStateUpdater.java        |  34 ++++
 .../storm/mongodb/topology/InsertWordCount.java |  81 ++++++++
 .../storm/mongodb/topology/UpdateWordCount.java |  91 +++++++++
 .../storm/mongodb/topology/WordCounter.java     |  67 +++++++
 .../storm/mongodb/topology/WordSpout.java       |  88 +++++++++
 .../storm/mongodb/trident/WordCountTrident.java |  85 ++++++++
 pom.xml                                         |   1 +
 storm-core/src/clj/org/apache/storm/clojure.clj |   3 +
 .../src/clj/org/apache/storm/daemon/acker.clj   |  10 +-
 .../src/clj/org/apache/storm/daemon/common.clj  |   9 +-
 .../clj/org/apache/storm/daemon/executor.clj    |  10 +
 .../storm/coordination/CoordinatedBolt.java     |   4 +
 .../org/apache/storm/task/IOutputCollector.java |   1 +
 .../org/apache/storm/task/OutputCollector.java  |  11 ++
 .../storm/topology/BasicOutputCollector.java    |  10 +
 .../storm/topology/IBasicOutputCollector.java   |   2 +
 .../trident/topology/TridentBoltExecutor.java   |   4 +
 .../org/apache/storm/integration_test.clj       |  51 ++++-
 storm-dist/binary/src/main/assembly/binary.xml  |  14 ++
 33 files changed, 1461 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/fc64e158/CHANGELOG.md
----------------------------------------------------------------------
diff --cc CHANGELOG.md
index 6697ed0,f765315..bb139d3
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@@ -1,5 -1,6 +1,7 @@@
  ## 1.0.0
+  * STORM-1483: add storm-mongodb connector
 + * STORM-1614: backpressure init and cleanup changes
+  * STORM-1549: Add support for resetting tuple timeout from bolts via the OutputCollector
   * STORM-971: Metric for messages lost due to kafka retention
   * STORM-1608: Fix stateful topology acking behavior
   * STORM-1609: Netty Client is not best effort delivery on failed Connection


[3/5] storm git commit: Merge branch 'STORM-1614_backpressure_init_and_cleanup_changes_1x' of github.com:abellina/storm into 1.x-branch

Posted by pt...@apache.org.
Merge branch 'STORM-1614_backpressure_init_and_cleanup_changes_1x' of github.com:abellina/storm into 1.x-branch


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2529e9f8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2529e9f8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2529e9f8

Branch: refs/heads/1.x-branch
Commit: 2529e9f862d0e62b17afec9d54ba9945334c0efd
Parents: f0abfff 9322283
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Thu Mar 17 15:30:35 2016 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Thu Mar 17 15:30:35 2016 -0400

----------------------------------------------------------------------
 storm-core/src/clj/org/apache/storm/cluster.clj |  14 +-
 .../src/clj/org/apache/storm/daemon/nimbus.clj  |  15 ++-
 .../src/clj/org/apache/storm/daemon/worker.clj  |   2 +
 .../test/clj/org/apache/storm/nimbus_test.clj   | 129 +++++++++++++++++++
 4 files changed, 151 insertions(+), 9 deletions(-)
----------------------------------------------------------------------



[2/5] storm git commit: STORM-1614: force-delete-dir -> force-delete-topo-dist-dir

Posted by pt...@apache.org.
STORM-1614: force-delete-dir -> force-delete-topo-dist-dir


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9322283a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9322283a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9322283a

Branch: refs/heads/1.x-branch
Commit: 9322283a6c205697a50d1fadc3345e0af798d26b
Parents: 28b96d7
Author: Alessandro Bellina <ab...@yahoo-inc.com>
Authored: Tue Mar 15 10:07:25 2016 -0500
Committer: Alessandro Bellina <ab...@yahoo-inc.com>
Committed: Tue Mar 15 10:07:25 2016 -0500

----------------------------------------------------------------------
 storm-core/src/clj/org/apache/storm/daemon/nimbus.clj |  4 ++--
 storm-core/test/clj/org/apache/storm/nimbus_test.clj  | 10 +++++-----
 2 files changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/9322283a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index e79b94d..d34ddb1 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -1121,7 +1121,7 @@
   (blob-rm-key blob-store (master-stormconf-key id) storm-cluster-state)
   (blob-rm-key blob-store (master-stormcode-key id) storm-cluster-state))
 
-(defn force-delete-dir [conf id]
+(defn force-delete-topo-dist-dir [conf id]
   (rmr (master-stormdist-root conf id)))
 
 (defn do-cleanup [nimbus]
@@ -1138,7 +1138,7 @@
             (.teardown-heartbeats! storm-cluster-state id)
             (.teardown-topology-errors! storm-cluster-state id)
             (.remove-backpressure! storm-cluster-state id)
-            (force-delete-dir conf id)
+            (force-delete-topo-dist-dir conf id)
             (blob-rm-topology-keys id blob-store storm-cluster-state)
             (swap! (:heartbeats-cache nimbus) dissoc id)))))
     (log-message "not a leader, skipping cleanup")))

http://git-wip-us.apache.org/repos/asf/storm/blob/9322283a/storm-core/test/clj/org/apache/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
index 0e6e4b6..0788fce 100644
--- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj
+++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
@@ -1581,7 +1581,7 @@
         [teardown-heartbeats 
          teardown-topo-errors 
          teardown-backpressure-dirs
-         nimbus/force-delete-dir
+         nimbus/force-delete-topo-dist-dir
          nimbus/blob-rm-topology-keys] 
 
         (nimbus/do-cleanup nimbus)
@@ -1599,8 +1599,8 @@
         (verify-nth-call-args-for 2 teardown-backpressure-dirs "topo3")
 
         ;; removed topo directories
-        (verify-nth-call-args-for 1 nimbus/force-delete-dir conf "topo2")
-        (verify-nth-call-args-for 2 nimbus/force-delete-dir conf "topo3")
+        (verify-nth-call-args-for 1 nimbus/force-delete-topo-dist-dir conf "topo2")
+        (verify-nth-call-args-for 2 nimbus/force-delete-topo-dist-dir conf "topo3")
 
         ;; removed blob store topo keys
         (verify-nth-call-args-for 1 nimbus/blob-rm-topology-keys "topo2" mock-blob-store mock-state)
@@ -1628,7 +1628,7 @@
         [teardown-heartbeats 
          teardown-topo-errors 
          teardown-backpressure-dirs
-         nimbus/force-delete-dir
+         nimbus/force-delete-topo-dist-dir
          nimbus/blob-rm-topology-keys] 
 
         (nimbus/do-cleanup nimbus)
@@ -1636,7 +1636,7 @@
         (verify-call-times-for teardown-heartbeats 0)
         (verify-call-times-for teardown-topo-errors 0)
         (verify-call-times-for teardown-backpressure-dirs 0)
-        (verify-call-times-for nimbus/force-delete-dir 0)
+        (verify-call-times-for nimbus/force-delete-topo-dist-dir 0)
         (verify-call-times-for nimbus/blob-rm-topology-keys 0)
 
         ;; hb-cache goes down to 1 because only one topo was inactive