You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2016/12/09 21:24:08 UTC
[2/4] storm git commit: add disruptor queue metrics
add disruptor queue metrics
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9f632b64
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9f632b64
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9f632b64
Branch: refs/heads/metrics_v2
Commit: 9f632b64a1583b3953753cc47756dc801457e65c
Parents: 6c93a0d
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Fri Nov 18 15:18:57 2016 -0500
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri Nov 18 15:18:57 2016 -0500
----------------------------------------------------------------------
.../storm/starter/ExclamationTopology.java | 2 +-
.../apache/storm/starter/WordCountTopology.java | 2 +-
.../hdfs/avro/ConfluentAvroSerializer.java | 2 +-
pom.xml | 5 ++
storm-core/pom.xml | 4 ++
.../clj/org/apache/storm/daemon/executor.clj | 2 +
.../src/clj/org/apache/storm/daemon/worker.clj | 6 +-
.../src/clj/org/apache/storm/disruptor.clj | 5 +-
.../apache/storm/metrics2/DisruptorMetrics.java | 76 ++++++++++++++++++++
.../org/apache/storm/metrics2/SimpleGauge.java | 21 ++++++
.../storm/metrics2/StormMetricRegistry.java | 43 +++++++++--
.../org/apache/storm/utils/DisruptorQueue.java | 43 +++++++----
.../utils/DisruptorQueueBackpressureTest.java | 2 +-
.../apache/storm/utils/DisruptorQueueTest.java | 4 +-
14 files changed, 187 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/9f632b64/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java
index 26e0430..9284b52 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java
@@ -79,7 +79,7 @@ public class ExclamationTopology {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
- Utils.sleep(10000);
+ Utils.sleep(100000);
cluster.killTopology("test");
cluster.shutdown();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/9f632b64/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
index e4a5711..0611894 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
@@ -98,7 +98,7 @@ public class WordCountTopology {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", conf, builder.createTopology());
- Thread.sleep(10000);
+ Thread.sleep(60000);
cluster.shutdown();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/9f632b64/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java
index 2008a3e..087aec5 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java
@@ -27,7 +27,7 @@ import java.io.IOException;
import java.util.Map;
/**
- * This class provides a mechanism to utilize the Confluent Schema Registry (https://github.com/confluentinc/schema-registry)
+ * This class provides a mechanism to utilize the Confluent Schema StormMetricRegistry (https://github.com/confluentinc/schema-registry)
* for Storm to (de)serialize Avro generic records across a topology. It assumes the schema registry is up and running
* completely independent of Storm.
*/
http://git-wip-us.apache.org/repos/asf/storm/blob/9f632b64/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index a2aec4d..d040099 100644
--- a/pom.xml
+++ b/pom.xml
@@ -840,6 +840,11 @@
<version>${metrics.version}</version>
</dependency>
<dependency>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-graphite</artifactId>
+ <version>${metrics.version}</version>
+ </dependency>
+ <dependency>
<groupId>metrics-clojure</groupId>
<artifactId>metrics-clojure</artifactId>
<version>${metrics-clojure.version}</version>
http://git-wip-us.apache.org/repos/asf/storm/blob/9f632b64/storm-core/pom.xml
----------------------------------------------------------------------
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index 38a0811..21b50af 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -281,6 +281,10 @@
<artifactId>metrics-core</artifactId>
</dependency>
<dependency>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-graphite</artifactId>
+ </dependency>
+ <dependency>
<groupId>metrics-clojure</groupId>
<artifactId>metrics-clojure</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/storm/blob/9f632b64/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index f1af8e7..cdb9c7f 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -234,6 +234,8 @@
(str "executor" executor-id "-send-queue")
(storm-conf TOPOLOGY-EXECUTOR-SEND-BUFFER-SIZE)
(storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
+ (.getStormId worker-context)
+ (.getThisWorkerPort worker-context)
:producer-type :single-threaded
:batch-size (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE)
:batch-timeout (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))
http://git-wip-us.apache.org/repos/asf/storm/blob/9f632b64/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index cca0011..3031513 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -205,12 +205,13 @@
(transfer-fn serializer tuple-batch)))
transfer-fn)))
-(defn- mk-receive-queue-map [storm-conf executors]
+(defn- mk-receive-queue-map [storm-conf executors storm-id port]
(->> executors
;; TODO: this depends on the type of executor
(map (fn [e] [e (disruptor/disruptor-queue (str "receive-queue" e)
(storm-conf TOPOLOGY-EXECUTOR-RECEIVE-BUFFER-SIZE)
(storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
+ storm-id port
:batch-size (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE)
:batch-timeout (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))]))
(into {})
@@ -255,9 +256,10 @@
executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port assignment-versions))
transfer-queue (disruptor/disruptor-queue "worker-transfer-queue" (storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE)
(storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
+ storm-id port
:batch-size (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE)
:batch-timeout (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))
- executor-receive-queue-map (mk-receive-queue-map storm-conf executors)
+ executor-receive-queue-map (mk-receive-queue-map storm-conf executors storm-id port)
receive-queue-map (->> executor-receive-queue-map
(mapcat (fn [[e queue]] (for [t (executor-id->tasks e)] [t queue])))
http://git-wip-us.apache.org/repos/asf/storm/blob/9f632b64/storm-core/src/clj/org/apache/storm/disruptor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/disruptor.clj b/storm-core/src/clj/org/apache/storm/disruptor.clj
index 1546b3f..73a9d84 100644
--- a/storm-core/src/clj/org/apache/storm/disruptor.clj
+++ b/storm-core/src/clj/org/apache/storm/disruptor.clj
@@ -16,6 +16,7 @@
(ns org.apache.storm.disruptor
(:import [org.apache.storm.utils DisruptorQueue WorkerBackpressureCallback DisruptorBackpressureCallback])
+ (:import [org.apache.storm.task WorkerTopologyContext])
(:import [com.lmax.disruptor.dsl ProducerType])
(:require [clojure [string :as str]])
(:require [clojure [set :as set]])
@@ -27,10 +28,10 @@
:single-threaded ProducerType/SINGLE})
(defnk disruptor-queue
- [^String queue-name buffer-size timeout :producer-type :multi-threaded :batch-size 100 :batch-timeout 1]
+ [^String queue-name buffer-size timeout ^String storm-id ^Integer worker-port :producer-type :multi-threaded :batch-size 100 :batch-timeout 1]
(DisruptorQueue. queue-name
(PRODUCER-TYPE producer-type) buffer-size
- timeout batch-size batch-timeout))
+ timeout batch-size batch-timeout storm-id worker-port))
(defn clojure-handler
[afn]
http://git-wip-us.apache.org/repos/asf/storm/blob/9f632b64/storm-core/src/jvm/org/apache/storm/metrics2/DisruptorMetrics.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/DisruptorMetrics.java b/storm-core/src/jvm/org/apache/storm/metrics2/DisruptorMetrics.java
new file mode 100644
index 0000000..3c11f1a
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/DisruptorMetrics.java
@@ -0,0 +1,76 @@
+package org.apache.storm.metrics2;
+
+import org.apache.storm.utils.DisruptorQueue;
+
+public class DisruptorMetrics {
+ private SimpleGauge<Long> capacity;
+ private SimpleGauge<Long> population;
+ private SimpleGauge<Long> writePosition;
+ private SimpleGauge<Long> readPosition;
+ private SimpleGauge<Double> arrivalRate; // TODO: Change to meter
+ private SimpleGauge<Double> sojournTime;
+ private SimpleGauge<Long> overflow;
+ private SimpleGauge<Float> pctFull;
+
+
+ DisruptorMetrics(SimpleGauge<Long> capacity,
+ SimpleGauge<Long> population,
+ SimpleGauge<Long> writePosition,
+ SimpleGauge<Long> readPosition,
+ SimpleGauge<Double> arrivalRate,
+ SimpleGauge<Double> sojournTime,
+ SimpleGauge<Long> overflow,
+ SimpleGauge<Float> pctFull) {
+ this.capacity = capacity;
+ this.population = population;
+ this.writePosition = writePosition;
+ this.readPosition = readPosition;
+ this.arrivalRate = arrivalRate;
+ this.sojournTime = sojournTime;
+ this.overflow = overflow;
+ this.pctFull = pctFull;
+ }
+
+ public void setCapacity(Long capacity) {
+ this.capacity.set(capacity);
+ }
+
+ public void setPopulation(Long population) {
+ this.population.set(population);
+ }
+
+ public void setWritePosition(Long writePosition) {
+ this.writePosition.set(writePosition);
+ }
+
+ public void setReadPosition(Long readPosition) {
+ this.readPosition.set(readPosition);
+ }
+
+ public void setArrivalRate(Double arrivalRate) {
+ this.arrivalRate.set(arrivalRate);
+ }
+
+ public void setSojournTime(Double soujournTime) {
+ this.sojournTime.set(soujournTime);
+ }
+
+ public void setOverflow(Long overflow) {
+ this.overflow.set(overflow);
+ }
+
+ public void setPercentFull(Float pctFull){
+ this.pctFull.set(pctFull);
+ }
+
+ public void set(DisruptorQueue.QueueMetrics metrics){
+ this.capacity.set(metrics.capacity());
+ this.population.set(metrics.population());
+ this.writePosition.set(metrics.writePos());
+ this.readPosition.set(metrics.readPos());
+ this.arrivalRate.set(metrics.arrivalRate());
+ this.sojournTime.set(metrics.sojournTime());
+ this.overflow.set(metrics.overflow());
+ this.pctFull.set(metrics.pctFull());
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/9f632b64/storm-core/src/jvm/org/apache/storm/metrics2/SimpleGauge.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/SimpleGauge.java b/storm-core/src/jvm/org/apache/storm/metrics2/SimpleGauge.java
new file mode 100644
index 0000000..b88cc7f
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/SimpleGauge.java
@@ -0,0 +1,21 @@
+package org.apache.storm.metrics2;
+
+
+import com.codahale.metrics.Gauge;
+
+public class SimpleGauge<T> implements Gauge<T> {
+ private T value;
+
+ public SimpleGauge(T value){
+ this.value = value;
+ }
+
+ @Override
+ public T getValue() {
+ return this.value;
+ }
+
+ public void set(T value){
+ this.value = value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/9f632b64/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
index 02dfac3..dd430ac 100644
--- a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
@@ -1,13 +1,14 @@
package org.apache.storm.metrics2;
-import com.codahale.metrics.ConsoleReporter;
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.*;
+import com.codahale.metrics.graphite.Graphite;
+import com.codahale.metrics.graphite.GraphiteReporter;
import org.apache.storm.task.WorkerTopologyContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
public class StormMetricRegistry {
@@ -16,21 +17,49 @@ public class StormMetricRegistry {
private static final MetricRegistry REGISTRY = new MetricRegistry();
- private static ConsoleReporter REPORTER;
+ private static ScheduledReporter REPORTER;
static {
- REPORTER = ConsoleReporter.forRegistry(REGISTRY)
+// REPORTER = ConsoleReporter.forRegistry(REGISTRY)
+// .convertRatesTo(TimeUnit.SECONDS)
+// .convertDurationsTo(TimeUnit.MILLISECONDS)
+// .build();
+
+
+ final Graphite graphite = new Graphite(new InetSocketAddress("graphite", 2003));
+ REPORTER = GraphiteReporter.forRegistry(REGISTRY)
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
- .build();
+ .filter(MetricFilter.ALL)
+ .build(graphite);
+
REPORTER.start(15, TimeUnit.SECONDS);
}
+ public static <T> SimpleGauge<T> gauge(T initialValue, String name, String topologyId, Integer port){
+ SimpleGauge<T> gauge = new SimpleGauge<>(initialValue);
+ String metricName = String.format("storm.worker.%s.%s-%s", topologyId, port, name);
+ return REGISTRY.register(metricName, gauge);
+ }
+
+ public static DisruptorMetrics disruptorMetrics(String name, String topologyId, Integer port){
+ return new DisruptorMetrics(
+ StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, port),
+ StormMetricRegistry.gauge(0L, name + "-population", topologyId, port),
+ StormMetricRegistry.gauge(0L, name + "-write-position", topologyId, port),
+ StormMetricRegistry.gauge(0L, name + "-read-position", topologyId, port),
+ StormMetricRegistry.gauge(0.0, name + "-arrival-rate", topologyId, port),
+ StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", topologyId, port),
+ StormMetricRegistry.gauge(0L, name + "-overflow", topologyId, port),
+ StormMetricRegistry.gauge(0.0F, name + "-percent-full", topologyId, port)
+ );
+ }
+
+
public static Meter meter(String name, WorkerTopologyContext context, String componentId){
// storm.worker.{topology}.{host}.{port}
// TODO: hostname
String metricName = String.format("storm.worker.%s.%s.%s-%s", context.getStormId(), componentId, context.getThisWorkerPort(), name);
-
return REGISTRY.meter(metricName);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/9f632b64/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
index fbae1d1..09c0e0b 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
@@ -33,6 +33,9 @@ import com.lmax.disruptor.dsl.ProducerType;
import org.apache.storm.metric.api.IStatefulObject;
import org.apache.storm.metric.internal.RateTracker;
+import org.apache.storm.metrics2.DisruptorMetrics;
+import org.apache.storm.metrics2.StormMetricRegistry;
+import org.apache.storm.task.WorkerTopologyContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,6 +65,7 @@ public class DisruptorQueue implements IStatefulObject {
private static final Object INTERRUPT = new Object();
private static final String PREFIX = "disruptor-";
private static final FlusherPool FLUSHER = new FlusherPool();
+ private static final Timer METRICS_TIMER = new Timer("disruptor-metrics-timer", true);
private static class FlusherPool {
private static final String THREAD_PREFIX = "disruptor-flush";
@@ -326,27 +330,31 @@ public class DisruptorQueue implements IStatefulObject {
return (1.0F * population() / capacity());
}
- public Object getState() {
- Map state = new HashMap<String, Object>();
+ public double arrivalRate(){
+ return _rateTracker.reportRate();
+ }
+ public double sojournTime(){
// get readPos then writePos so it's never an under-estimate
long rp = readPos();
long wp = writePos();
-
- final double arrivalRateInSecs = _rateTracker.reportRate();
+ final double arrivalRateInSecs = arrivalRate();
//Assume the queue is stable, in which the arrival rate is equal to the consumption rate.
// If this assumption does not hold, the calculation of sojourn time should also consider
// departure rate according to Queuing Theory.
- final double sojournTime = (wp - rp) / Math.max(arrivalRateInSecs, 0.00001) * 1000.0;
+ return (wp - rp) / Math.max(arrivalRateInSecs, 0.00001) * 1000.0;
+ }
+ public Object getState() {
+ Map state = new HashMap<String, Object>();
state.put("capacity", capacity());
- state.put("population", wp - rp);
- state.put("write_pos", wp);
- state.put("read_pos", rp);
- state.put("arrival_rate_secs", arrivalRateInSecs);
- state.put("sojourn_time_ms", sojournTime); //element sojourn time in milliseconds
- state.put("overflow", _overflowCount.get());
+ state.put("population", population());
+ state.put("write_pos", writePos());
+ state.put("read_pos", readPos());
+ state.put("arrival_rate_secs", arrivalRate());
+ state.put("sojourn_time_ms", sojournTime()); //element sojourn time in milliseconds
+ state.put("overflow", overflow());
return state;
}
@@ -366,7 +374,8 @@ public class DisruptorQueue implements IStatefulObject {
private final int _inputBatchSize;
private final ConcurrentHashMap<Long, ThreadLocalInserter> _batchers = new ConcurrentHashMap<Long, ThreadLocalInserter>();
private final Flusher _flusher;
- private final QueueMetrics _metrics;
+ private final QueueMetrics _metrics; // old metrics API
+ private final DisruptorMetrics _disruptorMetrics;
private String _queueName = "";
private DisruptorBackpressureCallback _cb = null;
@@ -376,7 +385,7 @@ public class DisruptorQueue implements IStatefulObject {
private final AtomicLong _overflowCount = new AtomicLong(0);
private volatile boolean _throttleOn = false;
- public DisruptorQueue(String queueName, ProducerType type, int size, long readTimeout, int inputBatchSize, long flushInterval) {
+ public DisruptorQueue(String queueName, ProducerType type, int size, long readTimeout, int inputBatchSize, long flushInterval, String topologyId, int port) {
this._queueName = PREFIX + queueName;
WaitStrategy wait;
if (readTimeout <= 0) {
@@ -390,12 +399,20 @@ public class DisruptorQueue implements IStatefulObject {
_barrier = _buffer.newBarrier();
_buffer.addGatingSequences(_consumer);
_metrics = new QueueMetrics();
+ _disruptorMetrics = StormMetricRegistry.disruptorMetrics(_queueName, topologyId, port);
//The batch size can be no larger than half the full queue size.
//This is mostly to avoid contention issues.
_inputBatchSize = Math.max(1, Math.min(inputBatchSize, size/2));
_flusher = new Flusher(Math.max(flushInterval, 1), _queueName);
_flusher.start();
+
+ METRICS_TIMER.schedule(new TimerTask(){
+ @Override
+ public void run() {
+ _disruptorMetrics.set(_metrics);
+ }
+ }, 15000, 15000); // TODO: Configurable interval
}
public String getName() {
http://git-wip-us.apache.org/repos/asf/storm/blob/9f632b64/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java b/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java
index 7072e55..110fe88 100644
--- a/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java
+++ b/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java
@@ -105,6 +105,6 @@ public class DisruptorQueueBackpressureTest extends TestCase {
}
private static DisruptorQueue createQueue(String name, int queueSize) {
- return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, 1, 1L);
+ return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, 1, 1L, "test", 1000);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/9f632b64/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java b/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java
index e7ac54e..c834cbb 100644
--- a/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java
+++ b/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java
@@ -178,10 +178,10 @@ public class DisruptorQueueTest extends TestCase {
}
private static DisruptorQueue createQueue(String name, int queueSize) {
- return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, 1, 1L);
+ return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, 1, 1L, "test", 1000);
}
private static DisruptorQueue createQueue(String name, int batchSize, int queueSize) {
- return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, batchSize, 1L);
+ return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, batchSize, 1L, "test", 1000);
}
}