You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/02/18 18:09:41 UTC
[5/9] storm git commit: edits based on reviews
edits based on reviews
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2d06efe7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2d06efe7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2d06efe7
Branch: refs/heads/master
Commit: 2d06efe7c62d85a3187c03523bfee7474b963304
Parents: b09d8ca
Author: Boyang Jerry Peng <je...@yahoo-inc.com>
Authored: Thu Feb 18 10:37:22 2016 -0600
Committer: Boyang Jerry Peng <je...@yahoo-inc.com>
Committed: Thu Feb 18 10:37:22 2016 -0600
----------------------------------------------------------------------
.../clj/org/apache/storm/daemon/executor.clj | 22 +--
.../clj/org/apache/storm/daemon/logviewer.clj | 23 ++-
.../src/clj/org/apache/storm/daemon/nimbus.clj | 67 +++----
.../clj/org/apache/storm/daemon/supervisor.clj | 102 +++++------
.../src/clj/org/apache/storm/daemon/worker.clj | 122 +++++--------
.../src/jvm/org/apache/storm/StormTimer.java | 175 ++++++++++---------
6 files changed, 225 insertions(+), 286 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/2d06efe7/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 2afb853..92cc003 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -40,7 +40,7 @@
[java.util.concurrent ConcurrentLinkedQueue]
[org.json.simple JSONValue]
[com.lmax.disruptor.dsl ProducerType]
- [org.apache.storm StormTimer StormTimer$TimerFunc])
+ [org.apache.storm StormTimer])
(:require [org.apache.storm [cluster :as cluster] [stats :as stats]])
(:require [org.apache.storm.daemon [task :as task]])
(:require [org.apache.storm.daemon.builtin-metrics :as builtin-metrics])
@@ -324,15 +324,13 @@
(let [{:keys [storm-conf receive-queue worker-context interval->task->metric-registry]} executor-data
distinct-time-bucket-intervals (keys interval->task->metric-registry)]
(doseq [interval distinct-time-bucket-intervals]
- (StormTimer/scheduleRecurring
+ (.scheduleRecurring
(:user-timer (:worker executor-data))
interval
interval
- (reify StormTimer$TimerFunc
- (^void run
- [this ^Object o]
- (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. worker-context [interval] Constants/SYSTEM_TASK_ID Constants/METRICS_TICK_STREAM_ID))]]
- (.publish ^DisruptorQueue receive-queue val))))))))
+ (fn []
+ (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. worker-context [interval] Constants/SYSTEM_TASK_ID Constants/METRICS_TICK_STREAM_ID))]]
+ (.publish ^DisruptorQueue receive-queue val)))))))
(defn metrics-tick
[executor-data task-data ^TupleImpl tuple]
@@ -367,15 +365,13 @@
(and (= false (storm-conf TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS))
(= :spout (:type executor-data))))
(log-message "Timeouts disabled for executor " (:component-id executor-data) ":" (:executor-id executor-data))
- (StormTimer/scheduleRecurring
+ (.scheduleRecurring
(:user-timer worker)
tick-time-secs
tick-time-secs
- (reify StormTimer$TimerFunc
- (^void run
- [this ^Object o]
- (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID Constants/SYSTEM_TICK_STREAM_ID))]]
- (.publish ^DisruptorQueue receive-queue val)))))))))
+ (fn []
+ (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID Constants/SYSTEM_TICK_STREAM_ID))]]
+ (.publish ^DisruptorQueue receive-queue val))))))))
(defn mk-executor [worker executor-id initial-credentials]
(let [executor-data (mk-executor-data worker executor-id)
http://git-wip-us.apache.org/repos/asf/storm/blob/2d06efe7/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj b/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
index 16815f9..9502196 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
@@ -20,7 +20,7 @@
(:use [hiccup core page-helpers form-helpers])
(:use [org.apache.storm config util log])
(:use [org.apache.storm.ui helpers])
- (:import [org.apache.storm StormTimer StormTimer$TimerFunc])
+ (:import [org.apache.storm StormTimer])
(:import [org.apache.storm.utils Utils Time VersionInfo ConfigUtils])
(:import [org.slf4j LoggerFactory])
(:import [java.util Arrays ArrayList HashSet])
@@ -264,18 +264,15 @@
(let [interval-secs (conf LOGVIEWER-CLEANUP-INTERVAL-SECS)]
(when interval-secs
(log-debug "starting log cleanup thread at interval: " interval-secs)
- (StormTimer/scheduleRecurring
- (StormTimer/mkTimer "logviewer-cleanup"
- (reify StormTimer$TimerFunc
- (^void run
- [this ^Object t]
- (log-error t "Error when doing logs cleanup")
- (Utils/exitProcess 20 "Error when doing log cleanup"))))
- 0 interval-secs
- (reify StormTimer$TimerFunc
- (^void run
- [this ^Object o]
- (cleanup-fn! log-root-dir)))))))
+
+ (let [timer (StormTimer. "logviewer-cleanup"
+ (reify Thread$UncaughtExceptionHandler
+ (^void uncaughtException
+ [this ^Thread t ^Throwable e]
+ (log-error t "Error when doing logs cleanup")
+ (Utils/exitProcess 20 "Error when doing log cleanup"))))]
+ (.scheduleRecurring timer 0 interval-secs
+ (fn [] (cleanup-fn! log-root-dir)))))))
(defn- skip-bytes
"FileInputStream#skip may not work the first time, so ensure it successfully
http://git-wip-us.apache.org/repos/asf/storm/blob/2d06efe7/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 0d0b27a..a3497d6 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -66,7 +66,7 @@
(:require [clj-time.coerce :as coerce])
(:require [metrics.meters :refer [defmeter mark!]])
(:require [metrics.gauges :refer [defgauge]])
- (:import [org.apache.storm StormTimer StormTimer$TimerFunc])
+ (:import [org.apache.storm StormTimer])
(:gen-class
:methods [^{:static true} [launch [org.apache.storm.scheduler.INimbus] void]]))
@@ -194,12 +194,13 @@
:blob-listers (mk-bloblist-cache-map conf)
:uptime (Utils/makeUptimeComputer)
:validator (Utils/newInstance (conf NIMBUS-TOPOLOGY-VALIDATOR))
- :timer (StormTimer/mkTimer nil
- (reify StormTimer$TimerFunc
- (^void run
- [this ^Object t]
- (log-error t "Error when processing event")
+ :timer (StormTimer. nil
+ (reify Thread$UncaughtExceptionHandler
+ (^void uncaughtException
+ [this ^Thread t ^Throwable e]
+ (log-error e "Error when processing event")
(Utils/exitProcess 20 "Error when processing an event"))))
+
:scheduler (mk-scheduler conf inimbus)
:leader-elector (Zookeeper/zkLeaderElector conf)
:id->sched-status (atom {})
@@ -382,12 +383,9 @@
(defn delay-event [nimbus storm-id delay-secs event]
(log-message "Delaying event " event " for " delay-secs " secs for " storm-id)
- (StormTimer/schedule (:timer nimbus)
+ (.schedule (:timer nimbus)
delay-secs
- (reify StormTimer$TimerFunc
- (^void run
- [this ^Object o]
- (transition! nimbus storm-id event false)))))
+ (fn [] (transition! nimbus storm-id event false))))
;; active -> reassign in X secs
@@ -1448,49 +1446,36 @@
(doseq [storm-id (.active-storms (:storm-cluster-state nimbus))]
(transition! nimbus storm-id :startup)))
- (StormTimer/scheduleRecurring (:timer nimbus)
+ (.scheduleRecurring (:timer nimbus)
0
(conf NIMBUS-MONITOR-FREQ-SECS)
- (reify StormTimer$TimerFunc
- (^void run
- [this ^Object o]
- (when-not (conf ConfigUtils/NIMBUS_DO_NOT_REASSIGN)
- (locking (:submit-lock nimbus)
- (mk-assignments nimbus)))
- (do-cleanup nimbus))))
+ (fn []
+ (when-not (conf ConfigUtils/NIMBUS_DO_NOT_REASSIGN)
+ (locking (:submit-lock nimbus)
+ (mk-assignments nimbus)))
+ (do-cleanup nimbus)))
;; Schedule Nimbus inbox cleaner
- (StormTimer/scheduleRecurring (:timer nimbus)
+ (.scheduleRecurring (:timer nimbus)
0
(conf NIMBUS-CLEANUP-INBOX-FREQ-SECS)
- (reify StormTimer$TimerFunc
- (^void run
- [this ^Object o]
- (clean-inbox (inbox nimbus) (conf NIMBUS-INBOX-JAR-EXPIRATION-SECS)))))
+ (fn [] (clean-inbox (inbox nimbus) (conf NIMBUS-INBOX-JAR-EXPIRATION-SECS))))
;; Schedule nimbus code sync thread to sync code from other nimbuses.
(if (instance? LocalFsBlobStore blob-store)
- (StormTimer/scheduleRecurring (:timer nimbus)
+ (.scheduleRecurring (:timer nimbus)
0
(conf NIMBUS-CODE-SYNC-FREQ-SECS)
- (reify StormTimer$TimerFunc
- (^void run
- [this ^Object t]
- (blob-sync conf nimbus)))))
+ (fn [] (blob-sync conf nimbus))))
;; Schedule topology history cleaner
(when-let [interval (conf LOGVIEWER-CLEANUP-INTERVAL-SECS)]
- (StormTimer/scheduleRecurring (:timer nimbus)
+ (.scheduleRecurring (:timer nimbus)
0
(conf LOGVIEWER-CLEANUP-INTERVAL-SECS)
- (reify StormTimer$TimerFunc
- (^void run
- [this ^Object t]
- (clean-topology-history (conf LOGVIEWER-CLEANUP-AGE-MINS) nimbus)))))
- (StormTimer/scheduleRecurring (:timer nimbus)
+ (fn [] (clean-topology-history (conf LOGVIEWER-CLEANUP-AGE-MINS) nimbus))))
+ (.scheduleRecurring (:timer nimbus)
0
(conf NIMBUS-CREDENTIAL-RENEW-FREQ-SECS)
- (reify StormTimer$TimerFunc
- (^void run
- [this ^Object t]
- (renew-credentials nimbus))))
+ (fn []
+ (renew-credentials nimbus)))
(defgauge nimbus:num-supervisors
(fn [] (.size (.supervisors (:storm-cluster-state nimbus) nil))))
@@ -2222,7 +2207,7 @@
(shutdown [this]
(mark! nimbus:num-shutdown-calls)
(log-message "Shutting down master")
- (StormTimer/cancelTimer (:timer nimbus))
+ (.close (:timer nimbus))
(.disconnect (:storm-cluster-state nimbus))
(.cleanup (:downloaders nimbus))
(.cleanup (:uploaders nimbus))
@@ -2232,7 +2217,7 @@
(log-message "Shut down master"))
DaemonCommon
(waiting? [this]
- (StormTimer/isTimerWaiting (:timer nimbus))))))
+ (.isTimerWaiting (:timer nimbus))))))
(defn validate-port-available[conf]
(try
http://git-wip-us.apache.org/repos/asf/storm/blob/2d06efe7/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
index 2b70731..ad9db76 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
@@ -42,7 +42,7 @@
[org.yaml.snakeyaml.constructor SafeConstructor])
(:require [metrics.gauges :refer [defgauge]])
(:require [metrics.meters :refer [defmeter mark!]])
- (:import [org.apache.storm StormTimer StormTimer$TimerFunc])
+ (:import [org.apache.storm StormTimer])
(:gen-class
:methods [^{:static true} [launch [org.apache.storm.scheduler.ISupervisor] void]])
(:require [clojure.string :as str]))
@@ -336,23 +336,23 @@
:assignment-id (.getAssignmentId isupervisor)
:my-hostname (Utils/hostname conf)
:curr-assignment (atom nil) ;; used for reporting used ports when heartbeating
- :heartbeat-timer (StormTimer/mkTimer nil
- (reify StormTimer$TimerFunc
- (^void run
- [this ^Object t]
- (log-error t "Error when processing event")
+ :heartbeat-timer (StormTimer. nil
+ (reify Thread$UncaughtExceptionHandler
+ (^void uncaughtException
+ [this ^Thread t ^Throwable e]
+ (log-error e "Error when processing event")
(Utils/exitProcess 20 "Error when processing an event"))))
- :event-timer (StormTimer/mkTimer nil
- (reify StormTimer$TimerFunc
- (^void run
- [this ^Object t]
- (log-error t "Error when processing event")
+ :event-timer (StormTimer. nil
+ (reify Thread$UncaughtExceptionHandler
+ (^void uncaughtException
+ [this ^Thread t ^Throwable e]
+ (log-error e "Error when processing event")
(Utils/exitProcess 20 "Error when processing an event"))))
- :blob-update-timer (StormTimer/mkTimer "blob-update-timer"
- (reify StormTimer$TimerFunc
- (^void run
- [this ^Object t]
- (log-error t "Error when processing event")
+ :blob-update-timer (StormTimer. "blob-update-timer"
+ (reify Thread$UncaughtExceptionHandler
+ (^void uncaughtException
+ [this ^Thread t ^Throwable e]
+ (log-error e "Error when processing event")
(Utils/exitProcess 20 "Error when processing an event"))))
:localizer (Utils/createLocalizer conf (ConfigUtils/supervisorLocalDir conf))
:assignment-versions (atom {})
@@ -821,13 +821,10 @@
(heartbeat-fn)
;; should synchronize supervisor so it doesn't launch anything after being down (optimization)
- (StormTimer/scheduleRecurring (:heartbeat-timer supervisor)
+ (.scheduleRecurring (:heartbeat-timer supervisor)
0
(conf SUPERVISOR-HEARTBEAT-FREQUENCY-SECS)
- (reify StormTimer$TimerFunc
- (^void run
- [this ^Object o]
- (heartbeat-fn))))
+ heartbeat-fn)
(doseq [storm-id downloaded-storm-ids]
(add-blob-references (:localizer supervisor) storm-id
@@ -838,53 +835,38 @@
(when (conf SUPERVISOR-ENABLE)
;; This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up
;; to date even if callbacks don't all work exactly right
- (StormTimer/scheduleRecurring (:event-timer supervisor)
- 0 10
- (reify StormTimer$TimerFunc
- (^void run
- [this ^Object o]
- (.add event-manager synchronize-supervisor))))
-
- (StormTimer/scheduleRecurring (:event-timer supervisor)
+ (.scheduleRecurring (:event-timer supervisor) 0 10 (fn [] (.add event-manager synchronize-supervisor)))
+
+ (.scheduleRecurring (:event-timer supervisor)
0
(conf SUPERVISOR-MONITOR-FREQUENCY-SECS)
- (reify StormTimer$TimerFunc
- (^void run
- [this ^Object o]
- (.add processes-event-manager sync-processes))))
+ (fn [] (.add processes-event-manager sync-processes)))
;; Blob update thread. Starts with 30 seconds delay, every 30 seconds
- (StormTimer/scheduleRecurring (:blob-update-timer supervisor)
+ (.scheduleRecurring (:blob-update-timer supervisor)
30
30
- (reify StormTimer$TimerFunc
- (^void run
- [this ^Object o]
- (.add event-manager synchronize-blobs-fn))))
+ (fn [] (.add event-manager synchronize-blobs-fn)))
- (StormTimer/scheduleRecurring (:event-timer supervisor)
+ (.scheduleRecurring (:event-timer supervisor)
(* 60 5)
(* 60 5)
- (reify StormTimer$TimerFunc
- (^void run
- [this ^Object o]
- (let [health-code (healthcheck/health-check conf)
- ids (my-worker-ids conf)]
- (if (not (= health-code 0))
- (do
- (doseq [id ids]
- (shutdown-worker supervisor id))
- (throw (RuntimeException. "Supervisor failed health check. Exiting."))))))))
+ (fn []
+ (let [health-code (healthcheck/health-check conf)
+ ids (my-worker-ids conf)]
+ (if (not (= health-code 0))
+ (do
+ (doseq [id ids]
+ (shutdown-worker supervisor id))
+ (throw (RuntimeException. "Supervisor failed health check. Exiting.")))))))
;; Launch a thread that Runs profiler commands . Starts with 30 seconds delay, every 30 seconds
- (StormTimer/scheduleRecurring
+ (.scheduleRecurring
(:event-timer supervisor)
- 30 30
- (reify StormTimer$TimerFunc
- (^void run
- [this ^Object o]
- (.add event-manager run-profiler-actions-fn)))))
+ 30
+ 30
+ (fn [] (.add event-manager run-profiler-actions-fn))))
(log-message "Starting supervisor with id " (:supervisor-id supervisor) " at host " (:my-hostname supervisor))
(reify
@@ -892,9 +874,9 @@
(shutdown [this]
(log-message "Shutting down supervisor " (:supervisor-id supervisor))
(reset! (:active supervisor) false)
- (StormTimer/cancelTimer (:heartbeat-timer supervisor))
- (StormTimer/cancelTimer (:event-timer supervisor))
- (StormTimer/cancelTimer (:blob-update-timer supervisor))
+ (.close (:heartbeat-timer supervisor))
+ (.close (:event-timer supervisor))
+ (.close (:blob-update-timer supervisor))
(.shutdown event-manager)
(.shutdown processes-event-manager)
(.shutdown (:localizer supervisor))
@@ -913,8 +895,8 @@
(waiting? [this]
(or (not @(:active supervisor))
(and
- (StormTimer/isTimerWaiting (:heartbeat-timer supervisor))
- (StormTimer/isTimerWaiting (:event-timer supervisor))
+ (.isTimerWaiting (:heartbeat-timer supervisor))
+ (.isTimerWaiting (:event-timer supervisor))
(every? (memfn waiting?) managers)))
))))
http://git-wip-us.apache.org/repos/asf/storm/blob/2d06efe7/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index e74ffa1..c2a767a 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -45,7 +45,7 @@
(:import [org.apache.logging.log4j Level])
(:import [org.apache.logging.log4j.core.config LoggerConfig])
(:import [org.apache.storm.generated LogConfig LogLevelAction])
- (:import [org.apache.storm StormTimer StormTimer$TimerFunc])
+ (:import [org.apache.storm StormTimer])
(:gen-class))
(defmulti mk-suicide-fn cluster-mode)
@@ -239,11 +239,11 @@
{})
(defn mk-halting-timer [timer-name]
- (StormTimer/mkTimer timer-name
- (reify StormTimer$TimerFunc
- (^void run
- [this ^Object t]
- (log-error t "Error when processing event")
+ (StormTimer. timer-name
+ (reify Thread$UncaughtExceptionHandler
+ (^void uncaughtException
+ [this ^Thread t ^Throwable e]
+ (log-error e "Error when processing event")
(Utils/exitProcess 20 "Error when processing an event")))))
(defn worker-data [conf mq-context storm-id assignment-id port worker-id storm-conf cluster-state storm-cluster-state]
@@ -379,12 +379,8 @@
(fn refresh-connections
([]
(refresh-connections (fn [& ignored]
- (StormTimer/schedule
- (:refresh-connections-timer worker) 0
- (reify StormTimer$TimerFunc
- (^void run
- [this ^Object o]
- (refresh-connections)))))))
+ (.schedule
+ (:refresh-connections-timer worker) 0 refresh-connections))))
([callback]
(let [version (.assignment-version storm-cluster-state storm-id callback)
assignment (if (= version (:version (get @(:assignment-versions worker) storm-id)))
@@ -437,12 +433,8 @@
([worker]
(refresh-storm-active
worker (fn [& ignored]
- (StormTimer/schedule
- (:refresh-active-timer worker) 0
- (reify StormTimer$TimerFunc
- (^void run
- [this ^Object o]
- ((partial refresh-storm-active worker))))))))
+ (.schedule
+ (:refresh-active-timer worker) 0 (partial refresh-storm-active worker)))))
([worker callback]
(let [base (.storm-base (:storm-cluster-state worker) (:storm-id worker) callback)]
(reset!
@@ -489,17 +481,15 @@
(let [timer (:refresh-active-timer worker)
delay-secs 0
recur-secs 1]
- (StormTimer/schedule timer
+ (.schedule timer
delay-secs
- (reify StormTimer$TimerFunc
- (^void run
- [this ^Object o]
+ (fn this []
(if (all-connections-ready worker)
(do
(log-message "All connections are ready for worker " (:assignment-id worker) ":" (:port worker)
" with id " (:worker-id worker))
(reset! (:worker-active-flag worker) true))
- (StormTimer/schedule timer recur-secs this false 0)))))))
+ (.schedule timer recur-secs this false 0))))))
(defn register-callbacks [worker]
(let [transfer-local-fn (:transfer-local-fn worker)
@@ -654,19 +644,10 @@
executors (atom nil)
;; launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout
;; to the supervisor
- _ (StormTimer/scheduleRecurring
- (:heartbeat-timer worker) 0 (conf WORKER-HEARTBEAT-FREQUENCY-SECS)
- (reify StormTimer$TimerFunc
- (^void run
- [this ^Object o]
- (heartbeat-fn))))
-
- _ (StormTimer/scheduleRecurring
- (:executor-heartbeat-timer worker) 0 (conf TASK-HEARTBEAT-FREQUENCY-SECS)
- (reify StormTimer$TimerFunc
- (^void run
- [this ^Object o]
- (do-executor-heartbeats worker :executors @executors))))
+ _ (.scheduleRecurring (:heartbeat-timer worker) 0 (conf WORKER-HEARTBEAT-FREQUENCY-SECS) heartbeat-fn)
+
+ _ (.scheduleRecurring (:executor-heartbeat-timer worker) 0 (conf TASK-HEARTBEAT-FREQUENCY-SECS)
+ (fn [] (do-executor-heartbeats worker :executors @executors)))
_ (register-callbacks worker)
@@ -727,14 +708,14 @@
(.interrupt backpressure-thread)
(.join backpressure-thread)
(log-message "Shut down backpressure thread")
- (StormTimer/cancelTimer (:heartbeat-timer worker))
- (StormTimer/cancelTimer (:refresh-connections-timer worker))
- (StormTimer/cancelTimer (:refresh-credentials-timer worker))
- (StormTimer/cancelTimer (:refresh-active-timer worker))
- (StormTimer/cancelTimer (:executor-heartbeat-timer worker))
- (StormTimer/cancelTimer (:user-timer worker))
- (StormTimer/cancelTimer (:refresh-load-timer worker))
- (StormTimer/cancelTimer (:reset-log-levels-timer worker))
+ (.close (:heartbeat-timer worker))
+ (.close (:refresh-connections-timer worker))
+ (.close (:refresh-credentials-timer worker))
+ (.close (:refresh-active-timer worker))
+ (.close (:executor-heartbeat-timer worker))
+ (.close (:user-timer worker))
+ (.close (:refresh-load-timer worker))
+ (.close (:reset-log-levels-timer worker))
(close-resources worker)
(log-message "Trigger any worker shutdown hooks")
@@ -753,13 +734,13 @@
DaemonCommon
(waiting? [this]
(and
- (StormTimer/isTimerWaiting (:heartbeat-timer worker))
- (StormTimer/isTimerWaiting (:refresh-connections-timer worker))
- (StormTimer/isTimerWaiting (:refresh-load-timer worker))
- (StormTimer/isTimerWaiting (:refresh-credentials-timer worker))
- (StormTimer/isTimerWaiting (:refresh-active-timer worker))
- (StormTimer/isTimerWaiting (:executor-heartbeat-timer worker))
- (StormTimer/isTimerWaiting (:user-timer worker))
+ (.isTimerWaiting (:heartbeat-timer worker))
+ (.isTimerWaiting (:refresh-connections-timer worker))
+ (.isTimerWaiting (:refresh-load-timer worker))
+ (.isTimerWaiting (:refresh-credentials-timer worker))
+ (.isTimerWaiting (:refresh-active-timer worker))
+ (.isTimerWaiting (:executor-heartbeat-timer worker))
+ (.isTimerWaiting (:user-timer worker))
))
)
credentials (atom initial-credentials)
@@ -788,40 +769,23 @@
(establish-log-setting-callback)
(.credentials (:storm-cluster-state worker) storm-id (fn [args] (check-credentials-changed)))
- (StormTimer/scheduleRecurring
+ (.scheduleRecurring
(:refresh-credentials-timer worker) 0 (conf TASK-CREDENTIALS-POLL-SECS)
- (reify StormTimer$TimerFunc
- (^void run
- [this ^Object o]
+ (fn []
(check-credentials-changed)
(if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
- (check-throttle-changed)))))
+ (check-throttle-changed))))
;; The jitter allows the clients to get the data at different times, and avoids thundering herd
(when-not (.get conf TOPOLOGY-DISABLE-LOADAWARE-MESSAGING)
- (StormTimer/scheduleRecurringWithJitter
- (:refresh-load-timer worker) 0 1 500
- (reify StormTimer$TimerFunc
- (^void run
- [this ^Object o]
- (refresh-load)))))
- (StormTimer/scheduleRecurring
- (:refresh-connections-timer worker) 0 (conf TASK-REFRESH-POLL-SECS)
- (reify StormTimer$TimerFunc
- (^void run
- [this ^Object o]
- (refresh-connections))))
- (StormTimer/scheduleRecurring
+ (.scheduleRecurringWithJitter
+ (:refresh-load-timer worker) 0 1 500 refresh-load))
+ (.scheduleRecurring
+ (:refresh-connections-timer worker) 0 (conf TASK-REFRESH-POLL-SECS) refresh-connections)
+ (.scheduleRecurring
(:reset-log-levels-timer worker) 0 (conf WORKER-LOG-LEVEL-RESET-POLL-SECS)
- (reify StormTimer$TimerFunc
- (^void run
- [this ^Object o]
- (reset-log-levels latest-log-config))))
- (StormTimer/scheduleRecurring
- (:refresh-active-timer worker) 0 (conf TASK-REFRESH-POLL-SECS)
- (reify StormTimer$TimerFunc
- (^void run
- [this ^Object o]
- ((partial refresh-storm-active worker)))))
+ (fn [] (reset-log-levels latest-log-config)))
+ (.scheduleRecurring
+ (:refresh-active-timer worker) 0 (conf TASK-REFRESH-POLL-SECS) (partial refresh-storm-active worker))
(log-message "Worker has topology config " (Utils/redactValue (:storm-conf worker) STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD))
(log-message "Worker " worker-id " for storm " storm-id " on " assignment-id ":" port " has finished loading")
ret
http://git-wip-us.apache.org/repos/asf/storm/blob/2d06efe7/storm-core/src/jvm/org/apache/storm/StormTimer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/StormTimer.java b/storm-core/src/jvm/org/apache/storm/StormTimer.java
index df89dc6..a2d0145 100644
--- a/storm-core/src/jvm/org/apache/storm/StormTimer.java
+++ b/storm-core/src/jvm/org/apache/storm/StormTimer.java
@@ -26,7 +26,6 @@ import org.slf4j.LoggerFactory;
import java.util.Comparator;
import java.util.Random;
import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -35,66 +34,52 @@ import java.util.concurrent.atomic.AtomicBoolean;
* code that does asynchronous work on the timer thread
*/
-public class StormTimer {
+public class StormTimer implements AutoCloseable{
private static final Logger LOG = LoggerFactory.getLogger(StormTimer.class);
- public interface TimerFunc {
- public void run(Object o);
- }
-
public static class QueueEntry {
public final Long endTimeMs;
- public final TimerFunc afn;
+ public final Runnable func;
public final String id;
- public QueueEntry(Long endTimeMs, TimerFunc afn, String id) {
+ public QueueEntry(Long endTimeMs, Runnable func, String id) {
this.endTimeMs = endTimeMs;
- this.afn = afn;
+ this.func = func;
this.id = id;
}
-
- @Override
- public String toString() {
- return this.id + " " + this.endTimeMs + " " + this.afn;
- }
}
public static class StormTimerTask extends Thread {
- private PriorityBlockingQueue<QueueEntry> queue = new PriorityBlockingQueue<QueueEntry>(10, new Comparator() {
+ private PriorityBlockingQueue<QueueEntry> queue = new PriorityBlockingQueue<QueueEntry>(10, new Comparator<QueueEntry>() {
@Override
- public int compare(Object o1, Object o2) {
- return ((QueueEntry)o1).endTimeMs.intValue() - ((QueueEntry)o2).endTimeMs.intValue();
+ public int compare(QueueEntry o1, QueueEntry o2) {
+ return o1.endTimeMs.intValue() - o2.endTimeMs.intValue();
}
});
+ // boolean to indicate whether timer is active
private AtomicBoolean active = new AtomicBoolean(false);
- private TimerFunc onKill;
+ // function to call when timer is killed
+ private Thread.UncaughtExceptionHandler onKill;
+ //random number generator
private Random random = new Random();
- private Semaphore cancelNotifier = new Semaphore(0);
-
- private Object lock = new Object();
-
@Override
public void run() {
while (this.active.get()) {
QueueEntry queueEntry = null;
try {
- synchronized (this.lock) {
- queueEntry = this.queue.peek();
- }
+ queueEntry = this.queue.peek();
if ((queueEntry != null) && (Time.currentTimeMillis() >= queueEntry.endTimeMs)) {
// It is imperative to not run the function
// inside the timer lock. Otherwise, it is
// possible to deadlock if the fn deals with
// other locks, like the submit lock.
- synchronized (this.lock) {
- this.queue.poll();
- }
- queueEntry.afn.run(null);
+ this.queue.remove(queueEntry);
+ queueEntry.func.run();
} else if (queueEntry != null) {
// If any events are scheduled, sleep until
// event generation. If any recurring events
@@ -113,18 +98,16 @@ public class StormTimer {
// events.
Time.sleep(1000);
}
- } catch (Throwable t) {
- if (!(Utils.exceptionCauseIsInstanceOf(InterruptedException.class, t))) {
- this.onKill.run(t);
+ } catch (Throwable e) {
+ if (!(Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e))) {
+ this.onKill.uncaughtException(this, e);
this.setActive(false);
- throw new RuntimeException(t);
}
}
}
- this.cancelNotifier.release();
}
- public void setOnKillFunc(TimerFunc onKill) {
+ public void setOnKillFunc(Thread.UncaughtExceptionHandler onKill) {
this.onKill = onKill;
}
@@ -141,88 +124,120 @@ public class StormTimer {
}
}
- public static StormTimerTask mkTimer(String name, TimerFunc onKill) {
+ //task to run
+ StormTimerTask task = new StormTimerTask();
+
+ /**
+ * Makes a Timer in the form of a StormTimerTask Object
+ * @param name name of the timer
+ * @param onKill function to call when timer is killed unexpectedly
+ * @return StormTimerTask object that was initialized
+ */
+ public StormTimer (String name, Thread.UncaughtExceptionHandler onKill) {
if (onKill == null) {
throw new RuntimeException("onKill func is null!");
}
- StormTimerTask task = new StormTimerTask();
if (name == null) {
- task.setName("timer");
+ this.task.setName("timer");
} else {
- task.setName(name);
+ this.task.setName(name);
}
- task.setOnKillFunc(onKill);
- task.setActive(true);
+ this.task.setOnKillFunc(onKill);
+ this.task.setActive(true);
- task.setDaemon(true);
- task.setPriority(Thread.MAX_PRIORITY);
- task.start();
- return task;
+ this.task.setDaemon(true);
+ this.task.setPriority(Thread.MAX_PRIORITY);
+ this.task.start();
}
- public static void schedule(StormTimerTask task, int delaySecs, TimerFunc afn, boolean checkActive, int jitterMs) {
- if (task == null) {
+
+ /**
+ * Schedule a function to be executed in the timer
+ * @param delaySecs the number of seconds to delay before running the function
+ * @param func the function to run
+ * @param checkActive whether to check is the timer is active
+ * @param jitterMs add jitter to the run
+ */
+ public void schedule(int delaySecs, Runnable func, boolean checkActive, int jitterMs) {
+ if (this.task == null) {
throw new RuntimeException("task is null!");
}
- if (afn == null) {
+ if (func == null) {
throw new RuntimeException("function to schedule is null!");
}
+ if (checkActive) {
+ checkActive();
+ }
String id = Utils.uuid();
long endTimeMs = Time.currentTimeMillis() + Time.secsToMillisLong(delaySecs);
if (jitterMs > 0) {
- endTimeMs = task.random.nextInt(jitterMs) + endTimeMs;
- }
- synchronized (task.lock) {
- task.add(new QueueEntry(endTimeMs, afn, id));
+ endTimeMs = this.task.random.nextInt(jitterMs) + endTimeMs;
}
+ task.add(new QueueEntry(endTimeMs, func, id));
}
- public static void schedule(StormTimerTask task, int delaySecs, TimerFunc afn) {
- schedule(task, delaySecs, afn, true, 0);
+
+ public void schedule(int delaySecs, Runnable func) {
+ schedule(delaySecs, func, true, 0);
}
- public static void scheduleRecurring(final StormTimerTask task, int delaySecs, final int recurSecs, final TimerFunc afn) {
- schedule(task, delaySecs, new TimerFunc() {
+ /**
+ * Schedule a function to run recurrently
+ * @param delaySecs the number of seconds to delay before running the function
+ * @param recurSecs the time between each invocation
+ * @param func the function to run
+ */
+ public void scheduleRecurring(int delaySecs, final int recurSecs, final Runnable func) {
+ schedule(delaySecs, new Runnable() {
@Override
- public void run(Object o) {
- afn.run(null);
+ public void run() {
+ func.run();
// This avoids a race condition with cancel-timer.
- schedule(task, recurSecs, this, false, 0);
+ schedule(recurSecs, this, false, 0);
}
});
}
- public static void scheduleRecurringWithJitter(final StormTimerTask task, int delaySecs, final int recurSecs, final int jitterMs, final TimerFunc afn) {
- schedule(task, delaySecs, new TimerFunc() {
+ /**
+ * schedule a function to run recurrently with jitter
+ * @param delaySecs the number of seconds to delay before running the function
+ * @param recurSecs the time between each invocation
+ * @param jitterMs jitter added to the run
+ * @param func the function to run
+ */
+ public void scheduleRecurringWithJitter(int delaySecs, final int recurSecs, final int jitterMs, final Runnable func) {
+ schedule(delaySecs, new Runnable() {
@Override
- public void run(Object o) {
- afn.run(null);
+ public void run() {
+ func.run();
// This avoids a race condition with cancel-timer.
- schedule(task, recurSecs, this, false, jitterMs);
+ schedule(recurSecs, this, false, jitterMs);
}
});
}
- public static void checkActive(StormTimerTask task) {
- if (task == null) {
- throw new RuntimeException("task is null!");
- }
- if (!task.isActive()) {
+ /**
+ * check if timer is active
+ */
+ public void checkActive() {
+ if (!this.task.isActive()) {
throw new IllegalStateException("Timer is not active");
}
}
- public static void cancelTimer(StormTimerTask task) throws InterruptedException {
- if (task == null) {
- throw new RuntimeException("task is null!");
- }
- checkActive(task);
- synchronized (task.lock) {
- task.setActive(false);
- task.interrupt();
- }
- task.cancelNotifier.acquire();
+ /**
+ * cancel timer
+ */
+
+ @Override
+ public void close() throws Exception {
+ checkActive();
+ this.task.setActive(false);
+ this.task.interrupt();
}
- public static boolean isTimerWaiting(StormTimerTask task) {
+ /**
+ * is timer waiting. Used in timer simulation
+ */
+ public boolean isTimerWaiting() {
if (task == null) {
throw new RuntimeException("task is null!");
}