You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/02/18 18:09:41 UTC

[5/9] storm git commit: edits based on reviews

edits based on reviews


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2d06efe7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2d06efe7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2d06efe7

Branch: refs/heads/master
Commit: 2d06efe7c62d85a3187c03523bfee7474b963304
Parents: b09d8ca
Author: Boyang Jerry Peng <je...@yahoo-inc.com>
Authored: Thu Feb 18 10:37:22 2016 -0600
Committer: Boyang Jerry Peng <je...@yahoo-inc.com>
Committed: Thu Feb 18 10:37:22 2016 -0600

----------------------------------------------------------------------
 .../clj/org/apache/storm/daemon/executor.clj    |  22 +--
 .../clj/org/apache/storm/daemon/logviewer.clj   |  23 ++-
 .../src/clj/org/apache/storm/daemon/nimbus.clj  |  67 +++----
 .../clj/org/apache/storm/daemon/supervisor.clj  | 102 +++++------
 .../src/clj/org/apache/storm/daemon/worker.clj  | 122 +++++--------
 .../src/jvm/org/apache/storm/StormTimer.java    | 175 ++++++++++---------
 6 files changed, 225 insertions(+), 286 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/2d06efe7/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 2afb853..92cc003 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -40,7 +40,7 @@
            [java.util.concurrent ConcurrentLinkedQueue]
            [org.json.simple JSONValue]
            [com.lmax.disruptor.dsl ProducerType]
-           [org.apache.storm StormTimer StormTimer$TimerFunc])
+           [org.apache.storm StormTimer])
   (:require [org.apache.storm [cluster :as cluster] [stats :as stats]])
   (:require [org.apache.storm.daemon [task :as task]])
   (:require [org.apache.storm.daemon.builtin-metrics :as builtin-metrics])
@@ -324,15 +324,13 @@
   (let [{:keys [storm-conf receive-queue worker-context interval->task->metric-registry]} executor-data
         distinct-time-bucket-intervals (keys interval->task->metric-registry)]
     (doseq [interval distinct-time-bucket-intervals]
-      (StormTimer/scheduleRecurring
+      (.scheduleRecurring
         (:user-timer (:worker executor-data))
         interval
         interval
-        (reify StormTimer$TimerFunc
-          (^void run
-            [this ^Object o]
-            (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. worker-context [interval] Constants/SYSTEM_TASK_ID Constants/METRICS_TICK_STREAM_ID))]]
-              (.publish ^DisruptorQueue receive-queue val))))))))
+        (fn []
+          (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. worker-context [interval] Constants/SYSTEM_TASK_ID Constants/METRICS_TICK_STREAM_ID))]]
+            (.publish ^DisruptorQueue receive-queue val)))))))
 
 (defn metrics-tick
   [executor-data task-data ^TupleImpl tuple]
@@ -367,15 +365,13 @@
               (and (= false (storm-conf TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS))
                    (= :spout (:type executor-data))))
         (log-message "Timeouts disabled for executor " (:component-id executor-data) ":" (:executor-id executor-data))
-        (StormTimer/scheduleRecurring
+        (.scheduleRecurring
           (:user-timer worker)
           tick-time-secs
           tick-time-secs
-          (reify StormTimer$TimerFunc
-            (^void run
-              [this ^Object o]
-              (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID Constants/SYSTEM_TICK_STREAM_ID))]]
-                (.publish ^DisruptorQueue receive-queue val)))))))))
+          (fn []
+            (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID Constants/SYSTEM_TICK_STREAM_ID))]]
+              (.publish ^DisruptorQueue receive-queue val))))))))
 
 (defn mk-executor [worker executor-id initial-credentials]
   (let [executor-data (mk-executor-data worker executor-id)

http://git-wip-us.apache.org/repos/asf/storm/blob/2d06efe7/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj b/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
index 16815f9..9502196 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
@@ -20,7 +20,7 @@
   (:use [hiccup core page-helpers form-helpers])
   (:use [org.apache.storm config util log])
   (:use [org.apache.storm.ui helpers])
-  (:import [org.apache.storm StormTimer StormTimer$TimerFunc])
+  (:import [org.apache.storm StormTimer])
   (:import [org.apache.storm.utils Utils Time VersionInfo ConfigUtils])
   (:import [org.slf4j LoggerFactory])
   (:import [java.util Arrays ArrayList HashSet])
@@ -264,18 +264,15 @@
   (let [interval-secs (conf LOGVIEWER-CLEANUP-INTERVAL-SECS)]
     (when interval-secs
       (log-debug "starting log cleanup thread at interval: " interval-secs)
-      (StormTimer/scheduleRecurring
-        (StormTimer/mkTimer "logviewer-cleanup"
-          (reify StormTimer$TimerFunc
-            (^void run
-              [this ^Object t]
-              (log-error t "Error when doing logs cleanup")
-              (Utils/exitProcess 20 "Error when doing log cleanup"))))
-        0 interval-secs
-        (reify StormTimer$TimerFunc
-          (^void run
-            [this ^Object o]
-            (cleanup-fn! log-root-dir)))))))
+
+      (let [timer (StormTimer. "logviewer-cleanup"
+                    (reify Thread$UncaughtExceptionHandler
+                      (^void uncaughtException
+                        [this ^Thread t ^Throwable e]
+                        (log-error t "Error when doing logs cleanup")
+                        (Utils/exitProcess 20 "Error when doing log cleanup"))))]
+        (.scheduleRecurring timer 0 interval-secs
+          (fn [] (cleanup-fn! log-root-dir)))))))
 
 (defn- skip-bytes
   "FileInputStream#skip may not work the first time, so ensure it successfully

http://git-wip-us.apache.org/repos/asf/storm/blob/2d06efe7/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 0d0b27a..a3497d6 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -66,7 +66,7 @@
   (:require [clj-time.coerce :as coerce])
   (:require [metrics.meters :refer [defmeter mark!]])
   (:require [metrics.gauges :refer [defgauge]])
-  (:import [org.apache.storm StormTimer StormTimer$TimerFunc])
+  (:import [org.apache.storm StormTimer])
   (:gen-class
     :methods [^{:static true} [launch [org.apache.storm.scheduler.INimbus] void]]))
 
