You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/10/25 02:25:24 UTC

[1/4] storm git commit: STORM-1128: Make metrics fast

Repository: storm
Updated Branches:
  refs/heads/master e93015c64 -> 3bc375e73


STORM-1128: Make metrics fast


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e75219b5
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e75219b5
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e75219b5

Branch: refs/heads/master
Commit: e75219b5a4856c39c4031c3d6205105e034d10af
Parents: e93015c
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Thu Oct 22 10:48:04 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Sat Oct 24 09:11:58 2015 -0500

----------------------------------------------------------------------
 .../jvm/storm/starter/ThroughputVsLatency.java  |  11 +-
 .../backtype/storm/daemon/builtin_metrics.clj   |  84 +++---
 .../src/clj/backtype/storm/daemon/executor.clj  |  16 --
 .../src/clj/backtype/storm/daemon/task.clj      |  10 +-
 storm-core/src/clj/backtype/storm/stats.clj     | 233 ++++++-----------
 .../metric/internal/CountStatAndMetric.java     | 211 +++++++++++++++
 .../metric/internal/LatencyStatAndMetric.java   | 259 +++++++++++++++++++
 .../storm/metric/internal/MetricStatTimer.java  |  27 ++
 .../internal/MultiCountStatAndMetric.java       | 112 ++++++++
 .../internal/MultiLatencyStatAndMetric.java     | 109 ++++++++
 .../storm/metric/internal/RateTracker.java      | 165 ++++++++++++
 .../backtype/storm/utils/DisruptorQueue.java    |   1 +
 .../jvm/backtype/storm/utils/RateTracker.java   | 166 ------------
 .../metric/internal/CountStatAndMetricTest.java |  86 ++++++
 .../internal/LatencyStatAndMetricTest.java      |  83 ++++++
 .../storm/metric/internal/RateTrackerTest.java  |  94 +++++++
 .../backtype/storm/utils/RateTrackerTest.java   |  94 -------
 17 files changed, 1264 insertions(+), 497 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e75219b5/examples/storm-starter/src/jvm/storm/starter/ThroughputVsLatency.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/ThroughputVsLatency.java b/examples/storm-starter/src/jvm/storm/starter/ThroughputVsLatency.java
index 2608755..4c6680e 100644
--- a/examples/storm-starter/src/jvm/storm/starter/ThroughputVsLatency.java
+++ b/examples/storm-starter/src/jvm/storm/starter/ThroughputVsLatency.java
@@ -277,15 +277,17 @@ public class ThroughputVsLatency {
         SpoutStats stats = exec.get_stats().get_specific().get_spout();
         Map<String, Long> failedMap = stats.get_failed().get(":all-time");
         Map<String, Long> ackedMap = stats.get_acked().get(":all-time");
-        for (String key: ackedMap.keySet()) {
-          if (failedMap != null) {
+        if (ackedMap != null) {
+          for (String key: ackedMap.keySet()) {
+            if (failedMap != null) {
               Long tmp = failedMap.get(key);
               if (tmp != null) {
                   failed += tmp;
               }
+            }
+            long ackVal = ackedMap.get(key);
+            acked += ackVal;
           }
-          long ackVal = ackedMap.get(key);
-          acked += ackVal;
         }
       }
     }
@@ -394,6 +396,7 @@ public class ThroughputVsLatency {
 
     C cluster = new C(conf);
     conf.setNumWorkers(parallelism);
+    conf.registerMetricsConsumer(backtype.storm.metric.LoggingMetricsConsumer.class);
     conf.registerMetricsConsumer(backtype.storm.metric.HttpForwardingMetricsConsumer.class, url, 1);
     Map<String, String> workerMetrics = new HashMap<String, String>();
     if (!cluster.isLocal()) {

http://git-wip-us.apache.org/repos/asf/storm/blob/e75219b5/storm-core/src/clj/backtype/storm/daemon/builtin_metrics.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/builtin_metrics.clj b/storm-core/src/clj/backtype/storm/daemon/builtin_metrics.clj
index 990800a..0caa0b9 100644
--- a/storm-core/src/clj/backtype/storm/daemon/builtin_metrics.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/builtin_metrics.clj
@@ -14,41 +14,42 @@
 ;; See the License for the specific language governing permissions and
 ;; limitations under the License.
 (ns backtype.storm.daemon.builtin-metrics
-  (:import [backtype.storm.metric.api MultiCountMetric MultiReducedMetric CountMetric MeanReducer StateMetric IMetric IStatefulObject])
+  (:import [backtype.storm.metric.api CountMetric StateMetric IMetric IStatefulObject])
+  (:import [backtype.storm.metric.internal MultiCountStatAndMetric MultiLatencyStatAndMetric])
   (:import [backtype.storm Config])
-  (:use [backtype.storm.stats :only [stats-rate]]))
-
-(defrecord BuiltinSpoutMetrics [^MultiCountMetric ack-count                                
-                                ^MultiReducedMetric complete-latency
-                                ^MultiCountMetric fail-count
-                                ^MultiCountMetric emit-count
-                                ^MultiCountMetric transfer-count])
-(defrecord BuiltinBoltMetrics [^MultiCountMetric ack-count
-                               ^MultiReducedMetric process-latency
-                               ^MultiCountMetric fail-count
-                               ^MultiCountMetric execute-count
-                               ^MultiReducedMetric execute-latency
-                               ^MultiCountMetric emit-count
-                               ^MultiCountMetric transfer-count])
+  (:use [backtype.storm.stats]))
+
+(defrecord BuiltinSpoutMetrics [^MultiCountStatAndMetric ack-count
+                                ^MultiLatencyStatAndMetric complete-latency
+                                ^MultiCountStatAndMetric fail-count
+                                ^MultiCountStatAndMetric emit-count
+                                ^MultiCountStatAndMetric transfer-count])
+(defrecord BuiltinBoltMetrics [^MultiCountStatAndMetric ack-count
+                               ^MultiLatencyStatAndMetric process-latency
+                               ^MultiCountStatAndMetric fail-count
+                               ^MultiCountStatAndMetric execute-count
+                               ^MultiLatencyStatAndMetric execute-latency
+                               ^MultiCountStatAndMetric emit-count
+                               ^MultiCountStatAndMetric transfer-count])
 (defrecord SpoutThrottlingMetrics [^CountMetric skipped-max-spout
                                    ^CountMetric skipped-throttle
                                    ^CountMetric skipped-inactive])
 
 
-(defn make-data [executor-type]
+(defn make-data [executor-type stats]
   (condp = executor-type
-    :spout (BuiltinSpoutMetrics. (MultiCountMetric.)
-                                 (MultiReducedMetric. (MeanReducer.))
-                                 (MultiCountMetric.)
-                                 (MultiCountMetric.)
-                                 (MultiCountMetric.))
-    :bolt (BuiltinBoltMetrics. (MultiCountMetric.)
-                               (MultiReducedMetric. (MeanReducer.))
-                               (MultiCountMetric.)
-                               (MultiCountMetric.)
-                               (MultiReducedMetric. (MeanReducer.))
-                               (MultiCountMetric.)
-                               (MultiCountMetric.))))
+    :spout (BuiltinSpoutMetrics. (stats-acked stats)
+                                 (stats-complete-latencies stats)
+                                 (stats-failed stats)
+                                 (stats-emitted stats)
+                                 (stats-transferred stats))
+    :bolt (BuiltinBoltMetrics. (stats-acked stats)
+                               (stats-process-latencies stats)
+                               (stats-failed stats)
+                               (stats-executed stats)
+                               (stats-execute-latencies stats)
+                               (stats-emitted stats)
+                               (stats-transferred stats))))
 
 (defn make-spout-throttling-data []
   (SpoutThrottlingMetrics. (CountMetric.)
@@ -87,33 +88,6 @@
     (.registerMetric topology-context (str "__" (name qname)) (StateMetric. q)
                      (int (get storm-conf Config/TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS)))))
 
-(defn spout-acked-tuple! [^BuiltinSpoutMetrics m stats stream latency-ms]  
-  (-> m .ack-count (.scope stream) (.incrBy (stats-rate stats)))
-  (-> m .complete-latency (.scope stream) (.update latency-ms)))
-
-(defn spout-failed-tuple! [^BuiltinSpoutMetrics m stats stream]  
-  (-> m .fail-count (.scope stream) (.incrBy (stats-rate stats))))
-
-(defn bolt-execute-tuple! [^BuiltinBoltMetrics m stats comp-id stream latency-ms]
-  (let [scope (str comp-id ":" stream)]    
-    (-> m .execute-count (.scope scope) (.incrBy (stats-rate stats)))
-    (-> m .execute-latency (.scope scope) (.update latency-ms))))
-
-(defn bolt-acked-tuple! [^BuiltinBoltMetrics m stats comp-id stream latency-ms]
-  (let [scope (str comp-id ":" stream)]
-    (-> m .ack-count (.scope scope) (.incrBy (stats-rate stats)))
-    (-> m .process-latency (.scope scope) (.update latency-ms))))
-
-(defn bolt-failed-tuple! [^BuiltinBoltMetrics m stats comp-id stream]
-  (let [scope (str comp-id ":" stream)]    
-    (-> m .fail-count (.scope scope) (.incrBy (stats-rate stats)))))
-
-(defn emitted-tuple! [m stats stream]
-  (-> m :emit-count (.scope stream) (.incrBy (stats-rate stats))))
-
-(defn transferred-tuple! [m stats stream num-out-tasks]
-  (-> m :transfer-count (.scope stream) (.incrBy (* num-out-tasks (stats-rate stats)))))
-
 (defn skipped-max-spout! [^SpoutThrottlingMetrics m stats]
   (-> m .skipped-max-spout (.incrBy (stats-rate stats))))
 

http://git-wip-us.apache.org/repos/asf/storm/blob/e75219b5/storm-core/src/clj/backtype/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/executor.clj b/storm-core/src/clj/backtype/storm/daemon/executor.clj
index ed92e9c..a398b8a 100644
--- a/storm-core/src/clj/backtype/storm/daemon/executor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/executor.clj
@@ -438,7 +438,6 @@
     (.fail spout msg-id)
     (task/apply-hooks (:user-context task-data) .spoutFail (SpoutFailInfo. msg-id task-id time-delta))
     (when time-delta
-      (builtin-metrics/spout-failed-tuple! (:builtin-metrics task-data) (:stats executor-data) (:stream tuple-info))      
       (stats/spout-failed-tuple! (:stats executor-data) (:stream tuple-info) time-delta))))
 
 (defn- ack-spout-msg [executor-data task-data msg-id tuple-info time-delta id]
@@ -450,7 +449,6 @@
     (.ack spout msg-id)
     (task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id task-id time-delta))
     (when time-delta
-      (builtin-metrics/spout-acked-tuple! (:builtin-metrics task-data) (:stats executor-data) (:stream tuple-info) time-delta)
       (stats/spout-acked-tuple! (:stats executor-data) (:stream tuple-info) time-delta))))
 
 (defn mk-task-receiver [executor-data tuple-action-fn]
@@ -760,11 +758,6 @@
  
                                   (task/apply-hooks user-context .boltExecute (BoltExecuteInfo. tuple task-id delta))
                                   (when delta
-                                    (builtin-metrics/bolt-execute-tuple! (:builtin-metrics task-data)
-                                                                         executor-stats
-                                                                         (.getSourceComponent tuple)                                                      
-                                                                         (.getSourceStreamId tuple)
-                                                                         delta)
                                     (stats/bolt-execute-tuple! executor-stats
                                                                (.getSourceComponent tuple)
                                                                (.getSourceStreamId tuple)
@@ -846,11 +839,6 @@
                              (log-message "BOLT ack TASK: " task-id " TIME: " delta " TUPLE: " tuple))
                            (task/apply-hooks user-context .boltAck (BoltAckInfo. tuple task-id delta))
                            (when delta
-                             (builtin-metrics/bolt-acked-tuple! (:builtin-metrics task-data)
-                                                                executor-stats
-                                                                (.getSourceComponent tuple)                                                      
-                                                                (.getSourceStreamId tuple)
-                                                                delta)
                              (stats/bolt-acked-tuple! executor-stats
                                                       (.getSourceComponent tuple)
                                                       (.getSourceStreamId tuple)
@@ -866,10 +854,6 @@
                              (log-message "BOLT fail TASK: " task-id " TIME: " delta " TUPLE: " tuple))
                            (task/apply-hooks user-context .boltFail (BoltFailInfo. tuple task-id delta))
                            (when delta
-                             (builtin-metrics/bolt-failed-tuple! (:builtin-metrics task-data)
-                                                                 executor-stats
-                                                                 (.getSourceComponent tuple)                                                      
-                                                                 (.getSourceStreamId tuple))
                              (stats/bolt-failed-tuple! executor-stats
                                                        (.getSourceComponent tuple)
                                                        (.getSourceStreamId tuple)

http://git-wip-us.apache.org/repos/asf/storm/blob/e75219b5/storm-core/src/clj/backtype/storm/daemon/task.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/task.clj b/storm-core/src/clj/backtype/storm/daemon/task.clj
index 9cf2b85..0a38860 100644
--- a/storm-core/src/clj/backtype/storm/daemon/task.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/task.clj
@@ -145,11 +145,9 @@
               (throw (IllegalArgumentException. "Cannot emitDirect to a task expecting a regular grouping")))                          
             (apply-hooks user-context .emit (EmitInfo. values stream task-id [out-task-id]))
             (when (emit-sampler)
-              (builtin-metrics/emitted-tuple! (:builtin-metrics task-data) executor-stats stream)
               (stats/emitted-tuple! executor-stats stream)
               (if out-task-id
-                (stats/transferred-tuples! executor-stats stream 1)
-                (builtin-metrics/transferred-tuple! (:builtin-metrics task-data) executor-stats stream 1)))
+                (stats/transferred-tuples! executor-stats stream 1)))
             (if out-task-id [out-task-id])
             ))
         ([^String stream ^List values]
@@ -168,9 +166,7 @@
              (apply-hooks user-context .emit (EmitInfo. values stream task-id out-tasks))
              (when (emit-sampler)
                (stats/emitted-tuple! executor-stats stream)
-               (builtin-metrics/emitted-tuple! (:builtin-metrics task-data) executor-stats stream)              
-               (stats/transferred-tuples! executor-stats stream (count out-tasks))
-               (builtin-metrics/transferred-tuple! (:builtin-metrics task-data) executor-stats stream (count out-tasks)))
+               (stats/transferred-tuples! executor-stats stream (count out-tasks)))
              out-tasks)))
     ))
 
