You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/02/18 18:09:39 UTC

[3/9] storm git commit: replacing clojure with java

replacing clojure with java


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4243e4e0
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4243e4e0
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4243e4e0

Branch: refs/heads/master
Commit: 4243e4e0cf9b9ea4c2aa8efa0a37a78120f2b687
Parents: 7f58252
Author: Boyang Jerry Peng <je...@yahoo-inc.com>
Authored: Thu Feb 18 10:35:46 2016 -0600
Committer: Boyang Jerry Peng <je...@yahoo-inc.com>
Committed: Thu Feb 18 10:35:46 2016 -0600

----------------------------------------------------------------------
 .../clj/org/apache/storm/daemon/executor.clj    |  47 +++-
 .../clj/org/apache/storm/daemon/logviewer.clj   |  30 ++-
 .../src/clj/org/apache/storm/daemon/nimbus.clj  | 136 +++++++---
 .../clj/org/apache/storm/daemon/supervisor.clj  | 176 +++++++++----
 .../src/clj/org/apache/storm/daemon/worker.clj  | 185 ++++++++++----
 storm-core/src/clj/org/apache/storm/timer.clj   | 254 +++++++++----------
 .../src/jvm/org/apache/storm/StormTimer.java    | 115 +++++----
 .../test/clj/org/apache/storm/nimbus_test.clj   |   4 +-
 .../clj/org/apache/storm/supervisor_test.clj    |   7 +-
 .../test/jvm/org/apache/storm/TestTimer.java    |  57 +++++
 10 files changed, 682 insertions(+), 329 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/4243e4e0/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 14a2f6e..f46f18b 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -17,7 +17,7 @@
   (:use [org.apache.storm.daemon common])
   (:import [org.apache.storm.generated Grouping Grouping$_Fields]
            [java.io Serializable])
-  (:use [org.apache.storm util config log timer stats])
+  (:use [org.apache.storm util config log stats])
   (:import [java.util List Random HashMap ArrayList LinkedList Map])
   (:import [org.apache.storm ICredentialsListener Thrift])
   (:import [org.apache.storm.hooks ITaskHook])
@@ -39,7 +39,8 @@
   (:import [java.lang Thread Thread$UncaughtExceptionHandler]
            [java.util.concurrent ConcurrentLinkedQueue]
            [org.json.simple JSONValue]
-           [com.lmax.disruptor.dsl ProducerType])
+           [com.lmax.disruptor.dsl ProducerType]
+           [org.apache.storm StormTimer StormTimer$TimerFunc])
   (:require [org.apache.storm [cluster :as cluster] [stats :as stats]])
   (:require [org.apache.storm.daemon [task :as task]])
   (:require [org.apache.storm.daemon.builtin-metrics :as builtin-metrics])
@@ -323,13 +324,23 @@
   (let [{:keys [storm-conf receive-queue worker-context interval->task->metric-registry]} executor-data
         distinct-time-bucket-intervals (keys interval->task->metric-registry)]
     (doseq [interval distinct-time-bucket-intervals]
-      (schedule-recurring 
-       (:user-timer (:worker executor-data)) 
-       interval
-       interval
-       (fn []
-         (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. worker-context [interval] Constants/SYSTEM_TASK_ID Constants/METRICS_TICK_STREAM_ID))]]
-           (.publish ^DisruptorQueue receive-queue val)))))))
+;      (schedule-recurring
+;       (:user-timer (:worker executor-data))
+;       interval
+;       interval
+;       (fn []
+;         (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. worker-context [interval] Constants/SYSTEM_TASK_ID Constants/METRICS_TICK_STREAM_ID))]]
+;           (disruptor/publish receive-queue val))))
+
+      (StormTimer/scheduleRecurring
+        (:user-timer (:worker executor-data))
+        interval
+        interval
+        (reify StormTimer$TimerFunc
+          (^void run
+            [this ^Object o]
+            (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. worker-context [interval] Constants/SYSTEM_TASK_ID Constants/METRICS_TICK_STREAM_ID))]]
+              (.publish ^DisruptorQueue receive-queue val))))))))
 
 (defn metrics-tick
   [executor-data task-data ^TupleImpl tuple]
@@ -364,13 +375,23 @@
               (and (= false (storm-conf TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS))
                    (= :spout (:type executor-data))))
         (log-message "Timeouts disabled for executor " (:component-id executor-data) ":" (:executor-id executor-data))
-        (schedule-recurring
+;        (schedule-recurring
+;          (:user-timer worker)
+;          tick-time-secs
+;          tick-time-secs
+;          (fn []
+;            (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID Constants/SYSTEM_TICK_STREAM_ID))]]
+;              (disruptor/publish receive-queue val))))
+
+        (StormTimer/scheduleRecurring
           (:user-timer worker)
           tick-time-secs
           tick-time-secs
-          (fn []
-            (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID Constants/SYSTEM_TICK_STREAM_ID))]]
-              (.publish ^DisruptorQueue receive-queue val))))))))
+          (reify StormTimer$TimerFunc
+            (^void run
+              [this ^Object o]
+              (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID Constants/SYSTEM_TICK_STREAM_ID))]]
+                (.publish ^DisruptorQueue receive-queue val)))))))))
 
 (defn mk-executor [worker executor-id initial-credentials]
   (let [executor-data (mk-executor-data worker executor-id)

http://git-wip-us.apache.org/repos/asf/storm/blob/4243e4e0/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj b/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
index 6ca1759..932a813 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
@@ -18,8 +18,9 @@
   (:use [clojure.set :only [difference intersection]])
   (:use [clojure.string :only [blank? split]])
   (:use [hiccup core page-helpers form-helpers])
-  (:use [org.apache.storm config util log timer])
+  (:use [org.apache.storm config util log])
   (:use [org.apache.storm.ui helpers])
+  (:import [org.apache.storm StormTimer StormTimer$TimerFunc])
   (:import [org.apache.storm.utils Utils Time VersionInfo ConfigUtils])
   (:import [org.slf4j LoggerFactory])
   (:import [java.util Arrays ArrayList HashSet])
@@ -263,13 +264,26 @@
   (let [interval-secs (conf LOGVIEWER-CLEANUP-INTERVAL-SECS)]
     (when interval-secs
       (log-debug "starting log cleanup thread at interval: " interval-secs)
-      (schedule-recurring (mk-timer :thread-name "logviewer-cleanup"
-                                    :kill-fn (fn [t]
-                                               (log-error t "Error when doing logs cleanup")
-                                               (Utils/exitProcess 20 "Error when doing log cleanup")))
-                          0 ;; Start immediately.
-                          interval-secs
-                          (fn [] (cleanup-fn! log-root-dir))))))
+;      (schedule-recurring (mk-timer :thread-name "logviewer-cleanup"
+;                                    :kill-fn (fn [t]
+;                                               (log-error t "Error when doing logs cleanup")
+;                                               (Utils/exitProcess 20 "Error when doing log cleanup")))
+;                          0 ;; Start immediately.
+;                          interval-secs
+;                          (fn [] (cleanup-fn! log-root-dir)))
+
+      (StormTimer/scheduleRecurring
+        (StormTimer/mkTimer "logviewer-cleanup"
+          (reify StormTimer$TimerFunc
+            (^void run
+              [this ^Object t]
+              (log-error t "Error when doing logs cleanup")
+              (Utils/exitProcess 20 "Error when doing log cleanup"))))
+        0 interval-secs
+        (reify StormTimer$TimerFunc
+          (^void run
+            [this ^Object o]
+            (cleanup-fn! log-root-dir)))))))
 
 (defn- skip-bytes
   "FileInputStream#skip may not work the first time, so ensure it successfully

http://git-wip-us.apache.org/repos/asf/storm/blob/4243e4e0/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 710cd83..d6413db 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -50,7 +50,7 @@
   (:import [org.apache.storm.daemon Shutdownable])
   (:import [org.apache.storm.validation ConfigValidation])
   (:import [org.apache.storm.cluster ClusterStateContext DaemonType])
-  (:use [org.apache.storm util config log timer zookeeper local-state])
+  (:use [org.apache.storm util config log zookeeper local-state])
   (:require [org.apache.storm [cluster :as cluster]
                             [converter :as converter]
                             [stats :as stats]])
@@ -66,6 +66,7 @@
   (:require [clj-time.coerce :as coerce])
   (:require [metrics.meters :refer [defmeter mark!]])
   (:require [metrics.gauges :refer [defgauge]])
+  (:import [org.apache.storm StormTimer StormTimer$TimerFunc])
   (:gen-class
     :methods [^{:static true} [launch [org.apache.storm.scheduler.INimbus] void]]))
 
