You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/02/18 18:09:40 UTC

[4/9] storm git commit: cleaning up

cleaning up


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b09d8ca1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b09d8ca1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b09d8ca1

Branch: refs/heads/master
Commit: b09d8ca14798cb1b344038be5bea4fb53428d6a2
Parents: 4243e4e
Author: Boyang Jerry Peng <je...@yahoo-inc.com>
Authored: Thu Feb 11 15:01:43 2016 -0600
Committer: Boyang Jerry Peng <je...@yahoo-inc.com>
Committed: Thu Feb 18 10:35:56 2016 -0600

----------------------------------------------------------------------
 .../clj/org/apache/storm/daemon/executor.clj    |  16 ---
 .../clj/org/apache/storm/daemon/logviewer.clj   |   8 --
 .../src/clj/org/apache/storm/daemon/nimbus.clj  |  62 ++-------
 .../clj/org/apache/storm/daemon/supervisor.clj  |  70 ++--------
 .../src/clj/org/apache/storm/daemon/worker.clj  |  45 -------
 storm-core/src/clj/org/apache/storm/timer.clj   | 128 -------------------
 .../src/jvm/org/apache/storm/StormTimer.java    |  39 ++++--
 .../test/clj/org/apache/storm/nimbus_test.clj   |   1 -
 .../clj/org/apache/storm/supervisor_test.clj    |   4 +-
 .../test/jvm/org/apache/storm/TestTimer.java    |  57 ---------
 10 files changed, 43 insertions(+), 387 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b09d8ca1/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index f46f18b..2afb853 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -324,14 +324,6 @@
   (let [{:keys [storm-conf receive-queue worker-context interval->task->metric-registry]} executor-data
         distinct-time-bucket-intervals (keys interval->task->metric-registry)]
     (doseq [interval distinct-time-bucket-intervals]
-;      (schedule-recurring
-;       (:user-timer (:worker executor-data))
-;       interval
-;       interval
-;       (fn []
-;         (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. worker-context [interval] Constants/SYSTEM_TASK_ID Constants/METRICS_TICK_STREAM_ID))]]
-;           (disruptor/publish receive-queue val))))
-
       (StormTimer/scheduleRecurring
         (:user-timer (:worker executor-data))
         interval
@@ -375,14 +367,6 @@
               (and (= false (storm-conf TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS))
                    (= :spout (:type executor-data))))
         (log-message "Timeouts disabled for executor " (:component-id executor-data) ":" (:executor-id executor-data))