@@ -180,7 +176,7 @@
     :task-id task-id
     :system-context (system-topology-context (:worker executor-data) executor-data task-id)
     :user-context (user-topology-context (:worker executor-data) executor-data task-id)
-    :builtin-metrics (builtin-metrics/make-data (:type executor-data))
+    :builtin-metrics (builtin-metrics/make-data (:type executor-data) (:stats executor-data))
     :tasks-fn (mk-tasks-fn <>)
     :object (get-task-object (.getRawTopology ^TopologyContext (:system-context <>)) (:component-id executor-data))))
 

http://git-wip-us.apache.org/repos/asf/storm/blob/e75219b5/storm-core/src/clj/backtype/storm/stats.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/stats.clj b/storm-core/src/clj/backtype/storm/stats.clj
index dabc6f7..efc4965 100644
--- a/storm-core/src/clj/backtype/storm/stats.clj
+++ b/storm-core/src/clj/backtype/storm/stats.clj
@@ -24,216 +24,137 @@
             ExecutorAggregateStats SpecificAggregateStats
             SpoutAggregateStats TopologyPageInfo TopologyStats])
   (:import [backtype.storm.utils Utils])
+  (:import [backtype.storm.metric.internal MultiCountStatAndMetric MultiLatencyStatAndMetric])
   (:use [backtype.storm log util])
   (:use [clojure.math.numeric-tower :only [ceil]]))
 
-;;TODO: consider replacing this with some sort of RRD
-
 (def TEN-MIN-IN-SECONDS (* 10 60))
 