@@ -193,10 +194,17 @@
      :blob-listers (mk-bloblist-cache-map conf)
      :uptime (Utils/makeUptimeComputer)
      :validator (Utils/newInstance (conf NIMBUS-TOPOLOGY-VALIDATOR))
-     :timer (mk-timer :kill-fn (fn [t]
-                                 (log-error t "Error when processing event")
-                                 (Utils/exitProcess 20 "Error when processing an event")
-                                 ))
+;     :timer (mk-timer :kill-fn (fn [t]
+;                                 (log-error t "Error when processing event")
+;                                 (Utils/exitProcess 20 "Error when processing an event")
+;                                 ))
+     :timer (StormTimer/mkTimer nil
+              (reify StormTimer$TimerFunc
+                (^void run
+                  [this ^Object t]
+                  (log-error t "Error when processing event")
+                  (Utils/exitProcess 20 "Error when processing an event"))))
+
      :scheduler (mk-scheduler conf inimbus)
      :leader-elector (Zookeeper/zkLeaderElector conf)
      :id->sched-status (atom {})
@@ -379,10 +387,17 @@
 
 (defn delay-event [nimbus storm-id delay-secs event]
   (log-message "Delaying event " event " for " delay-secs " secs for " storm-id)
-  (schedule (:timer nimbus)
-            delay-secs
-            #(transition! nimbus storm-id event false)
-            ))
+;  (schedule (:timer nimbus)
+;            delay-secs
+;            #(transition! nimbus storm-id event false)
+;            )
+  (StormTimer/schedule
+    (:timer nimbus)
+    delay-secs
+    (reify StormTimer$TimerFunc
+      (^void run
+        [this ^Object o]
+        (transition! nimbus storm-id event false)))))
 
 ;; active -> reassign in X secs
 
@@ -1442,39 +1457,83 @@
     (when (is-leader nimbus :throw-exception false)
       (doseq [storm-id (.active-storms (:storm-cluster-state nimbus))]
         (transition! nimbus storm-id :startup)))
-    (schedule-recurring (:timer nimbus)
-                        0
-                        (conf NIMBUS-MONITOR-FREQ-SECS)
-                        (fn []
-                          (when-not (conf ConfigUtils/NIMBUS_DO_NOT_REASSIGN)
-                            (locking (:submit-lock nimbus)
-                              (mk-assignments nimbus)))
-                          (do-cleanup nimbus)))
+;    (schedule-recurring (:timer nimbus)
+;                        0
+;                        (conf NIMBUS-MONITOR-FREQ-SECS)
+;                        (fn []
+;                          (when-not (conf ConfigUtils/NIMBUS_DO_NOT_REASSIGN)
+;                            (locking (:submit-lock nimbus)
+;                              (mk-assignments nimbus)))
+;                          (do-cleanup nimbus)))
+    (StormTimer/scheduleRecurring
+      (:timer nimbus)
+      0
+      (conf NIMBUS-MONITOR-FREQ-SECS)
+      (reify StormTimer$TimerFunc
+        (^void run
+          [this ^Object o]
+          (when-not (conf ConfigUtils/NIMBUS_DO_NOT_REASSIGN)
+            (locking (:submit-lock nimbus)
+              (mk-assignments nimbus)))
+          (do-cleanup nimbus))))
     ;; Schedule Nimbus inbox cleaner
-    (schedule-recurring (:timer nimbus)
-                        0
-                        (conf NIMBUS-CLEANUP-INBOX-FREQ-SECS)
-                        (fn []
-                          (clean-inbox (inbox nimbus) (conf NIMBUS-INBOX-JAR-EXPIRATION-SECS))))
+;    (schedule-recurring (:timer nimbus)
+;                        0
+;                        (conf NIMBUS-CLEANUP-INBOX-FREQ-SECS)
+;                        (fn []
+;                          (clean-inbox (inbox nimbus) (conf NIMBUS-INBOX-JAR-EXPIRATION-SECS))))
+
+    (StormTimer/scheduleRecurring
+      (:timer nimbus)
+      0
+      (conf NIMBUS-CLEANUP-INBOX-FREQ-SECS)
+      (reify StormTimer$TimerFunc
+        (^void run
+          [this ^Object o]
+          (clean-inbox (inbox nimbus) (conf NIMBUS-INBOX-JAR-EXPIRATION-SECS)))))
     ;; Schedule nimbus code sync thread to sync code from other nimbuses.
     (if (instance? LocalFsBlobStore blob-store)
-      (schedule-recurring (:timer nimbus)
-                          0
-                          (conf NIMBUS-CODE-SYNC-FREQ-SECS)
-                          (fn []
-                            (blob-sync conf nimbus))))
+;      (schedule-recurring (:timer nimbus)
+;                          0
+;                          (conf NIMBUS-CODE-SYNC-FREQ-SECS)
+;                          (fn []
+;                            (blob-sync conf nimbus)))
+      (StormTimer/scheduleRecurring
+        (:timer nimbus)
+        0
+        (conf NIMBUS-CODE-SYNC-FREQ-SECS)
+        (reify StormTimer$TimerFunc
+          (^void run
+            [this ^Object t]
+            (blob-sync conf nimbus)))))
     ;; Schedule topology history cleaner
     (when-let [interval (conf LOGVIEWER-CLEANUP-INTERVAL-SECS)]
-      (schedule-recurring (:timer nimbus)
+;      (schedule-recurring (:timer nimbus)
+;        0
+;        (conf LOGVIEWER-CLEANUP-INTERVAL-SECS)
+;        (fn []
+;          (clean-topology-history (conf LOGVIEWER-CLEANUP-AGE-MINS) nimbus)))
+      (StormTimer/scheduleRecurring
+        (:timer nimbus)
         0
         (conf LOGVIEWER-CLEANUP-INTERVAL-SECS)
-        (fn []
-          (clean-topology-history (conf LOGVIEWER-CLEANUP-AGE-MINS) nimbus))))
-    (schedule-recurring (:timer nimbus)
-                        0
-                        (conf NIMBUS-CREDENTIAL-RENEW-FREQ-SECS)
-                        (fn []
-                          (renew-credentials nimbus)))
+        (reify StormTimer$TimerFunc
+          (^void run
+            [this ^Object t]
+            (clean-topology-history (conf LOGVIEWER-CLEANUP-AGE-MINS) nimbus)))))
+;    (schedule-recurring (:timer nimbus)
+;                        0
+;                        (conf NIMBUS-CREDENTIAL-RENEW-FREQ-SECS)
+;                        (fn []
+;                          (renew-credentials nimbus)))
+    (StormTimer/scheduleRecurring
+      (:timer nimbus)
+      0
+      (conf NIMBUS-CREDENTIAL-RENEW-FREQ-SECS)
+      (reify StormTimer$TimerFunc
+        (^void run
+          [this ^Object t]
+          (renew-credentials nimbus))))
 
     (defgauge nimbus:num-supervisors
       (fn [] (.size (.supervisors (:storm-cluster-state nimbus) nil))))
