You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/11/19 19:58:13 UTC
[4/5] incubator-streams git commit: STREAMS-216 | All JMX monitoring
beans now include identifying information and the time that the stream was
started
STREAMS-216 | All JMX monitoring beans now include identifying information and the time that the stream was started
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/fb8f9d20
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/fb8f9d20
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/fb8f9d20
Branch: refs/heads/STREAMS-216
Commit: fb8f9d209bf604a3b2127e22441e54b8e3259ad7
Parents: 9a77a8f
Author: Robert Douglas <rd...@w2ogroup.com>
Authored: Fri Nov 14 11:17:23 2014 -0600
Committer: Robert Douglas <rd...@w2ogroup.com>
Committed: Fri Nov 14 11:17:23 2014 -0600
----------------------------------------------------------------------
.../tasks/BroadcastMonitorThread.java | 14 ------
.../org/apache/streams/pojo/json/Broadcast.json | 4 ++
.../local/builders/LocalStreamBuilder.java | 35 ++++++++++---
.../streams/local/builders/StreamComponent.java | 31 +++++++-----
.../local/counters/DatumStatusCounter.java | 9 +++-
.../local/counters/StreamsTaskCounter.java | 16 ++++--
.../streams/local/queues/ThroughputQueue.java | 52 +++++++++++++++++---
.../streams/local/tasks/BaseStreamsTask.java | 39 ++++++++++++++-
.../streams/local/tasks/StreamsMergeTask.java | 7 ++-
.../local/tasks/StreamsPersistWriterTask.java | 11 +++--
.../local/tasks/StreamsProcessorTask.java | 14 ++++--
.../local/tasks/StreamsProviderTask.java | 11 +++--
.../local/counters/StreamsTaskCounterTest.java | 16 +++---
.../streams/local/tasks/BasicTasksTest.java | 6 +--
.../local/tasks/StreamsProviderTaskTest.java | 10 ++--
15 files changed, 202 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb8f9d20/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java b/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
index 25a5030..10b60b1 100644
--- a/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
+++ b/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
@@ -46,7 +46,6 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport imple
private ObjectMapper objectMapper;
private Map<String, Object> streamConfig;
private String broadcastURI = null;
- private String streamName = null;
private MessagePersister messagePersister;
private volatile boolean keepRunning;
@@ -56,7 +55,6 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport imple
server = ManagementFactory.getPlatformMBeanServer();
setBroadcastURI();
- setStreamName();
setWaitTime();
messagePersister = new BroadcastMessagePersister(broadcastURI);
@@ -108,7 +106,6 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport imple
}
if(broadcast != null) {
- broadcast.setStreamIdentifier(streamName);
messages.add(objectMapper.writeValueAsString(broadcast));
}
}
@@ -136,17 +133,6 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport imple
}
}
- private void setStreamName() {
- if (streamConfig != null &&
- streamConfig.containsKey("streamsID") &&
- streamConfig.get("streamsID") != null &&
- streamConfig.get("streamsID") instanceof String) {
- streamName = streamConfig.get("streamsID").toString();
- } else {
- streamName = "{\"streamName\":\"Unknown Stream\"}";
- }
- }
-
/**
* Go through streams config and set the thread's wait time (if present)
*/
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb8f9d20/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json b/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json
index 94ec147..687ef9c 100644
--- a/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json
+++ b/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json
@@ -12,6 +12,10 @@
"streamIdentifier": {
"type": "string",
"description": "The name of the Stream that is currently executing"
+ },
+ "startedAt": {
+ "type": "integer",
+ "description": "Milliseconds since epoch when this Stream was started"
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb8f9d20/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index e3e42ff..19d50e1 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -51,6 +51,8 @@ public class LocalStreamBuilder implements StreamBuilder {
public static final String BROADCAST_KEY = "broadcastURI";
public static final String STREAM_IDENTIFIER_KEY = "streamsID";
public static final String BROADCAST_INTERVAL_KEY = "monitoring_broadcast_interval_ms";
+ public static final String DEFAULT_STREAM_IDENTIFIER = "Unknown_Stream";
+ public static final String DEFAULT_STARTED_AT_KEY = "startedAt";
private Map<String, StreamComponent> providers;
private Map<String, StreamComponent> components;
@@ -65,6 +67,8 @@ public class LocalStreamBuilder implements StreamBuilder {
private Thread shutdownHook;
private BroadcastMonitorThread broadcastMonitor;
private int maxQueueCapacity;
+ private String streamIdentifier = DEFAULT_STREAM_IDENTIFIER;
+ private DateTime startedAt = new DateTime();
/**
* Creates a local stream builder with no config object and default maximum internal queue size of 500
@@ -112,6 +116,11 @@ public class LocalStreamBuilder implements StreamBuilder {
}
};
+ setStreamIdentifier();
+ if(this.streamConfig != null) {
+ this.streamConfig.put(DEFAULT_STARTED_AT_KEY, startedAt.getMillis());
+ }
+
this.broadcastMonitor = new BroadcastMonitorThread(this.streamConfig);
this.futures = new HashMap<>();
@@ -120,7 +129,7 @@ public class LocalStreamBuilder implements StreamBuilder {
@Override
public StreamBuilder newPerpetualStream(String id, StreamsProvider provider) {
validateId(id);
- this.providers.put(id, new StreamComponent(id, provider, true));
+ this.providers.put(id, new StreamComponent(id, provider, true, streamConfig));
++this.totalTasks;
if( provider instanceof DatumStatusCountable )
++this.monitorTasks;
@@ -130,7 +139,7 @@ public class LocalStreamBuilder implements StreamBuilder {
@Override
public StreamBuilder newReadCurrentStream(String id, StreamsProvider provider) {
validateId(id);
- this.providers.put(id, new StreamComponent(id, provider, false));
+ this.providers.put(id, new StreamComponent(id, provider, false, streamConfig));
++this.totalTasks;
if( provider instanceof DatumStatusCountable )
++this.monitorTasks;
@@ -140,7 +149,7 @@ public class LocalStreamBuilder implements StreamBuilder {
@Override
public StreamBuilder newReadNewStream(String id, StreamsProvider provider, BigInteger sequence) {
validateId(id);
- this.providers.put(id, new StreamComponent(id, provider, sequence));
+ this.providers.put(id, new StreamComponent(id, provider, sequence, streamConfig));
++this.totalTasks;
if( provider instanceof DatumStatusCountable )
++this.monitorTasks;
@@ -150,7 +159,7 @@ public class LocalStreamBuilder implements StreamBuilder {
@Override
public StreamBuilder newReadRangeStream(String id, StreamsProvider provider, DateTime start, DateTime end) {
validateId(id);
- this.providers.put(id, new StreamComponent(id, provider, start, end));
+ this.providers.put(id, new StreamComponent(id, provider, start, end, streamConfig));
++this.totalTasks;
if( provider instanceof DatumStatusCountable )
++this.monitorTasks;
@@ -160,7 +169,7 @@ public class LocalStreamBuilder implements StreamBuilder {
@Override
public StreamBuilder addStreamsProcessor(String id, StreamsProcessor processor, int numTasks, String... inBoundIds) {
validateId(id);
- StreamComponent comp = new StreamComponent(id, processor, new ThroughputQueue<StreamsDatum>(this.maxQueueCapacity, id), numTasks);
+ StreamComponent comp = new StreamComponent(id, processor, new ThroughputQueue<StreamsDatum>(this.maxQueueCapacity, id, streamIdentifier, startedAt.getMillis()), numTasks, streamConfig);
this.components.put(id, comp);
connectToOtherComponents(inBoundIds, comp);
this.totalTasks += numTasks;
@@ -172,7 +181,7 @@ public class LocalStreamBuilder implements StreamBuilder {
@Override
public StreamBuilder addStreamsPersistWriter(String id, StreamsPersistWriter writer, int numTasks, String... inBoundIds) {
validateId(id);
- StreamComponent comp = new StreamComponent(id, writer, new ThroughputQueue<StreamsDatum>(this.maxQueueCapacity, id), numTasks);
+ StreamComponent comp = new StreamComponent(id, writer, new ThroughputQueue<StreamsDatum>(this.maxQueueCapacity, id, streamIdentifier, startedAt.getMillis()), numTasks, streamConfig);
this.components.put(id, comp);
connectToOtherComponents(inBoundIds, comp);
this.totalTasks += numTasks;
@@ -281,7 +290,7 @@ public class LocalStreamBuilder implements StreamBuilder {
for(StreamComponent prov : this.providers.values()) {
StreamsTask task = prov.createConnectedTask(getTimeout());
task.setStreamConfig(this.streamConfig);
- StreamsTaskCounter counter = new StreamsTaskCounter(prov.getId());
+ StreamsTaskCounter counter = new StreamsTaskCounter(prov.getId(), streamIdentifier, startedAt.getMillis());
task.setStreamsTaskCounter(counter);
this.executor.submit(task);
provTasks.put(prov.getId(), (StreamsProviderTask) task);
@@ -296,7 +305,7 @@ public class LocalStreamBuilder implements StreamBuilder {
for(StreamComponent comp : this.components.values()) {
int tasks = comp.getNumTasks();
List<StreamsTask> compTasks = new LinkedList<StreamsTask>();
- StreamsTaskCounter counter = new StreamsTaskCounter(comp.getId());
+ StreamsTaskCounter counter = new StreamsTaskCounter(comp.getId(), streamIdentifier, startedAt.getMillis());
for(int i=0; i < tasks; ++i) {
StreamsTask task = comp.createConnectedTask(getTimeout());
task.setStreamsTaskCounter(counter);
@@ -410,4 +419,14 @@ public class LocalStreamBuilder implements StreamBuilder {
return streamConfig != null && streamConfig.containsKey(TIMEOUT_KEY) ? (Integer)streamConfig.get(TIMEOUT_KEY) : -1;
}
+ private void setStreamIdentifier() {
+ if(streamConfig.containsKey(STREAM_IDENTIFIER_KEY) &&
+ streamConfig.get(STREAM_IDENTIFIER_KEY) != null &&
+ streamConfig.get(STREAM_IDENTIFIER_KEY).toString().length() > 0) {
+ this.streamIdentifier = streamConfig.get(STREAM_IDENTIFIER_KEY).toString();
+ } else {
+ this.streamIdentifier = DEFAULT_STREAM_IDENTIFIER;
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb8f9d20/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
index 4b96c5b..0dcc4d0 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
@@ -51,15 +51,18 @@ public class StreamComponent {
private int numTasks = 1;
private boolean perpetual;
+ private Map<String, Object> streamConfig;
+
/**
*
* @param id
* @param provider
*/
- public StreamComponent(String id, StreamsProvider provider, boolean perpetual) {
+ public StreamComponent(String id, StreamsProvider provider, boolean perpetual, Map<String, Object> streamConfig) {
this.id = id;
this.provider = provider;
this.perpetual = perpetual;
+ this.streamConfig = streamConfig;
initializePrivateVariables();
}
@@ -70,12 +73,13 @@ public class StreamComponent {
* @param start
* @param end
*/
- public StreamComponent(String id, StreamsProvider provider, DateTime start, DateTime end) {
+ public StreamComponent(String id, StreamsProvider provider, DateTime start, DateTime end, Map<String, Object> streamConfig) {
this.id = id;
this.provider = provider;
this.dateRange = new DateTime[2];
this.dateRange[START] = start;
this.dateRange[END] = end;
+ this.streamConfig = streamConfig;
initializePrivateVariables();
}
@@ -86,10 +90,11 @@ public class StreamComponent {
* @param provider
* @param sequence
*/
- public StreamComponent(String id, StreamsProvider provider, BigInteger sequence) {
+ public StreamComponent(String id, StreamsProvider provider, BigInteger sequence, Map<String, Object> streamConfig) {
this.id = id;
this.provider = provider;
this.sequence = sequence;
+ this.streamConfig = streamConfig;
}
/**
@@ -99,11 +104,12 @@ public class StreamComponent {
* @param inQueue
* @param numTasks
*/
- public StreamComponent(String id, StreamsProcessor processor, BlockingQueue<StreamsDatum> inQueue, int numTasks) {
+ public StreamComponent(String id, StreamsProcessor processor, BlockingQueue<StreamsDatum> inQueue, int numTasks, Map<String, Object> streamConfig) {
this.id = id;
this.processor = processor;
this.inQueue = inQueue;
this.numTasks = numTasks;
+ this.streamConfig = streamConfig;
initializePrivateVariables();
}
@@ -114,11 +120,12 @@ public class StreamComponent {
* @param inQueue
* @param numTasks
*/
- public StreamComponent(String id, StreamsPersistWriter writer, BlockingQueue<StreamsDatum> inQueue, int numTasks) {
+ public StreamComponent(String id, StreamsPersistWriter writer, BlockingQueue<StreamsDatum> inQueue, int numTasks, Map<String, Object> streamConfig) {
this.id = id;
this.writer = writer;
this.inQueue = inQueue;
this.numTasks = numTasks;
+ this.streamConfig = streamConfig;
initializePrivateVariables();
}
@@ -187,13 +194,13 @@ public class StreamComponent {
StreamsTask task;
if(this.processor != null) {
if(this.numTasks > 1) {
- task = new StreamsProcessorTask((StreamsProcessor)SerializationUtil.cloneBySerialization(this.processor));
+ task = new StreamsProcessorTask((StreamsProcessor)SerializationUtil.cloneBySerialization(this.processor), streamConfig);
task.addInputQueue(this.inQueue);
for(BlockingQueue<StreamsDatum> q : this.outBound.values()) {
task.addOutputQueue(q);
}
} else {
- task = new StreamsProcessorTask(this.processor);
+ task = new StreamsProcessorTask(this.processor, streamConfig);
task.addInputQueue(this.inQueue);
for(BlockingQueue<StreamsDatum> q : this.outBound.values()) {
task.addOutputQueue(q);
@@ -202,10 +209,10 @@ public class StreamComponent {
}
else if(this.writer != null) {
if(this.numTasks > 1) {
- task = new StreamsPersistWriterTask((StreamsPersistWriter) SerializationUtil.cloneBySerialization(this.writer));
+ task = new StreamsPersistWriterTask((StreamsPersistWriter) SerializationUtil.cloneBySerialization(this.writer), streamConfig);
task.addInputQueue(this.inQueue);
} else {
- task = new StreamsPersistWriterTask(this.writer);
+ task = new StreamsPersistWriterTask(this.writer, streamConfig);
task.addInputQueue(this.inQueue);
}
}
@@ -217,11 +224,11 @@ public class StreamComponent {
prov = this.provider;
}
if(this.dateRange == null && this.sequence == null)
- task = new StreamsProviderTask(prov, this.perpetual);
+ task = new StreamsProviderTask(prov, this.perpetual, streamConfig);
else if(this.sequence != null)
- task = new StreamsProviderTask(prov, this.sequence);
+ task = new StreamsProviderTask(prov, this.sequence, streamConfig);
else
- task = new StreamsProviderTask(prov, this.dateRange[0], this.dateRange[1]);
+ task = new StreamsProviderTask(prov, this.dateRange[0], this.dateRange[1], streamConfig);
//Adjust the timeout if necessary
if(timeout != 0) {
((StreamsProviderTask)task).setTimeout(timeout);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb8f9d20/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java
index acada71..34d2bcc 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java
@@ -18,6 +18,7 @@
package org.apache.streams.local.counters;
import net.jcip.annotations.ThreadSafe;
+import org.apache.streams.local.builders.LocalStreamBuilder;
import org.apache.streams.util.ComponentUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,16 +33,20 @@ import java.util.concurrent.atomic.AtomicLong;
@ThreadSafe
public class DatumStatusCounter implements DatumStatusCounterMXBean{
- public static final String NAME_TEMPLATE = "org.apache.streams.local:type=DatumCounter,name=%s";
+ public static final String NAME_TEMPLATE = "org.apache.streams.local:type=DatumCounter,name=%s,identifier=%s,startedAt=%s";
private static final Logger LOGGER = LoggerFactory.getLogger(DatumStatusCounter.class);
private AtomicLong failed;
private AtomicLong passed;
public DatumStatusCounter(String id) {
+ this(id, LocalStreamBuilder.DEFAULT_STREAM_IDENTIFIER, -1);
+ }
+
+ public DatumStatusCounter(String id, String streamIdentifier, long startedAt) {
this.failed = new AtomicLong(0);
this.passed = new AtomicLong(0);
- ComponentUtils.registerLocalMBean(String.format(NAME_TEMPLATE, id), this);
+ ComponentUtils.registerLocalMBean(String.format(NAME_TEMPLATE, id, streamIdentifier, startedAt), this);
}
public void incrementFailedCount() {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb8f9d20/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
index 68c6364..9bd5d49 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
@@ -19,7 +19,9 @@ package org.apache.streams.local.counters;
import net.jcip.annotations.GuardedBy;
import net.jcip.annotations.ThreadSafe;
+import org.apache.streams.local.builders.LocalStreamBuilder;
import org.apache.streams.util.ComponentUtils;
+import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,9 +33,9 @@ import java.util.concurrent.atomic.AtomicLong;
*
*/
@ThreadSafe
-public class StreamsTaskCounter implements StreamsTaskCounterMXBean{
+public class StreamsTaskCounter implements StreamsTaskCounterMXBean {
- public static final String NAME_TEMPLATE = "org.apache.streams.local:type=StreamsTaskCounter,name=%s";
+ public static final String NAME_TEMPLATE = "org.apache.streams.local:type=StreamsTaskCounter,name=%s,identifier=%s,startedAt=%s";
private static final Logger LOGGER = LoggerFactory.getLogger(StreamsTaskCounter.class);
private AtomicLong emitted;
@@ -48,12 +50,20 @@ public class StreamsTaskCounter implements StreamsTaskCounterMXBean{
* @param id
*/
public StreamsTaskCounter(String id) {
+ this(id, LocalStreamBuilder.DEFAULT_STREAM_IDENTIFIER, -1);
+ }
+
+ /**
+ *
+ * @param id
+ */
+ public StreamsTaskCounter(String id, String streamId, long startedAt) {
this.emitted = new AtomicLong(0);
this.received = new AtomicLong(0);
this.errors = new AtomicLong(0);
this.totalTime = new AtomicLong(0);
this.maxTime = -1;
- ComponentUtils.registerLocalMBean(String.format(NAME_TEMPLATE, id), this);
+ ComponentUtils.registerLocalMBean(String.format(NAME_TEMPLATE, id, streamId, startedAt), this);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb8f9d20/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
index de1add3..e4a6ab9 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
@@ -17,6 +17,7 @@
*/
package org.apache.streams.local.queues;
+import org.apache.streams.local.builders.LocalStreamBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sun.reflect.generics.reflectiveObjects.NotImplementedException;
@@ -43,7 +44,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
*/
public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBean {
- public static final String NAME_TEMPLATE = "org.apache.streams.local:type=ThroughputQueue,name=%s";
+ public static final String NAME_TEMPLATE = "org.apache.streams.local:type=ThroughputQueue,name=%s,identifier=%s,startedAt=%s";
private static final Logger LOGGER = LoggerFactory.getLogger(ThroughputQueue.class);
@@ -60,7 +61,16 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
* Creates an unbounded, unregistered {@code ThroughputQueue}
*/
public ThroughputQueue() {
- this(-1, null);
+ this(-1, null, LocalStreamBuilder.DEFAULT_STREAM_IDENTIFIER, -1);
+ }
+
+ /**
+ *
+ * @param streamIdentifier
+ * @param startedAt
+ */
+ public ThroughputQueue(String streamIdentifier, long startedAt) {
+ this(-1, null, streamIdentifier, startedAt);
}
/**
@@ -69,7 +79,17 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
* @param maxSize maximum capacity of queue, if maxSize < 1 then unbounded
*/
public ThroughputQueue(int maxSize) {
- this(maxSize, null);
+ this(maxSize, null, LocalStreamBuilder.DEFAULT_STREAM_IDENTIFIER, -1);
+ }
+
+ /**
+ *
+ * @param maxSize
+ * @param streamIdentifier
+ * @param startedAt
+ */
+ public ThroughputQueue(int maxSize, String streamIdentifier, long startedAt) {
+ this(maxSize, null, streamIdentifier, startedAt);
}
/**
@@ -78,7 +98,27 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
* @param id unique id for this queue to be registered with. if id == NULL then not registered
*/
public ThroughputQueue(String id) {
- this(-1, id);
+ this(-1, id, LocalStreamBuilder.DEFAULT_STREAM_IDENTIFIER, -1);
+ }
+
+ /**
+ *
+ * @param id
+ * @param streamIdentifier
+ * @param startedAt
+ */
+ public ThroughputQueue(String id, String streamIdentifier, long startedAt) {
+ this(-1, id, streamIdentifier, startedAt);
+ }
+
+ /**
+ *
+ * @param maxSize
+ * @param id
+ */
+ public ThroughputQueue(int maxSize, String id) {
+ this(maxSize, id, LocalStreamBuilder.DEFAULT_STREAM_IDENTIFIER, -1);
+
}
/**
@@ -87,7 +127,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
* @param maxSize maximum capacity of queue, if maxSize < 1 then unbounded
* @param id unique id for this queue to be registered with. if id == NULL then not registered
*/
- public ThroughputQueue(int maxSize, String id) {
+ public ThroughputQueue(int maxSize, String id, String streamIdentifier, long startedAt) {
if (maxSize < 1) {
this.underlyingQueue = new LinkedBlockingQueue<>();
} else {
@@ -102,7 +142,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
this.totalQueueTime = new AtomicLong(0);
if (id != null) {
try {
- ObjectName name = new ObjectName(String.format(NAME_TEMPLATE, id));
+ ObjectName name = new ObjectName(String.format(NAME_TEMPLATE, id, streamIdentifier, startedAt));
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
mbs.registerMBean(this, name);
} catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException e) {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb8f9d20/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
index 6755d77..cfb231d 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Lists;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.local.builders.LocalStreamBuilder;
import org.apache.streams.pojo.json.Activity;
import org.apache.streams.util.ComponentUtils;
import org.apache.streams.util.SerializationUtil;
@@ -45,10 +46,18 @@ public abstract class BaseStreamsTask implements StreamsTask {
private List<BlockingQueue<StreamsDatum>> outQueues = new LinkedList<BlockingQueue<StreamsDatum>>();
private int inIndex = 0;
private ObjectMapper mapper;
+ protected Map<String, Object> streamConfig;
- public BaseStreamsTask() {
+ private long startedAt;
+ private String streamIdentifier;
+
+ public BaseStreamsTask(Map<String, Object> config) {
this.mapper = new StreamsJacksonMapper();
this.mapper.registerSubtypes(Activity.class);
+ this.streamConfig = config;
+
+ setStreamIdentifier();
+ setStartedAt();
}
@@ -190,4 +199,32 @@ public abstract class BaseStreamsTask implements StreamsTask {
}
return copyTo;
}
+
+ public long getStartedAt() {
+ return startedAt;
+ }
+
+ public void setStartedAt() {
+ if(streamConfig.containsKey(LocalStreamBuilder.DEFAULT_STARTED_AT_KEY) &&
+ streamConfig.get(LocalStreamBuilder.DEFAULT_STARTED_AT_KEY) != null &&
+ streamConfig.get(LocalStreamBuilder.DEFAULT_STARTED_AT_KEY) instanceof Long) {
+ this.startedAt = Long.parseLong(streamConfig.get(LocalStreamBuilder.DEFAULT_STARTED_AT_KEY).toString());
+ } else {
+ this.startedAt = -1;
+ }
+ }
+
+ public String getStreamIdentifier() {
+ return streamIdentifier;
+ }
+
+ public void setStreamIdentifier() {
+ if(streamConfig.containsKey(LocalStreamBuilder.STREAM_IDENTIFIER_KEY) &&
+ streamConfig.get(LocalStreamBuilder.STREAM_IDENTIFIER_KEY) != null &&
+ streamConfig.get(LocalStreamBuilder.STREAM_IDENTIFIER_KEY).toString().length() > 0) {
+ this.streamIdentifier = streamConfig.get(LocalStreamBuilder.STREAM_IDENTIFIER_KEY).toString();
+ } else {
+ this.streamIdentifier = LocalStreamBuilder.DEFAULT_STREAM_IDENTIFIER;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb8f9d20/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java
index 8280f29..27d1c6e 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java
@@ -37,10 +37,15 @@ public class StreamsMergeTask extends BaseStreamsTask {
private long sleepTime;
public StreamsMergeTask() {
- this(DEFAULT_SLEEP_TIME_MS);
+ this(DEFAULT_SLEEP_TIME_MS, null);
}
public StreamsMergeTask(long sleepTime) {
+ this(sleepTime, null);
+ }
+
+ public StreamsMergeTask(long sleepTime, Map<String, Object> streamConfig) {
+ super(streamConfig);
this.sleepTime = sleepTime;
this.keepRunning = new AtomicBoolean(true);
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb8f9d20/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
index 003ab9e..235ee92 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
@@ -58,7 +58,11 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumSt
* @param writer writer to execute in task
*/
public StreamsPersistWriterTask(StreamsPersistWriter writer) {
- this(writer, DEFAULT_SLEEP_TIME_MS);
+ this(writer, DEFAULT_SLEEP_TIME_MS, null);
+ }
+
+ public StreamsPersistWriterTask(StreamsPersistWriter writer, Map<String, Object> streamConfig) {
+ this(writer, DEFAULT_SLEEP_TIME_MS, streamConfig);
}
/**
@@ -66,7 +70,8 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumSt
* @param writer writer to execute in task
* @param sleepTime time to sleep when inbound queue is empty.
*/
- public StreamsPersistWriterTask(StreamsPersistWriter writer, long sleepTime) {
+ public StreamsPersistWriterTask(StreamsPersistWriter writer, long sleepTime, Map<String, Object> streamConfig) {
+ super(streamConfig);
this.writer = writer;
this.sleepTime = sleepTime;
this.keepRunning = new AtomicBoolean(true);
@@ -99,7 +104,7 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumSt
try {
this.writer.prepare(this.streamConfig);
if(this.counter == null) {
- this.counter = new StreamsTaskCounter(this.writer.getClass().getName()+ UUID.randomUUID().toString());
+ this.counter = new StreamsTaskCounter(this.writer.getClass().getName()+ UUID.randomUUID().toString(), getStreamIdentifier(), getStartedAt());
}
while(this.keepRunning.get()) {
StreamsDatum datum = null;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb8f9d20/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
index b6ab498..c470d0b 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
@@ -59,15 +59,23 @@ public class StreamsProcessorTask extends BaseStreamsTask implements DatumStatus
* @param processor process to run in task
*/
public StreamsProcessorTask(StreamsProcessor processor) {
- this(processor, DEFAULT_SLEEP_TIME_MS);
+ this(processor, DEFAULT_SLEEP_TIME_MS, null);
}
/**
*
+ * @param processor
+ * @param streamConfig
+ */
+ public StreamsProcessorTask(StreamsProcessor processor, Map<String, Object> streamConfig) { this(processor, DEFAULT_SLEEP_TIME_MS, streamConfig); }
+
+ /**
+ *
* @param processor processor to run in task
* @param sleepTime time to sleep when incoming queue is empty
*/
- public StreamsProcessorTask(StreamsProcessor processor, long sleepTime) {
+ public StreamsProcessorTask(StreamsProcessor processor, long sleepTime, Map<String, Object> streamConfig) {
+ super(streamConfig);
this.processor = processor;
this.sleepTime = sleepTime;
this.keepRunning = new AtomicBoolean(true);
@@ -105,7 +113,7 @@ public class StreamsProcessorTask extends BaseStreamsTask implements DatumStatus
try {
this.processor.prepare(this.streamConfig);
if(this.counter == null) {
- this.counter = new StreamsTaskCounter(this.processor.getClass().getName()+ UUID.randomUUID().toString());
+ this.counter = new StreamsTaskCounter(this.processor.getClass().getName()+ UUID.randomUUID().toString(), getStreamIdentifier(), getStartedAt());
}
while(this.keepRunning.get()) {
StreamsDatum datum = null;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb8f9d20/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
index 2475780..8c87d7a 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
@@ -72,7 +72,8 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
* Constructor for a StreamsProvider to execute {@link org.apache.streams.core.StreamsProvider:readCurrent()}
* @param provider
*/
- public StreamsProviderTask(StreamsProvider provider, boolean perpetual) {
+ public StreamsProviderTask(StreamsProvider provider, boolean perpetual, Map<String, Object> streamConfig) {
+ super(streamConfig);
this.provider = provider;
if( perpetual )
this.type = Type.PERPETUAL;
@@ -87,7 +88,8 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
* @param provider
* @param sequence
*/
- public StreamsProviderTask(StreamsProvider provider, BigInteger sequence) {
+ public StreamsProviderTask(StreamsProvider provider, BigInteger sequence, Map<String, Object> streamConfig) {
+ super(streamConfig);
this.provider = provider;
this.type = Type.READ_NEW;
this.sequence = sequence;
@@ -101,7 +103,8 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
* @param start
* @param end
*/
- public StreamsProviderTask(StreamsProvider provider, DateTime start, DateTime end) {
+ public StreamsProviderTask(StreamsProvider provider, DateTime start, DateTime end, Map<String, Object> streamConfig) {
+ super(streamConfig);
this.provider = provider;
this.type = Type.READ_RANGE;
this.dateRange = new DateTime[2];
@@ -149,7 +152,7 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
//Negative values mean we want to run forever
long maxZeros = timeout < 0 ? Long.MAX_VALUE : (timeout / sleepTime);
if(this.counter == null) { //should never be null
- this.counter = new StreamsTaskCounter(this.provider.getClass().getName()+ UUID.randomUUID().toString());
+ this.counter = new StreamsTaskCounter(this.provider.getClass().getName()+ UUID.randomUUID().toString(), getStreamIdentifier(), getStartedAt());
}
switch(this.type) {
case PERPETUAL: {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb8f9d20/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/StreamsTaskCounterTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/StreamsTaskCounterTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/StreamsTaskCounterTest.java
index a001845..da567fe 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/StreamsTaskCounterTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/StreamsTaskCounterTest.java
@@ -52,7 +52,7 @@ public class StreamsTaskCounterTest extends RandomizedTest {
@Test
public void testConstructor() {
try {
- new StreamsTaskCounter(MBEAN_ID);
+ new StreamsTaskCounter(MBEAN_ID, null, -1);
} catch (Throwable t) {
fail("Constructor threw error : "+t.getMessage());
}
@@ -65,7 +65,7 @@ public class StreamsTaskCounterTest extends RandomizedTest {
@Test
@Repeat(iterations = 3)
public void testEmitted() throws Exception {
- StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID);
+ StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, null, -1);
int numIncrements = randomIntBetween(1, 100000);
for(int i=0; i < numIncrements; ++i) {
counter.incrementEmittedCount();
@@ -74,7 +74,7 @@ public class StreamsTaskCounterTest extends RandomizedTest {
unregisterMXBean();
- counter = new StreamsTaskCounter(MBEAN_ID);
+ counter = new StreamsTaskCounter(MBEAN_ID, null, -1);
numIncrements = randomIntBetween(1, 100000);
long total = 0;
for(int i=0; i < numIncrements; ++i) {
@@ -92,7 +92,7 @@ public class StreamsTaskCounterTest extends RandomizedTest {
@Test
@Repeat(iterations = 3)
public void testReceived() throws Exception {
- StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID);
+ StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, null, -1);
int numIncrements = randomIntBetween(1, 100000);
for(int i=0; i < numIncrements; ++i) {
counter.incrementReceivedCount();
@@ -101,7 +101,7 @@ public class StreamsTaskCounterTest extends RandomizedTest {
unregisterMXBean();
- counter = new StreamsTaskCounter(MBEAN_ID);
+ counter = new StreamsTaskCounter(MBEAN_ID, null, -1);
numIncrements = randomIntBetween(1, 100000);
long total = 0;
for(int i=0; i < numIncrements; ++i) {
@@ -119,7 +119,7 @@ public class StreamsTaskCounterTest extends RandomizedTest {
@Test
@Repeat(iterations = 3)
public void testError() throws Exception {
- StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID);
+ StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, null, -1);
int numIncrements = randomIntBetween(1, 100000);
for(int i=0; i < numIncrements; ++i) {
counter.incrementErrorCount();
@@ -128,7 +128,7 @@ public class StreamsTaskCounterTest extends RandomizedTest {
unregisterMXBean();
- counter = new StreamsTaskCounter(MBEAN_ID);
+ counter = new StreamsTaskCounter(MBEAN_ID, null, -1);
numIncrements = randomIntBetween(1, 100000);
long total = 0;
for(int i=0; i < numIncrements; ++i) {
@@ -146,7 +146,7 @@ public class StreamsTaskCounterTest extends RandomizedTest {
@Test
@Repeat(iterations = 3)
public void testErrorRate() throws Exception {
- StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID);
+ StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, null, -1);
assertEquals(0.0, counter.getErrorRate(), 0);
int failures = randomIntBetween(0, 100000);
int received = randomIntBetween(0, 100000);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb8f9d20/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java
index a0e28cd..d5efc41 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java
@@ -57,7 +57,7 @@ public class BasicTasksTest {
public void testProviderTask() {
int numMessages = 100;
NumericMessageProvider provider = new NumericMessageProvider(numMessages);
- StreamsProviderTask task = new StreamsProviderTask(provider, false);
+ StreamsProviderTask task = new StreamsProviderTask(provider, false, null);
BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>();
task.addOutputQueue(outQueue);
//Test that adding input queues to providers is not valid
@@ -101,7 +101,7 @@ public class BasicTasksTest {
int numMessages = 100;
PassthroughDatumCounterProcessor processor = new PassthroughDatumCounterProcessor("");
StreamsProcessorTask task = new StreamsProcessorTask(processor);
- StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID);
+ StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, null, -1);
task.setStreamsTaskCounter(counter);
BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>();
BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages);
@@ -145,7 +145,7 @@ public class BasicTasksTest {
int numMessages = 100;
DatumCounterWriter writer = new DatumCounterWriter("");
StreamsPersistWriterTask task = new StreamsPersistWriterTask(writer);
- StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID);
+ StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, null, -1);
task.setStreamsTaskCounter(counter);
BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>();
BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb8f9d20/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/StreamsProviderTaskTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/StreamsProviderTaskTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/StreamsProviderTaskTest.java
index 5e18650..222566d 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/StreamsProviderTaskTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/StreamsProviderTaskTest.java
@@ -61,7 +61,7 @@ public class StreamsProviderTaskTest {
@Test
public void runPerpetual() {
- StreamsProviderTask task = new StreamsProviderTask(mockProvider, true);
+ StreamsProviderTask task = new StreamsProviderTask(mockProvider, true, null);
when(mockProvider.isRunning()).thenReturn(true);
when(mockProvider.readCurrent()).thenReturn(new StreamsResultSet(new LinkedBlockingQueue<StreamsDatum>()));
task.setTimeout(500);
@@ -75,7 +75,7 @@ public class StreamsProviderTaskTest {
@Test
public void flushes() {
BlockingQueue<StreamsDatum> out = new LinkedBlockingQueue<>();
- StreamsProviderTask task = new StreamsProviderTask(mockProvider, true);
+ StreamsProviderTask task = new StreamsProviderTask(mockProvider, true, null);
when(mockProvider.isRunning()).thenReturn(true);
when(mockProvider.readCurrent()).thenReturn(new StreamsResultSet(getQueue(3)));
task.setTimeout(100);
@@ -95,7 +95,7 @@ public class StreamsProviderTaskTest {
@Test
public void runNonPerpetual() {
- StreamsProviderTask task = new StreamsProviderTask(mockProvider, false);
+ StreamsProviderTask task = new StreamsProviderTask(mockProvider, false, null);
when(mockProvider.isRunning()).thenReturn(true);
when(mockProvider.readCurrent()).thenReturn(new StreamsResultSet(new LinkedBlockingQueue<StreamsDatum>()));
task.setTimeout(500);
@@ -108,7 +108,7 @@ public class StreamsProviderTaskTest {
@Test
public void stoppable() throws InterruptedException {
- StreamsProviderTask task = new StreamsProviderTask(mockProvider, true);
+ StreamsProviderTask task = new StreamsProviderTask(mockProvider, true, null);
when(mockProvider.isRunning()).thenReturn(true);
when(mockProvider.readCurrent()).thenReturn(new StreamsResultSet(new LinkedBlockingQueue<StreamsDatum>()));
task.setTimeout(-1);
@@ -129,7 +129,7 @@ public class StreamsProviderTaskTest {
@Test
public void earlyException() throws InterruptedException {
- StreamsProviderTask task = new StreamsProviderTask(mockProvider, true);
+ StreamsProviderTask task = new StreamsProviderTask(mockProvider, true, null);
when(mockProvider.isRunning()).thenReturn(true);
doThrow(new RuntimeException()).when(mockProvider).prepare(null);
task.setTimeout(-1);