You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2016/12/09 21:24:08 UTC

[2/4] storm git commit: add disruptor queue metrics

add disruptor queue metrics


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9f632b64
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9f632b64
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9f632b64

Branch: refs/heads/metrics_v2
Commit: 9f632b64a1583b3953753cc47756dc801457e65c
Parents: 6c93a0d
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Fri Nov 18 15:18:57 2016 -0500
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri Nov 18 15:18:57 2016 -0500

----------------------------------------------------------------------
 .../storm/starter/ExclamationTopology.java      |  2 +-
 .../apache/storm/starter/WordCountTopology.java |  2 +-
 .../hdfs/avro/ConfluentAvroSerializer.java      |  2 +-
 pom.xml                                         |  5 ++
 storm-core/pom.xml                              |  4 ++
 .../clj/org/apache/storm/daemon/executor.clj    |  2 +
 .../src/clj/org/apache/storm/daemon/worker.clj  |  6 +-
 .../src/clj/org/apache/storm/disruptor.clj      |  5 +-
 .../apache/storm/metrics2/DisruptorMetrics.java | 76 ++++++++++++++++++++
 .../org/apache/storm/metrics2/SimpleGauge.java  | 21 ++++++
 .../storm/metrics2/StormMetricRegistry.java     | 43 +++++++++--
 .../org/apache/storm/utils/DisruptorQueue.java  | 43 +++++++----
 .../utils/DisruptorQueueBackpressureTest.java   |  2 +-
 .../apache/storm/utils/DisruptorQueueTest.java  |  4 +-
 14 files changed, 187 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/9f632b64/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java
index 26e0430..9284b52 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java
@@ -79,7 +79,7 @@ public class ExclamationTopology {
 
       LocalCluster cluster = new LocalCluster();
       cluster.submitTopology("test", conf, builder.createTopology());
-      Utils.sleep(10000);
+      Utils.sleep(100000);
       cluster.killTopology("test");
       cluster.shutdown();
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/9f632b64/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
index e4a5711..0611894 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
@@ -98,7 +98,7 @@ public class WordCountTopology {
       LocalCluster cluster = new LocalCluster();
       cluster.submitTopology("word-count", conf, builder.createTopology());
 
-      Thread.sleep(10000);
+      Thread.sleep(60000);
 
       cluster.shutdown();
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/9f632b64/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java
index 2008a3e..087aec5 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java
@@ -27,7 +27,7 @@ import java.io.IOException;
 import java.util.Map;
 
 /**
- * This class provides a mechanism to utilize the Confluent Schema Registry (https://github.com/confluentinc/schema-registry)
+ * This class provides a mechanism to utilize the Confluent Schema StormMetricRegistry (https://github.com/confluentinc/schema-registry)
  * for Storm to (de)serialize Avro generic records across a topology.  It assumes the schema registry is up and running
  * completely independent of Storm.
  */

http://git-wip-us.apache.org/repos/asf/storm/blob/9f632b64/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index a2aec4d..d040099 100644
--- a/pom.xml
+++ b/pom.xml
@@ -840,6 +840,11 @@
                 <version>${metrics.version}</version>
             </dependency>
             <dependency>
+                <groupId>io.dropwizard.metrics</groupId>
+                <artifactId>metrics-graphite</artifactId>
+                <version>${metrics.version}</version>
+            </dependency>
+            <dependency>
                 <groupId>metrics-clojure</groupId>
                 <artifactId>metrics-clojure</artifactId>
                 <version>${metrics-clojure.version}</version>

http://git-wip-us.apache.org/repos/asf/storm/blob/9f632b64/storm-core/pom.xml
----------------------------------------------------------------------
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index 38a0811..21b50af 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -281,6 +281,10 @@
             <artifactId>metrics-core</artifactId>
         </dependency>
         <dependency>
+            <groupId>io.dropwizard.metrics</groupId>
+            <artifactId>metrics-graphite</artifactId>
+        </dependency>
+        <dependency>
             <groupId>metrics-clojure</groupId>
             <artifactId>metrics-clojure</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/storm/blob/9f632b64/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index f1af8e7..cdb9c7f 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -234,6 +234,8 @@
                                   (str "executor"  executor-id "-send-queue")
                                   (storm-conf TOPOLOGY-EXECUTOR-SEND-BUFFER-SIZE)
                                   (storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
+                                  (.getStormId worker-context)
+                                  (.getThisWorkerPort worker-context)
                                   :producer-type :single-threaded
                                   :batch-size (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE)
                                   :batch-timeout (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))

http://git-wip-us.apache.org/repos/asf/storm/blob/9f632b64/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index cca0011..3031513 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -205,12 +205,13 @@
           (transfer-fn serializer tuple-batch)))
       transfer-fn)))
 