@@ -2206,7 +2265,8 @@
       (shutdown [this]
         (mark! nimbus:num-shutdown-calls)
         (log-message "Shutting down master")
-        (cancel-timer (:timer nimbus))
+        ;(cancel-timer (:timer nimbus))
+        (StormTimer/cancelTimer (:timer nimbus))
         (.disconnect (:storm-cluster-state nimbus))
         (.cleanup (:downloaders nimbus))
         (.cleanup (:uploaders nimbus))
@@ -2216,7 +2276,9 @@
         (log-message "Shut down master"))
       DaemonCommon
       (waiting? [this]
-        (timer-waiting? (:timer nimbus))))))
+;        (timer-waiting? (:timer nimbus))
+        (StormTimer/isTimerWaiting (:timer nimbus))
+        ))))
 
 (defn validate-port-available[conf]
   (try

http://git-wip-us.apache.org/repos/asf/storm/blob/4243e4e0/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
index 4b4bac3..56184aa 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
@@ -24,7 +24,7 @@
            [java.net JarURLConnection]
            [java.net URI URLDecoder]
            [org.apache.commons.io FileUtils])
-  (:use [org.apache.storm config util log timer local-state])
+  (:use [org.apache.storm config util log local-state])
   (:import [org.apache.storm.generated AuthorizationException KeyNotFoundException WorkerResources])
   (:import [org.apache.storm.utils NimbusLeaderNotFoundException VersionInfo])
   (:import [java.nio.file Files StandardCopyOption])
@@ -42,6 +42,7 @@
            [org.yaml.snakeyaml.constructor SafeConstructor])
   (:require [metrics.gauges :refer [defgauge]])
   (:require [metrics.meters :refer [defmeter mark!]])
+  (:import [org.apache.storm StormTimer StormTimer$TimerFunc])
   (:gen-class
     :methods [^{:static true} [launch [org.apache.storm.scheduler.ISupervisor] void]])
   (:require [clojure.string :as str]))
@@ -335,19 +336,41 @@
    :assignment-id (.getAssignmentId isupervisor)
    :my-hostname (Utils/hostname conf)
    :curr-assignment (atom nil) ;; used for reporting used ports when heartbeating
-   :heartbeat-timer (mk-timer :kill-fn (fn [t]
-                               (log-error t "Error when processing event")
-                               (Utils/exitProcess 20 "Error when processing an event")
-                               ))
-   :event-timer (mk-timer :kill-fn (fn [t]
-                                         (log-error t "Error when processing event")
-                                         (Utils/exitProcess 20 "Error when processing an event")
-                                         ))
-   :blob-update-timer (mk-timer :kill-fn (defn blob-update-timer
-                                           [t]
-                                           (log-error t "Error when processing event")
-                                           (Utils/exitProcess 20 "Error when processing a event"))
-                                :timer-name "blob-update-timer")
+;   :heartbeat-timer (mk-timer :kill-fn (fn [t]
+;                               (log-error t "Error when processing event")
+;                               (Utils/exitProcess 20 "Error when processing an event")
+;                               ))
+
+   :heartbeat-timer (StormTimer/mkTimer nil
+                      (reify StormTimer$TimerFunc
+                        (^void run
+                          [this ^Object t]
+                          (log-error t "Error when processing event")
+                          (Utils/exitProcess 20 "Error when processing an event"))))
+;   :event-timer (mk-timer :kill-fn (fn [t]
+;                                         (log-error t "Error when processing event")
+;                                         (Utils/exitProcess 20 "Error when processing an event")
+;                                         ))
+
+   :event-timer (StormTimer/mkTimer nil
+                  (reify StormTimer$TimerFunc
+                    (^void run
+                      [this ^Object t]
+                      (log-error t "Error when processing event")
+                      (Utils/exitProcess 20 "Error when processing an event"))))
+
+;   :blob-update-timer (mk-timer :kill-fn (defn blob-update-timer
+;                                           [t]
+;                                           (log-error t "Error when processing event")
+;                                           (Utils/exitProcess 20 "Error when processing a event"))
+;                                :timer-name "blob-update-timer")
+
+   :blob-update-timer (StormTimer/mkTimer "blob-update-timer"
+                        (reify StormTimer$TimerFunc
+                          (^void run
+                            [this ^Object t]
+                            (log-error t "Error when processing event")
+                            (Utils/exitProcess 20 "Error when processing an event"))))
    :localizer (Utils/createLocalizer conf (ConfigUtils/supervisorLocalDir conf))
    :assignment-versions (atom {})
    :sync-retry (atom 0)
@@ -815,10 +838,19 @@
     (heartbeat-fn)
 
     ;; should synchronize supervisor so it doesn't launch anything after being down (optimization)
-    (schedule-recurring (:heartbeat-timer supervisor)
-                        0
-                        (conf SUPERVISOR-HEARTBEAT-FREQUENCY-SECS)
-                        heartbeat-fn)
+;    (schedule-recurring (:heartbeat-timer supervisor)
+;                        0
+;                        (conf SUPERVISOR-HEARTBEAT-FREQUENCY-SECS)
+;                        heartbeat-fn)
+    (StormTimer/scheduleRecurring
+      (:heartbeat-timer supervisor)
+      0
+      (conf SUPERVISOR-HEARTBEAT-FREQUENCY-SECS)
+      (reify StormTimer$TimerFunc
+        (^void run
+          [this ^Object o]
+          (heartbeat-fn))))
+
     (doseq [storm-id downloaded-storm-ids]
       (add-blob-references (:localizer supervisor) storm-id
         conf))