-;        (schedule-recurring
-;          (:user-timer worker)
-;          tick-time-secs
-;          tick-time-secs
-;          (fn []
-;            (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID Constants/SYSTEM_TICK_STREAM_ID))]]
-;              (disruptor/publish receive-queue val))))
-
         (StormTimer/scheduleRecurring
           (:user-timer worker)
           tick-time-secs

http://git-wip-us.apache.org/repos/asf/storm/blob/b09d8ca1/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj b/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
index 932a813..16815f9 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
@@ -264,14 +264,6 @@
   (let [interval-secs (conf LOGVIEWER-CLEANUP-INTERVAL-SECS)]
     (when interval-secs
       (log-debug "starting log cleanup thread at interval: " interval-secs)
-;      (schedule-recurring (mk-timer :thread-name "logviewer-cleanup"
-;                                    :kill-fn (fn [t]
-;                                               (log-error t "Error when doing logs cleanup")
-;                                               (Utils/exitProcess 20 "Error when doing log cleanup")))
-;                          0 ;; Start immediately.
-;                          interval-secs
-;                          (fn [] (cleanup-fn! log-root-dir)))
-
       (StormTimer/scheduleRecurring
         (StormTimer/mkTimer "logviewer-cleanup"
           (reify StormTimer$TimerFunc

http://git-wip-us.apache.org/repos/asf/storm/blob/b09d8ca1/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index d6413db..0d0b27a 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -194,17 +194,12 @@
      :blob-listers (mk-bloblist-cache-map conf)
      :uptime (Utils/makeUptimeComputer)
      :validator (Utils/newInstance (conf NIMBUS-TOPOLOGY-VALIDATOR))
-;     :timer (mk-timer :kill-fn (fn [t]
-;                                 (log-error t "Error when processing event")
-;                                 (Utils/exitProcess 20 "Error when processing an event")
-;                                 ))
      :timer (StormTimer/mkTimer nil
               (reify StormTimer$TimerFunc
                 (^void run
                   [this ^Object t]
                   (log-error t "Error when processing event")
                   (Utils/exitProcess 20 "Error when processing an event"))))
-
      :scheduler (mk-scheduler conf inimbus)
      :leader-elector (Zookeeper/zkLeaderElector conf)
      :id->sched-status (atom {})
@@ -387,12 +382,7 @@
 
 (defn delay-event [nimbus storm-id delay-secs event]
   (log-message "Delaying event " event " for " delay-secs " secs for " storm-id)
-;  (schedule (:timer nimbus)
-;            delay-secs
-;            #(transition! nimbus storm-id event false)
-;            )
-  (StormTimer/schedule
-    (:timer nimbus)
+  (StormTimer/schedule (:timer nimbus)
     delay-secs
     (reify StormTimer$TimerFunc
       (^void run
@@ -1457,16 +1447,8 @@
     (when (is-leader nimbus :throw-exception false)
       (doseq [storm-id (.active-storms (:storm-cluster-state nimbus))]
         (transition! nimbus storm-id :startup)))
-;    (schedule-recurring (:timer nimbus)
-;                        0
-;                        (conf NIMBUS-MONITOR-FREQ-SECS)
-;                        (fn []
-;                          (when-not (conf ConfigUtils/NIMBUS_DO_NOT_REASSIGN)
-;                            (locking (:submit-lock nimbus)
-;                              (mk-assignments nimbus)))
-;                          (do-cleanup nimbus)))
-    (StormTimer/scheduleRecurring
-      (:timer nimbus)
+
+    (StormTimer/scheduleRecurring (:timer nimbus)
       0
       (conf NIMBUS-MONITOR-FREQ-SECS)
       (reify StormTimer$TimerFunc
@@ -1477,14 +1459,7 @@
               (mk-assignments nimbus)))
           (do-cleanup nimbus))))
     ;; Schedule Nimbus inbox cleaner
-;    (schedule-recurring (:timer nimbus)
-;                        0
-;                        (conf NIMBUS-CLEANUP-INBOX-FREQ-SECS)
-;                        (fn []
-;                          (clean-inbox (inbox nimbus) (conf NIMBUS-INBOX-JAR-EXPIRATION-SECS))))
-
-    (StormTimer/scheduleRecurring
-      (:timer nimbus)
+    (StormTimer/scheduleRecurring (:timer nimbus)
       0
       (conf NIMBUS-CLEANUP-INBOX-FREQ-SECS)
       (reify StormTimer$TimerFunc
@@ -1493,13 +1468,7 @@
           (clean-inbox (inbox nimbus) (conf NIMBUS-INBOX-JAR-EXPIRATION-SECS)))))
     ;; Schedule nimbus code sync thread to sync code from other nimbuses.
     (if (instance? LocalFsBlobStore blob-store)
-;      (schedule-recurring (:timer nimbus)
-;                          0
-;                          (conf NIMBUS-CODE-SYNC-FREQ-SECS)
-;                          (fn []
-;                            (blob-sync conf nimbus)))
-      (StormTimer/scheduleRecurring
-        (:timer nimbus)
+      (StormTimer/scheduleRecurring (:timer nimbus)
         0
         (conf NIMBUS-CODE-SYNC-FREQ-SECS)
         (reify StormTimer$TimerFunc
@@ -1508,26 +1477,14 @@
             (blob-sync conf nimbus)))))
     ;; Schedule topology history cleaner
     (when-let [interval (conf LOGVIEWER-CLEANUP-INTERVAL-SECS)]
-;      (schedule-recurring (:timer nimbus)
-;        0
-;        (conf LOGVIEWER-CLEANUP-INTERVAL-SECS)
-;        (fn []
-;          (clean-topology-history (conf LOGVIEWER-CLEANUP-AGE-MINS) nimbus)))
-      (StormTimer/scheduleRecurring
-        (:timer nimbus)
+      (StormTimer/scheduleRecurring (:timer nimbus)
         0
         (conf LOGVIEWER-CLEANUP-INTERVAL-SECS)
         (reify StormTimer$TimerFunc
           (^void run
             [this ^Object t]
             (clean-topology-history (conf LOGVIEWER-CLEANUP-AGE-MINS) nimbus)))))
-;    (schedule-recurring (:timer nimbus)
-;                        0
-;                        (conf NIMBUS-CREDENTIAL-RENEW-FREQ-SECS)
-;                        (fn []
-;                          (renew-credentials nimbus)))
-    (StormTimer/scheduleRecurring
-      (:timer nimbus)
+    (StormTimer/scheduleRecurring (:timer nimbus)
       0
       (conf NIMBUS-CREDENTIAL-RENEW-FREQ-SECS)
       (reify StormTimer$TimerFunc
@@ -2265,7 +2222,6 @@
       (shutdown [this]
         (mark! nimbus:num-shutdown-calls)
         (log-message "Shutting down master")
-        ;(cancel-timer (:timer nimbus))
         (StormTimer/cancelTimer (:timer nimbus))
         (.disconnect (:storm-cluster-state nimbus))
         (.cleanup (:downloaders nimbus))
@@ -2276,9 +2232,7 @@
         (log-message "Shut down master"))
       DaemonCommon
       (waiting? [this]
-;        (timer-waiting? (:timer nimbus))
-        (StormTimer/isTimerWaiting (:timer nimbus))
-        ))))
+        (StormTimer/isTimerWaiting (:timer nimbus))))))
 
 (defn validate-port-available[conf]
   (try

http://git-wip-us.apache.org/repos/asf/storm/blob/b09d8ca1/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
index 56184aa..2b70731 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
@@ -336,35 +336,18 @@
    :assignment-id (.getAssignmentId isupervisor)
    :my-hostname (Utils/hostname conf)
    :curr-assignment (atom nil) ;; used for reporting used ports when heartbeating
-;   :heartbeat-timer (mk-timer :kill-fn (fn [t]
-;                               (log-error t "Error when processing event")
-;                               (Utils/exitProcess 20 "Error when processing an event")
-;                               ))
-
    :heartbeat-timer (StormTimer/mkTimer nil
                       (reify StormTimer$TimerFunc
                         (^void run
                           [this ^Object t]
                           (log-error t "Error when processing event")
                           (Utils/exitProcess 20 "Error when processing an event"))))
-;   :event-timer (mk-timer :kill-fn (fn [t]
-;                                         (log-error t "Error when processing event")
-;                                         (Utils/exitProcess 20 "Error when processing an event")
-;                                         ))
-
    :event-timer (StormTimer/mkTimer nil
                   (reify StormTimer$TimerFunc
                     (^void run
                       [this ^Object t]
                       (log-error t "Error when processing event")
                       (Utils/exitProcess 20 "Error when processing an event"))))
-
-;   :blob-update-timer (mk-timer :kill-fn (defn blob-update-timer
-;                                           [t]
-;                                           (log-error t "Error when processing event")
-;                                           (Utils/exitProcess 20 "Error when processing a event"))
-;                                :timer-name "blob-update-timer")
-
    :blob-update-timer (StormTimer/mkTimer "blob-update-timer"
                         (reify StormTimer$TimerFunc
                           (^void run
@@ -838,12 +821,7 @@
     (heartbeat-fn)
 
     ;; should synchronize supervisor so it doesn't launch anything after being down (optimization)
-;    (schedule-recurring (:heartbeat-timer supervisor)
-;                        0
-;                        (conf SUPERVISOR-HEARTBEAT-FREQUENCY-SECS)
-;                        heartbeat-fn)
-    (StormTimer/scheduleRecurring
-      (:heartbeat-timer supervisor)
+    (StormTimer/scheduleRecurring (:heartbeat-timer supervisor)
       0
       (conf SUPERVISOR-HEARTBEAT-FREQUENCY-SECS)
       (reify StormTimer$TimerFunc
@@ -860,21 +838,14 @@
     (when (conf SUPERVISOR-ENABLE)
       ;; This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up
       ;; to date even if callbacks don't all work exactly right
-;      (schedule-recurring (:event-timer supervisor) 0 10 (fn [] (.add event-manager synchronize-supervisor)))
-      (StormTimer/scheduleRecurring
-        (:event-timer supervisor)
+      (StormTimer/scheduleRecurring (:event-timer supervisor)
         0 10
         (reify StormTimer$TimerFunc
           (^void run
             [this ^Object o]
             (.add event-manager synchronize-supervisor))))
-;      (schedule-recurring (:event-timer supervisor)
-;                          0
-;                          (conf SUPERVISOR-MONITOR-FREQUENCY-SECS)
-;                          (fn [] (.add processes-event-manager sync-processes)))
 
-      (StormTimer/scheduleRecurring
-        (:event-timer supervisor)
+      (StormTimer/scheduleRecurring (:event-timer supervisor)
         0
         (conf SUPERVISOR-MONITOR-FREQUENCY-SECS)
         (reify StormTimer$TimerFunc
@@ -883,31 +854,17 @@
             (.add processes-event-manager sync-processes))))
 
       ;; Blob update thread. Starts with 30 seconds delay, every 30 seconds
-;      (schedule-recurring (:blob-update-timer supervisor)
-;                          30
-;                          30
-;                          (fn [] (.add event-manager synchronize-blobs-fn)))
-      (StormTimer/scheduleRecurring
-        (:blob-update-timer supervisor)
-        30 30
+      (StormTimer/scheduleRecurring (:blob-update-timer supervisor)
+        30
+        30
         (reify StormTimer$TimerFunc
           (^void run
             [this ^Object o]
             (.add event-manager synchronize-blobs-fn))))
 
-;      (schedule-recurring (:event-timer supervisor)
-;                          (* 60 5)
-;                          (* 60 5)
-;                          (fn [] (let [health-code (healthcheck/health-check conf)
-;                                       ids (my-worker-ids conf)]
-;                                   (if (not (= health-code 0))
-;                                     (do
-;                                       (doseq [id ids]
-;                                         (shutdown-worker supervisor id))
-;                                       (throw (RuntimeException. "Supervisor failed health check. Exiting.")))))))
-      (StormTimer/scheduleRecurring
-        (:event-timer supervisor)
-        (* 60 5) (* 60 5)
+      (StormTimer/scheduleRecurring (:event-timer supervisor)
+        (* 60 5)
+        (* 60 5)
         (reify StormTimer$TimerFunc
           (^void run
             [this ^Object o]
@@ -921,10 +878,6 @@
 
 
       ;; Launch a thread that Runs profiler commands . Starts with 30 seconds delay, every 30 seconds
-;      (schedule-recurring (:event-timer supervisor)
-;                          30
-;                          30
-;                          (fn [] (.add event-manager run-profiler-actions-fn))))
       (StormTimer/scheduleRecurring
         (:event-timer supervisor)
         30 30
@@ -939,11 +892,8 @@
      (shutdown [this]
                (log-message "Shutting down supervisor " (:supervisor-id supervisor))
                (reset! (:active supervisor) false)
-               ;(cancel-timer (:heartbeat-timer supervisor))
                (StormTimer/cancelTimer (:heartbeat-timer supervisor))
-               ;(cancel-timer (:event-timer supervisor))
                (StormTimer/cancelTimer (:event-timer supervisor))
-               ;(cancel-timer (:blob-update-timer supervisor))
                (StormTimer/cancelTimer (:blob-update-timer supervisor))
                (.shutdown event-manager)
                (.shutdown processes-event-manager)
@@ -963,9 +913,7 @@
      (waiting? [this]
        (or (not @(:active supervisor))
            (and
-            ;(timer-waiting? (:heartbeat-timer supervisor))
             (StormTimer/isTimerWaiting (:heartbeat-timer supervisor))
-            ;(timer-waiting? (:event-timer supervisor))
             (StormTimer/isTimerWaiting (:event-timer supervisor))
             (every? (memfn waiting?) managers)))
            ))))

http://git-wip-us.apache.org/repos/asf/storm/blob/b09d8ca1/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index 9212506..e74ffa1 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -239,11 +239,6 @@
   {})
 
 (defn mk-halting-timer [timer-name]
-;  (mk-timer :kill-fn (fn [t]
-;                       (log-error t "Error when processing event")
-;                       (Utils/exitProcess 20 "Error when processing an event")
-;                       )
-;            :timer-name timer-name)
   (StormTimer/mkTimer timer-name
     (reify StormTimer$TimerFunc
       (^void run
@@ -384,7 +379,6 @@
     (fn refresh-connections
       ([]
         (refresh-connections (fn [& ignored]
-;                (schedule (:refresh-connections-timer worker) 0 refresh-connections)
                 (StormTimer/schedule
                   (:refresh-connections-timer worker) 0
                   (reify StormTimer$TimerFunc
@@ -443,7 +437,6 @@
   ([worker]
     (refresh-storm-active
       worker (fn [& ignored]
-;               (schedule (:refresh-active-timer worker) 0 (partial refresh-storm-active worker))
                (StormTimer/schedule
                  (:refresh-active-timer worker) 0
                  (reify StormTimer$TimerFunc
@@ -496,17 +489,6 @@
   (let [timer (:refresh-active-timer worker)
         delay-secs 0
         recur-secs 1]
-;    (schedule timer
-;      delay-secs
-;      (fn this []
-;        (if (all-connections-ready worker)
-;          (do
-;            (log-message "All connections are ready for worker " (:assignment-id worker) ":" (:port worker)
-;              " with id "(:worker-id worker))
-;            (reset! (:worker-active-flag worker) true))
-;          (schedule timer recur-secs this :check-active false)
-;            )))
-
     (StormTimer/schedule timer
       delay-secs
       (reify StormTimer$TimerFunc
@@ -672,7 +654,6 @@
         executors (atom nil)
         ;; launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout
         ;; to the supervisor
-;        _ (schedule-recurring (:heartbeat-timer worker) 0 (conf WORKER-HEARTBEAT-FREQUENCY-SECS) heartbeat-fn)
         _ (StormTimer/scheduleRecurring
             (:heartbeat-timer worker) 0 (conf WORKER-HEARTBEAT-FREQUENCY-SECS)
             (reify StormTimer$TimerFunc
@@ -680,8 +661,6 @@
                 [this ^Object o]
                 (heartbeat-fn))))
 
-;                _ (schedule-recurring (:executor-heartbeat-timer worker) 0 (conf TASK-HEARTBEAT-FREQUENCY-SECS) #(do-executor-heartbeats worker :executors @executors))
-
         _ (StormTimer/scheduleRecurring
             (:executor-heartbeat-timer worker) 0 (conf TASK-HEARTBEAT-FREQUENCY-SECS)
             (reify StormTimer$TimerFunc
@@ -748,21 +727,13 @@
                     (.interrupt backpressure-thread)
                     (.join backpressure-thread)
                     (log-message "Shut down backpressure thread")
-;                    (cancel-timer (:heartbeat-timer worker))
                     (StormTimer/cancelTimer (:heartbeat-timer worker))
-;                    (cancel-timer (:refresh-connections-timer worker))
                     (StormTimer/cancelTimer (:refresh-connections-timer worker))
-;                    (cancel-timer (:refresh-credentials-timer worker))
                     (StormTimer/cancelTimer (:refresh-credentials-timer worker))
-;                    (cancel-timer (:refresh-active-timer worker))
                     (StormTimer/cancelTimer (:refresh-active-timer worker))
-;                    (cancel-timer (:executor-heartbeat-timer worker))
                     (StormTimer/cancelTimer (:executor-heartbeat-timer worker))
-;                    (cancel-timer (:user-timer worker))
                     (StormTimer/cancelTimer (:user-timer worker))
-;                    (cancel-timer (:refresh-load-timer worker))
                     (StormTimer/cancelTimer (:refresh-load-timer worker))
-
                     (StormTimer/cancelTimer (:reset-log-levels-timer worker))
                     (close-resources worker)
 
@@ -782,13 +753,6 @@
              DaemonCommon
              (waiting? [this]
                (and
-;                 (timer-waiting? (:heartbeat-timer worker))
-;                 (timer-waiting? (:refresh-connections-timer worker))
-;                 (timer-waiting? (:refresh-load-timer worker))
-;                 (timer-waiting? (:refresh-credentials-timer worker))
-;                 (timer-waiting? (:refresh-active-timer worker))
-;                 (timer-waiting? (:executor-heartbeat-timer worker))
-;                 (timer-waiting? (:user-timer worker))
                  (StormTimer/isTimerWaiting (:heartbeat-timer worker))
                  (StormTimer/isTimerWaiting (:refresh-connections-timer worker))
                  (StormTimer/isTimerWaiting (:refresh-load-timer worker))
@@ -823,11 +787,6 @@
 
     (establish-log-setting-callback)
     (.credentials (:storm-cluster-state worker) storm-id (fn [args] (check-credentials-changed)))
-;    (schedule-recurring (:refresh-credentials-timer worker) 0 (conf TASK-CREDENTIALS-POLL-SECS)
-;                        (fn [& args]
-;                          (check-credentials-changed)
-;                          (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
-;                            (check-throttle-changed))))
 
     (StormTimer/scheduleRecurring
       (:refresh-credentials-timer worker) 0 (conf TASK-CREDENTIALS-POLL-SECS)
@@ -839,28 +798,24 @@
             (check-throttle-changed)))))
     ;; The jitter allows the clients to get the data at different times, and avoids thundering herd
     (when-not (.get conf TOPOLOGY-DISABLE-LOADAWARE-MESSAGING)
-;      (schedule-recurring-with-jitter (:refresh-load-timer worker) 0 1 500 refresh-load)
       (StormTimer/scheduleRecurringWithJitter
         (:refresh-load-timer worker) 0 1 500
         (reify StormTimer$TimerFunc
           (^void run
             [this ^Object o]
             (refresh-load)))))
-;    (schedule-recurring (:refresh-connections-timer worker) 0 (conf TASK-REFRESH-POLL-SECS) refresh-connections)
     (StormTimer/scheduleRecurring
       (:refresh-connections-timer worker) 0 (conf TASK-REFRESH-POLL-SECS)
       (reify StormTimer$TimerFunc
         (^void run
           [this ^Object o]
           (refresh-connections))))
-;    (schedule-recurring (:reset-log-levels-timer worker) 0 (conf WORKER-LOG-LEVEL-RESET-POLL-SECS) (fn [] (reset-log-levels latest-log-config)))
     (StormTimer/scheduleRecurring
       (:reset-log-levels-timer worker) 0 (conf WORKER-LOG-LEVEL-RESET-POLL-SECS)
       (reify StormTimer$TimerFunc
         (^void run
           [this ^Object o]
           (reset-log-levels latest-log-config))))
-;     (schedule-recurring (:refresh-active-timer worker) 0 (conf TASK-REFRESH-POLL-SECS) (partial refresh-storm-active worker))
     (StormTimer/scheduleRecurring
       (:refresh-active-timer worker) 0 (conf TASK-REFRESH-POLL-SECS)
       (reify StormTimer$TimerFunc

http://git-wip-us.apache.org/repos/asf/storm/blob/b09d8ca1/storm-core/src/clj/org/apache/storm/timer.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/timer.clj b/storm-core/src/clj/org/apache/storm/timer.clj
deleted file mode 100644
index 27853c2..0000000
--- a/storm-core/src/clj/org/apache/storm/timer.clj
+++ /dev/null
@@ -1,128 +0,0 @@
-; Licensed to the Apache Software Foundation (ASF) under one
-; or more contributor license agreements.  See the NOTICE file
-; distributed with this work for additional information
-; regarding copyright ownership.  The ASF licenses this file
-; to you under the Apache License, Version 2.0 (the
-; "License"); you may not use this file except in compliance
-; with the License.  You may obtain a copy of the License at
-;
-; http://www.apache.org/licenses/LICENSE-2.0
-;
-; Unless required by applicable law or agreed to in writing, software
-; distributed under the License is distributed on an "AS IS" BASIS,
-; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-; See the License for the specific language governing permissions and
-; limitations under the License.
-
-;(ns org.apache.storm.timer
-;  (:import [org.apache.storm.utils Utils Time])
-;  (:import [java.util PriorityQueue Comparator Random])
-;  (:import [java.util.concurrent Semaphore])
-;  (:use [org.apache.storm util log]))
-;
-;;; The timer defined in this file is very similar to java.util.Timer, except
-;;; it integrates with Storm's time simulation capabilities. This lets us test
-;;; code that does asynchronous work on the timer thread
-;
-;(defnk mk-timer [:kill-fn (fn [& _] ) :timer-name nil]
-;  (let [queue (PriorityQueue. 10 (reify Comparator
-;                                   (compare
-;                                     [this o1 o2]
-;                                     (- (first o1) (first o2)))
-;                                   (equals
-;                                     [this obj]
-;                                     true)))
-;        active (atom true)
-;        lock (Object.)
-;        notifier (Semaphore. 0)
-;        thread-name (if timer-name timer-name "timer")
-;        timer-thread (Thread.
-;                       (fn []
-;                         (while @active
-;                           (try
-;                             (let [[time-millis _ _ :as elem] (locking lock (.peek queue))]
-;                               (if (and elem (>= (Time/currentTimeMillis) time-millis))
-;                                 ;; It is imperative to not run the function
-;                                 ;; inside the timer lock. Otherwise, it is
-;                                 ;; possible to deadlock if the fn deals with
-;                                 ;; other locks, like the submit lock.
-;                                 (let [afn (locking lock (second (.poll queue)))]
-;                                   (afn))
-;                                 (if time-millis
-;                                   ;; If any events are scheduled, sleep until
-;                                   ;; event generation. If any recurring events
-;                                   ;; are scheduled then we will always go
-;                                   ;; through this branch, sleeping only the
-;                                   ;; exact necessary amount of time. We give
-;                                   ;; an upper bound, e.g. 1000 millis, to the
-;                                   ;; sleeping time, to limit the response time
-;                                   ;; for detecting any new event within 1 secs.
-;                                   (Time/sleep (min 1000 (- time-millis (Time/currentTimeMillis))))
-;                                   ;; Otherwise poll to see if any new event
-;                                   ;; was scheduled. This is, in essence, the
-;                                   ;; response time for detecting any new event
-;                                   ;; schedulings when there are no scheduled
-;                                   ;; events.
-;                                   (Time/sleep 1000))))
-;                             (catch Throwable t
-;                               ;; Because the interrupted exception can be
-;                               ;; wrapped in a RuntimeException.
-;                               (when-not (Utils/exceptionCauseIsInstanceOf InterruptedException t)
-;                                 (kill-fn t)
-;                                 (reset! active false)
-;                                 (throw t)))))
-;                         (.release notifier)) thread-name)]
-;    (.setDaemon timer-thread true)
-;    (.setPriority timer-thread Thread/MAX_PRIORITY)
-;    (.start timer-thread)
-;    {:timer-thread timer-thread
-;     :queue queue
-;     :active active
-;     :lock lock
-;     :random (Random.)
-;     :cancel-notifier notifier}))
-;
-;(defn- check-active!
-;  [timer]
-;  (when-not @(:active timer)
-;    (throw (IllegalStateException. "Timer is not active"))))
-;
-;(defnk schedule
-;  [timer delay-secs afn :check-active true :jitter-ms 0]
-;  (when check-active (check-active! timer))
-;  (let [id (Utils/uuid)
-;        ^PriorityQueue queue (:queue timer)
-;        end-time-ms (+ (Time/currentTimeMillis) (Time/secsToMillisLong delay-secs))
-;        end-time-ms (if (< 0 jitter-ms) (+ (.nextInt (:random timer) jitter-ms) end-time-ms) end-time-ms)]
-;    (locking (:lock timer)
-;      (.add queue [end-time-ms afn id]))))
-;
-;(defn schedule-recurring
-;  [timer delay-secs recur-secs afn]
-;  (schedule timer
-;            delay-secs
-;            (fn this []
-;              (afn)
-;              ; This avoids a race condition with cancel-timer.
-;              (schedule timer recur-secs this :check-active false))))
-;
-;(defn schedule-recurring-with-jitter
-;  [timer delay-secs recur-secs jitter-ms afn]
-;  (schedule timer
-;            delay-secs
-;            (fn this []
-;              (afn)
-;              ; This avoids a race condition with cancel-timer.
-;              (schedule timer recur-secs this :check-active false :jitter-ms jitter-ms))))
-;
-;(defn cancel-timer
-;  [timer]
-;  (check-active! timer)
-;  (locking (:lock timer)
-;    (reset! (:active timer) false)
-;    (.interrupt (:timer-thread timer)))
-;  (.acquire (:cancel-notifier timer)))
-;
-;(defn timer-waiting?
-;  [timer]
-;  (Time/isThreadWaiting (:timer-thread timer)))

http://git-wip-us.apache.org/repos/asf/storm/blob/b09d8ca1/storm-core/src/jvm/org/apache/storm/StormTimer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/StormTimer.java b/storm-core/src/jvm/org/apache/storm/StormTimer.java
index 36878e4..df89dc6 100644
--- a/storm-core/src/jvm/org/apache/storm/StormTimer.java
+++ b/storm-core/src/jvm/org/apache/storm/StormTimer.java
@@ -25,11 +25,16 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Comparator;
 import java.util.Random;
-import java.util.UUID;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+/**
+ * The timer defined in this file is very similar to java.util.Timer, except
+ * it integrates with Storm's time simulation capabilities. This lets us test
+ * code that does asynchronous work on the timer thread
+ */
+
 public class StormTimer {
     private static final Logger LOG = LoggerFactory.getLogger(StormTimer.class);
 
@@ -75,28 +80,41 @@ public class StormTimer {
 
         @Override
         public void run() {
-            LOG.info("in run...{}", this.getName());
             while (this.active.get()) {
                 QueueEntry queueEntry = null;
                 try {
                     synchronized (this.lock) {
                         queueEntry = this.queue.peek();
                     }
-                    LOG.info("event: {} -- {}", this.getName(), queueEntry);
-
                     if ((queueEntry != null) && (Time.currentTimeMillis() >= queueEntry.endTimeMs)) {
+                        // It is imperative to not run the function
+                        // inside the timer lock. Otherwise, it is
+                        // possible to deadlock if the fn deals with
+                        // other locks, like the submit lock.
                         synchronized (this.lock) {
                             this.queue.poll();
                         }
                         queueEntry.afn.run(null);
                     } else if (queueEntry != null) {
+                        //  If any events are scheduled, sleep until
+                        // event generation. If any recurring events
+                        // are scheduled then we will always go
+                        // through this branch, sleeping only the
+                        // exact necessary amount of time. We give
+                        // an upper bound, e.g. 1000 millis, to the
+                        // sleeping time, to limit the response time
+                        // for detecting any new event within 1 secs.
                         Time.sleep(Math.min(1000, (queueEntry.endTimeMs - Time.currentTimeMillis())));
                     } else {
+                        // Otherwise poll to see if any new event
+                        // was scheduled. This is, in essence, the
+                        // response time for detecting any new event
+                        // schedulings when there are no scheduled
+                        // events.
                         Time.sleep(1000);
                     }
                 } catch (Throwable t) {
                     if (!(Utils.exceptionCauseIsInstanceOf(InterruptedException.class, t))) {
-                        LOG.info("Exception throw for event: {} --- {}", queueEntry, t);
                         this.onKill.run(t);
                         this.setActive(false);
                         throw new RuntimeException(t);
@@ -153,7 +171,6 @@ public class StormTimer {
         if (jitterMs > 0) {
             endTimeMs = task.random.nextInt(jitterMs) + endTimeMs;
         }
-        LOG.info("add event: {}-{}-{}", id, endTimeMs, afn);
         synchronized (task.lock) {
             task.add(new QueueEntry(endTimeMs, afn, id));
         }
@@ -166,10 +183,8 @@ public class StormTimer {
         schedule(task, delaySecs, new TimerFunc() {
             @Override
             public void run(Object o) {
-                LOG.info("scheduleRecurring running...");
                 afn.run(null);
-                LOG.info("scheduleRecurring schedule again...");
-
+                // This avoids a race condition with cancel-timer.
                 schedule(task, recurSecs, this, false, 0);
             }
         });
@@ -179,10 +194,8 @@ public class StormTimer {
         schedule(task, delaySecs, new TimerFunc() {
             @Override
             public void run(Object o) {
-                LOG.info("scheduleRecurringWithJitter running...");
                 afn.run(null);
-                LOG.info("scheduleRecurringWithJitter schedule again...");
-
+                // This avoids a race condition with cancel-timer.
                 schedule(task, recurSecs, this, false, jitterMs);
             }
         });
@@ -201,8 +214,6 @@ public class StormTimer {
         if (task == null) {
             throw new RuntimeException("task is null!");
         }
-        LOG.info("cancel task: {} - {} - {}", task.getName(), task.getId(), task.queue);
-
         checkActive(task);
         synchronized (task.lock) {
             task.setActive(false);

http://git-wip-us.apache.org/repos/asf/storm/blob/b09d8ca1/storm-core/test/clj/org/apache/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
index e527d60..ce58f42 100644
--- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj
+++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
@@ -1483,7 +1483,6 @@
                    nimbus/file-cache-map nil
                    nimbus/mk-blob-cache-map nil
                    nimbus/mk-bloblist-cache-map nil
-                  ; mk-timer nil
                    nimbus/mk-scheduler nil]
                   (nimbus/nimbus-data auth-conf fake-inimbus)
                   (verify-call-times-for cluster/mk-storm-cluster-state 1)

http://git-wip-us.apache.org/repos/asf/storm/blob/b09d8ca1/storm-core/test/clj/org/apache/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/supervisor_test.clj b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
index 71aaf85..ef40c4a 100644
--- a/storm-core/test/clj/org/apache/storm/supervisor_test.clj
+++ b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
@@ -645,9 +645,7 @@
                                                 (upTime [] 0))))]
       (with-open [_ (ConfigUtilsInstaller. fake-cu)
                   _ (UtilsInstaller. fake-utils)]
-        (stubbing [cluster/mk-storm-cluster-state nil
-;                   mk-timer nil
-                   ]
+        (stubbing [cluster/mk-storm-cluster-state nil]
           (supervisor/supervisor-data auth-conf nil fake-isupervisor)
           (verify-call-times-for cluster/mk-storm-cluster-state 1)
           (verify-first-call-args-for-indices cluster/mk-storm-cluster-state [2]

http://git-wip-us.apache.org/repos/asf/storm/blob/b09d8ca1/storm-core/test/jvm/org/apache/storm/TestTimer.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/TestTimer.java b/storm-core/test/jvm/org/apache/storm/TestTimer.java
deleted file mode 100644
index c798c1f..0000000
--- a/storm-core/test/jvm/org/apache/storm/TestTimer.java
+++ /dev/null
@@ -1,57 +0,0 @@
-package org.apache.storm;
-
-import org.apache.storm.utils.Time;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Created by jerrypeng on 2/9/16.
- */
-public class TestTimer {
-    //public static StormTimerTask mkTimer(TimerFunc onKill, String name) {
-    private static final Logger LOG = LoggerFactory.getLogger(TestTimer.class);
-
-    @Test
-    public void testTimer() throws InterruptedException {
-//        StormTimer.StormTimerTask task1 = StormTimer.mkTimer("timer", new StormTimer.TimerFunc() {
-//            @Override
-//            public void run(Object o) {
-//                LOG.info("task1 onKill at {}", Time.currentTimeSecs());
-//            }
-//        });
-//        StormTimer.scheduleRecurring(task1, 10, 5, new StormTimer.TimerFunc(){
-//            @Override
-//            public void run(Object o) {
-//                LOG.info("task1-1 scheduleRecurring func at {}", Time.currentTimeSecs());
-//            }
-//        });
-//        StormTimer.scheduleRecurring(task1, 5, 10, new StormTimer.TimerFunc(){
-//            @Override
-//            public void run(Object o) {
-//                LOG.info("task1-2 scheduleRecurring func at {}", Time.currentTimeSecs());
-//            }
-//        });
-//
-//        StormTimer.StormTimerTask task2 = StormTimer.mkTimer("timer", new StormTimer.TimerFunc() {
-//            @Override
-//            public void run(Object o) {
-//                LOG.info("task2 onKill at {}", Time.currentTimeSecs());
-//            }
-//        });
-//        StormTimer.scheduleRecurringWithJitter(task2, 10, 5, 2000, new StormTimer.TimerFunc(){
-//            @Override
-//            public void run(Object o) {
-//                LOG.info("task2 scheduleRecurringWithJitter func at {}", Time.currentTimeSecs());
-//            }
-//        });
-//
-//        LOG.info("sleeping...");
-//        Time.sleep(30000);
-//
-//        LOG.info("canceling task");
-//        StormTimer.cancelTimer(task1);
-//        StormTimer.cancelTimer(task2);
-
-    }
-}