You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/10/25 02:25:24 UTC
[1/4] storm git commit: STORM-1128: Make metrics fast
Repository: storm
Updated Branches:
refs/heads/master e93015c64 -> 3bc375e73
STORM-1128: Make metrics fast
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e75219b5
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e75219b5
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e75219b5
Branch: refs/heads/master
Commit: e75219b5a4856c39c4031c3d6205105e034d10af
Parents: e93015c
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Thu Oct 22 10:48:04 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Sat Oct 24 09:11:58 2015 -0500
----------------------------------------------------------------------
.../jvm/storm/starter/ThroughputVsLatency.java | 11 +-
.../backtype/storm/daemon/builtin_metrics.clj | 84 +++---
.../src/clj/backtype/storm/daemon/executor.clj | 16 --
.../src/clj/backtype/storm/daemon/task.clj | 10 +-
storm-core/src/clj/backtype/storm/stats.clj | 233 ++++++-----------
.../metric/internal/CountStatAndMetric.java | 211 +++++++++++++++
.../metric/internal/LatencyStatAndMetric.java | 259 +++++++++++++++++++
.../storm/metric/internal/MetricStatTimer.java | 27 ++
.../internal/MultiCountStatAndMetric.java | 112 ++++++++
.../internal/MultiLatencyStatAndMetric.java | 109 ++++++++
.../storm/metric/internal/RateTracker.java | 165 ++++++++++++
.../backtype/storm/utils/DisruptorQueue.java | 1 +
.../jvm/backtype/storm/utils/RateTracker.java | 166 ------------
.../metric/internal/CountStatAndMetricTest.java | 86 ++++++
.../internal/LatencyStatAndMetricTest.java | 83 ++++++
.../storm/metric/internal/RateTrackerTest.java | 94 +++++++
.../backtype/storm/utils/RateTrackerTest.java | 94 -------
17 files changed, 1264 insertions(+), 497 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/e75219b5/examples/storm-starter/src/jvm/storm/starter/ThroughputVsLatency.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/ThroughputVsLatency.java b/examples/storm-starter/src/jvm/storm/starter/ThroughputVsLatency.java
index 2608755..4c6680e 100644
--- a/examples/storm-starter/src/jvm/storm/starter/ThroughputVsLatency.java
+++ b/examples/storm-starter/src/jvm/storm/starter/ThroughputVsLatency.java
@@ -277,15 +277,17 @@ public class ThroughputVsLatency {
SpoutStats stats = exec.get_stats().get_specific().get_spout();
Map<String, Long> failedMap = stats.get_failed().get(":all-time");
Map<String, Long> ackedMap = stats.get_acked().get(":all-time");
- for (String key: ackedMap.keySet()) {
- if (failedMap != null) {
+ if (ackedMap != null) {
+ for (String key: ackedMap.keySet()) {
+ if (failedMap != null) {
Long tmp = failedMap.get(key);
if (tmp != null) {
failed += tmp;
}
+ }
+ long ackVal = ackedMap.get(key);
+ acked += ackVal;
}
- long ackVal = ackedMap.get(key);
- acked += ackVal;
}
}
}
@@ -394,6 +396,7 @@ public class ThroughputVsLatency {
C cluster = new C(conf);
conf.setNumWorkers(parallelism);
+ conf.registerMetricsConsumer(backtype.storm.metric.LoggingMetricsConsumer.class);
conf.registerMetricsConsumer(backtype.storm.metric.HttpForwardingMetricsConsumer.class, url, 1);
Map<String, String> workerMetrics = new HashMap<String, String>();
if (!cluster.isLocal()) {
http://git-wip-us.apache.org/repos/asf/storm/blob/e75219b5/storm-core/src/clj/backtype/storm/daemon/builtin_metrics.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/builtin_metrics.clj b/storm-core/src/clj/backtype/storm/daemon/builtin_metrics.clj
index 990800a..0caa0b9 100644
--- a/storm-core/src/clj/backtype/storm/daemon/builtin_metrics.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/builtin_metrics.clj
@@ -14,41 +14,42 @@
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns backtype.storm.daemon.builtin-metrics
- (:import [backtype.storm.metric.api MultiCountMetric MultiReducedMetric CountMetric MeanReducer StateMetric IMetric IStatefulObject])
+ (:import [backtype.storm.metric.api CountMetric StateMetric IMetric IStatefulObject])
+ (:import [backtype.storm.metric.internal MultiCountStatAndMetric MultiLatencyStatAndMetric])
(:import [backtype.storm Config])
- (:use [backtype.storm.stats :only [stats-rate]]))
-
-(defrecord BuiltinSpoutMetrics [^MultiCountMetric ack-count
- ^MultiReducedMetric complete-latency
- ^MultiCountMetric fail-count
- ^MultiCountMetric emit-count
- ^MultiCountMetric transfer-count])
-(defrecord BuiltinBoltMetrics [^MultiCountMetric ack-count
- ^MultiReducedMetric process-latency
- ^MultiCountMetric fail-count
- ^MultiCountMetric execute-count
- ^MultiReducedMetric execute-latency
- ^MultiCountMetric emit-count
- ^MultiCountMetric transfer-count])
+ (:use [backtype.storm.stats]))
+
+(defrecord BuiltinSpoutMetrics [^MultiCountStatAndMetric ack-count
+ ^MultiLatencyStatAndMetric complete-latency
+ ^MultiCountStatAndMetric fail-count
+ ^MultiCountStatAndMetric emit-count
+ ^MultiCountStatAndMetric transfer-count])
+(defrecord BuiltinBoltMetrics [^MultiCountStatAndMetric ack-count
+ ^MultiLatencyStatAndMetric process-latency
+ ^MultiCountStatAndMetric fail-count
+ ^MultiCountStatAndMetric execute-count
+ ^MultiLatencyStatAndMetric execute-latency
+ ^MultiCountStatAndMetric emit-count
+ ^MultiCountStatAndMetric transfer-count])
(defrecord SpoutThrottlingMetrics [^CountMetric skipped-max-spout
^CountMetric skipped-throttle
^CountMetric skipped-inactive])
-(defn make-data [executor-type]
+(defn make-data [executor-type stats]
(condp = executor-type
- :spout (BuiltinSpoutMetrics. (MultiCountMetric.)
- (MultiReducedMetric. (MeanReducer.))
- (MultiCountMetric.)
- (MultiCountMetric.)
- (MultiCountMetric.))
- :bolt (BuiltinBoltMetrics. (MultiCountMetric.)
- (MultiReducedMetric. (MeanReducer.))
- (MultiCountMetric.)
- (MultiCountMetric.)
- (MultiReducedMetric. (MeanReducer.))
- (MultiCountMetric.)
- (MultiCountMetric.))))
+ :spout (BuiltinSpoutMetrics. (stats-acked stats)
+ (stats-complete-latencies stats)
+ (stats-failed stats)
+ (stats-emitted stats)
+ (stats-transferred stats))
+ :bolt (BuiltinBoltMetrics. (stats-acked stats)
+ (stats-process-latencies stats)
+ (stats-failed stats)
+ (stats-executed stats)
+ (stats-execute-latencies stats)
+ (stats-emitted stats)
+ (stats-transferred stats))))
(defn make-spout-throttling-data []
(SpoutThrottlingMetrics. (CountMetric.)
@@ -87,33 +88,6 @@
(.registerMetric topology-context (str "__" (name qname)) (StateMetric. q)
(int (get storm-conf Config/TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS)))))
-(defn spout-acked-tuple! [^BuiltinSpoutMetrics m stats stream latency-ms]
- (-> m .ack-count (.scope stream) (.incrBy (stats-rate stats)))
- (-> m .complete-latency (.scope stream) (.update latency-ms)))
-
-(defn spout-failed-tuple! [^BuiltinSpoutMetrics m stats stream]
- (-> m .fail-count (.scope stream) (.incrBy (stats-rate stats))))
-
-(defn bolt-execute-tuple! [^BuiltinBoltMetrics m stats comp-id stream latency-ms]
- (let [scope (str comp-id ":" stream)]
- (-> m .execute-count (.scope scope) (.incrBy (stats-rate stats)))
- (-> m .execute-latency (.scope scope) (.update latency-ms))))
-
-(defn bolt-acked-tuple! [^BuiltinBoltMetrics m stats comp-id stream latency-ms]
- (let [scope (str comp-id ":" stream)]
- (-> m .ack-count (.scope scope) (.incrBy (stats-rate stats)))
- (-> m .process-latency (.scope scope) (.update latency-ms))))
-
-(defn bolt-failed-tuple! [^BuiltinBoltMetrics m stats comp-id stream]
- (let [scope (str comp-id ":" stream)]
- (-> m .fail-count (.scope scope) (.incrBy (stats-rate stats)))))
-
-(defn emitted-tuple! [m stats stream]
- (-> m :emit-count (.scope stream) (.incrBy (stats-rate stats))))
-
-(defn transferred-tuple! [m stats stream num-out-tasks]
- (-> m :transfer-count (.scope stream) (.incrBy (* num-out-tasks (stats-rate stats)))))
-
(defn skipped-max-spout! [^SpoutThrottlingMetrics m stats]
(-> m .skipped-max-spout (.incrBy (stats-rate stats))))
http://git-wip-us.apache.org/repos/asf/storm/blob/e75219b5/storm-core/src/clj/backtype/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/executor.clj b/storm-core/src/clj/backtype/storm/daemon/executor.clj
index ed92e9c..a398b8a 100644
--- a/storm-core/src/clj/backtype/storm/daemon/executor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/executor.clj
@@ -438,7 +438,6 @@
(.fail spout msg-id)
(task/apply-hooks (:user-context task-data) .spoutFail (SpoutFailInfo. msg-id task-id time-delta))
(when time-delta
- (builtin-metrics/spout-failed-tuple! (:builtin-metrics task-data) (:stats executor-data) (:stream tuple-info))
(stats/spout-failed-tuple! (:stats executor-data) (:stream tuple-info) time-delta))))
(defn- ack-spout-msg [executor-data task-data msg-id tuple-info time-delta id]
@@ -450,7 +449,6 @@
(.ack spout msg-id)
(task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id task-id time-delta))
(when time-delta
- (builtin-metrics/spout-acked-tuple! (:builtin-metrics task-data) (:stats executor-data) (:stream tuple-info) time-delta)
(stats/spout-acked-tuple! (:stats executor-data) (:stream tuple-info) time-delta))))
(defn mk-task-receiver [executor-data tuple-action-fn]
@@ -760,11 +758,6 @@
(task/apply-hooks user-context .boltExecute (BoltExecuteInfo. tuple task-id delta))
(when delta
- (builtin-metrics/bolt-execute-tuple! (:builtin-metrics task-data)
- executor-stats
- (.getSourceComponent tuple)
- (.getSourceStreamId tuple)
- delta)
(stats/bolt-execute-tuple! executor-stats
(.getSourceComponent tuple)
(.getSourceStreamId tuple)
@@ -846,11 +839,6 @@
(log-message "BOLT ack TASK: " task-id " TIME: " delta " TUPLE: " tuple))
(task/apply-hooks user-context .boltAck (BoltAckInfo. tuple task-id delta))
(when delta
- (builtin-metrics/bolt-acked-tuple! (:builtin-metrics task-data)
- executor-stats
- (.getSourceComponent tuple)
- (.getSourceStreamId tuple)
- delta)
(stats/bolt-acked-tuple! executor-stats
(.getSourceComponent tuple)
(.getSourceStreamId tuple)
@@ -866,10 +854,6 @@
(log-message "BOLT fail TASK: " task-id " TIME: " delta " TUPLE: " tuple))
(task/apply-hooks user-context .boltFail (BoltFailInfo. tuple task-id delta))
(when delta
- (builtin-metrics/bolt-failed-tuple! (:builtin-metrics task-data)
- executor-stats
- (.getSourceComponent tuple)
- (.getSourceStreamId tuple))
(stats/bolt-failed-tuple! executor-stats
(.getSourceComponent tuple)
(.getSourceStreamId tuple)
http://git-wip-us.apache.org/repos/asf/storm/blob/e75219b5/storm-core/src/clj/backtype/storm/daemon/task.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/task.clj b/storm-core/src/clj/backtype/storm/daemon/task.clj
index 9cf2b85..0a38860 100644
--- a/storm-core/src/clj/backtype/storm/daemon/task.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/task.clj
@@ -145,11 +145,9 @@
(throw (IllegalArgumentException. "Cannot emitDirect to a task expecting a regular grouping")))
(apply-hooks user-context .emit (EmitInfo. values stream task-id [out-task-id]))
(when (emit-sampler)
- (builtin-metrics/emitted-tuple! (:builtin-metrics task-data) executor-stats stream)
(stats/emitted-tuple! executor-stats stream)
(if out-task-id
- (stats/transferred-tuples! executor-stats stream 1)
- (builtin-metrics/transferred-tuple! (:builtin-metrics task-data) executor-stats stream 1)))
+ (stats/transferred-tuples! executor-stats stream 1)))
(if out-task-id [out-task-id])
))
([^String stream ^List values]
@@ -168,9 +166,7 @@
(apply-hooks user-context .emit (EmitInfo. values stream task-id out-tasks))
(when (emit-sampler)
(stats/emitted-tuple! executor-stats stream)
- (builtin-metrics/emitted-tuple! (:builtin-metrics task-data) executor-stats stream)
- (stats/transferred-tuples! executor-stats stream (count out-tasks))
- (builtin-metrics/transferred-tuple! (:builtin-metrics task-data) executor-stats stream (count out-tasks)))
+ (stats/transferred-tuples! executor-stats stream (count out-tasks)))
out-tasks)))
))
@@ -180,7 +176,7 @@
:task-id task-id
:system-context (system-topology-context (:worker executor-data) executor-data task-id)
:user-context (user-topology-context (:worker executor-data) executor-data task-id)
- :builtin-metrics (builtin-metrics/make-data (:type executor-data))
+ :builtin-metrics (builtin-metrics/make-data (:type executor-data) (:stats executor-data))
:tasks-fn (mk-tasks-fn <>)
:object (get-task-object (.getRawTopology ^TopologyContext (:system-context <>)) (:component-id executor-data))))
http://git-wip-us.apache.org/repos/asf/storm/blob/e75219b5/storm-core/src/clj/backtype/storm/stats.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/stats.clj b/storm-core/src/clj/backtype/storm/stats.clj
index dabc6f7..efc4965 100644
--- a/storm-core/src/clj/backtype/storm/stats.clj
+++ b/storm-core/src/clj/backtype/storm/stats.clj
@@ -24,216 +24,137 @@
ExecutorAggregateStats SpecificAggregateStats
SpoutAggregateStats TopologyPageInfo TopologyStats])
(:import [backtype.storm.utils Utils])
+ (:import [backtype.storm.metric.internal MultiCountStatAndMetric MultiLatencyStatAndMetric])
(:use [backtype.storm log util])
(:use [clojure.math.numeric-tower :only [ceil]]))
-;;TODO: consider replacing this with some sort of RRD
-
(def TEN-MIN-IN-SECONDS (* 10 60))
-(defn curr-time-bucket
- [^Integer time-secs ^Integer bucket-size-secs]
- (* bucket-size-secs (unchecked-divide-int time-secs bucket-size-secs)))
-
-(defrecord RollingWindow
- [updater merger extractor bucket-size-secs num-buckets buckets])
-
-(defn rolling-window
- [updater merger extractor bucket-size-secs num-buckets]
- (RollingWindow. updater merger extractor bucket-size-secs num-buckets {}))
-
-(defn update-rolling-window
- ([^RollingWindow rw time-secs & args]
- ;; this is 2.5x faster than using update-in...
- (let [time-bucket (curr-time-bucket time-secs (:bucket-size-secs rw))
- buckets (:buckets rw)
- curr (get buckets time-bucket)
- curr (apply (:updater rw) curr args)]
- (assoc rw :buckets (assoc buckets time-bucket curr)))))
-
-(defn value-rolling-window
- [^RollingWindow rw]
- ((:extractor rw)
- (let [values (vals (:buckets rw))]
- (apply (:merger rw) values))))
-
-(defn cleanup-rolling-window
- [^RollingWindow rw]
- (let [buckets (:buckets rw)
- cutoff (- (current-time-secs)
- (* (:num-buckets rw)
- (:bucket-size-secs rw)))
- to-remove (filter #(< % cutoff) (keys buckets))
- buckets (apply dissoc buckets to-remove)]
- (assoc rw :buckets buckets)))
-
-(defn rolling-window-size
- [^RollingWindow rw]
- (* (:bucket-size-secs rw) (:num-buckets rw)))
-
-(defrecord RollingWindowSet [updater extractor windows all-time])
-
-(defn rolling-window-set [updater merger extractor num-buckets & bucket-sizes]
- (RollingWindowSet. updater extractor (dofor [s bucket-sizes] (rolling-window updater merger extractor s num-buckets)) nil)
- )
-
-(defn update-rolling-window-set
- ([^RollingWindowSet rws & args]
- (let [now (current-time-secs)
- new-windows (dofor [w (:windows rws)]
- (apply update-rolling-window w now args))]
- (assoc rws
- :windows new-windows
- :all-time (apply (:updater rws) (:all-time rws) args)))))
-
-(defn cleanup-rolling-window-set
- ([^RollingWindowSet rws]
- (let [windows (:windows rws)]
- (assoc rws :windows (map cleanup-rolling-window windows)))))
-
-(defn value-rolling-window-set
- [^RollingWindowSet rws]
- (merge
- (into {}
- (for [w (:windows rws)]
- {(rolling-window-size w) (value-rolling-window w)}
- ))
- {:all-time ((:extractor rws) (:all-time rws))}))
-
-(defn- incr-val
- ([amap key]
- (incr-val amap key 1))
- ([amap key amt]
- (let [val (get amap key (long 0))]
- (assoc amap key (+ val amt)))))
-
-(defn- update-avg
- [curr val]
- (if curr
- [(+ (first curr) val) (inc (second curr))]
- [val (long 1)]))
-
-(defn- merge-avg
- [& avg]
- [(apply + (map first avg))
- (apply + (map second avg))
- ])
-
-(defn- extract-avg
- [pair]
- (double (/ (first pair) (second pair))))
-
-(defn- update-keyed-avg
- [amap key val]
- (assoc amap key (update-avg (get amap key) val)))
-
-(defn- merge-keyed-avg [& vals]
- (apply merge-with merge-avg vals))
-
-(defn- extract-keyed-avg [vals]
- (map-val extract-avg vals))
-
-(defn- counter-extract [v]
- (if v v {}))
-
-(defn keyed-counter-rolling-window-set
- [num-buckets & bucket-sizes]
- (apply rolling-window-set incr-val (partial merge-with +) counter-extract num-buckets bucket-sizes))
-
-(defn avg-rolling-window-set
- [num-buckets & bucket-sizes]
- (apply rolling-window-set update-avg merge-avg extract-avg num-buckets bucket-sizes))
-
-(defn keyed-avg-rolling-window-set
- [num-buckets & bucket-sizes]
- (apply rolling-window-set update-keyed-avg merge-keyed-avg extract-keyed-avg num-buckets bucket-sizes))
-
(def COMMON-FIELDS [:emitted :transferred])
-(defrecord CommonStats [emitted transferred rate])
+(defrecord CommonStats [^MultiCountStatAndMetric emitted
+ ^MultiCountStatAndMetric transferred
+ rate])
(def BOLT-FIELDS [:acked :failed :process-latencies :executed :execute-latencies])
;;acked and failed count individual tuples
-(defrecord BoltExecutorStats [common acked failed process-latencies executed execute-latencies])
+(defrecord BoltExecutorStats [^CommonStats common
+ ^MultiCountStatAndMetric acked
+ ^MultiCountStatAndMetric failed
+ ^MultiLatencyStatAndMetric process-latencies
+ ^MultiCountStatAndMetric executed
+ ^MultiLatencyStatAndMetric execute-latencies])
(def SPOUT-FIELDS [:acked :failed :complete-latencies])
;;acked and failed count tuple completion
-(defrecord SpoutExecutorStats [common acked failed complete-latencies])
+(defrecord SpoutExecutorStats [^CommonStats common
+ ^MultiCountStatAndMetric acked
+ ^MultiCountStatAndMetric failed
+ ^MultiLatencyStatAndMetric complete-latencies])
(def NUM-STAT-BUCKETS 20)
-;; 10 minutes, 3 hours, 1 day
-(def STAT-BUCKETS [30 540 4320])
(defn- mk-common-stats
[rate]
(CommonStats.
- (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
- (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
+ (MultiCountStatAndMetric. NUM-STAT-BUCKETS)
+ (MultiCountStatAndMetric. NUM-STAT-BUCKETS)
rate))
(defn mk-bolt-stats
[rate]
(BoltExecutorStats.
(mk-common-stats rate)
- (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
- (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
- (atom (apply keyed-avg-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
- (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
- (atom (apply keyed-avg-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))))
+ (MultiCountStatAndMetric. NUM-STAT-BUCKETS)
+ (MultiCountStatAndMetric. NUM-STAT-BUCKETS)
+ (MultiLatencyStatAndMetric. NUM-STAT-BUCKETS)
+ (MultiCountStatAndMetric. NUM-STAT-BUCKETS)
+ (MultiLatencyStatAndMetric. NUM-STAT-BUCKETS)))
(defn mk-spout-stats
[rate]
(SpoutExecutorStats.
(mk-common-stats rate)
- (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
- (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
- (atom (apply keyed-avg-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))))
-
-(defmacro update-executor-stat!
- [stats path & args]
- (let [path (collectify path)]
- `(swap! (-> ~stats ~@path) update-rolling-window-set ~@args)))
+ (MultiCountStatAndMetric. NUM-STAT-BUCKETS)
+ (MultiCountStatAndMetric. NUM-STAT-BUCKETS)
+ (MultiLatencyStatAndMetric. NUM-STAT-BUCKETS)))
(defmacro stats-rate
[stats]
`(-> ~stats :common :rate))
+(defmacro stats-emitted
+ [stats]
+ `(-> ~stats :common :emitted))
+
+(defmacro stats-transferred
+ [stats]
+ `(-> ~stats :common :transferred))
+
+(defmacro stats-executed
+ [stats]
+ `(:executed ~stats))
+
+(defmacro stats-acked
+ [stats]
+ `(:acked ~stats))
+
+(defmacro stats-failed
+ [stats]
+ `(:failed ~stats))
+
+(defmacro stats-execute-latencies
+ [stats]
+ `(:execute-latencies ~stats))
+
+(defmacro stats-process-latencies
+ [stats]
+ `(:process-latencies ~stats))
+
+(defmacro stats-complete-latencies
+ [stats]
+ `(:complete-latencies ~stats))
+
(defn emitted-tuple!
[stats stream]
- (update-executor-stat! stats [:common :emitted] stream (stats-rate stats)))
+ (.incBy ^MultiCountStatAndMetric (stats-emitted stats) stream (stats-rate stats)))
(defn transferred-tuples!
[stats stream amt]
- (update-executor-stat! stats [:common :transferred] stream (* (stats-rate stats) amt)))
+ (.incBy ^MultiCountStatAndMetric (stats-transferred stats) stream (* (stats-rate stats) amt)))
(defn bolt-execute-tuple!
[^BoltExecutorStats stats component stream latency-ms]
- (let [key [component stream]]
- (update-executor-stat! stats :executed key (stats-rate stats))
- (update-executor-stat! stats :execute-latencies key latency-ms)))
+ (let [key [component stream]
+ ^MultiCountStatAndMetric executed (stats-executed stats)
+ ^MultiLatencyStatAndMetric exec-lat (stats-execute-latencies stats)]
+ (.incBy executed key (stats-rate stats))
+ (.record exec-lat key latency-ms)))
(defn bolt-acked-tuple!
[^BoltExecutorStats stats component stream latency-ms]
- (let [key [component stream]]
- (update-executor-stat! stats :acked key (stats-rate stats))
- (update-executor-stat! stats :process-latencies key latency-ms)))
+ (let [key [component stream]
+ ^MultiCountStatAndMetric acked (stats-acked stats)
+ ^MultiLatencyStatAndMetric process-lat (stats-process-latencies stats)]
+ (.incBy acked key (stats-rate stats))
+ (.record process-lat key latency-ms)))
(defn bolt-failed-tuple!
[^BoltExecutorStats stats component stream latency-ms]
- (let [key [component stream]]
- (update-executor-stat! stats :failed key (stats-rate stats))))
+ (let [key [component stream]
+ ^MultiCountStatAndMetric failed (stats-failed stats)]
+ (.incBy failed key (stats-rate stats))))
(defn spout-acked-tuple!
[^SpoutExecutorStats stats stream latency-ms]
- (update-executor-stat! stats :acked stream (stats-rate stats))
- (update-executor-stat! stats :complete-latencies stream latency-ms))
+ (.incBy ^MultiCountStatAndMetric (stats-acked stats) stream (stats-rate stats))
+ (.record ^MultiLatencyStatAndMetric (stats-complete-latencies stats) stream latency-ms))
(defn spout-failed-tuple!
[^SpoutExecutorStats stats stream latency-ms]
- (update-executor-stat! stats :failed stream (stats-rate stats))
- )
+ (.incBy ^MultiCountStatAndMetric (stats-failed stats) stream (stats-rate stats)))
(defn- cleanup-stat! [stat]
- (swap! stat cleanup-rolling-window-set))
+ (.close stat))
(defn- cleanup-common-stats!
[^CommonStats stats]
@@ -255,7 +176,9 @@
(defn- value-stats
[stats fields]
(into {} (dofor [f fields]
- [f (value-rolling-window-set @(f stats))])))
+ [f (if (instance? MultiCountStatAndMetric (f stats))
+ (.getTimeCounts ^MultiCountStatAndMetric (f stats))
+ (.getTimeLatAvg ^MultiLatencyStatAndMetric (f stats)))])))
(defn- value-common-stats
[^CommonStats stats]
http://git-wip-us.apache.org/repos/asf/storm/blob/e75219b5/storm-core/src/jvm/backtype/storm/metric/internal/CountStatAndMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/internal/CountStatAndMetric.java b/storm-core/src/jvm/backtype/storm/metric/internal/CountStatAndMetric.java
new file mode 100644
index 0000000..194a9e2
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/metric/internal/CountStatAndMetric.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.metric.internal;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicLong;
+
+import backtype.storm.metric.api.IMetric;
+
+/**
+ * Acts as a Count Metric, but also keeps track of approximate counts
+ * for the last 10 mins, 3 hours, 1 day, and all time.
+ */
+public class CountStatAndMetric implements IMetric{
+ private final AtomicLong _currentBucket;
+ // All internal state except for the count of the current bucket are
+ // protected using a lock on this counter
+ private long _bucketStart;
+
+ //exact variable time, that is added to the current bucket
+ private long _exactExtra;
+
+ //10 min values
+ private final int _tmSize;
+ private final long[] _tmBuckets;
+ private final long[] _tmTime;
+
+ //3 hour values
+ private final int _thSize;
+ private final long[] _thBuckets;
+ private final long[] _thTime;
+
+ //1 day values
+ private final int _odSize;
+ private final long[] _odBuckets;
+ private final long[] _odTime;
+
+ //all time
+ private long _allTime;
+
+ private final TimerTask _task;
+
+ /**
+ * @param numBuckets the number of buckets to divide the time periods into.
+ */
+ public CountStatAndMetric(int numBuckets) {
+ this(numBuckets, -1);
+ }
+
+ /**
+ * Constructor
+ * @param numBuckets the number of buckets to divide the time periods into.
+ * @param startTime if positive the simulated time to start the from.
+ */
+ CountStatAndMetric(int numBuckets, long startTime){
+ numBuckets = Math.max(numBuckets, 2);
+ //We want to capture the full time range, so the target size is as
+ // if we had one bucket less, then we do
+ _tmSize = 10 * 60 * 1000 / (numBuckets - 1);
+ _thSize = 3 * 60 * 60 * 1000 / (numBuckets - 1);
+ _odSize = 24 * 60 * 60 * 1000 / (numBuckets - 1);
+ if (_tmSize < 1 || _thSize < 1 || _odSize < 1) {
+ throw new IllegalArgumentException("number of buckets is too large to be supported");
+ }
+ _tmBuckets = new long[numBuckets];
+ _tmTime = new long[numBuckets];
+ _thBuckets = new long[numBuckets];
+ _thTime = new long[numBuckets];
+ _odBuckets = new long[numBuckets];
+ _odTime = new long[numBuckets];
+ _allTime = 0;
+ _exactExtra = 0;
+
+ _bucketStart = startTime >= 0 ? startTime : System.currentTimeMillis();
+ _currentBucket = new AtomicLong(0);
+ if (startTime < 0) {
+ _task = new Fresher();
+ MetricStatTimer._timer.scheduleAtFixedRate(_task, _tmSize, _tmSize);
+ } else {
+ _task = null;
+ }
+ }
+
+ /**
+ * Increase the count by the given value.
+ *
+ * @param count number to count
+ */
+ public void incBy(long count) {
+ _currentBucket.addAndGet(count);
+ }
+
+
+
+ @Override
+ public synchronized Object getValueAndReset() {
+ return getValueAndReset(System.currentTimeMillis());
+ }
+
+ synchronized Object getValueAndReset(long now) {
+ long value = _currentBucket.getAndSet(0);
+ long timeSpent = now - _bucketStart;
+ long ret = value + _exactExtra;
+ _bucketStart = now;
+ _exactExtra = 0;
+ rotateBuckets(value, timeSpent);
+ return ret;
+ }
+
+ synchronized void rotateSched(long now) {
+ long value = _currentBucket.getAndSet(0);
+ long timeSpent = now - _bucketStart;
+ _exactExtra += value;
+ _bucketStart = now;
+ rotateBuckets(value, timeSpent);
+ }
+
+ synchronized void rotateBuckets(long value, long timeSpent) {
+ rotate(value, timeSpent, _tmSize, _tmTime, _tmBuckets);
+ rotate(value, timeSpent, _thSize, _thTime, _thBuckets);
+ rotate(value, timeSpent, _odSize, _odTime, _odBuckets);
+ _allTime += value;
+ }
+
+ private synchronized void rotate(long value, long timeSpent, long targetSize, long [] times, long [] buckets) {
+ times[0] += timeSpent;
+ buckets[0] += value;
+
+ long currentTime = 0;
+ long currentVal = 0;
+ if (times[0] >= targetSize) {
+ for (int i = 0; i < buckets.length; i++) {
+ long tmpTime = times[i];
+ times[i] = currentTime;
+ currentTime = tmpTime;
+
+ long cnt = buckets[i];
+ buckets[i] = currentVal;
+ currentVal = cnt;
+ }
+ }
+ }
+
+ /**
+ * @return a map of time window to count.
+ * Keys are "600" for last 10 mins
+ * "10800" for the last 3 hours
+ * "86400" for the last day
+ * ":all-time" for all time
+ */
+ public synchronized Map<String, Long> getTimeCounts() {
+ return getTimeCounts(System.currentTimeMillis());
+ }
+
+ synchronized Map<String, Long> getTimeCounts(long now) {
+ Map<String, Long> ret = new HashMap<>();
+ long value = _currentBucket.get();
+ long timeSpent = now - _bucketStart;
+ ret.put("600", readApproximateTime(value, timeSpent, _tmTime, _tmBuckets, 600 * 1000));
+ ret.put("10800", readApproximateTime(value, timeSpent, _thTime, _thBuckets, 10800 * 1000));
+ ret.put("86400", readApproximateTime(value, timeSpent, _odTime, _odBuckets, 86400 * 1000));
+ ret.put(":all-time", value + _allTime);
+ return ret;
+ }
+
+ long readApproximateTime(long value, long timeSpent, long[] bucketTime, long[] buckets, long desiredTime) {
+ long timeNeeded = desiredTime - timeSpent;
+ long total = value;
+ for (int i = 0; i < bucketTime.length; i++) {
+ if (timeNeeded < bucketTime[i]) {
+ double pct = timeNeeded/((double)bucketTime[i]);
+ total += (long)(pct * buckets[i]);
+ timeNeeded = 0;
+ break;
+ }
+ total += buckets[i];
+ timeNeeded -= bucketTime[i];
+ }
+ return total;
+ }
+
+ public void close() {
+ if (_task != null) {
+ _task.cancel();
+ }
+ }
+
+ private class Fresher extends TimerTask {
+ public void run () {
+ rotateSched(System.currentTimeMillis());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e75219b5/storm-core/src/jvm/backtype/storm/metric/internal/LatencyStatAndMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/internal/LatencyStatAndMetric.java b/storm-core/src/jvm/backtype/storm/metric/internal/LatencyStatAndMetric.java
new file mode 100644
index 0000000..31bad41
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/metric/internal/LatencyStatAndMetric.java
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.metric.internal;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicLong;
+
+import backtype.storm.metric.api.IMetric;
+
+/**
+ * Acts as a Latency Metric, but also keeps track of approximate latency
+ * for the last 10 mins, 3 hours, 1 day, and all time.
+ */
+public class LatencyStatAndMetric implements IMetric {
+ //The current lat and count buckets are protected by a different lock
+ // from the other buckets. This is to reduce the lock contention
+ // When doing complex calculations. Never grab the instance object lock
+ // while holding _currentLock to avoid deadlocks
+ private final Object _currentLock = new byte[0];
+ private long _currentLatBucket;
+ private long _currentCountBucket;
+
+ // All internal state except for the current buckets are
+ // protected using the Object Lock
+ private long _bucketStart;
+
+ //exact variable time, that is added to the current bucket
+ private long _exactExtraLat;
+ private long _exactExtraCount;
+
+ //10 min values
+ private final int _tmSize;
+ private final long[] _tmLatBuckets;
+ private final long[] _tmCountBuckets;
+ private final long[] _tmTime;
+
+ //3 hour values
+ private final int _thSize;
+ private final long[] _thLatBuckets;
+ private final long[] _thCountBuckets;
+ private final long[] _thTime;
+
+ //1 day values
+ private final int _odSize;
+ private final long[] _odLatBuckets;
+ private final long[] _odCountBuckets;
+ private final long[] _odTime;
+
+ //all time
+ private long _allTimeLat;
+ private long _allTimeCount;
+
+ private final TimerTask _task;
+
+ /**
+ * @param numBuckets the number of buckets to divide the time periods into.
+ */
+ public LatencyStatAndMetric(int numBuckets) {
+ this(numBuckets, -1);
+ }
+
+ /**
+ * Constructor
+ * @param numBuckets the number of buckets to divide the time periods into.
+ * @param startTime if positive the simulated time to start the from.
+ */
+ LatencyStatAndMetric(int numBuckets, long startTime){
+ numBuckets = Math.max(numBuckets, 2);
+ //We want to capture the full time range, so the target size is as
+ // if we had one bucket less, then we do
+ _tmSize = 10 * 60 * 1000 / (numBuckets - 1);
+ _thSize = 3 * 60 * 60 * 1000 / (numBuckets - 1);
+ _odSize = 24 * 60 * 60 * 1000 / (numBuckets - 1);
+ if (_tmSize < 1 || _thSize < 1 || _odSize < 1) {
+ throw new IllegalArgumentException("number of buckets is too large to be supported");
+ }
+ _tmLatBuckets = new long[numBuckets];
+ _tmCountBuckets = new long[numBuckets];
+ _tmTime = new long[numBuckets];
+ _thLatBuckets = new long[numBuckets];
+ _thCountBuckets = new long[numBuckets];
+ _thTime = new long[numBuckets];
+ _odLatBuckets = new long[numBuckets];
+ _odCountBuckets = new long[numBuckets];
+ _odTime = new long[numBuckets];
+ _allTimeLat = 0;
+ _allTimeCount = 0;
+ _exactExtraLat = 0;
+ _exactExtraCount = 0;
+
+ _bucketStart = startTime >= 0 ? startTime : System.currentTimeMillis();
+ _currentLatBucket = 0;
+ _currentCountBucket = 0;
+ if (startTime < 0) {
+ _task = new Fresher();
+ MetricStatTimer._timer.scheduleAtFixedRate(_task, _tmSize, _tmSize);
+ } else {
+ _task = null;
+ }
+ }
+
+ /**
+ * Record a specific latency
+ *
+ * @param latency what we are recording
+ */
+ public void record(long latency) {
+ synchronized(_currentLock) {
+ _currentLatBucket += latency;
+ _currentCountBucket++;
+ }
+ }
+
+ @Override
+ public synchronized Object getValueAndReset() {
+ return getValueAndReset(System.currentTimeMillis());
+ }
+
+ synchronized Object getValueAndReset(long now) {
+ long lat;
+ long count;
+ synchronized(_currentLock) {
+ lat = _currentLatBucket;
+ count = _currentCountBucket;
+ _currentLatBucket = 0;
+ _currentCountBucket = 0;
+ }
+
+ long timeSpent = now - _bucketStart;
+ double ret = ((double)(lat + _exactExtraLat))/(count + _exactExtraCount);
+ _bucketStart = now;
+ _exactExtraLat = 0;
+ _exactExtraCount = 0;
+ rotateBuckets(lat, count, timeSpent);
+ return ret;
+ }
+
+ synchronized void rotateSched(long now) {
+ long lat;
+ long count;
+ synchronized(_currentLock) {
+ lat = _currentLatBucket;
+ count = _currentCountBucket;
+ _currentLatBucket = 0;
+ _currentCountBucket = 0;
+ }
+
+ long timeSpent = now - _bucketStart;
+ _exactExtraLat += lat;
+ _exactExtraCount += count;
+ _bucketStart = now;
+ rotateBuckets(lat, count, timeSpent);
+ }
+
+ synchronized void rotateBuckets(long lat, long count, long timeSpent) {
+ rotate(lat, count, timeSpent, _tmSize, _tmTime, _tmLatBuckets, _tmCountBuckets);
+ rotate(lat, count, timeSpent, _thSize, _thTime, _thLatBuckets, _thCountBuckets);
+ rotate(lat, count, timeSpent, _odSize, _odTime, _odLatBuckets, _odCountBuckets);
+ _allTimeLat += lat;
+ _allTimeCount += count;
+ }
+
+ private synchronized void rotate(long lat, long count, long timeSpent, long targetSize,
+ long [] times, long [] latBuckets, long [] countBuckets) {
+ times[0] += timeSpent;
+ latBuckets[0] += lat;
+ countBuckets[0] += count;
+
+ long currentTime = 0;
+ long currentLat = 0;
+ long currentCount = 0;
+ if (times[0] >= targetSize) {
+ for (int i = 0; i < latBuckets.length; i++) {
+ long tmpTime = times[i];
+ times[i] = currentTime;
+ currentTime = tmpTime;
+
+ long lt = latBuckets[i];
+ latBuckets[i] = currentLat;
+ currentLat = lt;
+
+ long cnt = countBuckets[i];
+ countBuckets[i] = currentCount;
+ currentCount = cnt;
+ }
+ }
+ }
+
+ /**
+ * @return a map of time window to average latency.
+ * Keys are "600" for last 10 mins
+ * "10800" for the last 3 hours
+ * "86400" for the last day
+ * ":all-time" for all time
+ */
+ public synchronized Map<String, Double> getTimeLatAvg() {
+ return getTimeLatAvg(System.currentTimeMillis());
+ }
+
+ synchronized Map<String, Double> getTimeLatAvg(long now) {
+ Map<String, Double> ret = new HashMap<>();
+ long lat;
+ long count;
+ synchronized(_currentLock) {
+ lat = _currentLatBucket;
+ count = _currentCountBucket;
+ }
+ long timeSpent = now - _bucketStart;
+ ret.put("600", readApproximateLatAvg(lat, count, timeSpent, _tmTime, _tmLatBuckets, _tmCountBuckets, 600 * 1000));
+ ret.put("10800", readApproximateLatAvg(lat, count, timeSpent, _thTime, _thLatBuckets, _thCountBuckets, 10800 * 1000));
+ ret.put("86400", readApproximateLatAvg(lat, count, timeSpent, _odTime, _odLatBuckets, _odCountBuckets, 86400 * 1000));
+ ret.put(":all-time", ((double)lat + _allTimeLat)/(count + _allTimeCount));
+ return ret;
+ }
+
+ double readApproximateLatAvg(long lat, long count, long timeSpent, long[] bucketTime,
+ long[] latBuckets, long[] countBuckets, long desiredTime) {
+ long timeNeeded = desiredTime - timeSpent;
+ long totalLat = lat;
+ long totalCount = count;
+ for (int i = 0; i < bucketTime.length && timeNeeded > 0; i++) {
+ //Don't pro-rate anything, it is all approximate so an extra bucket is not that bad.
+ totalLat += latBuckets[i];
+ totalCount += countBuckets[i];
+ timeNeeded -= bucketTime[i];
+ }
+ return ((double)totalLat)/totalCount;
+ }
+
+ public void close() {
+ if (_task != null) {
+ _task.cancel();
+ }
+ }
+
+ private class Fresher extends TimerTask {
+ public void run () {
+ rotateSched(System.currentTimeMillis());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e75219b5/storm-core/src/jvm/backtype/storm/metric/internal/MetricStatTimer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/internal/MetricStatTimer.java b/storm-core/src/jvm/backtype/storm/metric/internal/MetricStatTimer.java
new file mode 100644
index 0000000..5f48793
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/metric/internal/MetricStatTimer.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.metric.internal;
+
+import java.util.Timer;
+
+/**
+ * Just holds a singleton metric/stat timer for use by metric/stat calculations
+ */
+class MetricStatTimer {
+ static Timer _timer = new Timer("metric/stat timer", true);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e75219b5/storm-core/src/jvm/backtype/storm/metric/internal/MultiCountStatAndMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/internal/MultiCountStatAndMetric.java b/storm-core/src/jvm/backtype/storm/metric/internal/MultiCountStatAndMetric.java
new file mode 100644
index 0000000..5fae4bf
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/metric/internal/MultiCountStatAndMetric.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.metric.internal;
+
+import java.util.Map;
+import java.util.List;
+import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+import backtype.storm.metric.api.IMetric;
+
+/**
+ * Acts as a MultiCount Metric, but keeps track of approximate counts
+ * for the last 10 mins, 3 hours, 1 day, and all time. for the same keys
+ */
+public class MultiCountStatAndMetric<T> implements IMetric {
+ private ConcurrentHashMap<T, CountStatAndMetric> _counts = new ConcurrentHashMap<>();
+ private final int _numBuckets;
+
+ /**
+ * @param numBuckets the number of buckets to divide the time periods into.
+ */
+ public MultiCountStatAndMetric(int numBuckets) {
+ _numBuckets = numBuckets;
+ }
+
+ CountStatAndMetric get(T key) {
+ CountStatAndMetric c = _counts.get(key);
+ if (c == null) {
+ synchronized(this) {
+ c = _counts.get(key);
+ if (c == null) {
+ c = new CountStatAndMetric(_numBuckets);
+ _counts.put(key, c);
+ }
+ }
+ }
+ return c;
+ }
+
+ /**
+ * Increase the count by the given value.
+ *
+ * @param count number to count
+ */
+ public void incBy(T key, long count) {
+ get(key).incBy(count);
+ }
+
+ protected String keyToString(T key) {
+ if (key instanceof List) {
+ //This is a bit of a hack. If it is a list, then it is [component, stream]
+ //we want to format this as component:stream
+ List<String> lk = (List<String>)key;
+ return lk.get(0) + ":" + lk.get(1);
+ }
+ return key.toString();
+ }
+
+ @Override
+ public Object getValueAndReset() {
+ Map<String, Long> ret = new HashMap<String, Long>();
+ for (Map.Entry<T, CountStatAndMetric> entry: _counts.entrySet()) {
+ String key = keyToString(entry.getKey());
+ //There could be collisions if keyToString returns only part of a result.
+ Long val = (Long)entry.getValue().getValueAndReset();
+ Long other = ret.get(key);
+ val += other == null ? 0l : other;
+ ret.put(key, val);
+ }
+ return ret;
+ }
+
+ public Map<String, Map<T, Long>> getTimeCounts() {
+ Map<String, Map<T, Long>> ret = new HashMap<>();
+ for (Map.Entry<T, CountStatAndMetric> entry: _counts.entrySet()) {
+ T key = entry.getKey();
+ Map<String, Long> toFlip = entry.getValue().getTimeCounts();
+ for (Map.Entry<String, Long> subEntry: toFlip.entrySet()) {
+ String time = subEntry.getKey();
+ Map<T, Long> tmp = ret.get(time);
+ if (tmp == null) {
+ tmp = new HashMap<>();
+ ret.put(time, tmp);
+ }
+ tmp.put(key, subEntry.getValue());
+ }
+ }
+ return ret;
+ }
+
+ public void close() {
+ for (CountStatAndMetric cc: _counts.values()) {
+ cc.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e75219b5/storm-core/src/jvm/backtype/storm/metric/internal/MultiLatencyStatAndMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/internal/MultiLatencyStatAndMetric.java b/storm-core/src/jvm/backtype/storm/metric/internal/MultiLatencyStatAndMetric.java
new file mode 100644
index 0000000..032eef1
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/metric/internal/MultiLatencyStatAndMetric.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.metric.internal;
+
+import java.util.Map;
+import java.util.List;
+import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+import backtype.storm.metric.api.IMetric;
+
+/**
+ * Acts as a Latnecy Metric for multiple keys, but keeps track of approximate counts
+ * for the last 10 mins, 3 hours, 1 day, and all time. for the same keys
+ */
+public class MultiLatencyStatAndMetric<T> implements IMetric {
+ private ConcurrentHashMap<T, LatencyStatAndMetric> _lat = new ConcurrentHashMap<>();
+ private final int _numBuckets;
+
+ /**
+ * @param numBuckets the number of buckets to divide the time periods into.
+ */
+ public MultiLatencyStatAndMetric(int numBuckets) {
+ _numBuckets = numBuckets;
+ }
+
+ LatencyStatAndMetric get(T key) {
+ LatencyStatAndMetric c = _lat.get(key);
+ if (c == null) {
+ synchronized(this) {
+ c = _lat.get(key);
+ if (c == null) {
+ c = new LatencyStatAndMetric(_numBuckets);
+ _lat.put(key, c);
+ }
+ }
+ }
+ return c;
+ }
+
+ /**
+ * Record a latency value
+ *
+ * @param latency the measurement to record
+ */
+ public void record(T key, long latency) {
+ get(key).record(latency);
+ }
+
+ protected String keyToString(T key) {
+ if (key instanceof List) {
+ //This is a bit of a hack. If it is a list, then it is [component, stream]
+ //we want to format this as component:stream
+ List<String> lk = (List<String>)key;
+ return lk.get(0) + ":" + lk.get(1);
+ }
+ return key.toString();
+ }
+
+ @Override
+ public Object getValueAndReset() {
+ Map<String, Double> ret = new HashMap<String, Double>();
+ for (Map.Entry<T, LatencyStatAndMetric> entry: _lat.entrySet()) {
+ String key = keyToString(entry.getKey());
+ Double val = (Double)entry.getValue().getValueAndReset();
+ ret.put(key, val);
+ }
+ return ret;
+ }
+
+ public Map<String, Map<T, Double>> getTimeLatAvg() {
+ Map<String, Map<T, Double>> ret = new HashMap<>();
+ for (Map.Entry<T, LatencyStatAndMetric> entry: _lat.entrySet()) {
+ T key = entry.getKey();
+ Map<String, Double> toFlip = entry.getValue().getTimeLatAvg();
+ for (Map.Entry<String, Double> subEntry: toFlip.entrySet()) {
+ String time = subEntry.getKey();
+ Map<T, Double> tmp = ret.get(time);
+ if (tmp == null) {
+ tmp = new HashMap<>();
+ ret.put(time, tmp);
+ }
+ tmp.put(key, subEntry.getValue());
+ }
+ }
+ return ret;
+ }
+
+ public void close() {
+ for (LatencyStatAndMetric l: _lat.values()) {
+ l.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e75219b5/storm-core/src/jvm/backtype/storm/metric/internal/RateTracker.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/internal/RateTracker.java b/storm-core/src/jvm/backtype/storm/metric/internal/RateTracker.java
new file mode 100644
index 0000000..f3eb6ae
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/metric/internal/RateTracker.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.metric.internal;
+
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * This class is a utility to track the rate of something.
+ */
+public class RateTracker{
+ private final int _bucketSizeMillis;
+ //Old Buckets and their length are only touched when rotating or gathering the metrics, which should not be that frequent
+ // As such all access to them should be protected by synchronizing with the RateTracker instance
+ private final long[] _bucketTime;
+ private final long[] _oldBuckets;
+
+ private final AtomicLong _bucketStart;
+ private final AtomicLong _currentBucket;
+
+ private final TimerTask _task;
+
+ /**
+ * @param validTimeWindowInMils events that happened before validTimeWindowInMils are not considered
+ * when reporting the rate.
+ * @param numBuckets the number of time sildes to divide validTimeWindows. The more buckets,
+ * the smother the reported results will be.
+ */
+ public RateTracker(int validTimeWindowInMils, int numBuckets) {
+ this(validTimeWindowInMils, numBuckets, -1);
+ }
+
+ /**
+ * Constructor
+ * @param validTimeWindowInMils events that happened before validTimeWindow are not considered
+ * when reporting the rate.
+ * @param numBuckets the number of time sildes to divide validTimeWindows. The more buckets,
+ * the smother the reported results will be.
+ * @param startTime if positive the simulated time to start the first bucket at.
+ */
+ RateTracker(int validTimeWindowInMils, int numBuckets, long startTime){
+ numBuckets = Math.max(numBuckets, 1);
+ _bucketSizeMillis = validTimeWindowInMils / numBuckets;
+ if (_bucketSizeMillis < 1 ) {
+ throw new IllegalArgumentException("validTimeWindowInMilis and numOfSildes cause each slide to have a window that is too small");
+ }
+ _bucketTime = new long[numBuckets - 1];
+ _oldBuckets = new long[numBuckets - 1];
+
+ _bucketStart = new AtomicLong(startTime >= 0 ? startTime : System.currentTimeMillis());
+ _currentBucket = new AtomicLong(0);
+ if (startTime < 0) {
+ _task = new Fresher();
+ MetricStatTimer._timer.scheduleAtFixedRate(_task, _bucketSizeMillis, _bucketSizeMillis);
+ } else {
+ _task = null;
+ }
+ }
+
+ /**
+ * Notify the tracker upon new arrivals
+ *
+ * @param count number of arrivals
+ */
+ public void notify(long count) {
+ _currentBucket.addAndGet(count);
+ }
+
+ /**
+ * @return the approximate average rate per second.
+ */
+ public synchronized double reportRate() {
+ return reportRate(System.currentTimeMillis());
+ }
+
+ synchronized double reportRate(long currentTime) {
+ long duration = Math.max(1l, currentTime - _bucketStart.get());
+ long events = _currentBucket.get();
+ for (int i = 0; i < _oldBuckets.length; i++) {
+ events += _oldBuckets[i];
+ duration += _bucketTime[i];
+ }
+
+ return events * 1000.0 / duration;
+ }
+
+ public void close() {
+ if (_task != null) {
+ _task.cancel();
+ }
+ }
+
+ /**
+ * Rotate the buckets a set number of times for testing purposes.
+ * @param numToEclipse the number of rotations to perform.
+ */
+ final void forceRotate(int numToEclipse, long interval) {
+ long time = _bucketStart.get();
+ for (int i = 0; i < numToEclipse; i++) {
+ time += interval;
+ rotateBuckets(time);
+ }
+ }
+
+ private synchronized void rotateBuckets(long time) {
+ long timeSpent = time - _bucketStart.getAndSet(time);
+ long currentVal = _currentBucket.getAndSet(0);
+ for (int i = 0; i < _oldBuckets.length; i++) {
+ long tmpTime = _bucketTime[i];
+ _bucketTime[i] = timeSpent;
+ timeSpent = tmpTime;
+
+ long cnt = _oldBuckets[i];
+ _oldBuckets[i] = currentVal;
+ currentVal = cnt;
+ }
+ }
+
+ private class Fresher extends TimerTask {
+ public void run () {
+ rotateBuckets(System.currentTimeMillis());
+ }
+ }
+
+ public static void main (String args[]) throws Exception {
+ final int number = (args.length >= 1) ? Integer.parseInt(args[0]) : 100000000;
+ for (int i = 0; i < 10; i++) {
+ testRate(number);
+ }
+ }
+
+ private static void testRate(int number) {
+ RateTracker rt = new RateTracker(10000, 10);
+ long start = System.currentTimeMillis();
+ for (int i = 0; i < number; i++) {
+ rt.notify(1);
+ if ((i % 1000000) == 0) {
+ //There is an issue with some JVM versions where an integer for loop that takes a long time
+ // can starve other threads resulting in the timer thread not getting called.
+ // This is a work around for that, and we still get the same results.
+ Thread.yield();
+ }
+ }
+ long end = System.currentTimeMillis();
+ double rate = rt.reportRate();
+ rt.close();
+ System.out.printf("time %,8d count %,8d rate %,15.2f reported rate %,15.2f\n", end-start,number, ((number * 1000.0)/(end-start)), rate);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e75219b5/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
index 3e71bca..fa073e7 100644
--- a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
@@ -37,6 +37,7 @@ import java.util.HashMap;
import java.util.Map;
import backtype.storm.metric.api.IStatefulObject;
+import backtype.storm.metric.internal.RateTracker;
/**
http://git-wip-us.apache.org/repos/asf/storm/blob/e75219b5/storm-core/src/jvm/backtype/storm/utils/RateTracker.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/RateTracker.java b/storm-core/src/jvm/backtype/storm/utils/RateTracker.java
deleted file mode 100644
index a490ecc..0000000
--- a/storm-core/src/jvm/backtype/storm/utils/RateTracker.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.utils;
-
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * This class is a utility to track the rate of something.
- */
-public class RateTracker{
- private final int _bucketSizeMillis;
- //Old Buckets and their length are only touched when rotating or gathering the metrics, which should not be that frequent
- // As such all access to them should be protected by synchronizing with the RateTracker instance
- private final long[] _bucketTime;
- private final long[] _oldBuckets;
-
- private final AtomicLong _bucketStart;
- private final AtomicLong _currentBucket;
-
- private final TimerTask _task;
- private static Timer _timer = new Timer("rate tracker timer", true);
-
- /**
- * @param validTimeWindowInMils events that happened before validTimeWindowInMils are not considered
- * when reporting the rate.
- * @param numBuckets the number of time sildes to divide validTimeWindows. The more buckets,
- * the smother the reported results will be.
- */
- public RateTracker(int validTimeWindowInMils, int numBuckets) {
- this(validTimeWindowInMils, numBuckets, -1);
- }
-
- /**
- * Constructor
- * @param validTimeWindowInMils events that happened before validTimeWindow are not considered
- * when reporting the rate.
- * @param numBuckets the number of time sildes to divide validTimeWindows. The more buckets,
- * the smother the reported results will be.
- * @param startTime if positive the simulated time to start the first bucket at.
- */
- RateTracker(int validTimeWindowInMils, int numBuckets, long startTime){
- numBuckets = Math.max(numBuckets, 1);
- _bucketSizeMillis = validTimeWindowInMils / numBuckets;
- if (_bucketSizeMillis < 1 ) {
- throw new IllegalArgumentException("validTimeWindowInMilis and numOfSildes cause each slide to have a window that is too small");
- }
- _bucketTime = new long[numBuckets - 1];
- _oldBuckets = new long[numBuckets - 1];
-
- _bucketStart = new AtomicLong(startTime >= 0 ? startTime : System.currentTimeMillis());
- _currentBucket = new AtomicLong(0);
- if (startTime < 0) {
- _task = new Fresher();
- _timer.scheduleAtFixedRate(_task, _bucketSizeMillis, _bucketSizeMillis);
- } else {
- _task = null;
- }
- }
-
- /**
- * Notify the tracker upon new arrivals
- *
- * @param count number of arrivals
- */
- public void notify(long count) {
- _currentBucket.addAndGet(count);
- }
-
- /**
- * @return the approximate average rate per second.
- */
- public synchronized double reportRate() {
- return reportRate(System.currentTimeMillis());
- }
-
- synchronized double reportRate(long currentTime) {
- long duration = Math.max(1l, currentTime - _bucketStart.get());
- long events = _currentBucket.get();
- for (int i = 0; i < _oldBuckets.length; i++) {
- events += _oldBuckets[i];
- duration += _bucketTime[i];
- }
-
- return events * 1000.0 / duration;
- }
-
- public void close() {
- if (_task != null) {
- _task.cancel();
- }
- }
-
- /**
- * Rotate the buckets a set number of times for testing purposes.
- * @param numToEclipse the number of rotations to perform.
- */
- final void forceRotate(int numToEclipse, long interval) {
- long time = _bucketStart.get();
- for (int i = 0; i < numToEclipse; i++) {
- time += interval;
- rotateBuckets(time);
- }
- }
-
- private synchronized void rotateBuckets(long time) {
- long timeSpent = time - _bucketStart.getAndSet(time);
- long currentVal = _currentBucket.getAndSet(0);
- for (int i = 0; i < _oldBuckets.length; i++) {
- long tmpTime = _bucketTime[i];
- _bucketTime[i] = timeSpent;
- timeSpent = tmpTime;
-
- long cnt = _oldBuckets[i];
- _oldBuckets[i] = currentVal;
- currentVal = cnt;
- }
- }
-
- private class Fresher extends TimerTask {
- public void run () {
- rotateBuckets(System.currentTimeMillis());
- }
- }
-
- public static void main (String args[]) throws Exception {
- final int number = (args.length >= 1) ? Integer.parseInt(args[0]) : 100000000;
- for (int i = 0; i < 10; i++) {
- testRate(number);
- }
- }
-
- private static void testRate(int number) {
- RateTracker rt = new RateTracker(10000, 10);
- long start = System.currentTimeMillis();
- for (int i = 0; i < number; i++) {
- rt.notify(1);
- if ((i % 1000000) == 0) {
- //There is an issue with some JVM versions where an integer for loop that takes a long time
- // can starve other threads resulting in the timer thread not getting called.
- // This is a work around for that, and we still get the same results.
- Thread.yield();
- }
- }
- long end = System.currentTimeMillis();
- double rate = rt.reportRate();
- rt.close();
- System.out.printf("time %,8d count %,8d rate %,15.2f reported rate %,15.2f\n", end-start,number, ((number * 1000.0)/(end-start)), rate);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e75219b5/storm-core/test/jvm/backtype/storm/metric/internal/CountStatAndMetricTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/metric/internal/CountStatAndMetricTest.java b/storm-core/test/jvm/backtype/storm/metric/internal/CountStatAndMetricTest.java
new file mode 100644
index 0000000..7fa8087
--- /dev/null
+++ b/storm-core/test/jvm/backtype/storm/metric/internal/CountStatAndMetricTest.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.metric.internal;
+
+import java.util.Map;
+import java.util.HashMap;
+
+import org.junit.Test;
+import junit.framework.TestCase;
+import static org.junit.Assert.*;
+
+/**
+ * Unit test for CountStatAndMetric
+ */
+public class CountStatAndMetricTest extends TestCase {
+ final long TEN_MIN = 10 * 60 * 1000;
+ final long THIRTY_SEC = 30 * 1000;
+ final long THREE_HOUR = 3 * 60 * 60 * 1000;
+ final long ONE_DAY = 24 * 60 * 60 * 1000;
+
+ @Test
+ public void testBasic() {
+ long time = 0l;
+ CountStatAndMetric count = new CountStatAndMetric(10, time);
+ while (time < TEN_MIN) {
+ //For this part of the test we interleve the differnt rotation types.
+ count.incBy(50);
+ time += THIRTY_SEC/2;
+ count.rotateSched(time);
+ count.incBy(50);
+ time += THIRTY_SEC/2;
+ assertEquals(100l, ((Long)count.getValueAndReset(time)).longValue());
+ }
+
+ long val = 100 * TEN_MIN/THIRTY_SEC;
+ Map<String, Long> expected = new HashMap<String, Long>();
+ expected.put("600", val);
+ expected.put("10800", val);
+ expected.put("86400", val);
+ expected.put(":all-time", val);
+ assertEquals(expected, count.getTimeCounts(time));
+
+ while (time < THREE_HOUR) {
+ count.incBy(100);
+ time += THIRTY_SEC;
+ assertEquals(100l, ((Long)count.getValueAndReset(time)).longValue());
+ }
+
+ val = 100 * THREE_HOUR/THIRTY_SEC;
+ expected = new HashMap<String, Long>();
+ expected.put("600", 100 * TEN_MIN/THIRTY_SEC);
+ expected.put("10800", val);
+ expected.put("86400", val);
+ expected.put(":all-time", val);
+ assertEquals(expected, count.getTimeCounts(time));
+
+ while (time < ONE_DAY) {
+ count.incBy(100);
+ time += THIRTY_SEC;
+ assertEquals(100l, ((Long)count.getValueAndReset(time)).longValue());
+ }
+
+ val = 100 * ONE_DAY/THIRTY_SEC;
+ expected = new HashMap<String, Long>();
+ expected.put("600", 100 * TEN_MIN/THIRTY_SEC);
+ expected.put("10800", 100 * THREE_HOUR/THIRTY_SEC);
+ expected.put("86400", val);
+ expected.put(":all-time", val);
+ assertEquals(expected, count.getTimeCounts(time));
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e75219b5/storm-core/test/jvm/backtype/storm/metric/internal/LatencyStatAndMetricTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/metric/internal/LatencyStatAndMetricTest.java b/storm-core/test/jvm/backtype/storm/metric/internal/LatencyStatAndMetricTest.java
new file mode 100644
index 0000000..3b83d95
--- /dev/null
+++ b/storm-core/test/jvm/backtype/storm/metric/internal/LatencyStatAndMetricTest.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.metric.internal;
+
+import java.util.Map;
+import java.util.HashMap;
+
+import org.junit.Test;
+import junit.framework.TestCase;
+import static org.junit.Assert.*;
+
+/**
+ * Unit test for LatencyStatAndMetric
+ */
+public class LatencyStatAndMetricTest extends TestCase {
+ final long TEN_MIN = 10 * 60 * 1000;
+ final long THIRTY_SEC = 30 * 1000;
+ final long THREE_HOUR = 3 * 60 * 60 * 1000;
+ final long ONE_DAY = 24 * 60 * 60 * 1000;
+
+ @Test
+ public void testBasic() {
+ long time = 0l;
+ LatencyStatAndMetric lat = new LatencyStatAndMetric(10, time);
+ while (time < TEN_MIN) {
+ lat.record(100);
+ time += THIRTY_SEC;
+ assertEquals(100.0, ((Double)lat.getValueAndReset(time)).doubleValue(), 0.01);
+ }
+
+ Map<String, Double> found = lat.getTimeLatAvg(time);
+ assertEquals(4, found.size());
+ assertEquals(100.0, found.get("600").doubleValue(), 0.01);
+ assertEquals(100.0, found.get("10800").doubleValue(), 0.01);
+ assertEquals(100.0, found.get("86400").doubleValue(), 0.01);
+ assertEquals(100.0, found.get(":all-time").doubleValue(), 0.01);
+
+ while (time < THREE_HOUR) {
+ lat.record(200);
+ time += THIRTY_SEC;
+ assertEquals(200.0, ((Double)lat.getValueAndReset(time)).doubleValue(), 0.01);
+ }
+
+ double expected = ((100.0 * TEN_MIN/THIRTY_SEC) + (200.0 * (THREE_HOUR - TEN_MIN)/THIRTY_SEC)) /
+ (THREE_HOUR/THIRTY_SEC);
+ found = lat.getTimeLatAvg(time);
+ assertEquals(4, found.size());
+ assertEquals(200.0, found.get("600").doubleValue(), 0.01); //flushed the buffers completely
+ assertEquals(expected, found.get("10800").doubleValue(), 0.01);
+ assertEquals(expected, found.get("86400").doubleValue(), 0.01);
+ assertEquals(expected, found.get(":all-time").doubleValue(), 0.01);
+
+ while (time < ONE_DAY) {
+ lat.record(300);
+ time += THIRTY_SEC;
+ assertEquals(300.0, ((Double)lat.getValueAndReset(time)).doubleValue(), 0.01);
+ }
+
+ expected = ((100.0 * TEN_MIN/THIRTY_SEC) + (200.0 * (THREE_HOUR - TEN_MIN)/THIRTY_SEC) + (300.0 * (ONE_DAY - THREE_HOUR)/THIRTY_SEC)) /
+ (ONE_DAY/THIRTY_SEC);
+ found = lat.getTimeLatAvg(time);
+ assertEquals(4, found.size());
+ assertEquals(300.0, found.get("600").doubleValue(), 0.01); //flushed the buffers completely
+ assertEquals(300.0, found.get("10800").doubleValue(), 0.01);
+ assertEquals(expected, found.get("86400").doubleValue(), 0.01);
+ assertEquals(expected, found.get(":all-time").doubleValue(), 0.01);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e75219b5/storm-core/test/jvm/backtype/storm/metric/internal/RateTrackerTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/metric/internal/RateTrackerTest.java b/storm-core/test/jvm/backtype/storm/metric/internal/RateTrackerTest.java
new file mode 100644
index 0000000..debb922
--- /dev/null
+++ b/storm-core/test/jvm/backtype/storm/metric/internal/RateTrackerTest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.metric.internal;
+
+import org.junit.Test;
+import junit.framework.TestCase;
+import static org.junit.Assert.*;
+
+/**
+ * Unit test for RateTracker
+ */
+public class RateTrackerTest extends TestCase {
+
+ @Test
+ public void testExactRate() {
+ //This test is in two phases. The first phase fills up the 10 buckets with 10 tuples each
+ // We purposely simulate a 1 second bucket size so the rate will always be 10 per second.
+ final long interval = 1000l;
+ long time = 0l;
+ RateTracker rt = new RateTracker(10000, 10, time);
+ double [] expected = new double[] {10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0};
+ for (int i = 0; i < expected.length; i++) {
+ double exp = expected[i];
+ rt.notify(10);
+ time += interval;
+ double actual = rt.reportRate(time);
+ rt.forceRotate(1, interval);
+ assertEquals("Expected rate on iteration "+i+" is wrong.", exp, actual, 0.00001);
+ }
+ //In the second part of the test the rate doubles to 20 per second but the rate tracker
+ // increases its result slowly as we push the 10 tuples per second buckets out and relpace them
+ // with 20 tuples per second.
+ expected = new double[] {11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0, 18.0, 19.0, 20.0};
+ for (int i = 0; i < expected.length; i++) {
+ double exp = expected[i];
+ rt.notify(20);
+ time += interval;
+ double actual = rt.reportRate(time);
+ rt.forceRotate(1, interval);
+ assertEquals("Expected rate on iteration "+i+" is wrong.", exp, actual, 0.00001);
+ }
+ }
+
+
+ @Test
+ public void testEclipsedAllWindows() {
+ long time = 0;
+ RateTracker rt = new RateTracker(10000, 10, time);
+ rt.notify(10);
+ rt.forceRotate(10, 1000l);
+ assertEquals(0.0, rt.reportRate(10000l), 0.00001);
+ }
+
+ @Test
+ public void testEclipsedOneWindow() {
+ long time = 0;
+ RateTracker rt = new RateTracker(10000, 10, time);
+ rt.notify(1);
+ double r1 = rt.reportRate(1000l);
+ rt.forceRotate(1, 1000l);
+ rt.notify(1);
+ double r2 = rt.reportRate(2000l);
+
+ assertEquals(r1, r2, 0.00001);
+ }
+
+ @Test
+ public void testEclipsedNineWindows() {
+ long time = 0;
+ RateTracker rt = new RateTracker(10000, 10, time);
+ rt.notify(1);
+ double r1 = rt.reportRate(1000);
+ rt.forceRotate(9, 1000);
+ rt.notify(9);
+ double r2 = rt.reportRate(10000);
+
+ assertEquals(r1, r2, 0.00001);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e75219b5/storm-core/test/jvm/backtype/storm/utils/RateTrackerTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/utils/RateTrackerTest.java b/storm-core/test/jvm/backtype/storm/utils/RateTrackerTest.java
deleted file mode 100644
index 4f5eacb..0000000
--- a/storm-core/test/jvm/backtype/storm/utils/RateTrackerTest.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.utils;
-
-import org.junit.Test;
-import junit.framework.TestCase;
-import static org.junit.Assert.*;
-
-/**
- * Unit test for RateTracker
- */
-public class RateTrackerTest extends TestCase {
-
- @Test
- public void testExactRate() {
- //This test is in two phases. The first phase fills up the 10 buckets with 10 tuples each
- // We purposely simulate a 1 second bucket size so the rate will always be 10 per second.
- final long interval = 1000l;
- long time = 0l;
- RateTracker rt = new RateTracker(10000, 10, time);
- double [] expected = new double[] {10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0};
- for (int i = 0; i < expected.length; i++) {
- double exp = expected[i];
- rt.notify(10);
- time += interval;
- double actual = rt.reportRate(time);
- rt.forceRotate(1, interval);
- assertEquals("Expected rate on iteration "+i+" is wrong.", exp, actual, 0.00001);
- }
- //In the second part of the test the rate doubles to 20 per second but the rate tracker
- // increases its result slowly as we push the 10 tuples per second buckets out and relpace them
- // with 20 tuples per second.
- expected = new double[] {11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0, 18.0, 19.0, 20.0};
- for (int i = 0; i < expected.length; i++) {
- double exp = expected[i];
- rt.notify(20);
- time += interval;
- double actual = rt.reportRate(time);
- rt.forceRotate(1, interval);
- assertEquals("Expected rate on iteration "+i+" is wrong.", exp, actual, 0.00001);
- }
- }
-
-
- @Test
- public void testEclipsedAllWindows() {
- long time = 0;
- RateTracker rt = new RateTracker(10000, 10, time);
- rt.notify(10);
- rt.forceRotate(10, 1000l);
- assertEquals(0.0, rt.reportRate(10000l), 0.00001);
- }
-
- @Test
- public void testEclipsedOneWindow() {
- long time = 0;
- RateTracker rt = new RateTracker(10000, 10, time);
- rt.notify(1);
- double r1 = rt.reportRate(1000l);
- rt.forceRotate(1, 1000l);
- rt.notify(1);
- double r2 = rt.reportRate(2000l);
-
- assertEquals(r1, r2, 0.00001);
- }
-
- @Test
- public void testEclipsedNineWindows() {
- long time = 0;
- RateTracker rt = new RateTracker(10000, 10, time);
- rt.notify(1);
- double r1 = rt.reportRate(1000);
- rt.forceRotate(9, 1000);
- rt.notify(9);
- double r2 = rt.reportRate(10000);
-
- assertEquals(r1, r2, 0.00001);
- }
-}
[2/4] storm git commit: Removed reflection from the critical path
Posted by sr...@apache.org.
Removed reflection from the critical path
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/16c33efa
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/16c33efa
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/16c33efa
Branch: refs/heads/master
Commit: 16c33efac39edea7e2d77df319d0ee3fd407383a
Parents: e75219b
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Sat Oct 24 10:27:11 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Sat Oct 24 10:27:45 2015 -0500
----------------------------------------------------------------------
storm-core/src/clj/backtype/storm/daemon/executor.clj | 2 +-
storm-core/src/clj/backtype/storm/daemon/worker.clj | 4 ++--
storm-core/src/clj/backtype/storm/stats.clj | 4 ++--
3 files changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/16c33efa/storm-core/src/clj/backtype/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/executor.clj b/storm-core/src/clj/backtype/storm/daemon/executor.clj
index a398b8a..fd8b886 100644
--- a/storm-core/src/clj/backtype/storm/daemon/executor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/executor.clj
@@ -568,7 +568,7 @@
(tasks-fn out-stream-id values))
rooted? (and message-id has-ackers?)
root-id (if rooted? (MessageId/generateId rand))
- out-ids (fast-list-for [t out-tasks] (if rooted? (MessageId/generateId rand)))]
+ ^List out-ids (fast-list-for [t out-tasks] (if rooted? (MessageId/generateId rand)))]
(fast-list-iter [out-task out-tasks id out-ids]
(let [tuple-id (if rooted?
(MessageId/makeRootId root-id id)
http://git-wip-us.apache.org/repos/asf/storm/blob/16c33efa/storm-core/src/clj/backtype/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj
index e349a53..2b74f69 100644
--- a/storm-core/src/clj/backtype/storm/daemon/worker.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj
@@ -169,9 +169,9 @@
(do
(when (not (.get remoteMap task))
(.put remoteMap task (ArrayList.)))
- (let [remote (.get remoteMap task)]
+ (let [^ArrayList remote (.get remoteMap task)]
(if (not-nil? task)
- (.add remote (TaskMessage. task (.serialize serializer tuple)))
+ (.add remote (TaskMessage. ^int task ^bytes (.serialize serializer tuple)))
(log-warn "Can't transfer tuple - task value is nil. tuple type: " (pr-str (type tuple)) " and information: " (pr-str tuple)))
))))
http://git-wip-us.apache.org/repos/asf/storm/blob/16c33efa/storm-core/src/clj/backtype/storm/stats.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/stats.clj b/storm-core/src/clj/backtype/storm/stats.clj
index efc4965..16a00ec 100644
--- a/storm-core/src/clj/backtype/storm/stats.clj
+++ b/storm-core/src/clj/backtype/storm/stats.clj
@@ -116,11 +116,11 @@
(defn emitted-tuple!
[stats stream]
- (.incBy ^MultiCountStatAndMetric (stats-emitted stats) stream (stats-rate stats)))
+ (.incBy ^MultiCountStatAndMetric (stats-emitted stats) ^Object stream ^long (stats-rate stats)))
(defn transferred-tuples!
[stats stream amt]
- (.incBy ^MultiCountStatAndMetric (stats-transferred stats) stream (* (stats-rate stats) amt)))
+ (.incBy ^MultiCountStatAndMetric (stats-transferred stats) ^Object stream ^long (* (stats-rate stats) amt)))
(defn bolt-execute-tuple!
[^BoltExecutorStats stats component stream latency-ms]
[4/4] storm git commit: Added STORM-1128 to CHANGELOG.
Posted by sr...@apache.org.
Added STORM-1128 to CHANGELOG.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3bc375e7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3bc375e7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3bc375e7
Branch: refs/heads/master
Commit: 3bc375e73b05b0ff1b390ce5957882a108068c00
Parents: 46b8078
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Sat Oct 24 18:17:14 2015 -0700
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Sat Oct 24 18:17:14 2015 -0700
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/3bc375e7/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 431148a..f069844 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 0.11.0
+ * STORM-1128: Make metrics fast
* STORM-1122: Fix the format issue in Utils.java
* STORM-1111: Fix Validation for lots of different configs
* STORM-1125: Adding separate ZK client for read in Nimbus ZK State
[3/4] storm git commit: Fixed config validation
Posted by sr...@apache.org.
Fixed config validation
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/46b80788
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/46b80788
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/46b80788
Branch: refs/heads/master
Commit: 46b807881379270617ec56c298a2a687a16f45f1
Parents: 16c33ef
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Sat Oct 24 10:52:49 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Sat Oct 24 10:52:49 2015 -0500
----------------------------------------------------------------------
storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/46b80788/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java b/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
index 1b0cd7f..9aa1205 100644
--- a/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
+++ b/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
@@ -467,7 +467,7 @@ public class ConfigValidation {
}
SimpleTypeValidator.validateField(name, String.class, ((Map) o).get("class"));
- SimpleTypeValidator.validateField(name, long.class, ((Map) o).get("parallelism.hint"));
+ SimpleTypeValidator.validateField(name, Long.class, ((Map) o).get("parallelism.hint"));
}
}