@@ -828,43 +860,91 @@
     (when (conf SUPERVISOR-ENABLE)
       ;; This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up
       ;; to date even if callbacks don't all work exactly right
-      (schedule-recurring (:event-timer supervisor) 0 10 (fn [] (.add event-manager synchronize-supervisor)))
-      (schedule-recurring (:event-timer supervisor)
-                          0
-                          (conf SUPERVISOR-MONITOR-FREQUENCY-SECS)
-                          (fn [] (.add processes-event-manager sync-processes)))
+;      (schedule-recurring (:event-timer supervisor) 0 10 (fn [] (.add event-manager synchronize-supervisor)))
+      (StormTimer/scheduleRecurring
+        (:event-timer supervisor)
+        0 10
+        (reify StormTimer$TimerFunc
+          (^void run
+            [this ^Object o]
+            (.add event-manager synchronize-supervisor))))
+;      (schedule-recurring (:event-timer supervisor)
+;                          0
+;                          (conf SUPERVISOR-MONITOR-FREQUENCY-SECS)
+;                          (fn [] (.add processes-event-manager sync-processes)))
+
+      (StormTimer/scheduleRecurring
+        (:event-timer supervisor)
+        0
+        (conf SUPERVISOR-MONITOR-FREQUENCY-SECS)
+        (reify StormTimer$TimerFunc
+          (^void run
+            [this ^Object o]
+            (.add processes-event-manager sync-processes))))
 
       ;; Blob update thread. Starts with 30 seconds delay, every 30 seconds
-      (schedule-recurring (:blob-update-timer supervisor)
-                          30
-                          30
-                          (fn [] (.add event-manager synchronize-blobs-fn)))
-
-      (schedule-recurring (:event-timer supervisor)
-                          (* 60 5)
-                          (* 60 5)
-                          (fn [] (let [health-code (healthcheck/health-check conf)
-                                       ids (my-worker-ids conf)]
-                                   (if (not (= health-code 0))
-                                     (do
-                                       (doseq [id ids]
-                                         (shutdown-worker supervisor id))
-                                       (throw (RuntimeException. "Supervisor failed health check. Exiting.")))))))
+;      (schedule-recurring (:blob-update-timer supervisor)
+;                          30
+;                          30
+;                          (fn [] (.add event-manager synchronize-blobs-fn)))
+      (StormTimer/scheduleRecurring
+        (:blob-update-timer supervisor)
+        30 30
+        (reify StormTimer$TimerFunc
+          (^void run
+            [this ^Object o]
+            (.add event-manager synchronize-blobs-fn))))
+
+;      (schedule-recurring (:event-timer supervisor)
+;                          (* 60 5)
+;                          (* 60 5)
+;                          (fn [] (let [health-code (healthcheck/health-check conf)
+;                                       ids (my-worker-ids conf)]
+;                                   (if (not (= health-code 0))
+;                                     (do
+;                                       (doseq [id ids]
+;                                         (shutdown-worker supervisor id))
+;                                       (throw (RuntimeException. "Supervisor failed health check. Exiting.")))))))
+      (StormTimer/scheduleRecurring
+        (:event-timer supervisor)
+        (* 60 5) (* 60 5)
+        (reify StormTimer$TimerFunc
+          (^void run
+            [this ^Object o]
+            (let [health-code (healthcheck/health-check conf)
+                  ids (my-worker-ids conf)]
+              (if (not (= health-code 0))
+                (do
+                  (doseq [id ids]
+                    (shutdown-worker supervisor id))
+                  (throw (RuntimeException. "Supervisor failed health check. Exiting."))))))))
+
 
       ;; Launch a thread that Runs profiler commands . Starts with 30 seconds delay, every 30 seconds
-      (schedule-recurring (:event-timer supervisor)
-                          30
-                          30
-                          (fn [] (.add event-manager run-profiler-actions-fn))))
+;      (schedule-recurring (:event-timer supervisor)
+;                          30
+;                          30
+;                          (fn [] (.add event-manager run-profiler-actions-fn))))
+      (StormTimer/scheduleRecurring
+        (:event-timer supervisor)
+        30 30
+        (reify StormTimer$TimerFunc
+          (^void run
+            [this ^Object o]
+            (.add event-manager run-profiler-actions-fn)))))
+
     (log-message "Starting supervisor with id " (:supervisor-id supervisor) " at host " (:my-hostname supervisor))
     (reify
      Shutdownable
      (shutdown [this]
                (log-message "Shutting down supervisor " (:supervisor-id supervisor))
                (reset! (:active supervisor) false)
-               (cancel-timer (:heartbeat-timer supervisor))
-               (cancel-timer (:event-timer supervisor))
-               (cancel-timer (:blob-update-timer supervisor))
+               ;(cancel-timer (:heartbeat-timer supervisor))
+               (StormTimer/cancelTimer (:heartbeat-timer supervisor))
+               ;(cancel-timer (:event-timer supervisor))
+               (StormTimer/cancelTimer (:event-timer supervisor))
+               ;(cancel-timer (:blob-update-timer supervisor))
+               (StormTimer/cancelTimer (:blob-update-timer supervisor))
                (.shutdown event-manager)
                (.shutdown processes-event-manager)
                (.shutdown (:localizer supervisor))
@@ -883,8 +963,10 @@
      (waiting? [this]
        (or (not @(:active supervisor))
            (and
-            (timer-waiting? (:heartbeat-timer supervisor))
-            (timer-waiting? (:event-timer supervisor))
+            ;(timer-waiting? (:heartbeat-timer supervisor))
+            (StormTimer/isTimerWaiting (:heartbeat-timer supervisor))
+            ;(timer-waiting? (:event-timer supervisor))
+            (StormTimer/isTimerWaiting (:event-timer supervisor))
             (every? (memfn waiting?) managers)))
            ))))
 

http://git-wip-us.apache.org/repos/asf/storm/blob/4243e4e0/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index 83ae9be..9212506 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -15,7 +15,7 @@
 ;; limitations under the License.
 (ns org.apache.storm.daemon.worker
   (:use [org.apache.storm.daemon common])
-  (:use [org.apache.storm config log util timer local-state])
+  (:use [org.apache.storm config log util local-state])
   (:require [clj-time.core :as time])
   (:require [clj-time.coerce :as coerce])
   (:require [org.apache.storm.daemon [executor :as executor]])
@@ -45,6 +45,7 @@
   (:import [org.apache.logging.log4j Level])
   (:import [org.apache.logging.log4j.core.config LoggerConfig])
   (:import [org.apache.storm.generated LogConfig LogLevelAction])
+  (:import [org.apache.storm StormTimer StormTimer$TimerFunc])
   (:gen-class))
 
 (defmulti mk-suicide-fn cluster-mode)