-(defn- mk-receive-queue-map [storm-conf executors]
+(defn- mk-receive-queue-map [storm-conf executors storm-id port]
   (->> executors
        ;; TODO: this depends on the type of executor
        (map (fn [e] [e (disruptor/disruptor-queue (str "receive-queue" e)
                                                   (storm-conf TOPOLOGY-EXECUTOR-RECEIVE-BUFFER-SIZE)
                                                   (storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
+                                                  storm-id port
                                                   :batch-size (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE)
                                                   :batch-timeout (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))]))
        (into {})
@@ -255,9 +256,10 @@
         executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port assignment-versions))
         transfer-queue (disruptor/disruptor-queue "worker-transfer-queue" (storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE)
                                                   (storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
+                                                  storm-id port
                                                   :batch-size (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE)
                                                   :batch-timeout (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))
-        executor-receive-queue-map (mk-receive-queue-map storm-conf executors)
+        executor-receive-queue-map (mk-receive-queue-map storm-conf executors storm-id port)
 
         receive-queue-map (->> executor-receive-queue-map
                                (mapcat (fn [[e queue]] (for [t (executor-id->tasks e)] [t queue])))

http://git-wip-us.apache.org/repos/asf/storm/blob/9f632b64/storm-core/src/clj/org/apache/storm/disruptor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/disruptor.clj b/storm-core/src/clj/org/apache/storm/disruptor.clj
index 1546b3f..73a9d84 100644
--- a/storm-core/src/clj/org/apache/storm/disruptor.clj
+++ b/storm-core/src/clj/org/apache/storm/disruptor.clj
@@ -16,6 +16,7 @@
 
 (ns org.apache.storm.disruptor
   (:import [org.apache.storm.utils DisruptorQueue WorkerBackpressureCallback DisruptorBackpressureCallback])
+  (:import [org.apache.storm.task WorkerTopologyContext])
   (:import [com.lmax.disruptor.dsl ProducerType])
   (:require [clojure [string :as str]])
   (:require [clojure [set :as set]])
@@ -27,10 +28,10 @@
    :single-threaded ProducerType/SINGLE})
 
 (defnk disruptor-queue
-  [^String queue-name buffer-size timeout :producer-type :multi-threaded :batch-size 100 :batch-timeout 1]
+  [^String queue-name buffer-size timeout ^String storm-id ^Integer worker-port :producer-type :multi-threaded :batch-size 100 :batch-timeout 1]
   (DisruptorQueue. queue-name
                    (PRODUCER-TYPE producer-type) buffer-size
-                   timeout batch-size batch-timeout))
+                   timeout batch-size batch-timeout storm-id worker-port))
 
 (defn clojure-handler
   [afn]

http://git-wip-us.apache.org/repos/asf/storm/blob/9f632b64/storm-core/src/jvm/org/apache/storm/metrics2/DisruptorMetrics.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/DisruptorMetrics.java b/storm-core/src/jvm/org/apache/storm/metrics2/DisruptorMetrics.java
new file mode 100644
index 0000000..3c11f1a
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/DisruptorMetrics.java
@@ -0,0 +1,76 @@
+package org.apache.storm.metrics2;
+
+import org.apache.storm.utils.DisruptorQueue;
+
+public class DisruptorMetrics {
+    private SimpleGauge<Long> capacity;
+    private SimpleGauge<Long> population;
+    private SimpleGauge<Long> writePosition;
+    private SimpleGauge<Long> readPosition;
+    private SimpleGauge<Double> arrivalRate; // TODO: Change to meter
+    private SimpleGauge<Double> sojournTime;
+    private SimpleGauge<Long> overflow;
+    private SimpleGauge<Float> pctFull;
+
+
+    DisruptorMetrics(SimpleGauge<Long> capacity,
+                    SimpleGauge<Long> population,
+                    SimpleGauge<Long> writePosition,
+                    SimpleGauge<Long> readPosition,
+                    SimpleGauge<Double> arrivalRate,
+                    SimpleGauge<Double> sojournTime,
+                    SimpleGauge<Long> overflow,
+                    SimpleGauge<Float> pctFull) {
+        this.capacity = capacity;
+        this.population = population;
+        this.writePosition = writePosition;
+        this.readPosition = readPosition;
+        this.arrivalRate = arrivalRate;
+        this.sojournTime = sojournTime;
+        this.overflow = overflow;
+        this.pctFull = pctFull;
+    }
+
+    public void setCapacity(Long capacity) {
+        this.capacity.set(capacity);
+    }
+
+    public void setPopulation(Long population) {
+        this.population.set(population);
+    }
+
+    public void setWritePosition(Long writePosition) {
+        this.writePosition.set(writePosition);
+    }
+
+    public void setReadPosition(Long readPosition) {
+        this.readPosition.set(readPosition);
+    }
+
+    public void setArrivalRate(Double arrivalRate) {
+        this.arrivalRate.set(arrivalRate);
+    }
+
+    public void setSojournTime(Double soujournTime) {
+        this.sojournTime.set(soujournTime);
+    }
+
+    public void setOverflow(Long overflow) {
+        this.overflow.set(overflow);
+    }
+
+    public void setPercentFull(Float pctFull){
+        this.pctFull.set(pctFull);
+    }
+
+    public void set(DisruptorQueue.QueueMetrics metrics){
+        this.capacity.set(metrics.capacity());
+        this.population.set(metrics.population());
+        this.writePosition.set(metrics.writePos());
+        this.readPosition.set(metrics.readPos());
+        this.arrivalRate.set(metrics.arrivalRate());
+        this.sojournTime.set(metrics.sojournTime());
+        this.overflow.set(metrics.overflow());
+        this.pctFull.set(metrics.pctFull());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/9f632b64/storm-core/src/jvm/org/apache/storm/metrics2/SimpleGauge.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/SimpleGauge.java b/storm-core/src/jvm/org/apache/storm/metrics2/SimpleGauge.java
new file mode 100644
index 0000000..b88cc7f
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/SimpleGauge.java
@@ -0,0 +1,21 @@
+package org.apache.storm.metrics2;
+
+
+import com.codahale.metrics.Gauge;
+
+public class SimpleGauge<T> implements Gauge<T> {
+    private T value;
+
+    public SimpleGauge(T value){
+        this.value = value;
+    }
+
+    @Override
+    public T getValue() {
+        return this.value;
+    }
+
+    public void set(T value){
+        this.value = value;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/9f632b64/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
index 02dfac3..dd430ac 100644
--- a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
@@ -1,13 +1,14 @@
 package org.apache.storm.metrics2;
 
 
-import com.codahale.metrics.ConsoleReporter;
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.*;
+import com.codahale.metrics.graphite.Graphite;
+import com.codahale.metrics.graphite.GraphiteReporter;
 import org.apache.storm.task.WorkerTopologyContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.InetSocketAddress;
 import java.util.concurrent.TimeUnit;
 
 public class StormMetricRegistry {
@@ -16,21 +17,49 @@ public class StormMetricRegistry {
 
     private static final MetricRegistry REGISTRY = new MetricRegistry();
 
-    private static ConsoleReporter REPORTER;
+    private static ScheduledReporter REPORTER;
     static {
-        REPORTER = ConsoleReporter.forRegistry(REGISTRY)
+//        REPORTER = ConsoleReporter.forRegistry(REGISTRY)
+//                .convertRatesTo(TimeUnit.SECONDS)
+//                .convertDurationsTo(TimeUnit.MILLISECONDS)
+//                .build();
+
+
+        final Graphite graphite = new Graphite(new InetSocketAddress("graphite", 2003));
+        REPORTER = GraphiteReporter.forRegistry(REGISTRY)
                 .convertRatesTo(TimeUnit.SECONDS)
                 .convertDurationsTo(TimeUnit.MILLISECONDS)
-                .build();
+                .filter(MetricFilter.ALL)
+                .build(graphite);
+
         REPORTER.start(15, TimeUnit.SECONDS);
     }
 
 
+    public static <T> SimpleGauge<T>  gauge(T initialValue, String name, String topologyId, Integer port){
+        SimpleGauge<T> gauge = new SimpleGauge<>(initialValue);
+        String metricName = String.format("storm.worker.%s.%s-%s", topologyId, port, name);
+        return REGISTRY.register(metricName, gauge);
+    }
+
+    public static DisruptorMetrics disruptorMetrics(String name, String topologyId, Integer port){
+        return new DisruptorMetrics(
+                StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, port),
+                StormMetricRegistry.gauge(0L, name + "-population", topologyId, port),
+                StormMetricRegistry.gauge(0L, name + "-write-position", topologyId, port),
+                StormMetricRegistry.gauge(0L, name + "-read-position", topologyId, port),
+                StormMetricRegistry.gauge(0.0, name + "-arrival-rate", topologyId, port),
+                StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", topologyId, port),
+                StormMetricRegistry.gauge(0L, name + "-overflow", topologyId, port),
+                StormMetricRegistry.gauge(0.0F, name + "-percent-full", topologyId, port)
+        );
+    }
+
+
     public static Meter meter(String name, WorkerTopologyContext context, String componentId){
         // storm.worker.{topology}.{host}.{port}
         // TODO: hostname
         String metricName = String.format("storm.worker.%s.%s.%s-%s", context.getStormId(), componentId, context.getThisWorkerPort(), name);
-
         return REGISTRY.meter(metricName);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/9f632b64/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
index fbae1d1..09c0e0b 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
@@ -33,6 +33,9 @@ import com.lmax.disruptor.dsl.ProducerType;
 
 import org.apache.storm.metric.api.IStatefulObject;
 import org.apache.storm.metric.internal.RateTracker;
+import org.apache.storm.metrics2.DisruptorMetrics;
+import org.apache.storm.metrics2.StormMetricRegistry;
+import org.apache.storm.task.WorkerTopologyContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -62,6 +65,7 @@ public class DisruptorQueue implements IStatefulObject {
     private static final Object INTERRUPT = new Object();
     private static final String PREFIX = "disruptor-";
     private static final FlusherPool FLUSHER = new FlusherPool();
+    private static final Timer METRICS_TIMER = new Timer("disruptor-metrics-timer", true);
 
     private static class FlusherPool { 
     	private static final String THREAD_PREFIX = "disruptor-flush";
@@ -326,27 +330,31 @@ public class DisruptorQueue implements IStatefulObject {
             return (1.0F * population() / capacity());
         }
 
-        public Object getState() {
-            Map state = new HashMap<String, Object>();
+        public double arrivalRate(){
+            return _rateTracker.reportRate();
+        }
 
+        public double sojournTime(){
             // get readPos then writePos so it's never an under-estimate
             long rp = readPos();
             long wp = writePos();
-
-            final double arrivalRateInSecs = _rateTracker.reportRate();
+            final double arrivalRateInSecs = arrivalRate();
 
             //Assume the queue is stable, in which the arrival rate is equal to the consumption rate.
             // If this assumption does not hold, the calculation of sojourn time should also consider
             // departure rate according to Queuing Theory.
-            final double sojournTime = (wp - rp) / Math.max(arrivalRateInSecs, 0.00001) * 1000.0;
+            return (wp - rp) / Math.max(arrivalRateInSecs, 0.00001) * 1000.0;
+        }
 
+        public Object getState() {
+            Map state = new HashMap<String, Object>();
             state.put("capacity", capacity());
-            state.put("population", wp - rp);
-            state.put("write_pos", wp);
-            state.put("read_pos", rp);
-            state.put("arrival_rate_secs", arrivalRateInSecs);
-            state.put("sojourn_time_ms", sojournTime); //element sojourn time in milliseconds
-            state.put("overflow", _overflowCount.get());
+            state.put("population", population());
+            state.put("write_pos", writePos());
+            state.put("read_pos", readPos());
+            state.put("arrival_rate_secs", arrivalRate());
+            state.put("sojourn_time_ms", sojournTime()); //element sojourn time in milliseconds
+            state.put("overflow", overflow());
 
             return state;
         }
@@ -366,7 +374,8 @@ public class DisruptorQueue implements IStatefulObject {
     private final int _inputBatchSize;
     private final ConcurrentHashMap<Long, ThreadLocalInserter> _batchers = new ConcurrentHashMap<Long, ThreadLocalInserter>();
     private final Flusher _flusher;
-    private final QueueMetrics _metrics;
+    private final QueueMetrics _metrics; // old metrics API
+    private final DisruptorMetrics _disruptorMetrics;
 
     private String _queueName = "";
     private DisruptorBackpressureCallback _cb = null;
@@ -376,7 +385,7 @@ public class DisruptorQueue implements IStatefulObject {
     private final AtomicLong _overflowCount = new AtomicLong(0);
     private volatile boolean _throttleOn = false;
 
-    public DisruptorQueue(String queueName, ProducerType type, int size, long readTimeout, int inputBatchSize, long flushInterval) {
+    public DisruptorQueue(String queueName, ProducerType type, int size, long readTimeout, int inputBatchSize, long flushInterval, String topologyId, int port) {
         this._queueName = PREFIX + queueName;
         WaitStrategy wait;
         if (readTimeout <= 0) {
@@ -390,12 +399,20 @@ public class DisruptorQueue implements IStatefulObject {
         _barrier = _buffer.newBarrier();
         _buffer.addGatingSequences(_consumer);
         _metrics = new QueueMetrics();
+        _disruptorMetrics = StormMetricRegistry.disruptorMetrics(_queueName, topologyId, port);
         //The batch size can be no larger than half the full queue size.
         //This is mostly to avoid contention issues.
         _inputBatchSize = Math.max(1, Math.min(inputBatchSize, size/2));
 
         _flusher = new Flusher(Math.max(flushInterval, 1), _queueName);
         _flusher.start();
+
+        METRICS_TIMER.schedule(new TimerTask(){
+            @Override
+            public void run() {
+                _disruptorMetrics.set(_metrics);
+            }
+        }, 15000, 15000); // TODO: Configurable interval
     }
 
     public String getName() {

http://git-wip-us.apache.org/repos/asf/storm/blob/9f632b64/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java b/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java
index 7072e55..110fe88 100644
--- a/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java
+++ b/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java
@@ -105,6 +105,6 @@ public class DisruptorQueueBackpressureTest extends TestCase {
     }
 
     private static DisruptorQueue createQueue(String name, int queueSize) {
-        return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, 1, 1L);
+        return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, 1, 1L, "test", 1000);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/9f632b64/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java b/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java
index e7ac54e..c834cbb 100644
--- a/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java
+++ b/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java
@@ -178,10 +178,10 @@ public class DisruptorQueueTest extends TestCase {
     }
 
     private static DisruptorQueue createQueue(String name, int queueSize) {
-        return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, 1, 1L);
+        return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, 1, 1L, "test", 1000);
     }
 
     private static DisruptorQueue createQueue(String name, int batchSize, int queueSize) {
-        return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, batchSize, 1L);
+        return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, batchSize, 1L, "test", 1000);
     }
 }