@@ -194,12 +194,13 @@
      :blob-listers (mk-bloblist-cache-map conf)
      :uptime (Utils/makeUptimeComputer)
      :validator (Utils/newInstance (conf NIMBUS-TOPOLOGY-VALIDATOR))
-     :timer (StormTimer/mkTimer nil
-              (reify StormTimer$TimerFunc
-                (^void run
-                  [this ^Object t]
-                  (log-error t "Error when processing event")
+     :timer (StormTimer. nil
+              (reify Thread$UncaughtExceptionHandler
+                (^void uncaughtException
+                  [this ^Thread t ^Throwable e]
+                  (log-error e "Error when processing event")
                   (Utils/exitProcess 20 "Error when processing an event"))))
+
      :scheduler (mk-scheduler conf inimbus)
      :leader-elector (Zookeeper/zkLeaderElector conf)
      :id->sched-status (atom {})
@@ -382,12 +383,9 @@
 
 (defn delay-event [nimbus storm-id delay-secs event]
   (log-message "Delaying event " event " for " delay-secs " secs for " storm-id)
-  (StormTimer/schedule (:timer nimbus)
+  (.schedule (:timer nimbus)
     delay-secs
-    (reify StormTimer$TimerFunc
-      (^void run
-        [this ^Object o]
-        (transition! nimbus storm-id event false)))))
+    (fn [] (transition! nimbus storm-id event false))))
 
 ;; active -> reassign in X secs
 
@@ -1448,49 +1446,36 @@
       (doseq [storm-id (.active-storms (:storm-cluster-state nimbus))]
         (transition! nimbus storm-id :startup)))
 
-    (StormTimer/scheduleRecurring (:timer nimbus)
+    (.scheduleRecurring (:timer nimbus)
       0
       (conf NIMBUS-MONITOR-FREQ-SECS)
-      (reify StormTimer$TimerFunc
-        (^void run
-          [this ^Object o]
-          (when-not (conf ConfigUtils/NIMBUS_DO_NOT_REASSIGN)
-            (locking (:submit-lock nimbus)
-              (mk-assignments nimbus)))
-          (do-cleanup nimbus))))
+      (fn []
+        (when-not (conf ConfigUtils/NIMBUS_DO_NOT_REASSIGN)
+          (locking (:submit-lock nimbus)
+            (mk-assignments nimbus)))
+        (do-cleanup nimbus)))
     ;; Schedule Nimbus inbox cleaner
-    (StormTimer/scheduleRecurring (:timer nimbus)
+    (.scheduleRecurring (:timer nimbus)
       0
       (conf NIMBUS-CLEANUP-INBOX-FREQ-SECS)
-      (reify StormTimer$TimerFunc
-        (^void run
-          [this ^Object o]
-          (clean-inbox (inbox nimbus) (conf NIMBUS-INBOX-JAR-EXPIRATION-SECS)))))
+      (fn [] (clean-inbox (inbox nimbus) (conf NIMBUS-INBOX-JAR-EXPIRATION-SECS))))
     ;; Schedule nimbus code sync thread to sync code from other nimbuses.
     (if (instance? LocalFsBlobStore blob-store)
-      (StormTimer/scheduleRecurring (:timer nimbus)
+      (.scheduleRecurring (:timer nimbus)
         0
         (conf NIMBUS-CODE-SYNC-FREQ-SECS)
-        (reify StormTimer$TimerFunc
-          (^void run
-            [this ^Object t]
-            (blob-sync conf nimbus)))))
+        (fn [] (blob-sync conf nimbus))))
     ;; Schedule topology history cleaner
     (when-let [interval (conf LOGVIEWER-CLEANUP-INTERVAL-SECS)]
-      (StormTimer/scheduleRecurring (:timer nimbus)
+      (.scheduleRecurring (:timer nimbus)
         0
         (conf LOGVIEWER-CLEANUP-INTERVAL-SECS)
-        (reify StormTimer$TimerFunc
-          (^void run
-            [this ^Object t]
-            (clean-topology-history (conf LOGVIEWER-CLEANUP-AGE-MINS) nimbus)))))
-    (StormTimer/scheduleRecurring (:timer nimbus)
+        (fn [] (clean-topology-history (conf LOGVIEWER-CLEANUP-AGE-MINS) nimbus))))
+    (.scheduleRecurring (:timer nimbus)
       0
       (conf NIMBUS-CREDENTIAL-RENEW-FREQ-SECS)
-      (reify StormTimer$TimerFunc
-        (^void run
-          [this ^Object t]
-          (renew-credentials nimbus))))
+      (fn []
+        (renew-credentials nimbus)))
 
     (defgauge nimbus:num-supervisors
       (fn [] (.size (.supervisors (:storm-cluster-state nimbus) nil))))