-(defn curr-time-bucket
-  [^Integer time-secs ^Integer bucket-size-secs]
-  (* bucket-size-secs (unchecked-divide-int time-secs bucket-size-secs)))
-
-(defrecord RollingWindow
-  [updater merger extractor bucket-size-secs num-buckets buckets])
-
-(defn rolling-window
-  [updater merger extractor bucket-size-secs num-buckets]
-  (RollingWindow. updater merger extractor bucket-size-secs num-buckets {}))
-
-(defn update-rolling-window
-  ([^RollingWindow rw time-secs & args]
-   ;; this is 2.5x faster than using update-in...
-   (let [time-bucket (curr-time-bucket time-secs (:bucket-size-secs rw))
-         buckets (:buckets rw)
-         curr (get buckets time-bucket)
-         curr (apply (:updater rw) curr args)]
-     (assoc rw :buckets (assoc buckets time-bucket curr)))))
-
-(defn value-rolling-window
-  [^RollingWindow rw]
-  ((:extractor rw)
-   (let [values (vals (:buckets rw))]
-     (apply (:merger rw) values))))
-
-(defn cleanup-rolling-window
-  [^RollingWindow rw]
-  (let [buckets (:buckets rw)
-        cutoff (- (current-time-secs)
-                  (* (:num-buckets rw)
-                     (:bucket-size-secs rw)))
-        to-remove (filter #(< % cutoff) (keys buckets))
-        buckets (apply dissoc buckets to-remove)]
-    (assoc rw :buckets buckets)))
-
-(defn rolling-window-size
-  [^RollingWindow rw]
-  (* (:bucket-size-secs rw) (:num-buckets rw)))
-
-(defrecord RollingWindowSet [updater extractor windows all-time])
-
-(defn rolling-window-set [updater merger extractor num-buckets & bucket-sizes]
-  (RollingWindowSet. updater extractor (dofor [s bucket-sizes] (rolling-window updater merger extractor s num-buckets)) nil)
-  )
-
-(defn update-rolling-window-set
-  ([^RollingWindowSet rws & args]
-   (let [now (current-time-secs)
-         new-windows (dofor [w (:windows rws)]
-                            (apply update-rolling-window w now args))]
-     (assoc rws
-       :windows new-windows
-       :all-time (apply (:updater rws) (:all-time rws) args)))))
-
-(defn cleanup-rolling-window-set
-  ([^RollingWindowSet rws]
-   (let [windows (:windows rws)]
-     (assoc rws :windows (map cleanup-rolling-window windows)))))
-
-(defn value-rolling-window-set
-  [^RollingWindowSet rws]
-  (merge
-    (into {}
-          (for [w (:windows rws)]
-            {(rolling-window-size w) (value-rolling-window w)}
-            ))
-    {:all-time ((:extractor rws) (:all-time rws))}))
-
-(defn- incr-val
-  ([amap key]
-   (incr-val amap key 1))
-  ([amap key amt]
-   (let [val (get amap key (long 0))]
-     (assoc amap key (+ val amt)))))
-
-(defn- update-avg
-  [curr val]
-  (if curr
-    [(+ (first curr) val) (inc (second curr))]
-    [val (long 1)]))
-
-(defn- merge-avg
-  [& avg]
-  [(apply + (map first avg))
-   (apply + (map second avg))
-   ])
-
-(defn- extract-avg
-  [pair]
-  (double (/ (first pair) (second pair))))
-
-(defn- update-keyed-avg
-  [amap key val]
-  (assoc amap key (update-avg (get amap key) val)))
-
-(defn- merge-keyed-avg [& vals]
-  (apply merge-with merge-avg vals))
-
-(defn- extract-keyed-avg [vals]
-  (map-val extract-avg vals))
-
-(defn- counter-extract [v]
-  (if v v {}))
-
-(defn keyed-counter-rolling-window-set
-  [num-buckets & bucket-sizes]
-  (apply rolling-window-set incr-val (partial merge-with +) counter-extract num-buckets bucket-sizes))
-
-(defn avg-rolling-window-set
-  [num-buckets & bucket-sizes]
-  (apply rolling-window-set update-avg merge-avg extract-avg num-buckets bucket-sizes))
-
-(defn keyed-avg-rolling-window-set
-  [num-buckets & bucket-sizes]
-  (apply rolling-window-set update-keyed-avg merge-keyed-avg extract-keyed-avg num-buckets bucket-sizes))
-
 (def COMMON-FIELDS [:emitted :transferred])
-(defrecord CommonStats [emitted transferred rate])
+(defrecord CommonStats [^MultiCountStatAndMetric emitted
+                        ^MultiCountStatAndMetric transferred
+                        rate])
 
 (def BOLT-FIELDS [:acked :failed :process-latencies :executed :execute-latencies])
 ;;acked and failed count individual tuples
-(defrecord BoltExecutorStats [common acked failed process-latencies executed execute-latencies])
+(defrecord BoltExecutorStats [^CommonStats common
+                              ^MultiCountStatAndMetric acked
+                              ^MultiCountStatAndMetric failed
+                              ^MultiLatencyStatAndMetric process-latencies
+                              ^MultiCountStatAndMetric executed
+                              ^MultiLatencyStatAndMetric execute-latencies])
 
 (def SPOUT-FIELDS [:acked :failed :complete-latencies])
 ;;acked and failed count tuple completion
-(defrecord SpoutExecutorStats [common acked failed complete-latencies])
+(defrecord SpoutExecutorStats [^CommonStats common
+                               ^MultiCountStatAndMetric acked
+                               ^MultiCountStatAndMetric failed
+                               ^MultiLatencyStatAndMetric complete-latencies])
 
 (def NUM-STAT-BUCKETS 20)
-;; 10 minutes, 3 hours, 1 day
-(def STAT-BUCKETS [30 540 4320])
 
 (defn- mk-common-stats
   [rate]
   (CommonStats.
-    (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
-    (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
+    (MultiCountStatAndMetric. NUM-STAT-BUCKETS)
+    (MultiCountStatAndMetric. NUM-STAT-BUCKETS)
     rate))
 
 (defn mk-bolt-stats
   [rate]
   (BoltExecutorStats.
     (mk-common-stats rate)
-    (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
-    (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
-    (atom (apply keyed-avg-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
-    (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
-    (atom (apply keyed-avg-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))))
+    (MultiCountStatAndMetric. NUM-STAT-BUCKETS)
+    (MultiCountStatAndMetric. NUM-STAT-BUCKETS)
+    (MultiLatencyStatAndMetric. NUM-STAT-BUCKETS)
+    (MultiCountStatAndMetric. NUM-STAT-BUCKETS)
+    (MultiLatencyStatAndMetric. NUM-STAT-BUCKETS)))
 
 (defn mk-spout-stats
   [rate]
   (SpoutExecutorStats.
     (mk-common-stats rate)
-    (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
-    (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
-    (atom (apply keyed-avg-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))))
-
-(defmacro update-executor-stat!
-  [stats path & args]
-  (let [path (collectify path)]
-    `(swap! (-> ~stats ~@path) update-rolling-window-set ~@args)))
+    (MultiCountStatAndMetric. NUM-STAT-BUCKETS)
+    (MultiCountStatAndMetric. NUM-STAT-BUCKETS)
+    (MultiLatencyStatAndMetric. NUM-STAT-BUCKETS)))
 
 (defmacro stats-rate
   [stats]
   `(-> ~stats :common :rate))
 
+(defmacro stats-emitted
+  [stats]
+  `(-> ~stats :common :emitted))
+
+(defmacro stats-transferred
+  [stats]
+  `(-> ~stats :common :transferred))
+
+(defmacro stats-executed
+  [stats]
+  `(:executed ~stats))
+
+(defmacro stats-acked
+  [stats]
+  `(:acked ~stats))
+
+(defmacro stats-failed
+  [stats]
+  `(:failed ~stats))
+
+(defmacro stats-execute-latencies
+  [stats]
+  `(:execute-latencies ~stats))
+
+(defmacro stats-process-latencies
+  [stats]
+  `(:process-latencies ~stats))
+
+(defmacro stats-complete-latencies
+  [stats]
+  `(:complete-latencies ~stats))
+
 (defn emitted-tuple!
   [stats stream]
-  (update-executor-stat! stats [:common :emitted] stream (stats-rate stats)))
+  (.incBy ^MultiCountStatAndMetric (stats-emitted stats) stream (stats-rate stats)))
 
 (defn transferred-tuples!
   [stats stream amt]
-  (update-executor-stat! stats [:common :transferred] stream (* (stats-rate stats) amt)))
+  (.incBy ^MultiCountStatAndMetric (stats-transferred stats) stream (* (stats-rate stats) amt)))
 
 (defn bolt-execute-tuple!
   [^BoltExecutorStats stats component stream latency-ms]
-  (let [key [component stream]]
-    (update-executor-stat! stats :executed key (stats-rate stats))
-    (update-executor-stat! stats :execute-latencies key latency-ms)))
+  (let [key [component stream]
+        ^MultiCountStatAndMetric executed (stats-executed stats)
+        ^MultiLatencyStatAndMetric exec-lat (stats-execute-latencies stats)]
+    (.incBy executed key (stats-rate stats))
+    (.record exec-lat key latency-ms)))
 
 (defn bolt-acked-tuple!
   [^BoltExecutorStats stats component stream latency-ms]
-  (let [key [component stream]]
-    (update-executor-stat! stats :acked key (stats-rate stats))
-    (update-executor-stat! stats :process-latencies key latency-ms)))
+  (let [key [component stream]
+        ^MultiCountStatAndMetric acked (stats-acked stats)
+        ^MultiLatencyStatAndMetric process-lat (stats-process-latencies stats)]
+    (.incBy acked key (stats-rate stats))
+    (.record process-lat key latency-ms)))
 
 (defn bolt-failed-tuple!
   [^BoltExecutorStats stats component stream latency-ms]
-  (let [key [component stream]]
-    (update-executor-stat! stats :failed key (stats-rate stats))))
+  (let [key [component stream]
+        ^MultiCountStatAndMetric failed (stats-failed stats)]
+    (.incBy failed key (stats-rate stats))))
 
 (defn spout-acked-tuple!
   [^SpoutExecutorStats stats stream latency-ms]
-  (update-executor-stat! stats :acked stream (stats-rate stats))
-  (update-executor-stat! stats :complete-latencies stream latency-ms))
+  (.incBy ^MultiCountStatAndMetric (stats-acked stats) stream (stats-rate stats))
+  (.record ^MultiLatencyStatAndMetric (stats-complete-latencies stats) stream latency-ms))
 
 (defn spout-failed-tuple!
   [^SpoutExecutorStats stats stream latency-ms]
-  (update-executor-stat! stats :failed stream (stats-rate stats))
-  )
+  (.incBy ^MultiCountStatAndMetric (stats-failed stats) stream (stats-rate stats)))
 
 (defn- cleanup-stat! [stat]
-  (swap! stat cleanup-rolling-window-set))
+  (.close stat))
 
 (defn- cleanup-common-stats!
   [^CommonStats stats]
@@ -255,7 +176,9 @@
 (defn- value-stats
   [stats fields]
   (into {} (dofor [f fields]
-                  [f (value-rolling-window-set @(f stats))])))
+                  [f (if (instance? MultiCountStatAndMetric (f stats))
+                         (.getTimeCounts ^MultiCountStatAndMetric (f stats))
+                         (.getTimeLatAvg ^MultiLatencyStatAndMetric (f stats)))])))
 
 (defn- value-common-stats
   [^CommonStats stats]

http://git-wip-us.apache.org/repos/asf/storm/blob/e75219b5/storm-core/src/jvm/backtype/storm/metric/internal/CountStatAndMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/internal/CountStatAndMetric.java b/storm-core/src/jvm/backtype/storm/metric/internal/CountStatAndMetric.java
new file mode 100644
index 0000000..194a9e2
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/metric/internal/CountStatAndMetric.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.metric.internal;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicLong;
+
+import backtype.storm.metric.api.IMetric;
+
+/**
+ * Acts as a Count Metric, but also keeps track of approximate counts
+ * for the last 10 mins, 3 hours, 1 day, and all time.
+ */
+public class CountStatAndMetric implements IMetric{
+    private final AtomicLong _currentBucket;
+    // All internal state except for the count of the current bucket are
+    // protected using a lock on this counter
+    private long _bucketStart;
+
+    //exact variable time, that is added to the current bucket
+    private long _exactExtra;
+ 
+    //10 min values
+    private final int _tmSize;
+    private final long[] _tmBuckets;
+    private final long[] _tmTime;
+    
+    //3 hour values
+    private final int _thSize;
+    private final long[] _thBuckets;
+    private final long[] _thTime;
+
+    //1 day values
+    private final int _odSize;
+    private final long[] _odBuckets;
+    private final long[] _odTime;
+ 
+    //all time
+    private long _allTime;
+
+    private final TimerTask _task;
+
+    /**
+     * @param numBuckets the number of buckets to divide the time periods into.
+     */
+    public CountStatAndMetric(int numBuckets) {
+        this(numBuckets, -1);
+    }
+
+    /**
+     * Constructor
+     * @param numBuckets the number of buckets to divide the time periods into.
+     * @param startTime if positive the simulated time to start the from.
+     */
+    CountStatAndMetric(int numBuckets, long startTime){
+        numBuckets = Math.max(numBuckets, 2);
+        //We want to capture the full time range, so the target size is as
+        // if we had one bucket less, then we do
+        _tmSize = 10 * 60 * 1000 / (numBuckets - 1);
+        _thSize = 3 * 60 * 60 * 1000 / (numBuckets - 1);
+        _odSize = 24 * 60 * 60 * 1000 / (numBuckets - 1);
+        if (_tmSize < 1 || _thSize < 1 || _odSize < 1) {
+            throw new IllegalArgumentException("number of buckets is too large to be supported");
+        }
+        _tmBuckets = new long[numBuckets];
+        _tmTime = new long[numBuckets];
+        _thBuckets = new long[numBuckets];
+        _thTime = new long[numBuckets];
+        _odBuckets = new long[numBuckets];
+        _odTime = new long[numBuckets];
+        _allTime = 0;
+        _exactExtra = 0;
+
+        _bucketStart = startTime >= 0 ? startTime : System.currentTimeMillis();
+        _currentBucket = new AtomicLong(0);
+        if (startTime < 0) {
+            _task = new Fresher();
+            MetricStatTimer._timer.scheduleAtFixedRate(_task, _tmSize, _tmSize);
+        } else {
+            _task = null;
+        }
+    }
+
+    /**
+     * Increase the count by the given value.
+     *
+     * @param count number to count
+     */
+    public void incBy(long count) {
+        _currentBucket.addAndGet(count);
+    }
+
+   
+
+    @Override
+    public synchronized Object getValueAndReset() {
+        return getValueAndReset(System.currentTimeMillis());
+    }
+
+    synchronized Object getValueAndReset(long now) {
+        long value = _currentBucket.getAndSet(0);
+        long timeSpent = now - _bucketStart;
+        long ret = value + _exactExtra;
+        _bucketStart = now;
+        _exactExtra = 0;
+        rotateBuckets(value, timeSpent);
+        return ret;
+    }
+
+    synchronized void rotateSched(long now) {
+        long value = _currentBucket.getAndSet(0);
+        long timeSpent = now - _bucketStart;
+        _exactExtra += value;
+        _bucketStart = now;
+        rotateBuckets(value, timeSpent);
+    }
+
+    synchronized void rotateBuckets(long value, long timeSpent) {
+        rotate(value, timeSpent, _tmSize, _tmTime, _tmBuckets);
+        rotate(value, timeSpent, _thSize, _thTime, _thBuckets);
+        rotate(value, timeSpent, _odSize, _odTime, _odBuckets);
+        _allTime += value;
+    }
+
+    private synchronized void rotate(long value, long timeSpent, long targetSize, long [] times, long [] buckets) {
+        times[0] += timeSpent;
+        buckets[0] += value;
+
+        long currentTime = 0;
+        long currentVal = 0;
+        if (times[0] >= targetSize) {
+            for (int i = 0; i < buckets.length; i++) {
+                long tmpTime = times[i];
+                times[i] = currentTime;
+                currentTime = tmpTime;
+
+                long cnt = buckets[i];
+                buckets[i] = currentVal;
+                currentVal = cnt;
+            }
+        }
+    }
+
+    /**
+     * @return a map of time window to count.
+     * Keys are "600" for last 10 mins
+     * "10800" for the last 3 hours
+     * "86400" for the last day
+     * ":all-time" for all time
+     */
+    public synchronized Map<String, Long> getTimeCounts() {
+        return getTimeCounts(System.currentTimeMillis());
+    }
+
+    synchronized Map<String, Long> getTimeCounts(long now) {
+        Map<String, Long> ret = new HashMap<>();
+        long value = _currentBucket.get();
+        long timeSpent = now - _bucketStart;
+        ret.put("600", readApproximateTime(value, timeSpent, _tmTime, _tmBuckets, 600 * 1000));
+        ret.put("10800", readApproximateTime(value, timeSpent, _thTime, _thBuckets, 10800 * 1000));
+        ret.put("86400", readApproximateTime(value, timeSpent, _odTime, _odBuckets, 86400 * 1000));
+        ret.put(":all-time", value + _allTime);
+        return ret;
+    }
+
+    long readApproximateTime(long value, long timeSpent, long[] bucketTime, long[] buckets, long desiredTime) {
+        long timeNeeded = desiredTime - timeSpent;
+        long total = value;
+        for (int i = 0; i < bucketTime.length; i++) {
+            if (timeNeeded < bucketTime[i]) {
+                double pct = timeNeeded/((double)bucketTime[i]);
+                total += (long)(pct * buckets[i]);
+                timeNeeded = 0;
+                break;
+            }
+            total += buckets[i];
+            timeNeeded -= bucketTime[i];
+        }
+        return total;
+    }
+
+    public void close() {
+        if (_task != null) {
+            _task.cancel();
+        }
+    }
+
+    private class Fresher extends TimerTask {
+        public void run () {
+            rotateSched(System.currentTimeMillis());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e75219b5/storm-core/src/jvm/backtype/storm/metric/internal/LatencyStatAndMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/internal/LatencyStatAndMetric.java b/storm-core/src/jvm/backtype/storm/metric/internal/LatencyStatAndMetric.java
new file mode 100644
index 0000000..31bad41
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/metric/internal/LatencyStatAndMetric.java
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.metric.internal;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicLong;
+
+import backtype.storm.metric.api.IMetric;
+
+/**
+ * Acts as a Latency Metric, but also keeps track of approximate latency
+ * for the last 10 mins, 3 hours, 1 day, and all time.
+ */
+public class LatencyStatAndMetric implements IMetric {
+    //The current lat and count buckets are protected by a different lock
+    // from the other buckets.  This is to reduce the lock contention
+    // When doing complex calculations.  Never grab the instance object lock
+    // while holding _currentLock to avoid deadlocks
+    private final Object _currentLock = new byte[0];
+    private long _currentLatBucket;
+    private long _currentCountBucket;
+
+    // All internal state except for the current buckets are
+    // protected using the Object Lock
+    private long _bucketStart;
+
+    //exact variable time, that is added to the current bucket
+    private long _exactExtraLat;
+    private long _exactExtraCount;
+ 
+    //10 min values
+    private final int _tmSize;
+    private final long[] _tmLatBuckets;
+    private final long[] _tmCountBuckets;
+    private final long[] _tmTime;
+    
+    //3 hour values
+    private final int _thSize;
+    private final long[] _thLatBuckets;
+    private final long[] _thCountBuckets;
+    private final long[] _thTime;
+
+    //1 day values
+    private final int _odSize;
+    private final long[] _odLatBuckets;
+    private final long[] _odCountBuckets;
+    private final long[] _odTime;
+ 
+    //all time
+    private long _allTimeLat;
+    private long _allTimeCount;
+
+    private final TimerTask _task;
+
+    /**
+     * @param numBuckets the number of buckets to divide the time periods into.
+     */
+    public LatencyStatAndMetric(int numBuckets) {
+        this(numBuckets, -1);
+    }
+
+    /**
+     * Constructor
+     * @param numBuckets the number of buckets to divide the time periods into.
+     * @param startTime if positive the simulated time to start the from.
+     */
+    LatencyStatAndMetric(int numBuckets, long startTime){
+        numBuckets = Math.max(numBuckets, 2);
+        //We want to capture the full time range, so the target size is as
+        // if we had one bucket less, then we do
+        _tmSize = 10 * 60 * 1000 / (numBuckets - 1);
+        _thSize = 3 * 60 * 60 * 1000 / (numBuckets - 1);
+        _odSize = 24 * 60 * 60 * 1000 / (numBuckets - 1);
+        if (_tmSize < 1 || _thSize < 1 || _odSize < 1) {
+            throw new IllegalArgumentException("number of buckets is too large to be supported");
+        }
+        _tmLatBuckets = new long[numBuckets];
+        _tmCountBuckets = new long[numBuckets];
+        _tmTime = new long[numBuckets];
+        _thLatBuckets = new long[numBuckets];
+        _thCountBuckets = new long[numBuckets];
+        _thTime = new long[numBuckets];
+        _odLatBuckets = new long[numBuckets];
+        _odCountBuckets = new long[numBuckets];
+        _odTime = new long[numBuckets];
+        _allTimeLat = 0;
+        _allTimeCount = 0;
+        _exactExtraLat = 0;
+        _exactExtraCount = 0;
+
+        _bucketStart = startTime >= 0 ? startTime : System.currentTimeMillis();
+        _currentLatBucket = 0;
+        _currentCountBucket = 0;
+        if (startTime < 0) {
+            _task = new Fresher();
+            MetricStatTimer._timer.scheduleAtFixedRate(_task, _tmSize, _tmSize);
+        } else {
+            _task = null;
+        }
+    }
+
+    /**
+     * Record a specific latency
+     *
+     * @param latency what we are recording
+     */
+    public void record(long latency) {
+        synchronized(_currentLock) {
+            _currentLatBucket += latency;
+            _currentCountBucket++;
+        }
+    }
+
+    @Override
+    public synchronized Object getValueAndReset() {
+        return getValueAndReset(System.currentTimeMillis());
+    }
+
+    synchronized Object getValueAndReset(long now) {
+        long lat;
+        long count;
+        synchronized(_currentLock) {
+            lat = _currentLatBucket;
+            count = _currentCountBucket;
+            _currentLatBucket = 0;
+            _currentCountBucket = 0;
+        }
+
+        long timeSpent = now - _bucketStart;
+        double ret = ((double)(lat + _exactExtraLat))/(count + _exactExtraCount);
+        _bucketStart = now;
+        _exactExtraLat = 0;
+        _exactExtraCount = 0;
+        rotateBuckets(lat, count, timeSpent);
+        return ret;
+    }
+
+    synchronized void rotateSched(long now) {
+        long lat;
+        long count;
+        synchronized(_currentLock) {
+            lat = _currentLatBucket;
+            count = _currentCountBucket;
+            _currentLatBucket = 0;
+            _currentCountBucket = 0;
+        }
+
+        long timeSpent = now - _bucketStart;
+        _exactExtraLat += lat;
+        _exactExtraCount += count;
+        _bucketStart = now;
+        rotateBuckets(lat, count, timeSpent);
+    }
+
+    synchronized void rotateBuckets(long lat, long count, long timeSpent) {
+        rotate(lat, count, timeSpent, _tmSize, _tmTime, _tmLatBuckets, _tmCountBuckets);
+        rotate(lat, count, timeSpent, _thSize, _thTime, _thLatBuckets, _thCountBuckets);
+        rotate(lat, count, timeSpent, _odSize, _odTime, _odLatBuckets, _odCountBuckets);
+        _allTimeLat += lat;
+        _allTimeCount += count;
+    }
+
+    private synchronized void rotate(long lat, long count, long timeSpent, long targetSize,
+            long [] times, long [] latBuckets, long [] countBuckets) {
+        times[0] += timeSpent;
+        latBuckets[0] += lat;
+        countBuckets[0] += count;
+
+        long currentTime = 0;
+        long currentLat = 0;
+        long currentCount = 0;
+        if (times[0] >= targetSize) {
+            for (int i = 0; i < latBuckets.length; i++) {
+                long tmpTime = times[i];
+                times[i] = currentTime;
+                currentTime = tmpTime;
+
+                long lt = latBuckets[i];
+                latBuckets[i] = currentLat;
+                currentLat = lt;
+
+                long cnt = countBuckets[i];
+                countBuckets[i] = currentCount;
+                currentCount = cnt;
+            }
+        }
+    }
+
+    /**
+     * @return a map of time window to average latency.
+     * Keys are "600" for last 10 mins
+     * "10800" for the last 3 hours
+     * "86400" for the last day
+     * ":all-time" for all time
+     */
+    public synchronized Map<String, Double> getTimeLatAvg() {
+        return getTimeLatAvg(System.currentTimeMillis());
+    }
+
+    synchronized Map<String, Double> getTimeLatAvg(long now) {
+        Map<String, Double> ret = new HashMap<>();
+        long lat;
+        long count;
+        synchronized(_currentLock) {
+            lat = _currentLatBucket;
+            count = _currentCountBucket;
+        }
+        long timeSpent = now - _bucketStart;
+        ret.put("600", readApproximateLatAvg(lat, count, timeSpent, _tmTime, _tmLatBuckets, _tmCountBuckets, 600 * 1000));
+        ret.put("10800", readApproximateLatAvg(lat, count, timeSpent, _thTime, _thLatBuckets, _thCountBuckets, 10800 * 1000));
+        ret.put("86400", readApproximateLatAvg(lat, count, timeSpent, _odTime, _odLatBuckets, _odCountBuckets, 86400 * 1000));
+        ret.put(":all-time", ((double)lat + _allTimeLat)/(count + _allTimeCount));
+        return ret;
+    }
+
+    double readApproximateLatAvg(long lat, long count, long timeSpent, long[] bucketTime,
+              long[] latBuckets, long[] countBuckets, long desiredTime) {
+        long timeNeeded = desiredTime - timeSpent;
+        long totalLat = lat;
+        long totalCount = count;
+        for (int i = 0; i < bucketTime.length && timeNeeded > 0; i++) {
+            //Don't pro-rate anything, it is all approximate so an extra bucket is not that bad.
+            totalLat += latBuckets[i];
+            totalCount += countBuckets[i];
+            timeNeeded -= bucketTime[i];
+        }
+        return ((double)totalLat)/totalCount;
+    }
+
+    public void close() {
+        if (_task != null) {
+            _task.cancel();
+        }
+    }
+
+    private class Fresher extends TimerTask {
+        public void run () {
+            rotateSched(System.currentTimeMillis());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e75219b5/storm-core/src/jvm/backtype/storm/metric/internal/MetricStatTimer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/internal/MetricStatTimer.java b/storm-core/src/jvm/backtype/storm/metric/internal/MetricStatTimer.java
new file mode 100644
index 0000000..5f48793
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/metric/internal/MetricStatTimer.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.metric.internal;
+
+import java.util.Timer;
+
+/**
+ * Just holds a singleton metric/stat timer for use by metric/stat calculations
+ */
+class MetricStatTimer {
+    static Timer _timer = new Timer("metric/stat timer", true);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e75219b5/storm-core/src/jvm/backtype/storm/metric/internal/MultiCountStatAndMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/internal/MultiCountStatAndMetric.java b/storm-core/src/jvm/backtype/storm/metric/internal/MultiCountStatAndMetric.java
new file mode 100644
index 0000000..5fae4bf
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/metric/internal/MultiCountStatAndMetric.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.metric.internal;
+
+import java.util.Map;
+import java.util.List;
+import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+import backtype.storm.metric.api.IMetric;
+
+/**
+ * Acts as a MultiCount Metric, but keeps track of approximate counts
+ * for the last 10 mins, 3 hours, 1 day, and all time. for the same keys
+ */
+public class MultiCountStatAndMetric<T> implements IMetric {
+    private ConcurrentHashMap<T, CountStatAndMetric> _counts = new ConcurrentHashMap<>();
+    private final int _numBuckets;
+
+    /**
+     * @param numBuckets the number of buckets to divide the time periods into.
+     */
+    public MultiCountStatAndMetric(int numBuckets) {
+        _numBuckets = numBuckets;
+    }
+
+    CountStatAndMetric get(T key) {
+        CountStatAndMetric c = _counts.get(key);
+        if (c == null) {
+            synchronized(this) {
+                c = _counts.get(key);
+                if (c == null) {
+                    c = new CountStatAndMetric(_numBuckets);
+                    _counts.put(key, c);
+                }
+            }
+        }
+        return c;
+    }
+
+    /**
+     * Increase the count by the given value.
+     *
+     * @param count number to count
+     */
+    public void incBy(T key, long count) {
+        get(key).incBy(count);
+    }
+
+    protected String keyToString(T key) {
+        if (key instanceof List) {
+            //This is a bit of a hack.  If it is a list, then it is [component, stream]
+            //we want to format this as component:stream
+            List<String> lk = (List<String>)key;
+            return lk.get(0) + ":" + lk.get(1);
+        }
+        return key.toString();
+    }
+
+    @Override
+    public Object getValueAndReset() {
+        Map<String, Long> ret = new HashMap<String, Long>();
+        for (Map.Entry<T, CountStatAndMetric> entry: _counts.entrySet()) {
+            String key = keyToString(entry.getKey());
+            //There could be collisions if keyToString returns only part of a result.
+            Long val = (Long)entry.getValue().getValueAndReset();
+            Long other = ret.get(key);
+            val += other == null ? 0l : other;
+            ret.put(key, val);
+        }
+        return ret;
+    }
+
+    public Map<String, Map<T, Long>> getTimeCounts() {
+        Map<String, Map<T, Long>> ret = new HashMap<>();
+        for (Map.Entry<T, CountStatAndMetric> entry: _counts.entrySet()) {
+            T key = entry.getKey();
+            Map<String, Long> toFlip = entry.getValue().getTimeCounts();
+            for (Map.Entry<String, Long> subEntry: toFlip.entrySet()) {
+                String time = subEntry.getKey();
+                Map<T, Long> tmp = ret.get(time);
+                if (tmp == null) {
+                    tmp = new HashMap<>();
+                    ret.put(time, tmp);
+                }
+                tmp.put(key, subEntry.getValue());
+            }
+        }
+        return ret;
+    }
+
+    public void close() {
+        for (CountStatAndMetric cc: _counts.values()) {
+            cc.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e75219b5/storm-core/src/jvm/backtype/storm/metric/internal/MultiLatencyStatAndMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/internal/MultiLatencyStatAndMetric.java b/storm-core/src/jvm/backtype/storm/metric/internal/MultiLatencyStatAndMetric.java
new file mode 100644
index 0000000..032eef1
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/metric/internal/MultiLatencyStatAndMetric.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.metric.internal;
+
+import java.util.Map;
+import java.util.List;
+import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+import backtype.storm.metric.api.IMetric;
+
+/**
+ * Acts as a Latnecy Metric for multiple keys, but keeps track of approximate counts
+ * for the last 10 mins, 3 hours, 1 day, and all time. for the same keys
+ */
+public class MultiLatencyStatAndMetric<T> implements IMetric {
+    private ConcurrentHashMap<T, LatencyStatAndMetric> _lat = new ConcurrentHashMap<>();
+    private final int _numBuckets;
+
+    /**
+     * @param numBuckets the number of buckets to divide the time periods into.
+     */
+    public MultiLatencyStatAndMetric(int numBuckets) {
+        _numBuckets = numBuckets;
+    }
+
+    LatencyStatAndMetric get(T key) {
+        LatencyStatAndMetric c = _lat.get(key);
+        if (c == null) {
+            synchronized(this) {
+                c = _lat.get(key);
+                if (c == null) {
+                    c = new LatencyStatAndMetric(_numBuckets);
+                    _lat.put(key, c);
+                }
+            }
+        }
+        return c;
+    }
+
+    /**
+     * Record a latency value
+     *
+     * @param latency the measurement to record
+     */
+    public void record(T key, long latency) {
+        get(key).record(latency);
+    }
+
+    protected String keyToString(T key) {
+        if (key instanceof List) {
+            //This is a bit of a hack.  If it is a list, then it is [component, stream]
+            //we want to format this as component:stream
+            List<String> lk = (List<String>)key;
+            return lk.get(0) + ":" + lk.get(1);
+        }
+        return key.toString();
+    }
+
+    @Override
+    public Object getValueAndReset() {
+        Map<String, Double> ret = new HashMap<String, Double>();
+        for (Map.Entry<T, LatencyStatAndMetric> entry: _lat.entrySet()) {
+            String key = keyToString(entry.getKey());
+            Double val = (Double)entry.getValue().getValueAndReset();
+            ret.put(key, val);
+        }
+        return ret;
+    }
+
+    public Map<String, Map<T, Double>> getTimeLatAvg() {
+        Map<String, Map<T, Double>> ret = new HashMap<>();
+        for (Map.Entry<T, LatencyStatAndMetric> entry: _lat.entrySet()) {
+            T key = entry.getKey();
+            Map<String, Double> toFlip = entry.getValue().getTimeLatAvg();
+            for (Map.Entry<String, Double> subEntry: toFlip.entrySet()) {
+                String time = subEntry.getKey();
+                Map<T, Double> tmp = ret.get(time);
+                if (tmp == null) {
+                    tmp = new HashMap<>();
+                    ret.put(time, tmp);
+                }
+                tmp.put(key, subEntry.getValue());
+            }
+        }
+        return ret;
+    }
+
+    public void close() {
+        for (LatencyStatAndMetric l: _lat.values()) {
+            l.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e75219b5/storm-core/src/jvm/backtype/storm/metric/internal/RateTracker.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/internal/RateTracker.java b/storm-core/src/jvm/backtype/storm/metric/internal/RateTracker.java
new file mode 100644
index 0000000..f3eb6ae
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/metric/internal/RateTracker.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.metric.internal;
+
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * This class is a utility to track the rate of something.
+ */
+public class RateTracker{
+    private final int _bucketSizeMillis;
+    //Old Buckets and their length are only touched when rotating or gathering the metrics, which should not be that frequent
+    // As such all access to them should be protected by synchronizing with the RateTracker instance
+    private final long[] _bucketTime;
+    private final long[] _oldBuckets;
+    
+    private final AtomicLong _bucketStart;
+    private final AtomicLong _currentBucket;
+    
+    private final TimerTask _task;
+
+    /**
+     * @param validTimeWindowInMils events that happened before validTimeWindowInMils are not considered
+     *                        when reporting the rate.
+     * @param numBuckets the number of time sildes to divide validTimeWindows. The more buckets,
+     *                    the smother the reported results will be.
+     */
+    public RateTracker(int validTimeWindowInMils, int numBuckets) {
+        this(validTimeWindowInMils, numBuckets, -1);
+    }
+
+    /**
+     * Constructor
+     * @param validTimeWindowInMils events that happened before validTimeWindow are not considered
+     *                        when reporting the rate.
+     * @param numBuckets the number of time sildes to divide validTimeWindows. The more buckets,
+     *                    the smother the reported results will be.
+     * @param startTime if positive the simulated time to start the first bucket at.
+     */
+    RateTracker(int validTimeWindowInMils, int numBuckets, long startTime){
+        numBuckets = Math.max(numBuckets, 1);
+        _bucketSizeMillis = validTimeWindowInMils / numBuckets;
+        if (_bucketSizeMillis < 1 ) {
+            throw new IllegalArgumentException("validTimeWindowInMilis and numOfSildes cause each slide to have a window that is too small");
+        }
+        _bucketTime = new long[numBuckets - 1];
+        _oldBuckets = new long[numBuckets - 1];
+
+        _bucketStart = new AtomicLong(startTime >= 0 ? startTime : System.currentTimeMillis());
+        _currentBucket = new AtomicLong(0);
+        if (startTime < 0) {
+            _task = new Fresher();
+            MetricStatTimer._timer.scheduleAtFixedRate(_task, _bucketSizeMillis, _bucketSizeMillis);
+        } else {
+            _task = null;
+        }
+    }
+
+    /**
+     * Notify the tracker upon new arrivals
+     *
+     * @param count number of arrivals
+     */
+    public void notify(long count) {
+        _currentBucket.addAndGet(count);
+    }
+
+    /**
+     * @return the approximate average rate per second.
+     */
+    public synchronized double reportRate() {
+        return reportRate(System.currentTimeMillis());
+    }
+
+    synchronized double reportRate(long currentTime) {
+        long duration = Math.max(1l, currentTime - _bucketStart.get());
+        long events = _currentBucket.get();
+        for (int i = 0; i < _oldBuckets.length; i++) {
+            events += _oldBuckets[i];
+            duration += _bucketTime[i];
+        }
+
+        return events * 1000.0 / duration;
+    }
+
+    public void close() {
+        if (_task != null) {
+            _task.cancel();
+        }
+    }
+
+    /**
+     * Rotate the buckets a set number of times for testing purposes.
+     * @param numToEclipse the number of rotations to perform.
+     */
+    final void forceRotate(int numToEclipse, long interval) {
+        long time = _bucketStart.get();
+        for (int i = 0; i < numToEclipse; i++) {
+            time += interval;
+            rotateBuckets(time);
+        }
+    }
+
+    private synchronized void rotateBuckets(long time) {
+        long timeSpent = time - _bucketStart.getAndSet(time); 
+        long currentVal = _currentBucket.getAndSet(0);
+        for (int i = 0; i < _oldBuckets.length; i++) {
+            long tmpTime = _bucketTime[i];
+            _bucketTime[i] = timeSpent;
+            timeSpent = tmpTime;
+
+            long cnt = _oldBuckets[i];
+            _oldBuckets[i] = currentVal;
+            currentVal = cnt;
+        }
+    }
+
+    private class Fresher extends TimerTask {
+        public void run () {
+            rotateBuckets(System.currentTimeMillis());
+        }
+    }
+
+    public static void main (String args[]) throws Exception {
+        final int number = (args.length >= 1) ? Integer.parseInt(args[0]) : 100000000;
+        for (int i = 0; i < 10; i++) {
+            testRate(number);
+        }
+    }
+
+    private static void testRate(int number) {
+        RateTracker rt = new RateTracker(10000, 10);
+        long start = System.currentTimeMillis();
+        for (int i = 0; i < number; i++) {
+            rt.notify(1);
+            if ((i % 1000000) == 0) {
+                //There is an issue with some JVM versions where an integer for loop that takes a long time
+                // can starve other threads resulting in  the timer thread not getting called.
+                // This is a work around for that, and we still get the same results.
+                Thread.yield();
+            }
+        }
+        long end = System.currentTimeMillis();
+        double rate = rt.reportRate();
+        rt.close();
+        System.out.printf("time %,8d count %,8d rate %,15.2f reported rate %,15.2f\n", end-start,number, ((number * 1000.0)/(end-start)), rate);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e75219b5/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
index 3e71bca..fa073e7 100644
--- a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
@@ -37,6 +37,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import backtype.storm.metric.api.IStatefulObject;
+import backtype.storm.metric.internal.RateTracker;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/storm/blob/e75219b5/storm-core/src/jvm/backtype/storm/utils/RateTracker.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/RateTracker.java b/storm-core/src/jvm/backtype/storm/utils/RateTracker.java
deleted file mode 100644
index a490ecc..0000000
--- a/storm-core/src/jvm/backtype/storm/utils/RateTracker.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.utils;
-
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * This class is a utility to track the rate of something.
- */
-public class RateTracker{
-    private final int _bucketSizeMillis;
-    //Old Buckets and their length are only touched when rotating or gathering the metrics, which should not be that frequent
-    // As such all access to them should be protected by synchronizing with the RateTracker instance
-    private final long[] _bucketTime;
-    private final long[] _oldBuckets;
-    
-    private final AtomicLong _bucketStart;
-    private final AtomicLong _currentBucket;
-    
-    private final TimerTask _task;
-    private static Timer _timer = new Timer("rate tracker timer", true);
-
-    /**
-     * @param validTimeWindowInMils events that happened before validTimeWindowInMils are not considered
-     *                        when reporting the rate.
-     * @param numBuckets the number of time sildes to divide validTimeWindows. The more buckets,
-     *                    the smother the reported results will be.
-     */
-    public RateTracker(int validTimeWindowInMils, int numBuckets) {
-        this(validTimeWindowInMils, numBuckets, -1);
-    }
-
-    /**
-     * Constructor
-     * @param validTimeWindowInMils events that happened before validTimeWindow are not considered
-     *                        when reporting the rate.
-     * @param numBuckets the number of time sildes to divide validTimeWindows. The more buckets,
-     *                    the smother the reported results will be.
-     * @param startTime if positive the simulated time to start the first bucket at.
-     */
-    RateTracker(int validTimeWindowInMils, int numBuckets, long startTime){
-        numBuckets = Math.max(numBuckets, 1);
-        _bucketSizeMillis = validTimeWindowInMils / numBuckets;
-        if (_bucketSizeMillis < 1 ) {
-            throw new IllegalArgumentException("validTimeWindowInMilis and numOfSildes cause each slide to have a window that is too small");
-        }
-        _bucketTime = new long[numBuckets - 1];
-        _oldBuckets = new long[numBuckets - 1];
-
-        _bucketStart = new AtomicLong(startTime >= 0 ? startTime : System.currentTimeMillis());
-        _currentBucket = new AtomicLong(0);
-        if (startTime < 0) {
-            _task = new Fresher();
-            _timer.scheduleAtFixedRate(_task, _bucketSizeMillis, _bucketSizeMillis);
-        } else {
-            _task = null;
-        }
-    }
-
-    /**
-     * Notify the tracker upon new arrivals
-     *
-     * @param count number of arrivals
-     */
-    public void notify(long count) {
-        _currentBucket.addAndGet(count);
-    }
-
-    /**
-     * @return the approximate average rate per second.
-     */
-    public synchronized double reportRate() {
-        return reportRate(System.currentTimeMillis());
-    }
-
-    synchronized double reportRate(long currentTime) {
-        long duration = Math.max(1l, currentTime - _bucketStart.get());
-        long events = _currentBucket.get();
-        for (int i = 0; i < _oldBuckets.length; i++) {
-            events += _oldBuckets[i];
-            duration += _bucketTime[i];
-        }
-
-        return events * 1000.0 / duration;
-    }
-
-    public void close() {
-        if (_task != null) {
-            _task.cancel();
-        }
-    }
-
-    /**
-     * Rotate the buckets a set number of times for testing purposes.
-     * @param numToEclipse the number of rotations to perform.
-     */
-    final void forceRotate(int numToEclipse, long interval) {
-        long time = _bucketStart.get();
-        for (int i = 0; i < numToEclipse; i++) {
-            time += interval;
-            rotateBuckets(time);
-        }
-    }
-
-    private synchronized void rotateBuckets(long time) {
-        long timeSpent = time - _bucketStart.getAndSet(time); 
-        long currentVal = _currentBucket.getAndSet(0);
-        for (int i = 0; i < _oldBuckets.length; i++) {
-            long tmpTime = _bucketTime[i];
-            _bucketTime[i] = timeSpent;
-            timeSpent = tmpTime;
-
-            long cnt = _oldBuckets[i];
-            _oldBuckets[i] = currentVal;
-            currentVal = cnt;
-        }
-    }
-
-    private class Fresher extends TimerTask {
-        public void run () {
-            rotateBuckets(System.currentTimeMillis());
-        }
-    }
-
-    public static void main (String args[]) throws Exception {
-        final int number = (args.length >= 1) ? Integer.parseInt(args[0]) : 100000000;
-        for (int i = 0; i < 10; i++) {
-            testRate(number);
-        }
-    }
-
-    private static void testRate(int number) {
-        RateTracker rt = new RateTracker(10000, 10);
-        long start = System.currentTimeMillis();
-        for (int i = 0; i < number; i++) {
-            rt.notify(1);
-            if ((i % 1000000) == 0) {
-                //There is an issue with some JVM versions where an integer for loop that takes a long time
-                // can starve other threads resulting in  the timer thread not getting called.
-                // This is a work around for that, and we still get the same results.
-                Thread.yield();
-            }
-        }
-        long end = System.currentTimeMillis();
-        double rate = rt.reportRate();
-        rt.close();
-        System.out.printf("time %,8d count %,8d rate %,15.2f reported rate %,15.2f\n", end-start,number, ((number * 1000.0)/(end-start)), rate);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e75219b5/storm-core/test/jvm/backtype/storm/metric/internal/CountStatAndMetricTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/metric/internal/CountStatAndMetricTest.java b/storm-core/test/jvm/backtype/storm/metric/internal/CountStatAndMetricTest.java
new file mode 100644
index 0000000..7fa8087
--- /dev/null
+++ b/storm-core/test/jvm/backtype/storm/metric/internal/CountStatAndMetricTest.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.metric.internal;
+
+import java.util.Map;
+import java.util.HashMap;
+
+import org.junit.Test;
+import junit.framework.TestCase;
+import static org.junit.Assert.*;
+
+/**
+ * Unit test for CountStatAndMetric
+ */
+public class CountStatAndMetricTest extends TestCase {
+    final long TEN_MIN = 10 * 60 * 1000;
+    final long THIRTY_SEC = 30 * 1000;
+    final long THREE_HOUR = 3 * 60 * 60 * 1000;
+    final long ONE_DAY = 24 * 60 * 60 * 1000;
+
+    @Test
+    public void testBasic() {
+        long time = 0l;
+        CountStatAndMetric count = new CountStatAndMetric(10, time);
+        while (time < TEN_MIN) {
+            //For this part of the test we interleve the differnt rotation types.
+            count.incBy(50);
+            time += THIRTY_SEC/2;
+            count.rotateSched(time);
+            count.incBy(50);
+            time += THIRTY_SEC/2;
+            assertEquals(100l, ((Long)count.getValueAndReset(time)).longValue());
+        }
+
+        long val = 100 * TEN_MIN/THIRTY_SEC;
+        Map<String, Long> expected = new HashMap<String, Long>();
+        expected.put("600", val);
+        expected.put("10800", val);
+        expected.put("86400", val);
+        expected.put(":all-time", val);
+        assertEquals(expected, count.getTimeCounts(time));
+
+        while (time < THREE_HOUR) {
+            count.incBy(100);
+            time += THIRTY_SEC;
+            assertEquals(100l, ((Long)count.getValueAndReset(time)).longValue());
+        }
+
+        val = 100 * THREE_HOUR/THIRTY_SEC;
+        expected = new HashMap<String, Long>();
+        expected.put("600", 100 * TEN_MIN/THIRTY_SEC);
+        expected.put("10800", val);
+        expected.put("86400", val);
+        expected.put(":all-time", val);
+        assertEquals(expected, count.getTimeCounts(time));
+
+        while (time < ONE_DAY) {
+            count.incBy(100);
+            time += THIRTY_SEC;
+            assertEquals(100l, ((Long)count.getValueAndReset(time)).longValue());
+        }
+
+        val = 100 * ONE_DAY/THIRTY_SEC;
+        expected = new HashMap<String, Long>();
+        expected.put("600", 100 * TEN_MIN/THIRTY_SEC);
+        expected.put("10800", 100 * THREE_HOUR/THIRTY_SEC);
+        expected.put("86400", val);
+        expected.put(":all-time", val);
+        assertEquals(expected, count.getTimeCounts(time));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e75219b5/storm-core/test/jvm/backtype/storm/metric/internal/LatencyStatAndMetricTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/metric/internal/LatencyStatAndMetricTest.java b/storm-core/test/jvm/backtype/storm/metric/internal/LatencyStatAndMetricTest.java
new file mode 100644
index 0000000..3b83d95
--- /dev/null
+++ b/storm-core/test/jvm/backtype/storm/metric/internal/LatencyStatAndMetricTest.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.metric.internal;
+
+import java.util.Map;
+import java.util.HashMap;
+
+import org.junit.Test;
+import junit.framework.TestCase;
+import static org.junit.Assert.*;
+
+/**
+ * Unit test for LatencyStatAndMetric
+ */
+public class LatencyStatAndMetricTest extends TestCase {
+    final long TEN_MIN = 10 * 60 * 1000;
+    final long THIRTY_SEC = 30 * 1000;
+    final long THREE_HOUR = 3 * 60 * 60 * 1000;
+    final long ONE_DAY = 24 * 60 * 60 * 1000;
+
+    @Test
+    public void testBasic() {
+        long time = 0l;
+        LatencyStatAndMetric lat = new LatencyStatAndMetric(10, time);
+        while (time < TEN_MIN) {
+            lat.record(100);
+            time += THIRTY_SEC;
+            assertEquals(100.0, ((Double)lat.getValueAndReset(time)).doubleValue(), 0.01);
+        }
+
+        Map<String, Double> found = lat.getTimeLatAvg(time);
+        assertEquals(4, found.size());
+        assertEquals(100.0, found.get("600").doubleValue(), 0.01);
+        assertEquals(100.0, found.get("10800").doubleValue(), 0.01);
+        assertEquals(100.0, found.get("86400").doubleValue(), 0.01);
+        assertEquals(100.0, found.get(":all-time").doubleValue(), 0.01);
+
+        while (time < THREE_HOUR) {
+            lat.record(200);
+            time += THIRTY_SEC;
+            assertEquals(200.0, ((Double)lat.getValueAndReset(time)).doubleValue(), 0.01);
+        }
+
+        double expected = ((100.0 * TEN_MIN/THIRTY_SEC) + (200.0 * (THREE_HOUR - TEN_MIN)/THIRTY_SEC)) /
+                          (THREE_HOUR/THIRTY_SEC);
+        found = lat.getTimeLatAvg(time);
+        assertEquals(4, found.size());
+        assertEquals(200.0, found.get("600").doubleValue(), 0.01); //flushed the buffers completely
+        assertEquals(expected, found.get("10800").doubleValue(), 0.01);
+        assertEquals(expected, found.get("86400").doubleValue(), 0.01);
+        assertEquals(expected, found.get(":all-time").doubleValue(), 0.01);
+
+        while (time < ONE_DAY) {
+            lat.record(300);
+            time += THIRTY_SEC;
+            assertEquals(300.0, ((Double)lat.getValueAndReset(time)).doubleValue(), 0.01);
+        }
+
+        expected = ((100.0 * TEN_MIN/THIRTY_SEC) + (200.0 * (THREE_HOUR - TEN_MIN)/THIRTY_SEC) + (300.0 * (ONE_DAY - THREE_HOUR)/THIRTY_SEC)) /
+                          (ONE_DAY/THIRTY_SEC);
+        found = lat.getTimeLatAvg(time);
+        assertEquals(4, found.size());
+        assertEquals(300.0, found.get("600").doubleValue(), 0.01); //flushed the buffers completely
+        assertEquals(300.0, found.get("10800").doubleValue(), 0.01);
+        assertEquals(expected, found.get("86400").doubleValue(), 0.01);
+        assertEquals(expected, found.get(":all-time").doubleValue(), 0.01);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e75219b5/storm-core/test/jvm/backtype/storm/metric/internal/RateTrackerTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/metric/internal/RateTrackerTest.java b/storm-core/test/jvm/backtype/storm/metric/internal/RateTrackerTest.java
new file mode 100644
index 0000000..debb922
--- /dev/null
+++ b/storm-core/test/jvm/backtype/storm/metric/internal/RateTrackerTest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.metric.internal;
+
+import org.junit.Test;
+import junit.framework.TestCase;
+import static org.junit.Assert.*;
+
+/**
+ * Unit test for RateTracker
+ */
+public class RateTrackerTest extends TestCase {
+
+    @Test
+    public void testExactRate() {
+        //This test is in two phases.  The first phase fills up the 10 buckets with 10 tuples each
+        // We purposely simulate a 1 second bucket size so the rate will always be 10 per second.
+        final long interval = 1000l;
+        long time = 0l;
+        RateTracker rt = new RateTracker(10000, 10, time);
+        double [] expected = new double[] {10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0};
+        for (int i = 0; i < expected.length; i++) {
+            double exp = expected[i];
+            rt.notify(10);
+            time += interval;
+            double actual = rt.reportRate(time);
+            rt.forceRotate(1, interval);
+            assertEquals("Expected rate on iteration "+i+" is wrong.", exp, actual, 0.00001);
+        }
+        //In the second part of the test the rate doubles to 20 per second but the rate tracker
+        // increases its result slowly as we push the 10 tuples per second buckets out and relpace them
+        // with 20 tuples per second. 
+        expected = new double[] {11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0, 18.0, 19.0, 20.0};
+        for (int i = 0; i < expected.length; i++) {
+            double exp = expected[i];
+            rt.notify(20);
+            time += interval;
+            double actual = rt.reportRate(time);
+            rt.forceRotate(1, interval);
+            assertEquals("Expected rate on iteration "+i+" is wrong.", exp, actual, 0.00001);
+        }
+    }
+
+
+    @Test
+    public void testEclipsedAllWindows() {
+        long time = 0;
+        RateTracker rt = new RateTracker(10000, 10, time);
+        rt.notify(10);
+        rt.forceRotate(10, 1000l);
+        assertEquals(0.0, rt.reportRate(10000l), 0.00001);
+    }
+
+    @Test
+    public void testEclipsedOneWindow() {
+        long time = 0;
+        RateTracker rt = new RateTracker(10000, 10, time);
+        rt.notify(1);
+        double r1 = rt.reportRate(1000l);
+        rt.forceRotate(1, 1000l);
+        rt.notify(1);
+        double r2 = rt.reportRate(2000l);
+
+        assertEquals(r1, r2, 0.00001);
+    }
+
+    @Test
+    public void testEclipsedNineWindows() {
+        long time = 0;
+        RateTracker rt = new RateTracker(10000, 10, time);
+        rt.notify(1);
+        double r1 = rt.reportRate(1000);
+        rt.forceRotate(9, 1000);
+        rt.notify(9);
+        double r2 = rt.reportRate(10000);
+
+        assertEquals(r1, r2, 0.00001);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e75219b5/storm-core/test/jvm/backtype/storm/utils/RateTrackerTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/utils/RateTrackerTest.java b/storm-core/test/jvm/backtype/storm/utils/RateTrackerTest.java
deleted file mode 100644
index 4f5eacb..0000000
--- a/storm-core/test/jvm/backtype/storm/utils/RateTrackerTest.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.utils;
-
-import org.junit.Test;
-import junit.framework.TestCase;
-import static org.junit.Assert.*;
-
-/**
- * Unit test for RateTracker
- */
-public class RateTrackerTest extends TestCase {
-
-    @Test
-    public void testExactRate() {
-        //This test is in two phases.  The first phase fills up the 10 buckets with 10 tuples each
-        // We purposely simulate a 1 second bucket size so the rate will always be 10 per second.
-        final long interval = 1000l;
-        long time = 0l;
-        RateTracker rt = new RateTracker(10000, 10, time);
-        double [] expected = new double[] {10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0};
-        for (int i = 0; i < expected.length; i++) {
-            double exp = expected[i];
-            rt.notify(10);
-            time += interval;
-            double actual = rt.reportRate(time);
-            rt.forceRotate(1, interval);
-            assertEquals("Expected rate on iteration "+i+" is wrong.", exp, actual, 0.00001);
-        }
-        //In the second part of the test the rate doubles to 20 per second but the rate tracker
-        // increases its result slowly as we push the 10 tuples per second buckets out and relpace them
-        // with 20 tuples per second. 
-        expected = new double[] {11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0, 18.0, 19.0, 20.0};
-        for (int i = 0; i < expected.length; i++) {
-            double exp = expected[i];
-            rt.notify(20);
-            time += interval;
-            double actual = rt.reportRate(time);
-            rt.forceRotate(1, interval);
-            assertEquals("Expected rate on iteration "+i+" is wrong.", exp, actual, 0.00001);
-        }
-    }
-
-
-    @Test
-    public void testEclipsedAllWindows() {
-        long time = 0;
-        RateTracker rt = new RateTracker(10000, 10, time);
-        rt.notify(10);
-        rt.forceRotate(10, 1000l);
-        assertEquals(0.0, rt.reportRate(10000l), 0.00001);
-    }
-
-    @Test
-    public void testEclipsedOneWindow() {
-        long time = 0;
-        RateTracker rt = new RateTracker(10000, 10, time);
-        rt.notify(1);
-        double r1 = rt.reportRate(1000l);
-        rt.forceRotate(1, 1000l);
-        rt.notify(1);
-        double r2 = rt.reportRate(2000l);
-
-        assertEquals(r1, r2, 0.00001);
-    }
-
-    @Test
-    public void testEclipsedNineWindows() {
-        long time = 0;
-        RateTracker rt = new RateTracker(10000, 10, time);
-        rt.notify(1);
-        double r1 = rt.reportRate(1000);
-        rt.forceRotate(9, 1000);
-        rt.notify(9);
-        double r2 = rt.reportRate(10000);
-
-        assertEquals(r1, r2, 0.00001);
-    }
-}


[2/4] storm git commit: Removed reflection from the critical path

Posted by sr...@apache.org.
Removed reflection from the critical path


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/16c33efa
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/16c33efa
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/16c33efa

Branch: refs/heads/master
Commit: 16c33efac39edea7e2d77df319d0ee3fd407383a
Parents: e75219b
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Sat Oct 24 10:27:11 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Sat Oct 24 10:27:45 2015 -0500

----------------------------------------------------------------------
 storm-core/src/clj/backtype/storm/daemon/executor.clj | 2 +-
 storm-core/src/clj/backtype/storm/daemon/worker.clj   | 4 ++--
 storm-core/src/clj/backtype/storm/stats.clj           | 4 ++--
 3 files changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/16c33efa/storm-core/src/clj/backtype/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/executor.clj b/storm-core/src/clj/backtype/storm/daemon/executor.clj
index a398b8a..fd8b886 100644
--- a/storm-core/src/clj/backtype/storm/daemon/executor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/executor.clj
@@ -568,7 +568,7 @@
                                                          (tasks-fn out-stream-id values))
                                              rooted? (and message-id has-ackers?)
                                              root-id (if rooted? (MessageId/generateId rand))
-                                             out-ids (fast-list-for [t out-tasks] (if rooted? (MessageId/generateId rand)))]
+                                             ^List out-ids (fast-list-for [t out-tasks] (if rooted? (MessageId/generateId rand)))]
                                          (fast-list-iter [out-task out-tasks id out-ids]
                                                          (let [tuple-id (if rooted?
                                                                           (MessageId/makeRootId root-id id)

http://git-wip-us.apache.org/repos/asf/storm/blob/16c33efa/storm-core/src/clj/backtype/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj
index e349a53..2b74f69 100644
--- a/storm-core/src/clj/backtype/storm/daemon/worker.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj
@@ -169,9 +169,9 @@
                   (do
                     (when (not (.get remoteMap task))
                       (.put remoteMap task (ArrayList.)))
-                    (let [remote (.get remoteMap task)]
+                    (let [^ArrayList remote (.get remoteMap task)]
                       (if (not-nil? task)
-                        (.add remote (TaskMessage. task (.serialize serializer tuple)))
+                        (.add remote (TaskMessage. ^int task ^bytes (.serialize serializer tuple)))
                         (log-warn "Can't transfer tuple - task value is nil. tuple type: " (pr-str (type tuple)) " and information: " (pr-str tuple)))
                      ))))
 

http://git-wip-us.apache.org/repos/asf/storm/blob/16c33efa/storm-core/src/clj/backtype/storm/stats.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/stats.clj b/storm-core/src/clj/backtype/storm/stats.clj
index efc4965..16a00ec 100644
--- a/storm-core/src/clj/backtype/storm/stats.clj
+++ b/storm-core/src/clj/backtype/storm/stats.clj
@@ -116,11 +116,11 @@
 
 (defn emitted-tuple!
   [stats stream]
-  (.incBy ^MultiCountStatAndMetric (stats-emitted stats) stream (stats-rate stats)))
+  (.incBy ^MultiCountStatAndMetric (stats-emitted stats) ^Object stream ^long (stats-rate stats)))
 
 (defn transferred-tuples!
   [stats stream amt]
-  (.incBy ^MultiCountStatAndMetric (stats-transferred stats) stream (* (stats-rate stats) amt)))
+  (.incBy ^MultiCountStatAndMetric (stats-transferred stats) ^Object stream ^long (* (stats-rate stats) amt)))
 
 (defn bolt-execute-tuple!
   [^BoltExecutorStats stats component stream latency-ms]


[4/4] storm git commit: Added STORM-1128 to CHANGELOG.

Posted by sr...@apache.org.
Added STORM-1128 to CHANGELOG.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3bc375e7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3bc375e7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3bc375e7

Branch: refs/heads/master
Commit: 3bc375e73b05b0ff1b390ce5957882a108068c00
Parents: 46b8078
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Sat Oct 24 18:17:14 2015 -0700
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Sat Oct 24 18:17:14 2015 -0700

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/3bc375e7/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 431148a..f069844 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 0.11.0
+ * STORM-1128: Make metrics fast
  * STORM-1122: Fix the format issue in Utils.java
  * STORM-1111: Fix Validation for lots of different configs
  * STORM-1125: Adding separate ZK client for read in Nimbus ZK State


[3/4] storm git commit: Fixed config validation

Posted by sr...@apache.org.
Fixed config validation


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/46b80788
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/46b80788
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/46b80788

Branch: refs/heads/master
Commit: 46b807881379270617ec56c298a2a687a16f45f1
Parents: 16c33ef
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Sat Oct 24 10:52:49 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Sat Oct 24 10:52:49 2015 -0500

----------------------------------------------------------------------
 storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/46b80788/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java b/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
index 1b0cd7f..9aa1205 100644
--- a/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
+++ b/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
@@ -467,7 +467,7 @@ public class ConfigValidation {
             }
 
             SimpleTypeValidator.validateField(name, String.class, ((Map) o).get("class"));
-            SimpleTypeValidator.validateField(name, long.class, ((Map) o).get("parallelism.hint"));
+            SimpleTypeValidator.validateField(name, Long.class, ((Map) o).get("parallelism.hint"));
         }
     }