You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/02/18 18:09:39 UTC
[3/9] storm git commit: replacing clojure with java
replacing clojure with java
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4243e4e0
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4243e4e0
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4243e4e0
Branch: refs/heads/master
Commit: 4243e4e0cf9b9ea4c2aa8efa0a37a78120f2b687
Parents: 7f58252
Author: Boyang Jerry Peng <je...@yahoo-inc.com>
Authored: Thu Feb 18 10:35:46 2016 -0600
Committer: Boyang Jerry Peng <je...@yahoo-inc.com>
Committed: Thu Feb 18 10:35:46 2016 -0600
----------------------------------------------------------------------
.../clj/org/apache/storm/daemon/executor.clj | 47 +++-
.../clj/org/apache/storm/daemon/logviewer.clj | 30 ++-
.../src/clj/org/apache/storm/daemon/nimbus.clj | 136 +++++++---
.../clj/org/apache/storm/daemon/supervisor.clj | 176 +++++++++----
.../src/clj/org/apache/storm/daemon/worker.clj | 185 ++++++++++----
storm-core/src/clj/org/apache/storm/timer.clj | 254 +++++++++----------
.../src/jvm/org/apache/storm/StormTimer.java | 115 +++++----
.../test/clj/org/apache/storm/nimbus_test.clj | 4 +-
.../clj/org/apache/storm/supervisor_test.clj | 7 +-
.../test/jvm/org/apache/storm/TestTimer.java | 57 +++++
10 files changed, 682 insertions(+), 329 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/4243e4e0/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 14a2f6e..f46f18b 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -17,7 +17,7 @@
(:use [org.apache.storm.daemon common])
(:import [org.apache.storm.generated Grouping Grouping$_Fields]
[java.io Serializable])
- (:use [org.apache.storm util config log timer stats])
+ (:use [org.apache.storm util config log stats])
(:import [java.util List Random HashMap ArrayList LinkedList Map])
(:import [org.apache.storm ICredentialsListener Thrift])
(:import [org.apache.storm.hooks ITaskHook])
@@ -39,7 +39,8 @@
(:import [java.lang Thread Thread$UncaughtExceptionHandler]
[java.util.concurrent ConcurrentLinkedQueue]
[org.json.simple JSONValue]
- [com.lmax.disruptor.dsl ProducerType])
+ [com.lmax.disruptor.dsl ProducerType]
+ [org.apache.storm StormTimer StormTimer$TimerFunc])
(:require [org.apache.storm [cluster :as cluster] [stats :as stats]])
(:require [org.apache.storm.daemon [task :as task]])
(:require [org.apache.storm.daemon.builtin-metrics :as builtin-metrics])
@@ -323,13 +324,23 @@
(let [{:keys [storm-conf receive-queue worker-context interval->task->metric-registry]} executor-data
distinct-time-bucket-intervals (keys interval->task->metric-registry)]
(doseq [interval distinct-time-bucket-intervals]
- (schedule-recurring
- (:user-timer (:worker executor-data))
- interval
- interval
- (fn []
- (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. worker-context [interval] Constants/SYSTEM_TASK_ID Constants/METRICS_TICK_STREAM_ID))]]
- (.publish ^DisruptorQueue receive-queue val)))))))
+; (schedule-recurring
+; (:user-timer (:worker executor-data))
+; interval
+; interval
+; (fn []
+; (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. worker-context [interval] Constants/SYSTEM_TASK_ID Constants/METRICS_TICK_STREAM_ID))]]
+; (disruptor/publish receive-queue val))))
+
+ (StormTimer/scheduleRecurring
+ (:user-timer (:worker executor-data))
+ interval
+ interval
+ (reify StormTimer$TimerFunc
+ (^void run
+ [this ^Object o]
+ (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. worker-context [interval] Constants/SYSTEM_TASK_ID Constants/METRICS_TICK_STREAM_ID))]]
+ (.publish ^DisruptorQueue receive-queue val))))))))
(defn metrics-tick
[executor-data task-data ^TupleImpl tuple]
@@ -364,13 +375,23 @@
(and (= false (storm-conf TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS))
(= :spout (:type executor-data))))
(log-message "Timeouts disabled for executor " (:component-id executor-data) ":" (:executor-id executor-data))
- (schedule-recurring
+; (schedule-recurring
+; (:user-timer worker)
+; tick-time-secs
+; tick-time-secs
+; (fn []
+; (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID Constants/SYSTEM_TICK_STREAM_ID))]]
+; (disruptor/publish receive-queue val))))
+
+ (StormTimer/scheduleRecurring
(:user-timer worker)
tick-time-secs
tick-time-secs
- (fn []
- (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID Constants/SYSTEM_TICK_STREAM_ID))]]
- (.publish ^DisruptorQueue receive-queue val))))))))
+ (reify StormTimer$TimerFunc
+ (^void run
+ [this ^Object o]
+ (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID Constants/SYSTEM_TICK_STREAM_ID))]]
+ (.publish ^DisruptorQueue receive-queue val)))))))))
(defn mk-executor [worker executor-id initial-credentials]
(let [executor-data (mk-executor-data worker executor-id)
http://git-wip-us.apache.org/repos/asf/storm/blob/4243e4e0/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj b/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
index 6ca1759..932a813 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
@@ -18,8 +18,9 @@
(:use [clojure.set :only [difference intersection]])
(:use [clojure.string :only [blank? split]])
(:use [hiccup core page-helpers form-helpers])
- (:use [org.apache.storm config util log timer])
+ (:use [org.apache.storm config util log])
(:use [org.apache.storm.ui helpers])
+ (:import [org.apache.storm StormTimer StormTimer$TimerFunc])
(:import [org.apache.storm.utils Utils Time VersionInfo ConfigUtils])
(:import [org.slf4j LoggerFactory])
(:import [java.util Arrays ArrayList HashSet])
@@ -263,13 +264,26 @@
(let [interval-secs (conf LOGVIEWER-CLEANUP-INTERVAL-SECS)]
(when interval-secs
(log-debug "starting log cleanup thread at interval: " interval-secs)
- (schedule-recurring (mk-timer :thread-name "logviewer-cleanup"
- :kill-fn (fn [t]
- (log-error t "Error when doing logs cleanup")
- (Utils/exitProcess 20 "Error when doing log cleanup")))
- 0 ;; Start immediately.
- interval-secs
- (fn [] (cleanup-fn! log-root-dir))))))
+; (schedule-recurring (mk-timer :thread-name "logviewer-cleanup"
+; :kill-fn (fn [t]
+; (log-error t "Error when doing logs cleanup")
+; (Utils/exitProcess 20 "Error when doing log cleanup")))
+; 0 ;; Start immediately.
+; interval-secs
+; (fn [] (cleanup-fn! log-root-dir)))
+
+ (StormTimer/scheduleRecurring
+ (StormTimer/mkTimer "logviewer-cleanup"
+ (reify StormTimer$TimerFunc
+ (^void run
+ [this ^Object t]
+ (log-error t "Error when doing logs cleanup")
+ (Utils/exitProcess 20 "Error when doing log cleanup"))))
+ 0 interval-secs
+ (reify StormTimer$TimerFunc
+ (^void run
+ [this ^Object o]
+ (cleanup-fn! log-root-dir)))))))
(defn- skip-bytes
"FileInputStream#skip may not work the first time, so ensure it successfully
http://git-wip-us.apache.org/repos/asf/storm/blob/4243e4e0/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 710cd83..d6413db 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -50,7 +50,7 @@
(:import [org.apache.storm.daemon Shutdownable])
(:import [org.apache.storm.validation ConfigValidation])
(:import [org.apache.storm.cluster ClusterStateContext DaemonType])
- (:use [org.apache.storm util config log timer zookeeper local-state])
+ (:use [org.apache.storm util config log zookeeper local-state])
(:require [org.apache.storm [cluster :as cluster]
[converter :as converter]
[stats :as stats]])
@@ -66,6 +66,7 @@
(:require [clj-time.coerce :as coerce])
(:require [metrics.meters :refer [defmeter mark!]])
(:require [metrics.gauges :refer [defgauge]])
+ (:import [org.apache.storm StormTimer StormTimer$TimerFunc])
(:gen-class
:methods [^{:static true} [launch [org.apache.storm.scheduler.INimbus] void]]))
@@ -193,10 +194,17 @@
:blob-listers (mk-bloblist-cache-map conf)
:uptime (Utils/makeUptimeComputer)
:validator (Utils/newInstance (conf NIMBUS-TOPOLOGY-VALIDATOR))
- :timer (mk-timer :kill-fn (fn [t]
- (log-error t "Error when processing event")
- (Utils/exitProcess 20 "Error when processing an event")
- ))
+; :timer (mk-timer :kill-fn (fn [t]
+; (log-error t "Error when processing event")
+; (Utils/exitProcess 20 "Error when processing an event")
+; ))
+ :timer (StormTimer/mkTimer nil
+ (reify StormTimer$TimerFunc
+ (^void run
+ [this ^Object t]
+ (log-error t "Error when processing event")
+ (Utils/exitProcess 20 "Error when processing an event"))))
+
:scheduler (mk-scheduler conf inimbus)
:leader-elector (Zookeeper/zkLeaderElector conf)
:id->sched-status (atom {})
@@ -379,10 +387,17 @@
(defn delay-event [nimbus storm-id delay-secs event]
(log-message "Delaying event " event " for " delay-secs " secs for " storm-id)
- (schedule (:timer nimbus)
- delay-secs
- #(transition! nimbus storm-id event false)
- ))
+; (schedule (:timer nimbus)
+; delay-secs
+; #(transition! nimbus storm-id event false)
+; )
+ (StormTimer/schedule
+ (:timer nimbus)
+ delay-secs
+ (reify StormTimer$TimerFunc
+ (^void run
+ [this ^Object o]
+ (transition! nimbus storm-id event false)))))
;; active -> reassign in X secs
@@ -1442,39 +1457,83 @@
(when (is-leader nimbus :throw-exception false)
(doseq [storm-id (.active-storms (:storm-cluster-state nimbus))]
(transition! nimbus storm-id :startup)))
- (schedule-recurring (:timer nimbus)
- 0
- (conf NIMBUS-MONITOR-FREQ-SECS)
- (fn []
- (when-not (conf ConfigUtils/NIMBUS_DO_NOT_REASSIGN)
- (locking (:submit-lock nimbus)
- (mk-assignments nimbus)))
- (do-cleanup nimbus)))
+; (schedule-recurring (:timer nimbus)
+; 0
+; (conf NIMBUS-MONITOR-FREQ-SECS)
+; (fn []
+; (when-not (conf ConfigUtils/NIMBUS_DO_NOT_REASSIGN)
+; (locking (:submit-lock nimbus)
+; (mk-assignments nimbus)))
+; (do-cleanup nimbus)))
+ (StormTimer/scheduleRecurring
+ (:timer nimbus)
+ 0
+ (conf NIMBUS-MONITOR-FREQ-SECS)
+ (reify StormTimer$TimerFunc
+ (^void run
+ [this ^Object o]
+ (when-not (conf ConfigUtils/NIMBUS_DO_NOT_REASSIGN)
+ (locking (:submit-lock nimbus)
+ (mk-assignments nimbus)))
+ (do-cleanup nimbus))))
;; Schedule Nimbus inbox cleaner
- (schedule-recurring (:timer nimbus)
- 0
- (conf NIMBUS-CLEANUP-INBOX-FREQ-SECS)
- (fn []
- (clean-inbox (inbox nimbus) (conf NIMBUS-INBOX-JAR-EXPIRATION-SECS))))
+; (schedule-recurring (:timer nimbus)
+; 0
+; (conf NIMBUS-CLEANUP-INBOX-FREQ-SECS)
+; (fn []
+; (clean-inbox (inbox nimbus) (conf NIMBUS-INBOX-JAR-EXPIRATION-SECS))))
+
+ (StormTimer/scheduleRecurring
+ (:timer nimbus)
+ 0
+ (conf NIMBUS-CLEANUP-INBOX-FREQ-SECS)
+ (reify StormTimer$TimerFunc
+ (^void run
+ [this ^Object o]
+ (clean-inbox (inbox nimbus) (conf NIMBUS-INBOX-JAR-EXPIRATION-SECS)))))
;; Schedule nimbus code sync thread to sync code from other nimbuses.
(if (instance? LocalFsBlobStore blob-store)
- (schedule-recurring (:timer nimbus)
- 0
- (conf NIMBUS-CODE-SYNC-FREQ-SECS)
- (fn []
- (blob-sync conf nimbus))))
+; (schedule-recurring (:timer nimbus)
+; 0
+; (conf NIMBUS-CODE-SYNC-FREQ-SECS)
+; (fn []
+; (blob-sync conf nimbus)))
+ (StormTimer/scheduleRecurring
+ (:timer nimbus)
+ 0
+ (conf NIMBUS-CODE-SYNC-FREQ-SECS)
+ (reify StormTimer$TimerFunc
+ (^void run
+ [this ^Object t]
+ (blob-sync conf nimbus)))))
;; Schedule topology history cleaner
(when-let [interval (conf LOGVIEWER-CLEANUP-INTERVAL-SECS)]
- (schedule-recurring (:timer nimbus)
+; (schedule-recurring (:timer nimbus)
+; 0
+; (conf LOGVIEWER-CLEANUP-INTERVAL-SECS)
+; (fn []
+; (clean-topology-history (conf LOGVIEWER-CLEANUP-AGE-MINS) nimbus)))
+ (StormTimer/scheduleRecurring
+ (:timer nimbus)
0
(conf LOGVIEWER-CLEANUP-INTERVAL-SECS)
- (fn []
- (clean-topology-history (conf LOGVIEWER-CLEANUP-AGE-MINS) nimbus))))
- (schedule-recurring (:timer nimbus)
- 0
- (conf NIMBUS-CREDENTIAL-RENEW-FREQ-SECS)
- (fn []
- (renew-credentials nimbus)))
+ (reify StormTimer$TimerFunc
+ (^void run
+ [this ^Object t]
+ (clean-topology-history (conf LOGVIEWER-CLEANUP-AGE-MINS) nimbus)))))
+; (schedule-recurring (:timer nimbus)
+; 0
+; (conf NIMBUS-CREDENTIAL-RENEW-FREQ-SECS)
+; (fn []
+; (renew-credentials nimbus)))
+ (StormTimer/scheduleRecurring
+ (:timer nimbus)
+ 0
+ (conf NIMBUS-CREDENTIAL-RENEW-FREQ-SECS)
+ (reify StormTimer$TimerFunc
+ (^void run
+ [this ^Object t]
+ (renew-credentials nimbus))))
(defgauge nimbus:num-supervisors
(fn [] (.size (.supervisors (:storm-cluster-state nimbus) nil))))
@@ -2206,7 +2265,8 @@
(shutdown [this]
(mark! nimbus:num-shutdown-calls)
(log-message "Shutting down master")
- (cancel-timer (:timer nimbus))
+ ;(cancel-timer (:timer nimbus))
+ (StormTimer/cancelTimer (:timer nimbus))
(.disconnect (:storm-cluster-state nimbus))
(.cleanup (:downloaders nimbus))
(.cleanup (:uploaders nimbus))
@@ -2216,7 +2276,9 @@
(log-message "Shut down master"))
DaemonCommon
(waiting? [this]
- (timer-waiting? (:timer nimbus))))))
+; (timer-waiting? (:timer nimbus))
+ (StormTimer/isTimerWaiting (:timer nimbus))
+ ))))
(defn validate-port-available[conf]
(try
http://git-wip-us.apache.org/repos/asf/storm/blob/4243e4e0/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
index 4b4bac3..56184aa 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
@@ -24,7 +24,7 @@
[java.net JarURLConnection]
[java.net URI URLDecoder]
[org.apache.commons.io FileUtils])
- (:use [org.apache.storm config util log timer local-state])
+ (:use [org.apache.storm config util log local-state])
(:import [org.apache.storm.generated AuthorizationException KeyNotFoundException WorkerResources])
(:import [org.apache.storm.utils NimbusLeaderNotFoundException VersionInfo])
(:import [java.nio.file Files StandardCopyOption])
@@ -42,6 +42,7 @@
[org.yaml.snakeyaml.constructor SafeConstructor])
(:require [metrics.gauges :refer [defgauge]])
(:require [metrics.meters :refer [defmeter mark!]])
+ (:import [org.apache.storm StormTimer StormTimer$TimerFunc])
(:gen-class
:methods [^{:static true} [launch [org.apache.storm.scheduler.ISupervisor] void]])
(:require [clojure.string :as str]))
@@ -335,19 +336,41 @@
:assignment-id (.getAssignmentId isupervisor)
:my-hostname (Utils/hostname conf)
:curr-assignment (atom nil) ;; used for reporting used ports when heartbeating
- :heartbeat-timer (mk-timer :kill-fn (fn [t]
- (log-error t "Error when processing event")
- (Utils/exitProcess 20 "Error when processing an event")
- ))
- :event-timer (mk-timer :kill-fn (fn [t]
- (log-error t "Error when processing event")
- (Utils/exitProcess 20 "Error when processing an event")
- ))
- :blob-update-timer (mk-timer :kill-fn (defn blob-update-timer
- [t]
- (log-error t "Error when processing event")
- (Utils/exitProcess 20 "Error when processing a event"))
- :timer-name "blob-update-timer")
+; :heartbeat-timer (mk-timer :kill-fn (fn [t]
+; (log-error t "Error when processing event")
+; (Utils/exitProcess 20 "Error when processing an event")
+; ))
+
+ :heartbeat-timer (StormTimer/mkTimer nil
+ (reify StormTimer$TimerFunc
+ (^void run
+ [this ^Object t]
+ (log-error t "Error when processing event")
+ (Utils/exitProcess 20 "Error when processing an event"))))
+; :event-timer (mk-timer :kill-fn (fn [t]
+; (log-error t "Error when processing event")
+; (Utils/exitProcess 20 "Error when processing an event")
+; ))
+
+ :event-timer (StormTimer/mkTimer nil
+ (reify StormTimer$TimerFunc
+ (^void run
+ [this ^Object t]
+ (log-error t "Error when processing event")
+ (Utils/exitProcess 20 "Error when processing an event"))))
+
+; :blob-update-timer (mk-timer :kill-fn (defn blob-update-timer
+; [t]
+; (log-error t "Error when processing event")
+; (Utils/exitProcess 20 "Error when processing a event"))
+; :timer-name "blob-update-timer")
+
+ :blob-update-timer (StormTimer/mkTimer "blob-update-timer"
+ (reify StormTimer$TimerFunc
+ (^void run
+ [this ^Object t]
+ (log-error t "Error when processing event")
+ (Utils/exitProcess 20 "Error when processing an event"))))
:localizer (Utils/createLocalizer conf (ConfigUtils/supervisorLocalDir conf))
:assignment-versions (atom {})
:sync-retry (atom 0)
@@ -815,10 +838,19 @@
(heartbeat-fn)
;; should synchronize supervisor so it doesn't launch anything after being down (optimization)
- (schedule-recurring (:heartbeat-timer supervisor)
- 0
- (conf SUPERVISOR-HEARTBEAT-FREQUENCY-SECS)
- heartbeat-fn)
+; (schedule-recurring (:heartbeat-timer supervisor)
+; 0
+; (conf SUPERVISOR-HEARTBEAT-FREQUENCY-SECS)
+; heartbeat-fn)
+ (StormTimer/scheduleRecurring
+ (:heartbeat-timer supervisor)
+ 0
+ (conf SUPERVISOR-HEARTBEAT-FREQUENCY-SECS)
+ (reify StormTimer$TimerFunc
+ (^void run
+ [this ^Object o]
+ (heartbeat-fn))))
+
(doseq [storm-id downloaded-storm-ids]
(add-blob-references (:localizer supervisor) storm-id
conf))
@@ -828,43 +860,91 @@
(when (conf SUPERVISOR-ENABLE)
;; This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up
;; to date even if callbacks don't all work exactly right
- (schedule-recurring (:event-timer supervisor) 0 10 (fn [] (.add event-manager synchronize-supervisor)))
- (schedule-recurring (:event-timer supervisor)
- 0
- (conf SUPERVISOR-MONITOR-FREQUENCY-SECS)
- (fn [] (.add processes-event-manager sync-processes)))
+; (schedule-recurring (:event-timer supervisor) 0 10 (fn [] (.add event-manager synchronize-supervisor)))
+ (StormTimer/scheduleRecurring
+ (:event-timer supervisor)
+ 0 10
+ (reify StormTimer$TimerFunc
+ (^void run
+ [this ^Object o]
+ (.add event-manager synchronize-supervisor))))
+; (schedule-recurring (:event-timer supervisor)
+; 0
+; (conf SUPERVISOR-MONITOR-FREQUENCY-SECS)
+; (fn [] (.add processes-event-manager sync-processes)))
+
+ (StormTimer/scheduleRecurring
+ (:event-timer supervisor)
+ 0
+ (conf SUPERVISOR-MONITOR-FREQUENCY-SECS)
+ (reify StormTimer$TimerFunc
+ (^void run
+ [this ^Object o]
+ (.add processes-event-manager sync-processes))))
;; Blob update thread. Starts with 30 seconds delay, every 30 seconds
- (schedule-recurring (:blob-update-timer supervisor)
- 30
- 30
- (fn [] (.add event-manager synchronize-blobs-fn)))
-
- (schedule-recurring (:event-timer supervisor)
- (* 60 5)
- (* 60 5)
- (fn [] (let [health-code (healthcheck/health-check conf)
- ids (my-worker-ids conf)]
- (if (not (= health-code 0))
- (do
- (doseq [id ids]
- (shutdown-worker supervisor id))
- (throw (RuntimeException. "Supervisor failed health check. Exiting.")))))))
+; (schedule-recurring (:blob-update-timer supervisor)
+; 30
+; 30
+; (fn [] (.add event-manager synchronize-blobs-fn)))
+ (StormTimer/scheduleRecurring
+ (:blob-update-timer supervisor)
+ 30 30
+ (reify StormTimer$TimerFunc
+ (^void run
+ [this ^Object o]
+ (.add event-manager synchronize-blobs-fn))))
+
+; (schedule-recurring (:event-timer supervisor)
+; (* 60 5)
+; (* 60 5)
+; (fn [] (let [health-code (healthcheck/health-check conf)
+; ids (my-worker-ids conf)]
+; (if (not (= health-code 0))
+; (do
+; (doseq [id ids]
+; (shutdown-worker supervisor id))
+; (throw (RuntimeException. "Supervisor failed health check. Exiting.")))))))
+ (StormTimer/scheduleRecurring
+ (:event-timer supervisor)
+ (* 60 5) (* 60 5)
+ (reify StormTimer$TimerFunc
+ (^void run
+ [this ^Object o]
+ (let [health-code (healthcheck/health-check conf)
+ ids (my-worker-ids conf)]
+ (if (not (= health-code 0))
+ (do
+ (doseq [id ids]
+ (shutdown-worker supervisor id))
+ (throw (RuntimeException. "Supervisor failed health check. Exiting."))))))))
+
;; Launch a thread that Runs profiler commands . Starts with 30 seconds delay, every 30 seconds
- (schedule-recurring (:event-timer supervisor)
- 30
- 30
- (fn [] (.add event-manager run-profiler-actions-fn))))
+; (schedule-recurring (:event-timer supervisor)
+; 30
+; 30
+; (fn [] (.add event-manager run-profiler-actions-fn))))
+ (StormTimer/scheduleRecurring
+ (:event-timer supervisor)
+ 30 30
+ (reify StormTimer$TimerFunc
+ (^void run
+ [this ^Object o]
+ (.add event-manager run-profiler-actions-fn)))))
+
(log-message "Starting supervisor with id " (:supervisor-id supervisor) " at host " (:my-hostname supervisor))
(reify
Shutdownable
(shutdown [this]
(log-message "Shutting down supervisor " (:supervisor-id supervisor))
(reset! (:active supervisor) false)
- (cancel-timer (:heartbeat-timer supervisor))
- (cancel-timer (:event-timer supervisor))
- (cancel-timer (:blob-update-timer supervisor))
+ ;(cancel-timer (:heartbeat-timer supervisor))
+ (StormTimer/cancelTimer (:heartbeat-timer supervisor))
+ ;(cancel-timer (:event-timer supervisor))
+ (StormTimer/cancelTimer (:event-timer supervisor))
+ ;(cancel-timer (:blob-update-timer supervisor))
+ (StormTimer/cancelTimer (:blob-update-timer supervisor))
(.shutdown event-manager)
(.shutdown processes-event-manager)
(.shutdown (:localizer supervisor))
@@ -883,8 +963,10 @@
(waiting? [this]
(or (not @(:active supervisor))
(and
- (timer-waiting? (:heartbeat-timer supervisor))
- (timer-waiting? (:event-timer supervisor))
+ ;(timer-waiting? (:heartbeat-timer supervisor))
+ (StormTimer/isTimerWaiting (:heartbeat-timer supervisor))
+ ;(timer-waiting? (:event-timer supervisor))
+ (StormTimer/isTimerWaiting (:event-timer supervisor))
(every? (memfn waiting?) managers)))
))))
http://git-wip-us.apache.org/repos/asf/storm/blob/4243e4e0/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index 83ae9be..9212506 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -15,7 +15,7 @@
;; limitations under the License.
(ns org.apache.storm.daemon.worker
(:use [org.apache.storm.daemon common])
- (:use [org.apache.storm config log util timer local-state])
+ (:use [org.apache.storm config log util local-state])
(:require [clj-time.core :as time])
(:require [clj-time.coerce :as coerce])
(:require [org.apache.storm.daemon [executor :as executor]])
@@ -45,6 +45,7 @@
(:import [org.apache.logging.log4j Level])
(:import [org.apache.logging.log4j.core.config LoggerConfig])
(:import [org.apache.storm.generated LogConfig LogLevelAction])
+ (:import [org.apache.storm StormTimer StormTimer$TimerFunc])
(:gen-class))
(defmulti mk-suicide-fn cluster-mode)
@@ -238,11 +239,17 @@
{})
(defn mk-halting-timer [timer-name]
- (mk-timer :kill-fn (fn [t]
- (log-error t "Error when processing event")
- (Utils/exitProcess 20 "Error when processing an event")
- )
- :timer-name timer-name))
+; (mk-timer :kill-fn (fn [t]
+; (log-error t "Error when processing event")
+; (Utils/exitProcess 20 "Error when processing an event")
+; )
+; :timer-name timer-name)
+ (StormTimer/mkTimer timer-name
+ (reify StormTimer$TimerFunc
+ (^void run
+ [this ^Object t]
+ (log-error t "Error when processing event")
+ (Utils/exitProcess 20 "Error when processing an event")))))
(defn worker-data [conf mq-context storm-id assignment-id port worker-id storm-conf cluster-state storm-cluster-state]
(let [assignment-versions (atom {})
@@ -374,9 +381,16 @@
conf (:conf worker)
storm-cluster-state (:storm-cluster-state worker)
storm-id (:storm-id worker)]
- (fn this
+ (fn refresh-connections
([]
- (this (fn [& ignored] (schedule (:refresh-connections-timer worker) 0 this))))
+ (refresh-connections (fn [& ignored]
+; (schedule (:refresh-connections-timer worker) 0 refresh-connections)
+ (StormTimer/schedule
+ (:refresh-connections-timer worker) 0
+ (reify StormTimer$TimerFunc
+ (^void run
+ [this ^Object o]
+ (refresh-connections)))))))
([callback]
(let [version (.assignment-version storm-cluster-state storm-id callback)
assignment (if (= version (:version (get @(:assignment-versions worker) storm-id)))
@@ -427,7 +441,15 @@
(defn refresh-storm-active
([worker]
- (refresh-storm-active worker (fn [& ignored] (schedule (:refresh-active-timer worker) 0 (partial refresh-storm-active worker)))))
+ (refresh-storm-active
+ worker (fn [& ignored]
+; (schedule (:refresh-active-timer worker) 0 (partial refresh-storm-active worker))
+ (StormTimer/schedule
+ (:refresh-active-timer worker) 0
+ (reify StormTimer$TimerFunc
+ (^void run
+ [this ^Object o]
+ ((partial refresh-storm-active worker))))))))
([worker callback]
(let [base (.storm-base (:storm-cluster-state worker) (:storm-id worker) callback)]
(reset!
@@ -474,16 +496,28 @@
(let [timer (:refresh-active-timer worker)
delay-secs 0
recur-secs 1]
- (schedule timer
+; (schedule timer
+; delay-secs
+; (fn this []
+; (if (all-connections-ready worker)
+; (do
+; (log-message "All connections are ready for worker " (:assignment-id worker) ":" (:port worker)
+; " with id "(:worker-id worker))
+; (reset! (:worker-active-flag worker) true))
+; (schedule timer recur-secs this :check-active false)
+; )))
+
+ (StormTimer/schedule timer
delay-secs
- (fn this []
- (if (all-connections-ready worker)
- (do
- (log-message "All connections are ready for worker " (:assignment-id worker) ":" (:port worker)
- " with id "(:worker-id worker))
- (reset! (:worker-active-flag worker) true))
- (schedule timer recur-secs this :check-active false)
- )))))
+ (reify StormTimer$TimerFunc
+ (^void run
+ [this ^Object o]
+ (if (all-connections-ready worker)
+ (do
+ (log-message "All connections are ready for worker " (:assignment-id worker) ":" (:port worker)
+ " with id " (:worker-id worker))
+ (reset! (:worker-active-flag worker) true))
+ (StormTimer/schedule timer recur-secs this false 0)))))))
(defn register-callbacks [worker]
(let [transfer-local-fn (:transfer-local-fn worker)
@@ -638,8 +672,22 @@
executors (atom nil)
;; launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout
;; to the supervisor
- _ (schedule-recurring (:heartbeat-timer worker) 0 (conf WORKER-HEARTBEAT-FREQUENCY-SECS) heartbeat-fn)
- _ (schedule-recurring (:executor-heartbeat-timer worker) 0 (conf TASK-HEARTBEAT-FREQUENCY-SECS) #(do-executor-heartbeats worker :executors @executors))
+; _ (schedule-recurring (:heartbeat-timer worker) 0 (conf WORKER-HEARTBEAT-FREQUENCY-SECS) heartbeat-fn)
+ _ (StormTimer/scheduleRecurring
+ (:heartbeat-timer worker) 0 (conf WORKER-HEARTBEAT-FREQUENCY-SECS)
+ (reify StormTimer$TimerFunc
+ (^void run
+ [this ^Object o]
+ (heartbeat-fn))))
+
+; _ (schedule-recurring (:executor-heartbeat-timer worker) 0 (conf TASK-HEARTBEAT-FREQUENCY-SECS) #(do-executor-heartbeats worker :executors @executors))
+
+ _ (StormTimer/scheduleRecurring
+ (:executor-heartbeat-timer worker) 0 (conf TASK-HEARTBEAT-FREQUENCY-SECS)
+ (reify StormTimer$TimerFunc
+ (^void run
+ [this ^Object o]
+ (do-executor-heartbeats worker :executors @executors))))
_ (register-callbacks worker)
@@ -700,14 +748,22 @@
(.interrupt backpressure-thread)
(.join backpressure-thread)
(log-message "Shut down backpressure thread")
- (cancel-timer (:heartbeat-timer worker))
- (cancel-timer (:refresh-connections-timer worker))
- (cancel-timer (:refresh-credentials-timer worker))
- (cancel-timer (:refresh-active-timer worker))
- (cancel-timer (:executor-heartbeat-timer worker))
- (cancel-timer (:user-timer worker))
- (cancel-timer (:refresh-load-timer worker))
-
+; (cancel-timer (:heartbeat-timer worker))
+ (StormTimer/cancelTimer (:heartbeat-timer worker))
+; (cancel-timer (:refresh-connections-timer worker))
+ (StormTimer/cancelTimer (:refresh-connections-timer worker))
+; (cancel-timer (:refresh-credentials-timer worker))
+ (StormTimer/cancelTimer (:refresh-credentials-timer worker))
+; (cancel-timer (:refresh-active-timer worker))
+ (StormTimer/cancelTimer (:refresh-active-timer worker))
+; (cancel-timer (:executor-heartbeat-timer worker))
+ (StormTimer/cancelTimer (:executor-heartbeat-timer worker))
+; (cancel-timer (:user-timer worker))
+ (StormTimer/cancelTimer (:user-timer worker))
+; (cancel-timer (:refresh-load-timer worker))
+ (StormTimer/cancelTimer (:refresh-load-timer worker))
+
+ (StormTimer/cancelTimer (:reset-log-levels-timer worker))
(close-resources worker)
(log-message "Trigger any worker shutdown hooks")
@@ -726,13 +782,20 @@
DaemonCommon
(waiting? [this]
(and
- (timer-waiting? (:heartbeat-timer worker))
- (timer-waiting? (:refresh-connections-timer worker))
- (timer-waiting? (:refresh-load-timer worker))
- (timer-waiting? (:refresh-credentials-timer worker))
- (timer-waiting? (:refresh-active-timer worker))
- (timer-waiting? (:executor-heartbeat-timer worker))
- (timer-waiting? (:user-timer worker))
+; (timer-waiting? (:heartbeat-timer worker))
+; (timer-waiting? (:refresh-connections-timer worker))
+; (timer-waiting? (:refresh-load-timer worker))
+; (timer-waiting? (:refresh-credentials-timer worker))
+; (timer-waiting? (:refresh-active-timer worker))
+; (timer-waiting? (:executor-heartbeat-timer worker))
+; (timer-waiting? (:user-timer worker))
+ (StormTimer/isTimerWaiting (:heartbeat-timer worker))
+ (StormTimer/isTimerWaiting (:refresh-connections-timer worker))
+ (StormTimer/isTimerWaiting (:refresh-load-timer worker))
+ (StormTimer/isTimerWaiting (:refresh-credentials-timer worker))
+ (StormTimer/isTimerWaiting (:refresh-active-timer worker))
+ (StormTimer/isTimerWaiting (:executor-heartbeat-timer worker))
+ (StormTimer/isTimerWaiting (:user-timer worker))
))
)
credentials (atom initial-credentials)
@@ -760,18 +823,50 @@
(establish-log-setting-callback)
(.credentials (:storm-cluster-state worker) storm-id (fn [args] (check-credentials-changed)))
- (schedule-recurring (:refresh-credentials-timer worker) 0 (conf TASK-CREDENTIALS-POLL-SECS)
- (fn [& args]
- (check-credentials-changed)
- (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
- (check-throttle-changed))))
+; (schedule-recurring (:refresh-credentials-timer worker) 0 (conf TASK-CREDENTIALS-POLL-SECS)
+; (fn [& args]
+; (check-credentials-changed)
+; (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
+; (check-throttle-changed))))
+
+ (StormTimer/scheduleRecurring
+ (:refresh-credentials-timer worker) 0 (conf TASK-CREDENTIALS-POLL-SECS)
+ (reify StormTimer$TimerFunc
+ (^void run
+ [this ^Object o]
+ (check-credentials-changed)
+ (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
+ (check-throttle-changed)))))
;; The jitter allows the clients to get the data at different times, and avoids thundering herd
(when-not (.get conf TOPOLOGY-DISABLE-LOADAWARE-MESSAGING)
- (schedule-recurring-with-jitter (:refresh-load-timer worker) 0 1 500 refresh-load))
- (schedule-recurring (:refresh-connections-timer worker) 0 (conf TASK-REFRESH-POLL-SECS) refresh-connections)
- (schedule-recurring (:reset-log-levels-timer worker) 0 (conf WORKER-LOG-LEVEL-RESET-POLL-SECS) (fn [] (reset-log-levels latest-log-config)))
- (schedule-recurring (:refresh-active-timer worker) 0 (conf TASK-REFRESH-POLL-SECS) (partial refresh-storm-active worker))
-
+; (schedule-recurring-with-jitter (:refresh-load-timer worker) 0 1 500 refresh-load)
+ (StormTimer/scheduleRecurringWithJitter
+ (:refresh-load-timer worker) 0 1 500
+ (reify StormTimer$TimerFunc
+ (^void run
+ [this ^Object o]
+ (refresh-load)))))
+; (schedule-recurring (:refresh-connections-timer worker) 0 (conf TASK-REFRESH-POLL-SECS) refresh-connections)
+ (StormTimer/scheduleRecurring
+ (:refresh-connections-timer worker) 0 (conf TASK-REFRESH-POLL-SECS)
+ (reify StormTimer$TimerFunc
+ (^void run
+ [this ^Object o]
+ (refresh-connections))))
+; (schedule-recurring (:reset-log-levels-timer worker) 0 (conf WORKER-LOG-LEVEL-RESET-POLL-SECS) (fn [] (reset-log-levels latest-log-config)))
+ (StormTimer/scheduleRecurring
+ (:reset-log-levels-timer worker) 0 (conf WORKER-LOG-LEVEL-RESET-POLL-SECS)
+ (reify StormTimer$TimerFunc
+ (^void run
+ [this ^Object o]
+ (reset-log-levels latest-log-config))))
+; (schedule-recurring (:refresh-active-timer worker) 0 (conf TASK-REFRESH-POLL-SECS) (partial refresh-storm-active worker))
+ (StormTimer/scheduleRecurring
+ (:refresh-active-timer worker) 0 (conf TASK-REFRESH-POLL-SECS)
+ (reify StormTimer$TimerFunc
+ (^void run
+ [this ^Object o]
+ ((partial refresh-storm-active worker)))))
(log-message "Worker has topology config " (Utils/redactValue (:storm-conf worker) STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD))
(log-message "Worker " worker-id " for storm " storm-id " on " assignment-id ":" port " has finished loading")
ret
http://git-wip-us.apache.org/repos/asf/storm/blob/4243e4e0/storm-core/src/clj/org/apache/storm/timer.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/timer.clj b/storm-core/src/clj/org/apache/storm/timer.clj
index 5f31032..27853c2 100644
--- a/storm-core/src/clj/org/apache/storm/timer.clj
+++ b/storm-core/src/clj/org/apache/storm/timer.clj
@@ -1,128 +1,128 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
+; Licensed to the Apache Software Foundation (ASF) under one
+; or more contributor license agreements. See the NOTICE file
+; distributed with this work for additional information
+; regarding copyright ownership. The ASF licenses this file
+; to you under the Apache License, Version 2.0 (the
+; "License"); you may not use this file except in compliance
+; with the License. You may obtain a copy of the License at
+;
+; http://www.apache.org/licenses/LICENSE-2.0
+;
+; Unless required by applicable law or agreed to in writing, software
+; distributed under the License is distributed on an "AS IS" BASIS,
+; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+; See the License for the specific language governing permissions and
+; limitations under the License.
-(ns org.apache.storm.timer
- (:import [org.apache.storm.utils Utils Time])
- (:import [java.util PriorityQueue Comparator Random])
- (:import [java.util.concurrent Semaphore])
- (:use [org.apache.storm util log]))
-
-;; The timer defined in this file is very similar to java.util.Timer, except
-;; it integrates with Storm's time simulation capabilities. This lets us test
-;; code that does asynchronous work on the timer thread
-
-(defnk mk-timer [:kill-fn (fn [& _] ) :timer-name nil]
- (let [queue (PriorityQueue. 10 (reify Comparator
- (compare
- [this o1 o2]
- (- (first o1) (first o2)))
- (equals
- [this obj]
- true)))
- active (atom true)
- lock (Object.)
- notifier (Semaphore. 0)
- thread-name (if timer-name timer-name "timer")
- timer-thread (Thread.
- (fn []
- (while @active
- (try
- (let [[time-millis _ _ :as elem] (locking lock (.peek queue))]
- (if (and elem (>= (Time/currentTimeMillis) time-millis))
- ;; It is imperative to not run the function
- ;; inside the timer lock. Otherwise, it is
- ;; possible to deadlock if the fn deals with
- ;; other locks, like the submit lock.
- (let [afn (locking lock (second (.poll queue)))]
- (afn))
- (if time-millis
- ;; If any events are scheduled, sleep until
- ;; event generation. If any recurring events
- ;; are scheduled then we will always go
- ;; through this branch, sleeping only the
- ;; exact necessary amount of time. We give
- ;; an upper bound, e.g. 1000 millis, to the
- ;; sleeping time, to limit the response time
- ;; for detecting any new event within 1 secs.
- (Time/sleep (min 1000 (- time-millis (Time/currentTimeMillis))))
- ;; Otherwise poll to see if any new event
- ;; was scheduled. This is, in essence, the
- ;; response time for detecting any new event
- ;; schedulings when there are no scheduled
- ;; events.
- (Time/sleep 1000))))
- (catch Throwable t
- ;; Because the interrupted exception can be
- ;; wrapped in a RuntimeException.
- (when-not (Utils/exceptionCauseIsInstanceOf InterruptedException t)
- (kill-fn t)
- (reset! active false)
- (throw t)))))
- (.release notifier)) thread-name)]
- (.setDaemon timer-thread true)
- (.setPriority timer-thread Thread/MAX_PRIORITY)
- (.start timer-thread)
- {:timer-thread timer-thread
- :queue queue
- :active active
- :lock lock
- :random (Random.)
- :cancel-notifier notifier}))
-
-(defn- check-active!
- [timer]
- (when-not @(:active timer)
- (throw (IllegalStateException. "Timer is not active"))))
-
-(defnk schedule
- [timer delay-secs afn :check-active true :jitter-ms 0]
- (when check-active (check-active! timer))
- (let [id (Utils/uuid)
- ^PriorityQueue queue (:queue timer)
- end-time-ms (+ (Time/currentTimeMillis) (Time/secsToMillisLong delay-secs))
- end-time-ms (if (< 0 jitter-ms) (+ (.nextInt (:random timer) jitter-ms) end-time-ms) end-time-ms)]
- (locking (:lock timer)
- (.add queue [end-time-ms afn id]))))
-
-(defn schedule-recurring
- [timer delay-secs recur-secs afn]
- (schedule timer
- delay-secs
- (fn this []
- (afn)
- ; This avoids a race condition with cancel-timer.
- (schedule timer recur-secs this :check-active false))))
-
-(defn schedule-recurring-with-jitter
- [timer delay-secs recur-secs jitter-ms afn]
- (schedule timer
- delay-secs
- (fn this []
- (afn)
- ; This avoids a race condition with cancel-timer.
- (schedule timer recur-secs this :check-active false :jitter-ms jitter-ms))))
-
-(defn cancel-timer
- [timer]
- (check-active! timer)
- (locking (:lock timer)
- (reset! (:active timer) false)
- (.interrupt (:timer-thread timer)))
- (.acquire (:cancel-notifier timer)))
-
-(defn timer-waiting?
- [timer]
- (Time/isThreadWaiting (:timer-thread timer)))
+;(ns org.apache.storm.timer
+; (:import [org.apache.storm.utils Utils Time])
+; (:import [java.util PriorityQueue Comparator Random])
+; (:import [java.util.concurrent Semaphore])
+; (:use [org.apache.storm util log]))
+;
+;;; The timer defined in this file is very similar to java.util.Timer, except
+;;; it integrates with Storm's time simulation capabilities. This lets us test
+;;; code that does asynchronous work on the timer thread
+;
+;(defnk mk-timer [:kill-fn (fn [& _] ) :timer-name nil]
+; (let [queue (PriorityQueue. 10 (reify Comparator
+; (compare
+; [this o1 o2]
+; (- (first o1) (first o2)))
+; (equals
+; [this obj]
+; true)))
+; active (atom true)
+; lock (Object.)
+; notifier (Semaphore. 0)
+; thread-name (if timer-name timer-name "timer")
+; timer-thread (Thread.
+; (fn []
+; (while @active
+; (try
+; (let [[time-millis _ _ :as elem] (locking lock (.peek queue))]
+; (if (and elem (>= (Time/currentTimeMillis) time-millis))
+; ;; It is imperative to not run the function
+; ;; inside the timer lock. Otherwise, it is
+; ;; possible to deadlock if the fn deals with
+; ;; other locks, like the submit lock.
+; (let [afn (locking lock (second (.poll queue)))]
+; (afn))
+; (if time-millis
+; ;; If any events are scheduled, sleep until
+; ;; event generation. If any recurring events
+; ;; are scheduled then we will always go
+; ;; through this branch, sleeping only the
+; ;; exact necessary amount of time. We give
+; ;; an upper bound, e.g. 1000 millis, to the
+; ;; sleeping time, to limit the response time
+; ;; for detecting any new event within 1 secs.
+; (Time/sleep (min 1000 (- time-millis (Time/currentTimeMillis))))
+; ;; Otherwise poll to see if any new event
+; ;; was scheduled. This is, in essence, the
+; ;; response time for detecting any new event
+; ;; schedulings when there are no scheduled
+; ;; events.
+; (Time/sleep 1000))))
+; (catch Throwable t
+; ;; Because the interrupted exception can be
+; ;; wrapped in a RuntimeException.
+; (when-not (Utils/exceptionCauseIsInstanceOf InterruptedException t)
+; (kill-fn t)
+; (reset! active false)
+; (throw t)))))
+; (.release notifier)) thread-name)]
+; (.setDaemon timer-thread true)
+; (.setPriority timer-thread Thread/MAX_PRIORITY)
+; (.start timer-thread)
+; {:timer-thread timer-thread
+; :queue queue
+; :active active
+; :lock lock
+; :random (Random.)
+; :cancel-notifier notifier}))
+;
+;(defn- check-active!
+; [timer]
+; (when-not @(:active timer)
+; (throw (IllegalStateException. "Timer is not active"))))
+;
+;(defnk schedule
+; [timer delay-secs afn :check-active true :jitter-ms 0]
+; (when check-active (check-active! timer))
+; (let [id (Utils/uuid)
+; ^PriorityQueue queue (:queue timer)
+; end-time-ms (+ (Time/currentTimeMillis) (Time/secsToMillisLong delay-secs))
+; end-time-ms (if (< 0 jitter-ms) (+ (.nextInt (:random timer) jitter-ms) end-time-ms) end-time-ms)]
+; (locking (:lock timer)
+; (.add queue [end-time-ms afn id]))))
+;
+;(defn schedule-recurring
+; [timer delay-secs recur-secs afn]
+; (schedule timer
+; delay-secs
+; (fn this []
+; (afn)
+; ; This avoids a race condition with cancel-timer.
+; (schedule timer recur-secs this :check-active false))))
+;
+;(defn schedule-recurring-with-jitter
+; [timer delay-secs recur-secs jitter-ms afn]
+; (schedule timer
+; delay-secs
+; (fn this []
+; (afn)
+; ; This avoids a race condition with cancel-timer.
+; (schedule timer recur-secs this :check-active false :jitter-ms jitter-ms))))
+;
+;(defn cancel-timer
+; [timer]
+; (check-active! timer)
+; (locking (:lock timer)
+; (reset! (:active timer) false)
+; (.interrupt (:timer-thread timer)))
+; (.acquire (:cancel-notifier timer)))
+;
+;(defn timer-waiting?
+; [timer]
+; (Time/isThreadWaiting (:timer-thread timer)))
http://git-wip-us.apache.org/repos/asf/storm/blob/4243e4e0/storm-core/src/jvm/org/apache/storm/StormTimer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/StormTimer.java b/storm-core/src/jvm/org/apache/storm/StormTimer.java
index 5267335..36878e4 100644
--- a/storm-core/src/jvm/org/apache/storm/StormTimer.java
+++ b/storm-core/src/jvm/org/apache/storm/StormTimer.java
@@ -19,11 +19,13 @@
package org.apache.storm;
import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Comparator;
import java.util.Random;
+import java.util.UUID;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -35,12 +37,29 @@ public class StormTimer {
public void run(Object o);
}
+ public static class QueueEntry {
+ public final Long endTimeMs;
+ public final TimerFunc afn;
+ public final String id;
+
+ public QueueEntry(Long endTimeMs, TimerFunc afn, String id) {
+ this.endTimeMs = endTimeMs;
+ this.afn = afn;
+ this.id = id;
+ }
+
+ @Override
+ public String toString() {
+ return this.id + " " + this.endTimeMs + " " + this.afn;
+ }
+ }
+
public static class StormTimerTask extends Thread {
- private PriorityBlockingQueue<Long> queue = new PriorityBlockingQueue<Long>(10, new Comparator() {
+ private PriorityBlockingQueue<QueueEntry> queue = new PriorityBlockingQueue<QueueEntry>(10, new Comparator() {
@Override
public int compare(Object o1, Object o2) {
- return 0;
+ return ((QueueEntry)o1).endTimeMs.intValue() - ((QueueEntry)o2).endTimeMs.intValue();
}
});
@@ -48,8 +67,6 @@ public class StormTimer {
private TimerFunc onKill;
- private TimerFunc afn;
-
private Random random = new Random();
private Semaphore cancelNotifier = new Semaphore(0);
@@ -58,26 +75,32 @@ public class StormTimer {
@Override
public void run() {
- LOG.info("in run...");
+ LOG.info("in run...{}", this.getName());
while (this.active.get()) {
+ QueueEntry queueEntry = null;
try {
- Long endTimeMillis;
synchronized (this.lock) {
- endTimeMillis = this.queue.peek();
+ queueEntry = this.queue.peek();
}
- if ((endTimeMillis != null) && (currentTimeMillis() >= endTimeMillis)) {
+ LOG.info("event: {} -- {}", this.getName(), queueEntry);
+
+ if ((queueEntry != null) && (Time.currentTimeMillis() >= queueEntry.endTimeMs)) {
synchronized (this.lock) {
this.queue.poll();
}
- LOG.info("About to run function...");
- this.afn.run(null);
- } else if (endTimeMillis != null) {
- Time.sleep(Math.min(1000, (endTimeMillis - currentTimeMillis())));
+ queueEntry.afn.run(null);
+ } else if (queueEntry != null) {
+ Time.sleep(Math.min(1000, (queueEntry.endTimeMs - Time.currentTimeMillis())));
} else {
Time.sleep(1000);
}
} catch (Throwable t) {
- this.onKill.run(t);
+ if (!(Utils.exceptionCauseIsInstanceOf(InterruptedException.class, t))) {
+ LOG.info("Exception throw for event: {} --- {}", queueEntry, t);
+ this.onKill.run(t);
+ this.setActive(false);
+ throw new RuntimeException(t);
+ }
}
}
this.cancelNotifier.release();
@@ -87,10 +110,6 @@ public class StormTimer {
this.onKill = onKill;
}
- public void setFunc(TimerFunc func) {
- this.afn = func;
- }
-
public void setActive(boolean flag) {
this.active.set(flag);
}
@@ -99,14 +118,21 @@ public class StormTimer {
return this.active.get();
}
- public void add(long endTime) {
- this.queue.add(endTime);
+ public void add(QueueEntry queueEntry) {
+ this.queue.add(queueEntry);
}
}
- public static StormTimerTask mkTimer(TimerFunc onKill, String name) {
- LOG.info("making Timer...");
+ public static StormTimerTask mkTimer(String name, TimerFunc onKill) {
+ if (onKill == null) {
+ throw new RuntimeException("onKill func is null!");
+ }
StormTimerTask task = new StormTimerTask();
+ if (name == null) {
+ task.setName("timer");
+ } else {
+ task.setName(name);
+ }
task.setOnKillFunc(onKill);
task.setActive(true);
@@ -116,13 +142,20 @@ public class StormTimer {
return task;
}
public static void schedule(StormTimerTask task, int delaySecs, TimerFunc afn, boolean checkActive, int jitterMs) {
- long endTimeMs = currentTimeMillis() + secsToMillisLong(delaySecs);
+ if (task == null) {
+ throw new RuntimeException("task is null!");
+ }
+ if (afn == null) {
+ throw new RuntimeException("function to schedule is null!");
+ }
+ String id = Utils.uuid();
+ long endTimeMs = Time.currentTimeMillis() + Time.secsToMillisLong(delaySecs);
if (jitterMs > 0) {
endTimeMs = task.random.nextInt(jitterMs) + endTimeMs;
}
- task.setFunc(afn);
+ LOG.info("add event: {}-{}-{}", id, endTimeMs, afn);
synchronized (task.lock) {
- task.add(endTimeMs);
+ task.add(new QueueEntry(endTimeMs, afn, id));
}
}
public static void schedule(StormTimerTask task, int delaySecs, TimerFunc afn) {
@@ -156,12 +189,20 @@ public class StormTimer {
}
public static void checkActive(StormTimerTask task) {
+ if (task == null) {
+ throw new RuntimeException("task is null!");
+ }
if (!task.isActive()) {
throw new IllegalStateException("Timer is not active");
}
}
public static void cancelTimer(StormTimerTask task) throws InterruptedException {
+ if (task == null) {
+ throw new RuntimeException("task is null!");
+ }
+ LOG.info("cancel task: {} - {} - {}", task.getName(), task.getId(), task.queue);
+
checkActive(task);
synchronized (task.lock) {
task.setActive(false);
@@ -171,29 +212,9 @@ public class StormTimer {
}
public static boolean isTimerWaiting(StormTimerTask task) {
+ if (task == null) {
+ throw new RuntimeException("task is null!");
+ }
return Time.isThreadWaiting(task);
}
-
- /**
- * function in util that haven't be translated to java
- */
-
- public static long secsToMillisLong(long secs) {
- return secs * 1000;
- }
-
- public static long currentTimeMillis() {
- return Time.currentTimeMillis();
- }
-
-
- public static void main(String[] argv) {
- mkTimer(new TimerFunc() {
- @Override
- public void run(Object o) {
-
- }
- }, "erer");
- }
-
}
http://git-wip-us.apache.org/repos/asf/storm/blob/4243e4e0/storm-core/test/clj/org/apache/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
index 42a0374..e527d60 100644
--- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj
+++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
@@ -37,7 +37,7 @@
(:import [org.apache.storm.zookeeper Zookeeper])
(:import [org.apache.commons.io FileUtils]
[org.json.simple JSONValue])
- (:use [org.apache.storm testing MockAutoCred util config log timer zookeeper])
+ (:use [org.apache.storm testing MockAutoCred util config log zookeeper])
(:use [org.apache.storm.daemon common])
(:require [conjure.core])
(:require [org.apache.storm [cluster :as cluster]])
@@ -1483,7 +1483,7 @@
nimbus/file-cache-map nil
nimbus/mk-blob-cache-map nil
nimbus/mk-bloblist-cache-map nil
- mk-timer nil
+ ; mk-timer nil
nimbus/mk-scheduler nil]
(nimbus/nimbus-data auth-conf fake-inimbus)
(verify-call-times-for cluster/mk-storm-cluster-state 1)
http://git-wip-us.apache.org/repos/asf/storm/blob/4243e4e0/storm-core/test/clj/org/apache/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/supervisor_test.clj b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
index b25bd7c..71aaf85 100644
--- a/storm-core/test/clj/org/apache/storm/supervisor_test.clj
+++ b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
@@ -33,7 +33,7 @@
(:import [java.nio.file.attribute FileAttribute])
(:import [org.apache.storm Thrift])
(:import [org.apache.storm.utils Utils])
- (:use [org.apache.storm config testing util timer log])
+ (:use [org.apache.storm config testing util log])
(:use [org.apache.storm.daemon common])
(:require [org.apache.storm.daemon [worker :as worker] [supervisor :as supervisor]]
[org.apache.storm [cluster :as cluster]])
@@ -646,7 +646,8 @@
(with-open [_ (ConfigUtilsInstaller. fake-cu)
_ (UtilsInstaller. fake-utils)]
(stubbing [cluster/mk-storm-cluster-state nil
- mk-timer nil]
+; mk-timer nil
+ ]
(supervisor/supervisor-data auth-conf nil fake-isupervisor)
(verify-call-times-for cluster/mk-storm-cluster-state 1)
(verify-first-call-args-for-indices cluster/mk-storm-cluster-state [2]
@@ -837,4 +838,4 @@
(validate-launched-once (:launched changed)
{"sup1" [3 4]}
(get-storm-id (:storm-cluster-state cluster) "topology2"))
- )))
\ No newline at end of file
+ )))
http://git-wip-us.apache.org/repos/asf/storm/blob/4243e4e0/storm-core/test/jvm/org/apache/storm/TestTimer.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/TestTimer.java b/storm-core/test/jvm/org/apache/storm/TestTimer.java
new file mode 100644
index 0000000..c798c1f
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/TestTimer.java
@@ -0,0 +1,57 @@
+package org.apache.storm;
+
+import org.apache.storm.utils.Time;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Created by jerrypeng on 2/9/16.
+ */
+public class TestTimer {
+ //public static StormTimerTask mkTimer(TimerFunc onKill, String name) {
+ private static final Logger LOG = LoggerFactory.getLogger(TestTimer.class);
+
+ @Test
+ public void testTimer() throws InterruptedException {
+// StormTimer.StormTimerTask task1 = StormTimer.mkTimer("timer", new StormTimer.TimerFunc() {
+// @Override
+// public void run(Object o) {
+// LOG.info("task1 onKill at {}", Time.currentTimeSecs());
+// }
+// });
+// StormTimer.scheduleRecurring(task1, 10, 5, new StormTimer.TimerFunc(){
+// @Override
+// public void run(Object o) {
+// LOG.info("task1-1 scheduleRecurring func at {}", Time.currentTimeSecs());
+// }
+// });
+// StormTimer.scheduleRecurring(task1, 5, 10, new StormTimer.TimerFunc(){
+// @Override
+// public void run(Object o) {
+// LOG.info("task1-2 scheduleRecurring func at {}", Time.currentTimeSecs());
+// }
+// });
+//
+// StormTimer.StormTimerTask task2 = StormTimer.mkTimer("timer", new StormTimer.TimerFunc() {
+// @Override
+// public void run(Object o) {
+// LOG.info("task2 onKill at {}", Time.currentTimeSecs());
+// }
+// });
+// StormTimer.scheduleRecurringWithJitter(task2, 10, 5, 2000, new StormTimer.TimerFunc(){
+// @Override
+// public void run(Object o) {
+// LOG.info("task2 scheduleRecurringWithJitter func at {}", Time.currentTimeSecs());
+// }
+// });
+//
+// LOG.info("sleeping...");
+// Time.sleep(30000);
+//
+// LOG.info("canceling task");
+// StormTimer.cancelTimer(task1);
+// StormTimer.cancelTimer(task2);
+
+ }
+}