@@ -2222,7 +2207,7 @@
       (shutdown [this]
         (mark! nimbus:num-shutdown-calls)
         (log-message "Shutting down master")
-        (StormTimer/cancelTimer (:timer nimbus))
+        (.close (:timer nimbus))
         (.disconnect (:storm-cluster-state nimbus))
         (.cleanup (:downloaders nimbus))
         (.cleanup (:uploaders nimbus))
@@ -2232,7 +2217,7 @@
         (log-message "Shut down master"))
       DaemonCommon
       (waiting? [this]
-        (StormTimer/isTimerWaiting (:timer nimbus))))))
+        (.isTimerWaiting (:timer nimbus))))))
 
 (defn validate-port-available[conf]
   (try

http://git-wip-us.apache.org/repos/asf/storm/blob/2d06efe7/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
index 2b70731..ad9db76 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
@@ -42,7 +42,7 @@
            [org.yaml.snakeyaml.constructor SafeConstructor])
   (:require [metrics.gauges :refer [defgauge]])
   (:require [metrics.meters :refer [defmeter mark!]])
-  (:import [org.apache.storm StormTimer StormTimer$TimerFunc])
+  (:import [org.apache.storm StormTimer])
   (:gen-class
     :methods [^{:static true} [launch [org.apache.storm.scheduler.ISupervisor] void]])
   (:require [clojure.string :as str]))
@@ -336,23 +336,23 @@
    :assignment-id (.getAssignmentId isupervisor)
    :my-hostname (Utils/hostname conf)
    :curr-assignment (atom nil) ;; used for reporting used ports when heartbeating