@@ -238,11 +239,17 @@
   {})
 
 (defn mk-halting-timer [timer-name]
-  (mk-timer :kill-fn (fn [t]
-                       (log-error t "Error when processing event")
-                       (Utils/exitProcess 20 "Error when processing an event")
-                       )
-            :timer-name timer-name))
+;  (mk-timer :kill-fn (fn [t]
+;                       (log-error t "Error when processing event")
+;                       (Utils/exitProcess 20 "Error when processing an event")
+;                       )
+;            :timer-name timer-name)
+  (StormTimer/mkTimer timer-name
+    (reify StormTimer$TimerFunc
+      (^void run
+        [this ^Object t]
+        (log-error t "Error when processing event")
+        (Utils/exitProcess 20 "Error when processing an event")))))
 
 (defn worker-data [conf mq-context storm-id assignment-id port worker-id storm-conf cluster-state storm-cluster-state]
   (let [assignment-versions (atom {})
@@ -374,9 +381,16 @@
         conf (:conf worker)
         storm-cluster-state (:storm-cluster-state worker)
         storm-id (:storm-id worker)]
-    (fn this
+    (fn refresh-connections
       ([]
-        (this (fn [& ignored] (schedule (:refresh-connections-timer worker) 0 this))))
+        (refresh-connections (fn [& ignored]
+;                (schedule (:refresh-connections-timer worker) 0 refresh-connections)
+                (StormTimer/schedule
+                  (:refresh-connections-timer worker) 0
+                  (reify StormTimer$TimerFunc
+                    (^void run
+                      [this ^Object o]
+                      (refresh-connections)))))))
       ([callback]
          (let [version (.assignment-version storm-cluster-state storm-id callback)
                assignment (if (= version (:version (get @(:assignment-versions worker) storm-id)))
@@ -427,7 +441,15 @@
 
 (defn refresh-storm-active
   ([worker]
-    (refresh-storm-active worker (fn [& ignored] (schedule (:refresh-active-timer worker) 0 (partial refresh-storm-active worker)))))
+    (refresh-storm-active
+      worker (fn [& ignored]
+;               (schedule (:refresh-active-timer worker) 0 (partial refresh-storm-active worker))
+               (StormTimer/schedule
+                 (:refresh-active-timer worker) 0
+                 (reify StormTimer$TimerFunc
+                   (^void run
+                     [this ^Object o]
+                     ((partial refresh-storm-active worker))))))))
   ([worker callback]
     (let [base (.storm-base (:storm-cluster-state worker) (:storm-id worker) callback)]
       (reset!
@@ -474,16 +496,28 @@
   (let [timer (:refresh-active-timer worker)
         delay-secs 0
         recur-secs 1]
-    (schedule timer
+;    (schedule timer
+;      delay-secs
+;      (fn this []
+;        (if (all-connections-ready worker)
+;          (do
+;            (log-message "All connections are ready for worker " (:assignment-id worker) ":" (:port worker)
+;              " with id "(:worker-id worker))
+;            (reset! (:worker-active-flag worker) true))
+;          (schedule timer recur-secs this :check-active false)
+;            )))
+
+    (StormTimer/schedule timer
       delay-secs
-      (fn this []
-        (if (all-connections-ready worker)
-          (do
-            (log-message "All connections are ready for worker " (:assignment-id worker) ":" (:port worker)
-              " with id "(:worker-id worker))
-            (reset! (:worker-active-flag worker) true))
-          (schedule timer recur-secs this :check-active false)
-            )))))
+      (reify StormTimer$TimerFunc
+        (^void run
+          [this ^Object o]
+          (if (all-connections-ready worker)
+            (do
+              (log-message "All connections are ready for worker " (:assignment-id worker) ":" (:port worker)
+                " with id " (:worker-id worker))
+              (reset! (:worker-active-flag worker) true))
+            (StormTimer/schedule timer recur-secs this false 0)))))))
 
 (defn register-callbacks [worker]
   (let [transfer-local-fn (:transfer-local-fn worker)
@@ -638,8 +672,22 @@
         executors (atom nil)
         ;; launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout
         ;; to the supervisor
-        _ (schedule-recurring (:heartbeat-timer worker) 0 (conf WORKER-HEARTBEAT-FREQUENCY-SECS) heartbeat-fn)
-        _ (schedule-recurring (:executor-heartbeat-timer worker) 0 (conf TASK-HEARTBEAT-FREQUENCY-SECS) #(do-executor-heartbeats worker :executors @executors))
+;        _ (schedule-recurring (:heartbeat-timer worker) 0 (conf WORKER-HEARTBEAT-FREQUENCY-SECS) heartbeat-fn)
+        _ (StormTimer/scheduleRecurring
+            (:heartbeat-timer worker) 0 (conf WORKER-HEARTBEAT-FREQUENCY-SECS)
+            (reify StormTimer$TimerFunc
+              (^void run
+                [this ^Object o]
+                (heartbeat-fn))))
+
+;                _ (schedule-recurring (:executor-heartbeat-timer worker) 0 (conf TASK-HEARTBEAT-FREQUENCY-SECS) #(do-executor-heartbeats worker :executors @executors))
+
+        _ (StormTimer/scheduleRecurring
+            (:executor-heartbeat-timer worker) 0 (conf TASK-HEARTBEAT-FREQUENCY-SECS)
+            (reify StormTimer$TimerFunc
+              (^void run
+                [this ^Object o]
+                (do-executor-heartbeats worker :executors @executors))))
 
         _ (register-callbacks worker)
 
@@ -700,14 +748,22 @@
                     (.interrupt backpressure-thread)
                     (.join backpressure-thread)
                     (log-message "Shut down backpressure thread")
-                    (cancel-timer (:heartbeat-timer worker))
-                    (cancel-timer (:refresh-connections-timer worker))
-                    (cancel-timer (:refresh-credentials-timer worker))
-                    (cancel-timer (:refresh-active-timer worker))
-                    (cancel-timer (:executor-heartbeat-timer worker))
-                    (cancel-timer (:user-timer worker))
-                    (cancel-timer (:refresh-load-timer worker))
-
+;                    (cancel-timer (:heartbeat-timer worker))
+                    (StormTimer/cancelTimer (:heartbeat-timer worker))
+;                    (cancel-timer (:refresh-connections-timer worker))
+                    (StormTimer/cancelTimer (:refresh-connections-timer worker))
+;                    (cancel-timer (:refresh-credentials-timer worker))
+                    (StormTimer/cancelTimer (:refresh-credentials-timer worker))
+;                    (cancel-timer (:refresh-active-timer worker))
+                    (StormTimer/cancelTimer (:refresh-active-timer worker))
+;                    (cancel-timer (:executor-heartbeat-timer worker))
+                    (StormTimer/cancelTimer (:executor-heartbeat-timer worker))
+;                    (cancel-timer (:user-timer worker))
+                    (StormTimer/cancelTimer (:user-timer worker))
+;                    (cancel-timer (:refresh-load-timer worker))
+                    (StormTimer/cancelTimer (:refresh-load-timer worker))
+
+                    (StormTimer/cancelTimer (:reset-log-levels-timer worker))
                     (close-resources worker)
 
                     (log-message "Trigger any worker shutdown hooks")
@@ -726,13 +782,20 @@
              DaemonCommon
              (waiting? [this]
                (and
-                 (timer-waiting? (:heartbeat-timer worker))
-                 (timer-waiting? (:refresh-connections-timer worker))
-                 (timer-waiting? (:refresh-load-timer worker))
-                 (timer-waiting? (:refresh-credentials-timer worker))
-                 (timer-waiting? (:refresh-active-timer worker))
-                 (timer-waiting? (:executor-heartbeat-timer worker))
-                 (timer-waiting? (:user-timer worker))
+;                 (timer-waiting? (:heartbeat-timer worker))
+;                 (timer-waiting? (:refresh-connections-timer worker))
+;                 (timer-waiting? (:refresh-load-timer worker))
+;                 (timer-waiting? (:refresh-credentials-timer worker))
+;                 (timer-waiting? (:refresh-active-timer worker))
+;                 (timer-waiting? (:executor-heartbeat-timer worker))
+;                 (timer-waiting? (:user-timer worker))
+                 (StormTimer/isTimerWaiting (:heartbeat-timer worker))
+                 (StormTimer/isTimerWaiting (:refresh-connections-timer worker))
+                 (StormTimer/isTimerWaiting (:refresh-load-timer worker))
+                 (StormTimer/isTimerWaiting (:refresh-credentials-timer worker))
+                 (StormTimer/isTimerWaiting (:refresh-active-timer worker))
+                 (StormTimer/isTimerWaiting (:executor-heartbeat-timer worker))
+                 (StormTimer/isTimerWaiting (:user-timer worker))
                  ))
              )
         credentials (atom initial-credentials)
@@ -760,18 +823,50 @@
 
     (establish-log-setting-callback)
     (.credentials (:storm-cluster-state worker) storm-id (fn [args] (check-credentials-changed)))
-    (schedule-recurring (:refresh-credentials-timer worker) 0 (conf TASK-CREDENTIALS-POLL-SECS)
-                        (fn [& args]
-                          (check-credentials-changed)
-                          (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
-                            (check-throttle-changed))))
+;    (schedule-recurring (:refresh-credentials-timer worker) 0 (conf TASK-CREDENTIALS-POLL-SECS)
+;                        (fn [& args]
+;                          (check-credentials-changed)
+;                          (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
+;                            (check-throttle-changed))))
+
+    (StormTimer/scheduleRecurring
+      (:refresh-credentials-timer worker) 0 (conf TASK-CREDENTIALS-POLL-SECS)
+      (reify StormTimer$TimerFunc
+        (^void run
+          [this ^Object o]
+          (check-credentials-changed)
+          (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
+            (check-throttle-changed)))))
     ;; The jitter allows the clients to get the data at different times, and avoids thundering herd
     (when-not (.get conf TOPOLOGY-DISABLE-LOADAWARE-MESSAGING)
-      (schedule-recurring-with-jitter (:refresh-load-timer worker) 0 1 500 refresh-load))
-    (schedule-recurring (:refresh-connections-timer worker) 0 (conf TASK-REFRESH-POLL-SECS) refresh-connections)
-    (schedule-recurring (:reset-log-levels-timer worker) 0 (conf WORKER-LOG-LEVEL-RESET-POLL-SECS) (fn [] (reset-log-levels latest-log-config)))
-    (schedule-recurring (:refresh-active-timer worker) 0 (conf TASK-REFRESH-POLL-SECS) (partial refresh-storm-active worker))
-
+;      (schedule-recurring-with-jitter (:refresh-load-timer worker) 0 1 500 refresh-load)
+      (StormTimer/scheduleRecurringWithJitter
+        (:refresh-load-timer worker) 0 1 500
+        (reify StormTimer$TimerFunc
+          (^void run
+            [this ^Object o]
+            (refresh-load)))))
+;    (schedule-recurring (:refresh-connections-timer worker) 0 (conf TASK-REFRESH-POLL-SECS) refresh-connections)
+    (StormTimer/scheduleRecurring
+      (:refresh-connections-timer worker) 0 (conf TASK-REFRESH-POLL-SECS)
+      (reify StormTimer$TimerFunc
+        (^void run
+          [this ^Object o]
+          (refresh-connections))))
+;    (schedule-recurring (:reset-log-levels-timer worker) 0 (conf WORKER-LOG-LEVEL-RESET-POLL-SECS) (fn [] (reset-log-levels latest-log-config)))
+    (StormTimer/scheduleRecurring
+      (:reset-log-levels-timer worker) 0 (conf WORKER-LOG-LEVEL-RESET-POLL-SECS)
+      (reify StormTimer$TimerFunc
+        (^void run
+          [this ^Object o]
+          (reset-log-levels latest-log-config))))
+;     (schedule-recurring (:refresh-active-timer worker) 0 (conf TASK-REFRESH-POLL-SECS) (partial refresh-storm-active worker))
+    (StormTimer/scheduleRecurring
+      (:refresh-active-timer worker) 0 (conf TASK-REFRESH-POLL-SECS)
+      (reify StormTimer$TimerFunc
+        (^void run
+          [this ^Object o]
+          ((partial refresh-storm-active worker)))))
     (log-message "Worker has topology config " (Utils/redactValue (:storm-conf worker) STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD))
     (log-message "Worker " worker-id " for storm " storm-id " on " assignment-id ":" port " has finished loading")
     ret

http://git-wip-us.apache.org/repos/asf/storm/blob/4243e4e0/storm-core/src/clj/org/apache/storm/timer.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/timer.clj b/storm-core/src/clj/org/apache/storm/timer.clj
index 5f31032..27853c2 100644
--- a/storm-core/src/clj/org/apache/storm/timer.clj
+++ b/storm-core/src/clj/org/apache/storm/timer.clj
@@ -1,128 +1,128 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
+; Licensed to the Apache Software Foundation (ASF) under one
+; or more contributor license agreements.  See the NOTICE file
+; distributed with this work for additional information
+; regarding copyright ownership.  The ASF licenses this file
+; to you under the Apache License, Version 2.0 (the
+; "License"); you may not use this file except in compliance
+; with the License.  You may obtain a copy of the License at
+;
+; http://www.apache.org/licenses/LICENSE-2.0
+;
+; Unless required by applicable law or agreed to in writing, software
+; distributed under the License is distributed on an "AS IS" BASIS,
+; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+; See the License for the specific language governing permissions and
+; limitations under the License.
 
-(ns org.apache.storm.timer
-  (:import [org.apache.storm.utils Utils Time])
-  (:import [java.util PriorityQueue Comparator Random])
-  (:import [java.util.concurrent Semaphore])
-  (:use [org.apache.storm util log]))
-
-;; The timer defined in this file is very similar to java.util.Timer, except
-;; it integrates with Storm's time simulation capabilities. This lets us test
-;; code that does asynchronous work on the timer thread
-
-(defnk mk-timer [:kill-fn (fn [& _] ) :timer-name nil]
-  (let [queue (PriorityQueue. 10 (reify Comparator
-                                   (compare
-                                     [this o1 o2]
-                                     (- (first o1) (first o2)))
-                                   (equals
-                                     [this obj]
-                                     true)))
-        active (atom true)
-        lock (Object.)
-        notifier (Semaphore. 0)
-        thread-name (if timer-name timer-name "timer")
-        timer-thread (Thread.
-                       (fn []
-                         (while @active
-                           (try
-                             (let [[time-millis _ _ :as elem] (locking lock (.peek queue))]
-                               (if (and elem (>= (Time/currentTimeMillis) time-millis))
-                                 ;; It is imperative to not run the function
-                                 ;; inside the timer lock. Otherwise, it is
-                                 ;; possible to deadlock if the fn deals with
-                                 ;; other locks, like the submit lock.
-                                 (let [afn (locking lock (second (.poll queue)))]
-                                   (afn))
-                                 (if time-millis
-                                   ;; If any events are scheduled, sleep until
-                                   ;; event generation. If any recurring events
-                                   ;; are scheduled then we will always go
-                                   ;; through this branch, sleeping only the
-                                   ;; exact necessary amount of time. We give
-                                   ;; an upper bound, e.g. 1000 millis, to the
-                                   ;; sleeping time, to limit the response time
-                                   ;; for detecting any new event within 1 secs.
-                                   (Time/sleep (min 1000 (- time-millis (Time/currentTimeMillis))))
-                                   ;; Otherwise poll to see if any new event
-                                   ;; was scheduled. This is, in essence, the
-                                   ;; response time for detecting any new event
-                                   ;; schedulings when there are no scheduled
-                                   ;; events.
-                                   (Time/sleep 1000))))
-                             (catch Throwable t
-                               ;; Because the interrupted exception can be
-                               ;; wrapped in a RuntimeException.
-                               (when-not (Utils/exceptionCauseIsInstanceOf InterruptedException t)
-                                 (kill-fn t)
-                                 (reset! active false)
-                                 (throw t)))))
-                         (.release notifier)) thread-name)]
-    (.setDaemon timer-thread true)
-    (.setPriority timer-thread Thread/MAX_PRIORITY)
-    (.start timer-thread)
-    {:timer-thread timer-thread
-     :queue queue
-     :active active
-     :lock lock
-     :random (Random.)
-     :cancel-notifier notifier}))
-
-(defn- check-active!
-  [timer]
-  (when-not @(:active timer)
-    (throw (IllegalStateException. "Timer is not active"))))
-
-(defnk schedule
-  [timer delay-secs afn :check-active true :jitter-ms 0]
-  (when check-active (check-active! timer))
-  (let [id (Utils/uuid)
-        ^PriorityQueue queue (:queue timer)
-        end-time-ms (+ (Time/currentTimeMillis) (Time/secsToMillisLong delay-secs))
-        end-time-ms (if (< 0 jitter-ms) (+ (.nextInt (:random timer) jitter-ms) end-time-ms) end-time-ms)]
-    (locking (:lock timer)
-      (.add queue [end-time-ms afn id]))))
-
-(defn schedule-recurring
-  [timer delay-secs recur-secs afn]
-  (schedule timer
-            delay-secs
-            (fn this []
-              (afn)
-              ; This avoids a race condition with cancel-timer.
-              (schedule timer recur-secs this :check-active false))))
-
-(defn schedule-recurring-with-jitter
-  [timer delay-secs recur-secs jitter-ms afn]
-  (schedule timer
-            delay-secs
-            (fn this []
-              (afn)
-              ; This avoids a race condition with cancel-timer.
-              (schedule timer recur-secs this :check-active false :jitter-ms jitter-ms))))
-
-(defn cancel-timer
-  [timer]
-  (check-active! timer)
-  (locking (:lock timer)
-    (reset! (:active timer) false)
-    (.interrupt (:timer-thread timer)))
-  (.acquire (:cancel-notifier timer)))
-
-(defn timer-waiting?
-  [timer]
-  (Time/isThreadWaiting (:timer-thread timer)))
+;(ns org.apache.storm.timer
+;  (:import [org.apache.storm.utils Utils Time])
+;  (:import [java.util PriorityQueue Comparator Random])
+;  (:import [java.util.concurrent Semaphore])
+;  (:use [org.apache.storm util log]))
+;
+;;; The timer defined in this file is very similar to java.util.Timer, except
+;;; it integrates with Storm's time simulation capabilities. This lets us test
+;;; code that does asynchronous work on the timer thread
+;
+;(defnk mk-timer [:kill-fn (fn [& _] ) :timer-name nil]
+;  (let [queue (PriorityQueue. 10 (reify Comparator
+;                                   (compare
+;                                     [this o1 o2]
+;                                     (- (first o1) (first o2)))
+;                                   (equals
+;                                     [this obj]
+;                                     true)))
+;        active (atom true)
+;        lock (Object.)
+;        notifier (Semaphore. 0)
+;        thread-name (if timer-name timer-name "timer")
+;        timer-thread (Thread.
+;                       (fn []
+;                         (while @active
+;                           (try
+;                             (let [[time-millis _ _ :as elem] (locking lock (.peek queue))]
+;                               (if (and elem (>= (Time/currentTimeMillis) time-millis))
+;                                 ;; It is imperative to not run the function
+;                                 ;; inside the timer lock. Otherwise, it is
+;                                 ;; possible to deadlock if the fn deals with
+;                                 ;; other locks, like the submit lock.
+;                                 (let [afn (locking lock (second (.poll queue)))]
+;                                   (afn))
+;                                 (if time-millis
+;                                   ;; If any events are scheduled, sleep until
+;                                   ;; event generation. If any recurring events
+;                                   ;; are scheduled then we will always go
+;                                   ;; through this branch, sleeping only the
+;                                   ;; exact necessary amount of time. We give
+;                                   ;; an upper bound, e.g. 1000 millis, to the
+;                                   ;; sleeping time, to limit the response time
+;                                   ;; for detecting any new event within 1 secs.
+;                                   (Time/sleep (min 1000 (- time-millis (Time/currentTimeMillis))))
+;                                   ;; Otherwise poll to see if any new event
+;                                   ;; was scheduled. This is, in essence, the
+;                                   ;; response time for detecting any new event
+;                                   ;; schedulings when there are no scheduled
+;                                   ;; events.
+;                                   (Time/sleep 1000))))
+;                             (catch Throwable t
+;                               ;; Because the interrupted exception can be
+;                               ;; wrapped in a RuntimeException.
+;                               (when-not (Utils/exceptionCauseIsInstanceOf InterruptedException t)
+;                                 (kill-fn t)
+;                                 (reset! active false)
+;                                 (throw t)))))
+;                         (.release notifier)) thread-name)]
+;    (.setDaemon timer-thread true)
+;    (.setPriority timer-thread Thread/MAX_PRIORITY)
+;    (.start timer-thread)
+;    {:timer-thread timer-thread
+;     :queue queue
+;     :active active
+;     :lock lock
+;     :random (Random.)
+;     :cancel-notifier notifier}))
+;
+;(defn- check-active!
+;  [timer]
+;  (when-not @(:active timer)
+;    (throw (IllegalStateException. "Timer is not active"))))
+;
+;(defnk schedule
+;  [timer delay-secs afn :check-active true :jitter-ms 0]
+;  (when check-active (check-active! timer))
+;  (let [id (Utils/uuid)
+;        ^PriorityQueue queue (:queue timer)
+;        end-time-ms (+ (Time/currentTimeMillis) (Time/secsToMillisLong delay-secs))
+;        end-time-ms (if (< 0 jitter-ms) (+ (.nextInt (:random timer) jitter-ms) end-time-ms) end-time-ms)]
+;    (locking (:lock timer)
+;      (.add queue [end-time-ms afn id]))))
+;
+;(defn schedule-recurring
+;  [timer delay-secs recur-secs afn]
+;  (schedule timer
+;            delay-secs
+;            (fn this []
+;              (afn)
+;              ; This avoids a race condition with cancel-timer.
+;              (schedule timer recur-secs this :check-active false))))
+;
+;(defn schedule-recurring-with-jitter
+;  [timer delay-secs recur-secs jitter-ms afn]
+;  (schedule timer
+;            delay-secs
+;            (fn this []
+;              (afn)
+;              ; This avoids a race condition with cancel-timer.
+;              (schedule timer recur-secs this :check-active false :jitter-ms jitter-ms))))
+;
+;(defn cancel-timer
+;  [timer]
+;  (check-active! timer)
+;  (locking (:lock timer)
+;    (reset! (:active timer) false)
+;    (.interrupt (:timer-thread timer)))
+;  (.acquire (:cancel-notifier timer)))
+;
+;(defn timer-waiting?
+;  [timer]
+;  (Time/isThreadWaiting (:timer-thread timer)))

http://git-wip-us.apache.org/repos/asf/storm/blob/4243e4e0/storm-core/src/jvm/org/apache/storm/StormTimer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/StormTimer.java b/storm-core/src/jvm/org/apache/storm/StormTimer.java
index 5267335..36878e4 100644
--- a/storm-core/src/jvm/org/apache/storm/StormTimer.java
+++ b/storm-core/src/jvm/org/apache/storm/StormTimer.java
@@ -19,11 +19,13 @@
 package org.apache.storm;
 
 import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Comparator;
 import java.util.Random;
+import java.util.UUID;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -35,12 +37,29 @@ public class StormTimer {
         public void run(Object o);
     }
 
+    public static class QueueEntry {
+        public final Long endTimeMs;
+        public final TimerFunc afn;
+        public final String id;
+
+        public QueueEntry(Long endTimeMs, TimerFunc afn, String id) {
+            this.endTimeMs = endTimeMs;
+            this.afn = afn;
+            this.id = id;
+        }
+
+        @Override
+        public String toString() {
+            return this.id + " " + this.endTimeMs + " " + this.afn;
+        }
+    }
+
     public static class StormTimerTask extends Thread {
 
-        private PriorityBlockingQueue<Long> queue = new PriorityBlockingQueue<Long>(10, new Comparator() {
+        private PriorityBlockingQueue<QueueEntry> queue = new PriorityBlockingQueue<QueueEntry>(10, new Comparator() {
             @Override
             public int compare(Object o1, Object o2) {
-                return 0;
+                return ((QueueEntry)o1).endTimeMs.intValue() - ((QueueEntry)o2).endTimeMs.intValue();
             }
         });
 
@@ -48,8 +67,6 @@ public class StormTimer {
 
         private TimerFunc onKill;
 
-        private TimerFunc afn;
-
         private Random random = new Random();
 
         private Semaphore cancelNotifier = new Semaphore(0);
@@ -58,26 +75,32 @@ public class StormTimer {
 
         @Override
         public void run() {
-            LOG.info("in run...");
+            LOG.info("in run...{}", this.getName());
             while (this.active.get()) {
+                QueueEntry queueEntry = null;
                 try {
-                    Long endTimeMillis;
                     synchronized (this.lock) {
-                        endTimeMillis = this.queue.peek();
+                        queueEntry = this.queue.peek();
                     }
-                    if ((endTimeMillis != null) && (currentTimeMillis() >= endTimeMillis)) {
+                    LOG.info("event: {} -- {}", this.getName(), queueEntry);
+
+                    if ((queueEntry != null) && (Time.currentTimeMillis() >= queueEntry.endTimeMs)) {
                         synchronized (this.lock) {
                             this.queue.poll();
                         }
-                        LOG.info("About to run function...");
-                        this.afn.run(null);
-                    } else if (endTimeMillis != null) {
-                        Time.sleep(Math.min(1000, (endTimeMillis - currentTimeMillis())));
+                        queueEntry.afn.run(null);
+                    } else if (queueEntry != null) {
+                        Time.sleep(Math.min(1000, (queueEntry.endTimeMs - Time.currentTimeMillis())));
                     } else {
                         Time.sleep(1000);
                     }
                 } catch (Throwable t) {
-                    this.onKill.run(t);
+                    if (!(Utils.exceptionCauseIsInstanceOf(InterruptedException.class, t))) {
+                        LOG.info("Exception throw for event: {} --- {}", queueEntry, t);
+                        this.onKill.run(t);
+                        this.setActive(false);
+                        throw new RuntimeException(t);
+                    }
                 }
             }
             this.cancelNotifier.release();
@@ -87,10 +110,6 @@ public class StormTimer {
             this.onKill = onKill;
         }
 
-        public void setFunc(TimerFunc func) {
-            this.afn = func;
-        }
-
         public void setActive(boolean flag) {
             this.active.set(flag);
         }
@@ -99,14 +118,21 @@ public class StormTimer {
             return this.active.get();
         }
 
-        public void add(long endTime) {
-            this.queue.add(endTime);
+        public void add(QueueEntry queueEntry) {
+            this.queue.add(queueEntry);
         }
     }
 
-    public static StormTimerTask mkTimer(TimerFunc onKill, String name) {
-        LOG.info("making Timer...");
+    public static StormTimerTask mkTimer(String name, TimerFunc onKill) {
+        if (onKill == null) {
+            throw new RuntimeException("onKill func is null!");
+        }
         StormTimerTask task  = new StormTimerTask();
+        if (name == null) {
+            task.setName("timer");
+        } else {
+            task.setName(name);
+        }
         task.setOnKillFunc(onKill);
         task.setActive(true);
 
@@ -116,13 +142,20 @@ public class StormTimer {
         return task;
     }
     public static void schedule(StormTimerTask task, int delaySecs, TimerFunc afn, boolean checkActive, int jitterMs) {
-        long endTimeMs = currentTimeMillis() + secsToMillisLong(delaySecs);
+        if (task == null) {
+            throw new RuntimeException("task is null!");
+        }
+        if (afn == null) {
+            throw new RuntimeException("function to schedule is null!");
+        }
+        String id = Utils.uuid();
+        long endTimeMs = Time.currentTimeMillis() + Time.secsToMillisLong(delaySecs);
         if (jitterMs > 0) {
             endTimeMs = task.random.nextInt(jitterMs) + endTimeMs;
         }
-        task.setFunc(afn);
+        LOG.info("add event: {}-{}-{}", id, endTimeMs, afn);
         synchronized (task.lock) {
-            task.add(endTimeMs);
+            task.add(new QueueEntry(endTimeMs, afn, id));
         }
     }
     public static void schedule(StormTimerTask task, int delaySecs, TimerFunc afn) {
@@ -156,12 +189,20 @@ public class StormTimer {
     }
 
     public static void checkActive(StormTimerTask task) {
+        if (task == null) {
+            throw new RuntimeException("task is null!");
+        }
         if (!task.isActive()) {
             throw new IllegalStateException("Timer is not active");
         }
     }
 
     public static void cancelTimer(StormTimerTask task) throws InterruptedException {
+        if (task == null) {
+            throw new RuntimeException("task is null!");
+        }
+        LOG.info("cancel task: {} - {} - {}", task.getName(), task.getId(), task.queue);
+
         checkActive(task);
         synchronized (task.lock) {
             task.setActive(false);
@@ -171,29 +212,9 @@ public class StormTimer {
     }
 
     public static boolean isTimerWaiting(StormTimerTask task) {
+        if (task == null) {
+            throw new RuntimeException("task is null!");
+        }
         return Time.isThreadWaiting(task);
     }
-
-    /**
-     * function in util that haven't be translated to java
-     */
-
-    public static long secsToMillisLong(long secs) {
-        return secs * 1000;
-    }
-
-    public static long currentTimeMillis() {
-        return Time.currentTimeMillis();
-    }
-
-
-    public static void main(String[] argv) {
-        mkTimer(new TimerFunc() {
-            @Override
-            public void run(Object o) {
-
-            }
-        }, "erer");
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/4243e4e0/storm-core/test/clj/org/apache/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
index 42a0374..e527d60 100644
--- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj
+++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
@@ -37,7 +37,7 @@
   (:import [org.apache.storm.zookeeper Zookeeper])
   (:import [org.apache.commons.io FileUtils]
            [org.json.simple JSONValue])
-  (:use [org.apache.storm testing MockAutoCred util config log timer zookeeper])
+  (:use [org.apache.storm testing MockAutoCred util config log zookeeper])
   (:use [org.apache.storm.daemon common])
   (:require [conjure.core])
   (:require [org.apache.storm [cluster :as cluster]])
@@ -1483,7 +1483,7 @@
                    nimbus/file-cache-map nil
                    nimbus/mk-blob-cache-map nil
                    nimbus/mk-bloblist-cache-map nil
-                   mk-timer nil
+                  ; mk-timer nil
                    nimbus/mk-scheduler nil]
                   (nimbus/nimbus-data auth-conf fake-inimbus)
                   (verify-call-times-for cluster/mk-storm-cluster-state 1)

http://git-wip-us.apache.org/repos/asf/storm/blob/4243e4e0/storm-core/test/clj/org/apache/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/supervisor_test.clj b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
index b25bd7c..71aaf85 100644
--- a/storm-core/test/clj/org/apache/storm/supervisor_test.clj
+++ b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
@@ -33,7 +33,7 @@
   (:import [java.nio.file.attribute FileAttribute])
   (:import [org.apache.storm Thrift])
   (:import [org.apache.storm.utils Utils])
-  (:use [org.apache.storm config testing util timer log])
+  (:use [org.apache.storm config testing util log])
   (:use [org.apache.storm.daemon common])
   (:require [org.apache.storm.daemon [worker :as worker] [supervisor :as supervisor]]
             [org.apache.storm [cluster :as cluster]])
@@ -646,7 +646,8 @@
       (with-open [_ (ConfigUtilsInstaller. fake-cu)
                   _ (UtilsInstaller. fake-utils)]
         (stubbing [cluster/mk-storm-cluster-state nil
-                   mk-timer nil]
+;                   mk-timer nil
+                   ]
           (supervisor/supervisor-data auth-conf nil fake-isupervisor)
           (verify-call-times-for cluster/mk-storm-cluster-state 1)
           (verify-first-call-args-for-indices cluster/mk-storm-cluster-state [2]
@@ -837,4 +838,4 @@
      (validate-launched-once (:launched changed)
                              {"sup1" [3 4]}
                              (get-storm-id (:storm-cluster-state cluster) "topology2"))
-     )))
\ No newline at end of file
+     )))

http://git-wip-us.apache.org/repos/asf/storm/blob/4243e4e0/storm-core/test/jvm/org/apache/storm/TestTimer.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/TestTimer.java b/storm-core/test/jvm/org/apache/storm/TestTimer.java
new file mode 100644
index 0000000..c798c1f
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/TestTimer.java
@@ -0,0 +1,57 @@
+package org.apache.storm;
+
+import org.apache.storm.utils.Time;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Created by jerrypeng on 2/9/16.
+ */
+public class TestTimer {
+    //public static StormTimerTask mkTimer(TimerFunc onKill, String name) {
+    private static final Logger LOG = LoggerFactory.getLogger(TestTimer.class);
+
+    @Test
+    public void testTimer() throws InterruptedException {
+//        StormTimer.StormTimerTask task1 = StormTimer.mkTimer("timer", new StormTimer.TimerFunc() {
+//            @Override
+//            public void run(Object o) {
+//                LOG.info("task1 onKill at {}", Time.currentTimeSecs());
+//            }
+//        });
+//        StormTimer.scheduleRecurring(task1, 10, 5, new StormTimer.TimerFunc(){
+//            @Override
+//            public void run(Object o) {
+//                LOG.info("task1-1 scheduleRecurring func at {}", Time.currentTimeSecs());
+//            }
+//        });
+//        StormTimer.scheduleRecurring(task1, 5, 10, new StormTimer.TimerFunc(){
+//            @Override
+//            public void run(Object o) {
+//                LOG.info("task1-2 scheduleRecurring func at {}", Time.currentTimeSecs());
+//            }
+//        });
+//
+//        StormTimer.StormTimerTask task2 = StormTimer.mkTimer("timer", new StormTimer.TimerFunc() {
+//            @Override
+//            public void run(Object o) {
+//                LOG.info("task2 onKill at {}", Time.currentTimeSecs());
+//            }
+//        });
+//        StormTimer.scheduleRecurringWithJitter(task2, 10, 5, 2000, new StormTimer.TimerFunc(){
+//            @Override
+//            public void run(Object o) {
+//                LOG.info("task2 scheduleRecurringWithJitter func at {}", Time.currentTimeSecs());
+//            }
+//        });
+//
+//        LOG.info("sleeping...");
+//        Time.sleep(30000);
+//
+//        LOG.info("canceling task");
+//        StormTimer.cancelTimer(task1);
+//        StormTimer.cancelTimer(task2);
+
+    }
+}