You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/02/18 18:09:40 UTC
[4/9] storm git commit: cleaning up
cleaning up
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b09d8ca1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b09d8ca1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b09d8ca1
Branch: refs/heads/master
Commit: b09d8ca14798cb1b344038be5bea4fb53428d6a2
Parents: 4243e4e
Author: Boyang Jerry Peng <je...@yahoo-inc.com>
Authored: Thu Feb 11 15:01:43 2016 -0600
Committer: Boyang Jerry Peng <je...@yahoo-inc.com>
Committed: Thu Feb 18 10:35:56 2016 -0600
----------------------------------------------------------------------
.../clj/org/apache/storm/daemon/executor.clj | 16 ---
.../clj/org/apache/storm/daemon/logviewer.clj | 8 --
.../src/clj/org/apache/storm/daemon/nimbus.clj | 62 ++-------
.../clj/org/apache/storm/daemon/supervisor.clj | 70 ++--------
.../src/clj/org/apache/storm/daemon/worker.clj | 45 -------
storm-core/src/clj/org/apache/storm/timer.clj | 128 -------------------
.../src/jvm/org/apache/storm/StormTimer.java | 39 ++++--
.../test/clj/org/apache/storm/nimbus_test.clj | 1 -
.../clj/org/apache/storm/supervisor_test.clj | 4 +-
.../test/jvm/org/apache/storm/TestTimer.java | 57 ---------
10 files changed, 43 insertions(+), 387 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/b09d8ca1/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index f46f18b..2afb853 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -324,14 +324,6 @@
(let [{:keys [storm-conf receive-queue worker-context interval->task->metric-registry]} executor-data
distinct-time-bucket-intervals (keys interval->task->metric-registry)]
(doseq [interval distinct-time-bucket-intervals]
-; (schedule-recurring
-; (:user-timer (:worker executor-data))
-; interval
-; interval
-; (fn []
-; (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. worker-context [interval] Constants/SYSTEM_TASK_ID Constants/METRICS_TICK_STREAM_ID))]]
-; (disruptor/publish receive-queue val))))
-
(StormTimer/scheduleRecurring
(:user-timer (:worker executor-data))
interval
@@ -375,14 +367,6 @@
(and (= false (storm-conf TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS))
(= :spout (:type executor-data))))
(log-message "Timeouts disabled for executor " (:component-id executor-data) ":" (:executor-id executor-data))
-; (schedule-recurring
-; (:user-timer worker)
-; tick-time-secs
-; tick-time-secs
-; (fn []
-; (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID Constants/SYSTEM_TICK_STREAM_ID))]]
-; (disruptor/publish receive-queue val))))
-
(StormTimer/scheduleRecurring
(:user-timer worker)
tick-time-secs
http://git-wip-us.apache.org/repos/asf/storm/blob/b09d8ca1/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj b/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
index 932a813..16815f9 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
@@ -264,14 +264,6 @@
(let [interval-secs (conf LOGVIEWER-CLEANUP-INTERVAL-SECS)]
(when interval-secs
(log-debug "starting log cleanup thread at interval: " interval-secs)
-; (schedule-recurring (mk-timer :thread-name "logviewer-cleanup"
-; :kill-fn (fn [t]
-; (log-error t "Error when doing logs cleanup")
-; (Utils/exitProcess 20 "Error when doing log cleanup")))
-; 0 ;; Start immediately.
-; interval-secs
-; (fn [] (cleanup-fn! log-root-dir)))
-
(StormTimer/scheduleRecurring
(StormTimer/mkTimer "logviewer-cleanup"
(reify StormTimer$TimerFunc
http://git-wip-us.apache.org/repos/asf/storm/blob/b09d8ca1/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index d6413db..0d0b27a 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -194,17 +194,12 @@
:blob-listers (mk-bloblist-cache-map conf)
:uptime (Utils/makeUptimeComputer)
:validator (Utils/newInstance (conf NIMBUS-TOPOLOGY-VALIDATOR))
-; :timer (mk-timer :kill-fn (fn [t]
-; (log-error t "Error when processing event")
-; (Utils/exitProcess 20 "Error when processing an event")
-; ))
:timer (StormTimer/mkTimer nil
(reify StormTimer$TimerFunc
(^void run
[this ^Object t]
(log-error t "Error when processing event")
(Utils/exitProcess 20 "Error when processing an event"))))
-
:scheduler (mk-scheduler conf inimbus)
:leader-elector (Zookeeper/zkLeaderElector conf)
:id->sched-status (atom {})
@@ -387,12 +382,7 @@
(defn delay-event [nimbus storm-id delay-secs event]
(log-message "Delaying event " event " for " delay-secs " secs for " storm-id)
-; (schedule (:timer nimbus)
-; delay-secs
-; #(transition! nimbus storm-id event false)
-; )
- (StormTimer/schedule
- (:timer nimbus)
+ (StormTimer/schedule (:timer nimbus)
delay-secs
(reify StormTimer$TimerFunc
(^void run
@@ -1457,16 +1447,8 @@
(when (is-leader nimbus :throw-exception false)
(doseq [storm-id (.active-storms (:storm-cluster-state nimbus))]
(transition! nimbus storm-id :startup)))
-; (schedule-recurring (:timer nimbus)
-; 0
-; (conf NIMBUS-MONITOR-FREQ-SECS)
-; (fn []
-; (when-not (conf ConfigUtils/NIMBUS_DO_NOT_REASSIGN)
-; (locking (:submit-lock nimbus)
-; (mk-assignments nimbus)))
-; (do-cleanup nimbus)))
- (StormTimer/scheduleRecurring
- (:timer nimbus)
+
+ (StormTimer/scheduleRecurring (:timer nimbus)
0
(conf NIMBUS-MONITOR-FREQ-SECS)
(reify StormTimer$TimerFunc
@@ -1477,14 +1459,7 @@
(mk-assignments nimbus)))
(do-cleanup nimbus))))
;; Schedule Nimbus inbox cleaner
-; (schedule-recurring (:timer nimbus)
-; 0
-; (conf NIMBUS-CLEANUP-INBOX-FREQ-SECS)
-; (fn []
-; (clean-inbox (inbox nimbus) (conf NIMBUS-INBOX-JAR-EXPIRATION-SECS))))
-
- (StormTimer/scheduleRecurring
- (:timer nimbus)
+ (StormTimer/scheduleRecurring (:timer nimbus)
0
(conf NIMBUS-CLEANUP-INBOX-FREQ-SECS)
(reify StormTimer$TimerFunc
@@ -1493,13 +1468,7 @@
(clean-inbox (inbox nimbus) (conf NIMBUS-INBOX-JAR-EXPIRATION-SECS)))))
;; Schedule nimbus code sync thread to sync code from other nimbuses.
(if (instance? LocalFsBlobStore blob-store)
-; (schedule-recurring (:timer nimbus)
-; 0
-; (conf NIMBUS-CODE-SYNC-FREQ-SECS)
-; (fn []
-; (blob-sync conf nimbus)))
- (StormTimer/scheduleRecurring
- (:timer nimbus)
+ (StormTimer/scheduleRecurring (:timer nimbus)
0
(conf NIMBUS-CODE-SYNC-FREQ-SECS)
(reify StormTimer$TimerFunc
@@ -1508,26 +1477,14 @@
(blob-sync conf nimbus)))))
;; Schedule topology history cleaner
(when-let [interval (conf LOGVIEWER-CLEANUP-INTERVAL-SECS)]
-; (schedule-recurring (:timer nimbus)
-; 0
-; (conf LOGVIEWER-CLEANUP-INTERVAL-SECS)
-; (fn []
-; (clean-topology-history (conf LOGVIEWER-CLEANUP-AGE-MINS) nimbus)))
- (StormTimer/scheduleRecurring
- (:timer nimbus)
+ (StormTimer/scheduleRecurring (:timer nimbus)
0
(conf LOGVIEWER-CLEANUP-INTERVAL-SECS)
(reify StormTimer$TimerFunc
(^void run
[this ^Object t]
(clean-topology-history (conf LOGVIEWER-CLEANUP-AGE-MINS) nimbus)))))
-; (schedule-recurring (:timer nimbus)
-; 0
-; (conf NIMBUS-CREDENTIAL-RENEW-FREQ-SECS)
-; (fn []
-; (renew-credentials nimbus)))
- (StormTimer/scheduleRecurring
- (:timer nimbus)
+ (StormTimer/scheduleRecurring (:timer nimbus)
0
(conf NIMBUS-CREDENTIAL-RENEW-FREQ-SECS)
(reify StormTimer$TimerFunc
@@ -2265,7 +2222,6 @@
(shutdown [this]
(mark! nimbus:num-shutdown-calls)
(log-message "Shutting down master")
- ;(cancel-timer (:timer nimbus))
(StormTimer/cancelTimer (:timer nimbus))
(.disconnect (:storm-cluster-state nimbus))
(.cleanup (:downloaders nimbus))
@@ -2276,9 +2232,7 @@
(log-message "Shut down master"))
DaemonCommon
(waiting? [this]
-; (timer-waiting? (:timer nimbus))
- (StormTimer/isTimerWaiting (:timer nimbus))
- ))))
+ (StormTimer/isTimerWaiting (:timer nimbus))))))
(defn validate-port-available[conf]
(try
http://git-wip-us.apache.org/repos/asf/storm/blob/b09d8ca1/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
index 56184aa..2b70731 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
@@ -336,35 +336,18 @@
:assignment-id (.getAssignmentId isupervisor)
:my-hostname (Utils/hostname conf)
:curr-assignment (atom nil) ;; used for reporting used ports when heartbeating
-; :heartbeat-timer (mk-timer :kill-fn (fn [t]
-; (log-error t "Error when processing event")
-; (Utils/exitProcess 20 "Error when processing an event")
-; ))
-
:heartbeat-timer (StormTimer/mkTimer nil
(reify StormTimer$TimerFunc
(^void run
[this ^Object t]
(log-error t "Error when processing event")
(Utils/exitProcess 20 "Error when processing an event"))))
-; :event-timer (mk-timer :kill-fn (fn [t]
-; (log-error t "Error when processing event")
-; (Utils/exitProcess 20 "Error when processing an event")
-; ))
-
:event-timer (StormTimer/mkTimer nil
(reify StormTimer$TimerFunc
(^void run
[this ^Object t]
(log-error t "Error when processing event")
(Utils/exitProcess 20 "Error when processing an event"))))
-
-; :blob-update-timer (mk-timer :kill-fn (defn blob-update-timer
-; [t]
-; (log-error t "Error when processing event")
-; (Utils/exitProcess 20 "Error when processing a event"))
-; :timer-name "blob-update-timer")
-
:blob-update-timer (StormTimer/mkTimer "blob-update-timer"
(reify StormTimer$TimerFunc
(^void run
@@ -838,12 +821,7 @@
(heartbeat-fn)
;; should synchronize supervisor so it doesn't launch anything after being down (optimization)
-; (schedule-recurring (:heartbeat-timer supervisor)
-; 0
-; (conf SUPERVISOR-HEARTBEAT-FREQUENCY-SECS)
-; heartbeat-fn)
- (StormTimer/scheduleRecurring
- (:heartbeat-timer supervisor)
+ (StormTimer/scheduleRecurring (:heartbeat-timer supervisor)
0
(conf SUPERVISOR-HEARTBEAT-FREQUENCY-SECS)
(reify StormTimer$TimerFunc
@@ -860,21 +838,14 @@
(when (conf SUPERVISOR-ENABLE)
;; This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up
;; to date even if callbacks don't all work exactly right
-; (schedule-recurring (:event-timer supervisor) 0 10 (fn [] (.add event-manager synchronize-supervisor)))
- (StormTimer/scheduleRecurring
- (:event-timer supervisor)
+ (StormTimer/scheduleRecurring (:event-timer supervisor)
0 10
(reify StormTimer$TimerFunc
(^void run
[this ^Object o]
(.add event-manager synchronize-supervisor))))
-; (schedule-recurring (:event-timer supervisor)
-; 0
-; (conf SUPERVISOR-MONITOR-FREQUENCY-SECS)
-; (fn [] (.add processes-event-manager sync-processes)))
- (StormTimer/scheduleRecurring
- (:event-timer supervisor)
+ (StormTimer/scheduleRecurring (:event-timer supervisor)
0
(conf SUPERVISOR-MONITOR-FREQUENCY-SECS)
(reify StormTimer$TimerFunc
@@ -883,31 +854,17 @@
(.add processes-event-manager sync-processes))))
;; Blob update thread. Starts with 30 seconds delay, every 30 seconds
-; (schedule-recurring (:blob-update-timer supervisor)
-; 30
-; 30
-; (fn [] (.add event-manager synchronize-blobs-fn)))
- (StormTimer/scheduleRecurring
- (:blob-update-timer supervisor)
- 30 30
+ (StormTimer/scheduleRecurring (:blob-update-timer supervisor)
+ 30
+ 30
(reify StormTimer$TimerFunc
(^void run
[this ^Object o]
(.add event-manager synchronize-blobs-fn))))
-; (schedule-recurring (:event-timer supervisor)
-; (* 60 5)
-; (* 60 5)
-; (fn [] (let [health-code (healthcheck/health-check conf)
-; ids (my-worker-ids conf)]
-; (if (not (= health-code 0))
-; (do
-; (doseq [id ids]
-; (shutdown-worker supervisor id))
-; (throw (RuntimeException. "Supervisor failed health check. Exiting.")))))))
- (StormTimer/scheduleRecurring
- (:event-timer supervisor)
- (* 60 5) (* 60 5)
+ (StormTimer/scheduleRecurring (:event-timer supervisor)
+ (* 60 5)
+ (* 60 5)
(reify StormTimer$TimerFunc
(^void run
[this ^Object o]
@@ -921,10 +878,6 @@
;; Launch a thread that Runs profiler commands . Starts with 30 seconds delay, every 30 seconds
-; (schedule-recurring (:event-timer supervisor)
-; 30
-; 30
-; (fn [] (.add event-manager run-profiler-actions-fn))))
(StormTimer/scheduleRecurring
(:event-timer supervisor)
30 30
@@ -939,11 +892,8 @@
(shutdown [this]
(log-message "Shutting down supervisor " (:supervisor-id supervisor))
(reset! (:active supervisor) false)
- ;(cancel-timer (:heartbeat-timer supervisor))
(StormTimer/cancelTimer (:heartbeat-timer supervisor))
- ;(cancel-timer (:event-timer supervisor))
(StormTimer/cancelTimer (:event-timer supervisor))
- ;(cancel-timer (:blob-update-timer supervisor))
(StormTimer/cancelTimer (:blob-update-timer supervisor))
(.shutdown event-manager)
(.shutdown processes-event-manager)
@@ -963,9 +913,7 @@
(waiting? [this]
(or (not @(:active supervisor))
(and
- ;(timer-waiting? (:heartbeat-timer supervisor))
(StormTimer/isTimerWaiting (:heartbeat-timer supervisor))
- ;(timer-waiting? (:event-timer supervisor))
(StormTimer/isTimerWaiting (:event-timer supervisor))
(every? (memfn waiting?) managers)))
))))
http://git-wip-us.apache.org/repos/asf/storm/blob/b09d8ca1/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index 9212506..e74ffa1 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -239,11 +239,6 @@
{})
(defn mk-halting-timer [timer-name]
-; (mk-timer :kill-fn (fn [t]
-; (log-error t "Error when processing event")
-; (Utils/exitProcess 20 "Error when processing an event")
-; )
-; :timer-name timer-name)
(StormTimer/mkTimer timer-name
(reify StormTimer$TimerFunc
(^void run
@@ -384,7 +379,6 @@
(fn refresh-connections
([]
(refresh-connections (fn [& ignored]
-; (schedule (:refresh-connections-timer worker) 0 refresh-connections)
(StormTimer/schedule
(:refresh-connections-timer worker) 0
(reify StormTimer$TimerFunc
@@ -443,7 +437,6 @@
([worker]
(refresh-storm-active
worker (fn [& ignored]
-; (schedule (:refresh-active-timer worker) 0 (partial refresh-storm-active worker))
(StormTimer/schedule
(:refresh-active-timer worker) 0
(reify StormTimer$TimerFunc
@@ -496,17 +489,6 @@
(let [timer (:refresh-active-timer worker)
delay-secs 0
recur-secs 1]
-; (schedule timer
-; delay-secs
-; (fn this []
-; (if (all-connections-ready worker)
-; (do
-; (log-message "All connections are ready for worker " (:assignment-id worker) ":" (:port worker)
-; " with id "(:worker-id worker))
-; (reset! (:worker-active-flag worker) true))
-; (schedule timer recur-secs this :check-active false)
-; )))
-
(StormTimer/schedule timer
delay-secs
(reify StormTimer$TimerFunc
@@ -672,7 +654,6 @@
executors (atom nil)
;; launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout
;; to the supervisor
-; _ (schedule-recurring (:heartbeat-timer worker) 0 (conf WORKER-HEARTBEAT-FREQUENCY-SECS) heartbeat-fn)
_ (StormTimer/scheduleRecurring
(:heartbeat-timer worker) 0 (conf WORKER-HEARTBEAT-FREQUENCY-SECS)
(reify StormTimer$TimerFunc
@@ -680,8 +661,6 @@
[this ^Object o]
(heartbeat-fn))))
-; _ (schedule-recurring (:executor-heartbeat-timer worker) 0 (conf TASK-HEARTBEAT-FREQUENCY-SECS) #(do-executor-heartbeats worker :executors @executors))
-
_ (StormTimer/scheduleRecurring
(:executor-heartbeat-timer worker) 0 (conf TASK-HEARTBEAT-FREQUENCY-SECS)
(reify StormTimer$TimerFunc
@@ -748,21 +727,13 @@
(.interrupt backpressure-thread)
(.join backpressure-thread)
(log-message "Shut down backpressure thread")
-; (cancel-timer (:heartbeat-timer worker))
(StormTimer/cancelTimer (:heartbeat-timer worker))
-; (cancel-timer (:refresh-connections-timer worker))
(StormTimer/cancelTimer (:refresh-connections-timer worker))
-; (cancel-timer (:refresh-credentials-timer worker))
(StormTimer/cancelTimer (:refresh-credentials-timer worker))
-; (cancel-timer (:refresh-active-timer worker))
(StormTimer/cancelTimer (:refresh-active-timer worker))
-; (cancel-timer (:executor-heartbeat-timer worker))
(StormTimer/cancelTimer (:executor-heartbeat-timer worker))
-; (cancel-timer (:user-timer worker))
(StormTimer/cancelTimer (:user-timer worker))
-; (cancel-timer (:refresh-load-timer worker))
(StormTimer/cancelTimer (:refresh-load-timer worker))
-
(StormTimer/cancelTimer (:reset-log-levels-timer worker))
(close-resources worker)
@@ -782,13 +753,6 @@
DaemonCommon
(waiting? [this]
(and
-; (timer-waiting? (:heartbeat-timer worker))
-; (timer-waiting? (:refresh-connections-timer worker))
-; (timer-waiting? (:refresh-load-timer worker))
-; (timer-waiting? (:refresh-credentials-timer worker))
-; (timer-waiting? (:refresh-active-timer worker))
-; (timer-waiting? (:executor-heartbeat-timer worker))
-; (timer-waiting? (:user-timer worker))
(StormTimer/isTimerWaiting (:heartbeat-timer worker))
(StormTimer/isTimerWaiting (:refresh-connections-timer worker))
(StormTimer/isTimerWaiting (:refresh-load-timer worker))
@@ -823,11 +787,6 @@
(establish-log-setting-callback)
(.credentials (:storm-cluster-state worker) storm-id (fn [args] (check-credentials-changed)))
-; (schedule-recurring (:refresh-credentials-timer worker) 0 (conf TASK-CREDENTIALS-POLL-SECS)
-; (fn [& args]
-; (check-credentials-changed)
-; (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
-; (check-throttle-changed))))
(StormTimer/scheduleRecurring
(:refresh-credentials-timer worker) 0 (conf TASK-CREDENTIALS-POLL-SECS)
@@ -839,28 +798,24 @@
(check-throttle-changed)))))
;; The jitter allows the clients to get the data at different times, and avoids thundering herd
(when-not (.get conf TOPOLOGY-DISABLE-LOADAWARE-MESSAGING)
-; (schedule-recurring-with-jitter (:refresh-load-timer worker) 0 1 500 refresh-load)
(StormTimer/scheduleRecurringWithJitter
(:refresh-load-timer worker) 0 1 500
(reify StormTimer$TimerFunc
(^void run
[this ^Object o]
(refresh-load)))))
-; (schedule-recurring (:refresh-connections-timer worker) 0 (conf TASK-REFRESH-POLL-SECS) refresh-connections)
(StormTimer/scheduleRecurring
(:refresh-connections-timer worker) 0 (conf TASK-REFRESH-POLL-SECS)
(reify StormTimer$TimerFunc
(^void run
[this ^Object o]
(refresh-connections))))
-; (schedule-recurring (:reset-log-levels-timer worker) 0 (conf WORKER-LOG-LEVEL-RESET-POLL-SECS) (fn [] (reset-log-levels latest-log-config)))
(StormTimer/scheduleRecurring
(:reset-log-levels-timer worker) 0 (conf WORKER-LOG-LEVEL-RESET-POLL-SECS)
(reify StormTimer$TimerFunc
(^void run
[this ^Object o]
(reset-log-levels latest-log-config))))
-; (schedule-recurring (:refresh-active-timer worker) 0 (conf TASK-REFRESH-POLL-SECS) (partial refresh-storm-active worker))
(StormTimer/scheduleRecurring
(:refresh-active-timer worker) 0 (conf TASK-REFRESH-POLL-SECS)
(reify StormTimer$TimerFunc
http://git-wip-us.apache.org/repos/asf/storm/blob/b09d8ca1/storm-core/src/clj/org/apache/storm/timer.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/timer.clj b/storm-core/src/clj/org/apache/storm/timer.clj
deleted file mode 100644
index 27853c2..0000000
--- a/storm-core/src/clj/org/apache/storm/timer.clj
+++ /dev/null
@@ -1,128 +0,0 @@
-; Licensed to the Apache Software Foundation (ASF) under one
-; or more contributor license agreements. See the NOTICE file
-; distributed with this work for additional information
-; regarding copyright ownership. The ASF licenses this file
-; to you under the Apache License, Version 2.0 (the
-; "License"); you may not use this file except in compliance
-; with the License. You may obtain a copy of the License at
-;
-; http://www.apache.org/licenses/LICENSE-2.0
-;
-; Unless required by applicable law or agreed to in writing, software
-; distributed under the License is distributed on an "AS IS" BASIS,
-; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-; See the License for the specific language governing permissions and
-; limitations under the License.
-
-;(ns org.apache.storm.timer
-; (:import [org.apache.storm.utils Utils Time])
-; (:import [java.util PriorityQueue Comparator Random])
-; (:import [java.util.concurrent Semaphore])
-; (:use [org.apache.storm util log]))
-;
-;;; The timer defined in this file is very similar to java.util.Timer, except
-;;; it integrates with Storm's time simulation capabilities. This lets us test
-;;; code that does asynchronous work on the timer thread
-;
-;(defnk mk-timer [:kill-fn (fn [& _] ) :timer-name nil]
-; (let [queue (PriorityQueue. 10 (reify Comparator
-; (compare
-; [this o1 o2]
-; (- (first o1) (first o2)))
-; (equals
-; [this obj]
-; true)))
-; active (atom true)
-; lock (Object.)
-; notifier (Semaphore. 0)
-; thread-name (if timer-name timer-name "timer")
-; timer-thread (Thread.
-; (fn []
-; (while @active
-; (try
-; (let [[time-millis _ _ :as elem] (locking lock (.peek queue))]
-; (if (and elem (>= (Time/currentTimeMillis) time-millis))
-; ;; It is imperative to not run the function
-; ;; inside the timer lock. Otherwise, it is
-; ;; possible to deadlock if the fn deals with
-; ;; other locks, like the submit lock.
-; (let [afn (locking lock (second (.poll queue)))]
-; (afn))
-; (if time-millis
-; ;; If any events are scheduled, sleep until
-; ;; event generation. If any recurring events
-; ;; are scheduled then we will always go
-; ;; through this branch, sleeping only the
-; ;; exact necessary amount of time. We give
-; ;; an upper bound, e.g. 1000 millis, to the
-; ;; sleeping time, to limit the response time
-; ;; for detecting any new event within 1 secs.
-; (Time/sleep (min 1000 (- time-millis (Time/currentTimeMillis))))
-; ;; Otherwise poll to see if any new event
-; ;; was scheduled. This is, in essence, the
-; ;; response time for detecting any new event
-; ;; schedulings when there are no scheduled
-; ;; events.
-; (Time/sleep 1000))))
-; (catch Throwable t
-; ;; Because the interrupted exception can be
-; ;; wrapped in a RuntimeException.
-; (when-not (Utils/exceptionCauseIsInstanceOf InterruptedException t)
-; (kill-fn t)
-; (reset! active false)
-; (throw t)))))
-; (.release notifier)) thread-name)]
-; (.setDaemon timer-thread true)
-; (.setPriority timer-thread Thread/MAX_PRIORITY)
-; (.start timer-thread)
-; {:timer-thread timer-thread
-; :queue queue
-; :active active
-; :lock lock
-; :random (Random.)
-; :cancel-notifier notifier}))
-;
-;(defn- check-active!
-; [timer]
-; (when-not @(:active timer)
-; (throw (IllegalStateException. "Timer is not active"))))
-;
-;(defnk schedule
-; [timer delay-secs afn :check-active true :jitter-ms 0]
-; (when check-active (check-active! timer))
-; (let [id (Utils/uuid)
-; ^PriorityQueue queue (:queue timer)
-; end-time-ms (+ (Time/currentTimeMillis) (Time/secsToMillisLong delay-secs))
-; end-time-ms (if (< 0 jitter-ms) (+ (.nextInt (:random timer) jitter-ms) end-time-ms) end-time-ms)]
-; (locking (:lock timer)
-; (.add queue [end-time-ms afn id]))))
-;
-;(defn schedule-recurring
-; [timer delay-secs recur-secs afn]
-; (schedule timer
-; delay-secs
-; (fn this []
-; (afn)
-; ; This avoids a race condition with cancel-timer.
-; (schedule timer recur-secs this :check-active false))))
-;
-;(defn schedule-recurring-with-jitter
-; [timer delay-secs recur-secs jitter-ms afn]
-; (schedule timer
-; delay-secs
-; (fn this []
-; (afn)
-; ; This avoids a race condition with cancel-timer.
-; (schedule timer recur-secs this :check-active false :jitter-ms jitter-ms))))
-;
-;(defn cancel-timer
-; [timer]
-; (check-active! timer)
-; (locking (:lock timer)
-; (reset! (:active timer) false)
-; (.interrupt (:timer-thread timer)))
-; (.acquire (:cancel-notifier timer)))
-;
-;(defn timer-waiting?
-; [timer]
-; (Time/isThreadWaiting (:timer-thread timer)))
http://git-wip-us.apache.org/repos/asf/storm/blob/b09d8ca1/storm-core/src/jvm/org/apache/storm/StormTimer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/StormTimer.java b/storm-core/src/jvm/org/apache/storm/StormTimer.java
index 36878e4..df89dc6 100644
--- a/storm-core/src/jvm/org/apache/storm/StormTimer.java
+++ b/storm-core/src/jvm/org/apache/storm/StormTimer.java
@@ -25,11 +25,16 @@ import org.slf4j.LoggerFactory;
import java.util.Comparator;
import java.util.Random;
-import java.util.UUID;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
+/**
+ * The timer defined in this file is very similar to java.util.Timer, except
+ * it integrates with Storm's time simulation capabilities. This lets us test
+ * code that does asynchronous work on the timer thread
+ */
+
public class StormTimer {
private static final Logger LOG = LoggerFactory.getLogger(StormTimer.class);
@@ -75,28 +80,41 @@ public class StormTimer {
@Override
public void run() {
- LOG.info("in run...{}", this.getName());
while (this.active.get()) {
QueueEntry queueEntry = null;
try {
synchronized (this.lock) {
queueEntry = this.queue.peek();
}
- LOG.info("event: {} -- {}", this.getName(), queueEntry);
-
if ((queueEntry != null) && (Time.currentTimeMillis() >= queueEntry.endTimeMs)) {
+ // It is imperative to not run the function
+ // inside the timer lock. Otherwise, it is
+ // possible to deadlock if the fn deals with
+ // other locks, like the submit lock.
synchronized (this.lock) {
this.queue.poll();
}
queueEntry.afn.run(null);
} else if (queueEntry != null) {
+ // If any events are scheduled, sleep until
+ // event generation. If any recurring events
+ // are scheduled then we will always go
+ // through this branch, sleeping only the
+ // exact necessary amount of time. We give
+ // an upper bound, e.g. 1000 millis, to the
+ // sleeping time, to limit the response time
+ // for detecting any new event within 1 secs.
Time.sleep(Math.min(1000, (queueEntry.endTimeMs - Time.currentTimeMillis())));
} else {
+ // Otherwise poll to see if any new event
+ // was scheduled. This is, in essence, the
+ // response time for detecting any new event
+ // schedulings when there are no scheduled
+ // events.
Time.sleep(1000);
}
} catch (Throwable t) {
if (!(Utils.exceptionCauseIsInstanceOf(InterruptedException.class, t))) {
- LOG.info("Exception throw for event: {} --- {}", queueEntry, t);
this.onKill.run(t);
this.setActive(false);
throw new RuntimeException(t);
@@ -153,7 +171,6 @@ public class StormTimer {
if (jitterMs > 0) {
endTimeMs = task.random.nextInt(jitterMs) + endTimeMs;
}
- LOG.info("add event: {}-{}-{}", id, endTimeMs, afn);
synchronized (task.lock) {
task.add(new QueueEntry(endTimeMs, afn, id));
}
@@ -166,10 +183,8 @@ public class StormTimer {
schedule(task, delaySecs, new TimerFunc() {
@Override
public void run(Object o) {
- LOG.info("scheduleRecurring running...");
afn.run(null);
- LOG.info("scheduleRecurring schedule again...");
-
+ // This avoids a race condition with cancel-timer.
schedule(task, recurSecs, this, false, 0);
}
});
@@ -179,10 +194,8 @@ public class StormTimer {
schedule(task, delaySecs, new TimerFunc() {
@Override
public void run(Object o) {
- LOG.info("scheduleRecurringWithJitter running...");
afn.run(null);
- LOG.info("scheduleRecurringWithJitter schedule again...");
-
+ // This avoids a race condition with cancel-timer.
schedule(task, recurSecs, this, false, jitterMs);
}
});
@@ -201,8 +214,6 @@ public class StormTimer {
if (task == null) {
throw new RuntimeException("task is null!");
}
- LOG.info("cancel task: {} - {} - {}", task.getName(), task.getId(), task.queue);
-
checkActive(task);
synchronized (task.lock) {
task.setActive(false);
http://git-wip-us.apache.org/repos/asf/storm/blob/b09d8ca1/storm-core/test/clj/org/apache/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
index e527d60..ce58f42 100644
--- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj
+++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
@@ -1483,7 +1483,6 @@
nimbus/file-cache-map nil
nimbus/mk-blob-cache-map nil
nimbus/mk-bloblist-cache-map nil
- ; mk-timer nil
nimbus/mk-scheduler nil]
(nimbus/nimbus-data auth-conf fake-inimbus)
(verify-call-times-for cluster/mk-storm-cluster-state 1)
http://git-wip-us.apache.org/repos/asf/storm/blob/b09d8ca1/storm-core/test/clj/org/apache/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/supervisor_test.clj b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
index 71aaf85..ef40c4a 100644
--- a/storm-core/test/clj/org/apache/storm/supervisor_test.clj
+++ b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
@@ -645,9 +645,7 @@
(upTime [] 0))))]
(with-open [_ (ConfigUtilsInstaller. fake-cu)
_ (UtilsInstaller. fake-utils)]
- (stubbing [cluster/mk-storm-cluster-state nil
-; mk-timer nil
- ]
+ (stubbing [cluster/mk-storm-cluster-state nil]
(supervisor/supervisor-data auth-conf nil fake-isupervisor)
(verify-call-times-for cluster/mk-storm-cluster-state 1)
(verify-first-call-args-for-indices cluster/mk-storm-cluster-state [2]
http://git-wip-us.apache.org/repos/asf/storm/blob/b09d8ca1/storm-core/test/jvm/org/apache/storm/TestTimer.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/TestTimer.java b/storm-core/test/jvm/org/apache/storm/TestTimer.java
deleted file mode 100644
index c798c1f..0000000
--- a/storm-core/test/jvm/org/apache/storm/TestTimer.java
+++ /dev/null
@@ -1,57 +0,0 @@
-package org.apache.storm;
-
-import org.apache.storm.utils.Time;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Created by jerrypeng on 2/9/16.
- */
-public class TestTimer {
- //public static StormTimerTask mkTimer(TimerFunc onKill, String name) {
- private static final Logger LOG = LoggerFactory.getLogger(TestTimer.class);
-
- @Test
- public void testTimer() throws InterruptedException {
-// StormTimer.StormTimerTask task1 = StormTimer.mkTimer("timer", new StormTimer.TimerFunc() {
-// @Override
-// public void run(Object o) {
-// LOG.info("task1 onKill at {}", Time.currentTimeSecs());
-// }
-// });
-// StormTimer.scheduleRecurring(task1, 10, 5, new StormTimer.TimerFunc(){
-// @Override
-// public void run(Object o) {
-// LOG.info("task1-1 scheduleRecurring func at {}", Time.currentTimeSecs());
-// }
-// });
-// StormTimer.scheduleRecurring(task1, 5, 10, new StormTimer.TimerFunc(){
-// @Override
-// public void run(Object o) {
-// LOG.info("task1-2 scheduleRecurring func at {}", Time.currentTimeSecs());
-// }
-// });
-//
-// StormTimer.StormTimerTask task2 = StormTimer.mkTimer("timer", new StormTimer.TimerFunc() {
-// @Override
-// public void run(Object o) {
-// LOG.info("task2 onKill at {}", Time.currentTimeSecs());
-// }
-// });
-// StormTimer.scheduleRecurringWithJitter(task2, 10, 5, 2000, new StormTimer.TimerFunc(){
-// @Override
-// public void run(Object o) {
-// LOG.info("task2 scheduleRecurringWithJitter func at {}", Time.currentTimeSecs());
-// }
-// });
-//
-// LOG.info("sleeping...");
-// Time.sleep(30000);
-//
-// LOG.info("canceling task");
-// StormTimer.cancelTimer(task1);
-// StormTimer.cancelTimer(task2);
-
- }
-}