-   :heartbeat-timer (StormTimer/mkTimer nil
-                      (reify StormTimer$TimerFunc
-                        (^void run
-                          [this ^Object t]
-                          (log-error t "Error when processing event")
+   :heartbeat-timer (StormTimer. nil
+                      (reify Thread$UncaughtExceptionHandler
+                        (^void uncaughtException
+                          [this ^Thread t ^Throwable e]
+                          (log-error e "Error when processing event")
                           (Utils/exitProcess 20 "Error when processing an event"))))
-   :event-timer (StormTimer/mkTimer nil
-                  (reify StormTimer$TimerFunc
-                    (^void run
-                      [this ^Object t]
-                      (log-error t "Error when processing event")
+   :event-timer (StormTimer. nil
+                  (reify Thread$UncaughtExceptionHandler
+                    (^void uncaughtException
+                      [this ^Thread t ^Throwable e]
+                      (log-error e "Error when processing event")
                       (Utils/exitProcess 20 "Error when processing an event"))))
-   :blob-update-timer (StormTimer/mkTimer "blob-update-timer"
-                        (reify StormTimer$TimerFunc
-                          (^void run
-                            [this ^Object t]
-                            (log-error t "Error when processing event")
+   :blob-update-timer (StormTimer. "blob-update-timer"
+                        (reify Thread$UncaughtExceptionHandler
+                          (^void uncaughtException
+                            [this ^Thread t ^Throwable e]
+                            (log-error e "Error when processing event")
                             (Utils/exitProcess 20 "Error when processing an event"))))
    :localizer (Utils/createLocalizer conf (ConfigUtils/supervisorLocalDir conf))
    :assignment-versions (atom {})
@@ -821,13 +821,10 @@
     (heartbeat-fn)
 
     ;; should synchronize supervisor so it doesn't launch anything after being down (optimization)
-    (StormTimer/scheduleRecurring (:heartbeat-timer supervisor)
+    (.scheduleRecurring (:heartbeat-timer supervisor)
       0
       (conf SUPERVISOR-HEARTBEAT-FREQUENCY-SECS)
-      (reify StormTimer$TimerFunc
-        (^void run
-          [this ^Object o]
-          (heartbeat-fn))))
+      heartbeat-fn)
 
     (doseq [storm-id downloaded-storm-ids]
       (add-blob-references (:localizer supervisor) storm-id
@@ -838,53 +835,38 @@
     (when (conf SUPERVISOR-ENABLE)
       ;; This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up
       ;; to date even if callbacks don't all work exactly right
-      (StormTimer/scheduleRecurring (:event-timer supervisor)
-        0 10
-        (reify StormTimer$TimerFunc
-          (^void run
-            [this ^Object o]
-            (.add event-manager synchronize-supervisor))))
-
-      (StormTimer/scheduleRecurring (:event-timer supervisor)
+      (.scheduleRecurring (:event-timer supervisor) 0 10 (fn [] (.add event-manager synchronize-supervisor)))
+
+      (.scheduleRecurring (:event-timer supervisor)
         0
         (conf SUPERVISOR-MONITOR-FREQUENCY-SECS)
-        (reify StormTimer$TimerFunc
-          (^void run
-            [this ^Object o]
-            (.add processes-event-manager sync-processes))))
+        (fn [] (.add processes-event-manager sync-processes)))
 
       ;; Blob update thread. Starts with 30 seconds delay, every 30 seconds
-      (StormTimer/scheduleRecurring (:blob-update-timer supervisor)
+      (.scheduleRecurring (:blob-update-timer supervisor)
         30
         30
-        (reify StormTimer$TimerFunc
-          (^void run
-            [this ^Object o]
-            (.add event-manager synchronize-blobs-fn))))
+        (fn [] (.add event-manager synchronize-blobs-fn)))
 
-      (StormTimer/scheduleRecurring (:event-timer supervisor)
+      (.scheduleRecurring (:event-timer supervisor)
         (* 60 5)
         (* 60 5)
-        (reify StormTimer$TimerFunc
-          (^void run
-            [this ^Object o]
-            (let [health-code (healthcheck/health-check conf)
-                  ids (my-worker-ids conf)]
-              (if (not (= health-code 0))
-                (do
-                  (doseq [id ids]
-                    (shutdown-worker supervisor id))
-                  (throw (RuntimeException. "Supervisor failed health check. Exiting."))))))))
+        (fn []
+          (let [health-code (healthcheck/health-check conf)
+                ids (my-worker-ids conf)]
+            (if (not (= health-code 0))
+              (do
+                (doseq [id ids]
+                  (shutdown-worker supervisor id))
+                (throw (RuntimeException. "Supervisor failed health check. Exiting.")))))))
 
 
       ;; Launch a thread that Runs profiler commands . Starts with 30 seconds delay, every 30 seconds
-      (StormTimer/scheduleRecurring
+      (.scheduleRecurring
         (:event-timer supervisor)
-        30 30
-        (reify StormTimer$TimerFunc
-          (^void run
-            [this ^Object o]
-            (.add event-manager run-profiler-actions-fn)))))
+        30
+        30
+        (fn [] (.add event-manager run-profiler-actions-fn))))
 
     (log-message "Starting supervisor with id " (:supervisor-id supervisor) " at host " (:my-hostname supervisor))
     (reify
@@ -892,9 +874,9 @@
      (shutdown [this]
                (log-message "Shutting down supervisor " (:supervisor-id supervisor))
                (reset! (:active supervisor) false)
-               (StormTimer/cancelTimer (:heartbeat-timer supervisor))
-               (StormTimer/cancelTimer (:event-timer supervisor))
-               (StormTimer/cancelTimer (:blob-update-timer supervisor))
+               (.close (:heartbeat-timer supervisor))
+               (.close (:event-timer supervisor))
+               (.close (:blob-update-timer supervisor))
                (.shutdown event-manager)
                (.shutdown processes-event-manager)
                (.shutdown (:localizer supervisor))
@@ -913,8 +895,8 @@
      (waiting? [this]
        (or (not @(:active supervisor))
            (and
-            (StormTimer/isTimerWaiting (:heartbeat-timer supervisor))
-            (StormTimer/isTimerWaiting (:event-timer supervisor))
+            (.isTimerWaiting (:heartbeat-timer supervisor))
+            (.isTimerWaiting (:event-timer supervisor))
             (every? (memfn waiting?) managers)))
            ))))
 

http://git-wip-us.apache.org/repos/asf/storm/blob/2d06efe7/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index e74ffa1..c2a767a 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -45,7 +45,7 @@
   (:import [org.apache.logging.log4j Level])
   (:import [org.apache.logging.log4j.core.config LoggerConfig])
   (:import [org.apache.storm.generated LogConfig LogLevelAction])
-  (:import [org.apache.storm StormTimer StormTimer$TimerFunc])
+  (:import [org.apache.storm StormTimer])
   (:gen-class))
 
 (defmulti mk-suicide-fn cluster-mode)
@@ -239,11 +239,11 @@
   {})
 
 (defn mk-halting-timer [timer-name]
-  (StormTimer/mkTimer timer-name
-    (reify StormTimer$TimerFunc
-      (^void run
-        [this ^Object t]
-        (log-error t "Error when processing event")
+  (StormTimer. timer-name
+    (reify Thread$UncaughtExceptionHandler
+      (^void uncaughtException
+        [this ^Thread t ^Throwable e]
+        (log-error e "Error when processing event")
         (Utils/exitProcess 20 "Error when processing an event")))))
 
 (defn worker-data [conf mq-context storm-id assignment-id port worker-id storm-conf cluster-state storm-cluster-state]
@@ -379,12 +379,8 @@
     (fn refresh-connections
       ([]
         (refresh-connections (fn [& ignored]
-                (StormTimer/schedule
-                  (:refresh-connections-timer worker) 0
-                  (reify StormTimer$TimerFunc
-                    (^void run
-                      [this ^Object o]
-                      (refresh-connections)))))))
+                (.schedule
+                  (:refresh-connections-timer worker) 0 refresh-connections))))
       ([callback]
          (let [version (.assignment-version storm-cluster-state storm-id callback)
                assignment (if (= version (:version (get @(:assignment-versions worker) storm-id)))
@@ -437,12 +433,8 @@
   ([worker]
     (refresh-storm-active
       worker (fn [& ignored]
-               (StormTimer/schedule
-                 (:refresh-active-timer worker) 0
-                 (reify StormTimer$TimerFunc
-                   (^void run
-                     [this ^Object o]
-                     ((partial refresh-storm-active worker))))))))
+               (.schedule
+                 (:refresh-active-timer worker) 0 (partial refresh-storm-active worker)))))
   ([worker callback]
     (let [base (.storm-base (:storm-cluster-state worker) (:storm-id worker) callback)]
       (reset!
@@ -489,17 +481,15 @@
   (let [timer (:refresh-active-timer worker)
         delay-secs 0
         recur-secs 1]
-    (StormTimer/schedule timer
+    (.schedule timer
       delay-secs
-      (reify StormTimer$TimerFunc
-        (^void run
-          [this ^Object o]
+      (fn this []
           (if (all-connections-ready worker)
             (do
               (log-message "All connections are ready for worker " (:assignment-id worker) ":" (:port worker)
                 " with id " (:worker-id worker))
               (reset! (:worker-active-flag worker) true))
-            (StormTimer/schedule timer recur-secs this false 0)))))))
+            (.schedule timer recur-secs this false 0))))))
 
 (defn register-callbacks [worker]
   (let [transfer-local-fn (:transfer-local-fn worker)
@@ -654,19 +644,10 @@
         executors (atom nil)
         ;; launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout
         ;; to the supervisor
-        _ (StormTimer/scheduleRecurring
-            (:heartbeat-timer worker) 0 (conf WORKER-HEARTBEAT-FREQUENCY-SECS)
-            (reify StormTimer$TimerFunc
-              (^void run
-                [this ^Object o]
-                (heartbeat-fn))))
-
-        _ (StormTimer/scheduleRecurring
-            (:executor-heartbeat-timer worker) 0 (conf TASK-HEARTBEAT-FREQUENCY-SECS)
-            (reify StormTimer$TimerFunc
-              (^void run
-                [this ^Object o]
-                (do-executor-heartbeats worker :executors @executors))))
+        _ (.scheduleRecurring (:heartbeat-timer worker) 0 (conf WORKER-HEARTBEAT-FREQUENCY-SECS) heartbeat-fn)
+
+        _ (.scheduleRecurring (:executor-heartbeat-timer worker) 0 (conf TASK-HEARTBEAT-FREQUENCY-SECS)
+            (fn [] (do-executor-heartbeats worker :executors @executors)))
 
         _ (register-callbacks worker)
 
@@ -727,14 +708,14 @@
                     (.interrupt backpressure-thread)
                     (.join backpressure-thread)
                     (log-message "Shut down backpressure thread")
-                    (StormTimer/cancelTimer (:heartbeat-timer worker))
-                    (StormTimer/cancelTimer (:refresh-connections-timer worker))
-                    (StormTimer/cancelTimer (:refresh-credentials-timer worker))
-                    (StormTimer/cancelTimer (:refresh-active-timer worker))
-                    (StormTimer/cancelTimer (:executor-heartbeat-timer worker))
-                    (StormTimer/cancelTimer (:user-timer worker))
-                    (StormTimer/cancelTimer (:refresh-load-timer worker))
-                    (StormTimer/cancelTimer (:reset-log-levels-timer worker))
+                    (.close (:heartbeat-timer worker))
+                    (.close (:refresh-connections-timer worker))
+                    (.close (:refresh-credentials-timer worker))
+                    (.close (:refresh-active-timer worker))
+                    (.close (:executor-heartbeat-timer worker))
+                    (.close (:user-timer worker))
+                    (.close (:refresh-load-timer worker))
+                    (.close (:reset-log-levels-timer worker))
                     (close-resources worker)
 
                     (log-message "Trigger any worker shutdown hooks")
@@ -753,13 +734,13 @@
              DaemonCommon
              (waiting? [this]
                (and
-                 (StormTimer/isTimerWaiting (:heartbeat-timer worker))
-                 (StormTimer/isTimerWaiting (:refresh-connections-timer worker))
-                 (StormTimer/isTimerWaiting (:refresh-load-timer worker))
-                 (StormTimer/isTimerWaiting (:refresh-credentials-timer worker))
-                 (StormTimer/isTimerWaiting (:refresh-active-timer worker))
-                 (StormTimer/isTimerWaiting (:executor-heartbeat-timer worker))
-                 (StormTimer/isTimerWaiting (:user-timer worker))
+                 (.isTimerWaiting (:heartbeat-timer worker))
+                 (.isTimerWaiting (:refresh-connections-timer worker))
+                 (.isTimerWaiting (:refresh-load-timer worker))
+                 (.isTimerWaiting (:refresh-credentials-timer worker))
+                 (.isTimerWaiting (:refresh-active-timer worker))
+                 (.isTimerWaiting (:executor-heartbeat-timer worker))
+                 (.isTimerWaiting (:user-timer worker))
                  ))
              )
         credentials (atom initial-credentials)
@@ -788,40 +769,23 @@
     (establish-log-setting-callback)
     (.credentials (:storm-cluster-state worker) storm-id (fn [args] (check-credentials-changed)))
 
-    (StormTimer/scheduleRecurring
+    (.scheduleRecurring
       (:refresh-credentials-timer worker) 0 (conf TASK-CREDENTIALS-POLL-SECS)
-      (reify StormTimer$TimerFunc
-        (^void run
-          [this ^Object o]
+        (fn []
           (check-credentials-changed)
           (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
-            (check-throttle-changed)))))
+            (check-throttle-changed))))
     ;; The jitter allows the clients to get the data at different times, and avoids thundering herd
     (when-not (.get conf TOPOLOGY-DISABLE-LOADAWARE-MESSAGING)
-      (StormTimer/scheduleRecurringWithJitter
-        (:refresh-load-timer worker) 0 1 500
-        (reify StormTimer$TimerFunc
-          (^void run
-            [this ^Object o]
-            (refresh-load)))))
-    (StormTimer/scheduleRecurring
-      (:refresh-connections-timer worker) 0 (conf TASK-REFRESH-POLL-SECS)
-      (reify StormTimer$TimerFunc
-        (^void run
-          [this ^Object o]
-          (refresh-connections))))
-    (StormTimer/scheduleRecurring
+      (.scheduleRecurringWithJitter
+        (:refresh-load-timer worker) 0 1 500 refresh-load))
+    (.scheduleRecurring
+      (:refresh-connections-timer worker) 0 (conf TASK-REFRESH-POLL-SECS) refresh-connections)
+    (.scheduleRecurring
       (:reset-log-levels-timer worker) 0 (conf WORKER-LOG-LEVEL-RESET-POLL-SECS)
-      (reify StormTimer$TimerFunc
-        (^void run
-          [this ^Object o]
-          (reset-log-levels latest-log-config))))
-    (StormTimer/scheduleRecurring
-      (:refresh-active-timer worker) 0 (conf TASK-REFRESH-POLL-SECS)
-      (reify StormTimer$TimerFunc
-        (^void run
-          [this ^Object o]
-          ((partial refresh-storm-active worker)))))
+        (fn [] (reset-log-levels latest-log-config)))
+    (.scheduleRecurring
+      (:refresh-active-timer worker) 0 (conf TASK-REFRESH-POLL-SECS) (partial refresh-storm-active worker))
     (log-message "Worker has topology config " (Utils/redactValue (:storm-conf worker) STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD))
     (log-message "Worker " worker-id " for storm " storm-id " on " assignment-id ":" port " has finished loading")
     ret

http://git-wip-us.apache.org/repos/asf/storm/blob/2d06efe7/storm-core/src/jvm/org/apache/storm/StormTimer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/StormTimer.java b/storm-core/src/jvm/org/apache/storm/StormTimer.java
index df89dc6..a2d0145 100644
--- a/storm-core/src/jvm/org/apache/storm/StormTimer.java
+++ b/storm-core/src/jvm/org/apache/storm/StormTimer.java
@@ -26,7 +26,6 @@ import org.slf4j.LoggerFactory;
 import java.util.Comparator;
 import java.util.Random;
 import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -35,66 +34,52 @@ import java.util.concurrent.atomic.AtomicBoolean;
  * code that does asynchronous work on the timer thread
  */
 
-public class StormTimer {
+public class StormTimer implements AutoCloseable{
     private static final Logger LOG = LoggerFactory.getLogger(StormTimer.class);
 
-    public interface TimerFunc {
-        public void run(Object o);
-    }
-
     public static class QueueEntry {
         public final Long endTimeMs;
-        public final TimerFunc afn;
+        public final Runnable func;
         public final String id;
 
-        public QueueEntry(Long endTimeMs, TimerFunc afn, String id) {
+        public QueueEntry(Long endTimeMs, Runnable func, String id) {
             this.endTimeMs = endTimeMs;
-            this.afn = afn;
+            this.func = func;
             this.id = id;
         }
-
-        @Override
-        public String toString() {
-            return this.id + " " + this.endTimeMs + " " + this.afn;
-        }
     }
 
     public static class StormTimerTask extends Thread {
 
-        private PriorityBlockingQueue<QueueEntry> queue = new PriorityBlockingQueue<QueueEntry>(10, new Comparator() {
+        private PriorityBlockingQueue<QueueEntry> queue = new PriorityBlockingQueue<QueueEntry>(10, new Comparator<QueueEntry>() {
             @Override
-            public int compare(Object o1, Object o2) {
-                return ((QueueEntry)o1).endTimeMs.intValue() - ((QueueEntry)o2).endTimeMs.intValue();
+            public int compare(QueueEntry o1, QueueEntry o2) {
+                return o1.endTimeMs.intValue() - o2.endTimeMs.intValue();
             }
         });
 
+        // boolean to indicate whether timer is active
         private AtomicBoolean active = new AtomicBoolean(false);
 
-        private TimerFunc onKill;
+        // function to call when timer is killed
+        private Thread.UncaughtExceptionHandler onKill;
 
+        //random number generator
         private Random random = new Random();
 
-        private Semaphore cancelNotifier = new Semaphore(0);
-
-        private Object lock = new Object();
-
         @Override
         public void run() {
             while (this.active.get()) {
                 QueueEntry queueEntry = null;
                 try {
-                    synchronized (this.lock) {
-                        queueEntry = this.queue.peek();
-                    }
+                    queueEntry = this.queue.peek();
                     if ((queueEntry != null) && (Time.currentTimeMillis() >= queueEntry.endTimeMs)) {
                         // It is imperative to not run the function
                         // inside the timer lock. Otherwise, it is
                         // possible to deadlock if the fn deals with
                         // other locks, like the submit lock.
-                        synchronized (this.lock) {
-                            this.queue.poll();
-                        }
-                        queueEntry.afn.run(null);
+                        this.queue.remove(queueEntry);
+                        queueEntry.func.run();
                     } else if (queueEntry != null) {
                         //  If any events are scheduled, sleep until
                         // event generation. If any recurring events
@@ -113,18 +98,16 @@ public class StormTimer {
                         // events.
                         Time.sleep(1000);
                     }
-                } catch (Throwable t) {
-                    if (!(Utils.exceptionCauseIsInstanceOf(InterruptedException.class, t))) {
-                        this.onKill.run(t);
+                } catch (Throwable e) {
+                    if (!(Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e))) {
+                        this.onKill.uncaughtException(this, e);
                         this.setActive(false);
-                        throw new RuntimeException(t);
                     }
                 }
             }
-            this.cancelNotifier.release();
         }
 
-        public void setOnKillFunc(TimerFunc onKill) {
+        public void setOnKillFunc(Thread.UncaughtExceptionHandler onKill) {
             this.onKill = onKill;
         }
 
@@ -141,88 +124,120 @@ public class StormTimer {
         }
     }
 
-    public static StormTimerTask mkTimer(String name, TimerFunc onKill) {
+    //task to run
+    StormTimerTask task = new StormTimerTask();
+
+    /**
+     * Makes a Timer in the form of a StormTimerTask Object
+     * @param name name of the timer
+     * @param onKill function to call when timer is killed unexpectedly
+     * @return StormTimerTask object that was initialized
+     */
+    public StormTimer (String name, Thread.UncaughtExceptionHandler onKill) {
         if (onKill == null) {
             throw new RuntimeException("onKill func is null!");
         }
-        StormTimerTask task  = new StormTimerTask();
         if (name == null) {
-            task.setName("timer");
+            this.task.setName("timer");
         } else {
-            task.setName(name);
+            this.task.setName(name);
         }
-        task.setOnKillFunc(onKill);
-        task.setActive(true);
+        this.task.setOnKillFunc(onKill);
+        this.task.setActive(true);
 
-        task.setDaemon(true);
-        task.setPriority(Thread.MAX_PRIORITY);
-        task.start();
-        return task;
+        this.task.setDaemon(true);
+        this.task.setPriority(Thread.MAX_PRIORITY);
+        this.task.start();
     }
-    public static void schedule(StormTimerTask task, int delaySecs, TimerFunc afn, boolean checkActive, int jitterMs) {
-        if (task == null) {
+
+    /**
+     * Schedule a function to be executed in the timer
+     * @param delaySecs the number of seconds to delay before running the function
+     * @param func the function to run
+     * @param checkActive whether to check is the timer is active
+     * @param jitterMs add jitter to the run
+     */
+    public void schedule(int delaySecs, Runnable func, boolean checkActive, int jitterMs) {
+        if (this.task == null) {
             throw new RuntimeException("task is null!");
         }
-        if (afn == null) {
+        if (func == null) {
             throw new RuntimeException("function to schedule is null!");
         }
+        if (checkActive) {
+            checkActive();
+        }
         String id = Utils.uuid();
         long endTimeMs = Time.currentTimeMillis() + Time.secsToMillisLong(delaySecs);
         if (jitterMs > 0) {
-            endTimeMs = task.random.nextInt(jitterMs) + endTimeMs;
-        }
-        synchronized (task.lock) {
-            task.add(new QueueEntry(endTimeMs, afn, id));
+            endTimeMs = this.task.random.nextInt(jitterMs) + endTimeMs;
         }
+        task.add(new QueueEntry(endTimeMs, func, id));
     }
-    public static void schedule(StormTimerTask task, int delaySecs, TimerFunc afn) {
-        schedule(task, delaySecs, afn, true, 0);
+
+    public void schedule(int delaySecs, Runnable func) {
+        schedule(delaySecs, func, true, 0);
     }
 
-    public static void scheduleRecurring(final StormTimerTask task, int delaySecs, final int recurSecs, final TimerFunc afn) {
-        schedule(task, delaySecs, new TimerFunc() {
+    /**
+     * Schedule a function to run recurrently
+     * @param delaySecs the number of seconds to delay before running the function
+     * @param recurSecs the time between each invocation
+     * @param func the function to run
+     */
+    public void scheduleRecurring(int delaySecs, final int recurSecs, final Runnable func) {
+        schedule(delaySecs, new Runnable() {
             @Override
-            public void run(Object o) {
-                afn.run(null);
+            public void run() {
+                func.run();
                 // This avoids a race condition with cancel-timer.
-                schedule(task, recurSecs, this, false, 0);
+                schedule(recurSecs, this, false, 0);
             }
         });
     }
 
-    public static void scheduleRecurringWithJitter(final StormTimerTask task, int delaySecs, final int recurSecs, final int jitterMs, final TimerFunc afn) {
-        schedule(task, delaySecs, new TimerFunc() {
+    /**
+     * schedule a function to run recurrently with jitter
+     * @param delaySecs the number of seconds to delay before running the function
+     * @param recurSecs the time between each invocation
+     * @param jitterMs jitter added to the run
+     * @param func the function to run
+     */
+    public void scheduleRecurringWithJitter(int delaySecs, final int recurSecs, final int jitterMs, final Runnable func) {
+        schedule(delaySecs, new Runnable() {
             @Override
-            public void run(Object o) {
-                afn.run(null);
+            public void run() {
+                func.run();
                 // This avoids a race condition with cancel-timer.
-                schedule(task, recurSecs, this, false, jitterMs);
+                schedule(recurSecs, this, false, jitterMs);
             }
         });
     }
 
-    public static void checkActive(StormTimerTask task) {
-        if (task == null) {
-            throw new RuntimeException("task is null!");
-        }
-        if (!task.isActive()) {
+    /**
+     * check if timer is active
+     */
+    public void checkActive() {
+        if (!this.task.isActive()) {
             throw new IllegalStateException("Timer is not active");
         }
     }
 
-    public static void cancelTimer(StormTimerTask task) throws InterruptedException {
-        if (task == null) {
-            throw new RuntimeException("task is null!");
-        }
-        checkActive(task);
-        synchronized (task.lock) {
-            task.setActive(false);
-            task.interrupt();
-        }
-        task.cancelNotifier.acquire();
+    /**
+     * cancel timer
+     */
+
+    @Override
+    public void close() throws Exception {
+        checkActive();
+        this.task.setActive(false);
+        this.task.interrupt();
     }
 
-    public static boolean isTimerWaiting(StormTimerTask task) {
+    /**
+     * is timer waiting. Used in timer simulation
+     */
+    public boolean isTimerWaiting() {
         if (task == null) {
             throw new RuntimeException("task is null!");
         }