You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/11/13 06:10:07 UTC
[40/43] incubator-streams git commit: STREAMS-210 | Removed
accidental changes to runtime files
STREAMS-210 | Removed accidental changes to runtime files
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/48ad6009
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/48ad6009
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/48ad6009
Branch: refs/heads/master
Commit: 48ad60093dfbbc528e4b06f80c0ad35c9e977b92
Parents: 743f3bd
Author: Robert Douglas <rd...@w2ogroup.com>
Authored: Mon Nov 10 09:42:07 2014 -0600
Committer: Robert Douglas <rd...@w2ogroup.com>
Committed: Mon Nov 10 09:42:07 2014 -0600
----------------------------------------------------------------------
.../streams/local/queues/ThroughputQueue.java | 16 +---------
.../streams/local/tasks/StreamsMergeTask.java | 7 +++++
.../local/tasks/StreamsPersistWriterTask.java | 18 ++++++++---
.../local/tasks/StreamsProcessorTask.java | 20 +++++++++---
.../local/tasks/StreamsProviderTask.java | 15 +++++++++
.../apache/streams/local/tasks/StreamsTask.java | 4 +++
.../local/builders/LocalStreamBuilderTest.java | 32 +++++++++++++++++---
.../streams/local/tasks/BasicTasksTest.java | 26 +++++++++++++---
8 files changed, 106 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/48ad6009/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
index d3fc71e..de1add3 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
@@ -41,7 +41,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
* Only the necessary methods for the local streams runtime are implemented. All other methods throw a
* {@link sun.reflect.generics.reflectiveObjects.NotImplementedException}.
*/
-public class ThroughputQueue<E> extends NotificationBroadcasterSupport implements BlockingQueue<E>, ThroughputQueueMXBean {
+public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBean {
public static final String NAME_TEMPLATE = "org.apache.streams.local:type=ThroughputQueue,name=%s";
@@ -105,14 +105,6 @@ public class ThroughputQueue<E> extends NotificationBroadcasterSupport implement
ObjectName name = new ObjectName(String.format(NAME_TEMPLATE, id));
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
mbs.registerMBean(this, name);
-
- /*addNotificationListener(new NotificationListener() {
- @Override
- public void handleNotification(Notification notification, Object handback) {
- LOGGER.debug("Notification!");
- }
- }, null, null);*/
-
} catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException e) {
LOGGER.error("Failed to register MXBean : {}", e);
throw new RuntimeException(e);
@@ -124,10 +116,6 @@ public class ThroughputQueue<E> extends NotificationBroadcasterSupport implement
public boolean add(E e) {
if (this.underlyingQueue.add(new ThroughputElement<E>(e))) {
internalAddElement();
-
- Notification n = new AttributeChangeNotification(this, 1, System.currentTimeMillis(), "Added element to queue", "Added", "String", null, e);
- sendNotification(n);
-
return true;
}
return false;
@@ -151,8 +139,6 @@ public class ThroughputQueue<E> extends NotificationBroadcasterSupport implement
@Override
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
if (this.underlyingQueue.offer(new ThroughputElement<E>(e), timeout, unit)) {
- Notification n = new AttributeChangeNotification(this, 1, System.currentTimeMillis(), "Added element to queue", "Added", "String", null, e);
- sendNotification(n);
internalAddElement();
return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/48ad6009/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java
index 7a4c806..8280f29 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java
@@ -19,6 +19,8 @@
package org.apache.streams.local.tasks;
import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.local.counters.StreamsTaskCounter;
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -78,4 +80,9 @@ public class StreamsMergeTask extends BaseStreamsTask {
}
}
}
+
+ @Override
+ public void setStreamsTaskCounter(StreamsTaskCounter counter) {
+ throw new NotImplementedException();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/48ad6009/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
index cab46b8..003ab9e 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
@@ -20,13 +20,11 @@ package org.apache.streams.local.tasks;
import org.apache.streams.core.*;
import org.apache.streams.core.util.DatumUtils;
+import org.apache.streams.local.counters.StreamsTaskCounter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
+import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -45,6 +43,7 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumSt
private BlockingQueue<StreamsDatum> inQueue;
private AtomicBoolean isRunning;
private AtomicBoolean blocked;
+ private StreamsTaskCounter counter;
private DatumStatusCounter statusCounter = new DatumStatusCounter();
@@ -99,6 +98,9 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumSt
public void run() {
try {
this.writer.prepare(this.streamConfig);
+ if(this.counter == null) {
+ this.counter = new StreamsTaskCounter(this.writer.getClass().getName()+ UUID.randomUUID().toString());
+ }
while(this.keepRunning.get()) {
StreamsDatum datum = null;
try {
@@ -111,14 +113,18 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumSt
Thread.currentThread().interrupt();
}
if(datum != null) {
+ this.counter.incrementReceivedCount();
try {
+ long startTime = System.currentTimeMillis();
this.writer.write(datum);
+ this.counter.addTime(System.currentTimeMillis() - startTime);
statusCounter.incrementStatus(DatumStatus.SUCCESS);
} catch (Exception e) {
LOGGER.error("Error writing to persist writer {}", this.writer.getClass().getSimpleName(), e);
this.keepRunning.set(false); // why do we shutdown on a failed write ?
statusCounter.incrementStatus(DatumStatus.FAIL);
DatumUtils.addErrorToMetadata(datum, e, this.writer.getClass());
+ this.counter.incrementErrorCount();
}
} else { //datums should never be null
LOGGER.debug("Received null StreamsDatum @ writer : {}", this.writer.getClass().getName());
@@ -151,4 +157,8 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumSt
return queues;
}
+ @Override
+ public void setStreamsTaskCounter(StreamsTaskCounter counter) {
+ this.counter = counter;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/48ad6009/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
index 1bb565d..b6ab498 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
@@ -21,13 +21,11 @@ package org.apache.streams.local.tasks;
import com.google.common.collect.Maps;
import org.apache.streams.core.*;
import org.apache.streams.core.util.DatumUtils;
+import org.apache.streams.local.counters.StreamsTaskCounter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
+import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -47,6 +45,7 @@ public class StreamsProcessorTask extends BaseStreamsTask implements DatumStatus
private BlockingQueue<StreamsDatum> inQueue;
private AtomicBoolean isRunning;
private AtomicBoolean blocked;
+ private StreamsTaskCounter counter;
private DatumStatusCounter statusCounter = new DatumStatusCounter();
@@ -105,6 +104,9 @@ public class StreamsProcessorTask extends BaseStreamsTask implements DatumStatus
public void run() {
try {
this.processor.prepare(this.streamConfig);
+ if(this.counter == null) {
+ this.counter = new StreamsTaskCounter(this.processor.getClass().getName()+ UUID.randomUUID().toString());
+ }
while(this.keepRunning.get()) {
StreamsDatum datum = null;
try {
@@ -117,11 +119,15 @@ public class StreamsProcessorTask extends BaseStreamsTask implements DatumStatus
Thread.currentThread().interrupt();
}
if(datum != null) {
+ this.counter.incrementReceivedCount();
try {
+ long startTime = System.currentTimeMillis();
List<StreamsDatum> output = this.processor.process(datum);
+ this.counter.addTime(System.currentTimeMillis() - startTime);
if(output != null) {
for(StreamsDatum outDatum : output) {
super.addToOutgoingQueue(outDatum);
+ this.counter.incrementEmittedCount();
statusCounter.incrementStatus(DatumStatus.SUCCESS);
}
}
@@ -130,6 +136,7 @@ public class StreamsProcessorTask extends BaseStreamsTask implements DatumStatus
this.keepRunning.set(false);
Thread.currentThread().interrupt();
} catch (Throwable t) {
+ this.counter.incrementErrorCount();
LOGGER.warn("Caught Throwable in processor, {} : {}", this.processor.getClass().getName(), t.getMessage());
statusCounter.incrementStatus(DatumStatus.FAIL);
//Add the error to the metadata, but keep processing
@@ -151,4 +158,9 @@ public class StreamsProcessorTask extends BaseStreamsTask implements DatumStatus
queues.add(this.inQueue);
return queues;
}
+
+ @Override
+ public void setStreamsTaskCounter(StreamsTaskCounter counter) {
+ this.counter = counter;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/48ad6009/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
index c16f64d..2475780 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
@@ -20,6 +20,7 @@ package org.apache.streams.local.tasks;
import org.apache.streams.core.*;
import org.apache.streams.core.util.DatumUtils;
+import org.apache.streams.local.counters.StreamsTaskCounter;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -27,6 +28,7 @@ import org.slf4j.LoggerFactory;
import java.math.BigInteger;
import java.util.Map;
import java.util.Queue;
+import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -64,6 +66,7 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
private long sleepTime;
private int zeros = 0;
private DatumStatusCounter statusCounter = new DatumStatusCounter();
+ private StreamsTaskCounter counter;
/**
* Constructor for a StreamsProvider to execute {@link org.apache.streams.core.StreamsProvider:readCurrent()}
@@ -145,13 +148,18 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
StreamsResultSet resultSet = null;
//Negative values mean we want to run forever
long maxZeros = timeout < 0 ? Long.MAX_VALUE : (timeout / sleepTime);
+ if(this.counter == null) { //should never be null
+ this.counter = new StreamsTaskCounter(this.provider.getClass().getName()+ UUID.randomUUID().toString());
+ }
switch(this.type) {
case PERPETUAL: {
provider.startStream();
this.started.set(true);
while(this.isRunning()) {
try {
+ long startTime = System.currentTimeMillis();
resultSet = provider.readCurrent();
+ this.counter.addTime(System.currentTimeMillis() - startTime);
if( resultSet.size() == 0 )
zeros++;
else {
@@ -164,6 +172,7 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
if(zeros > 0)
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
+ this.counter.incrementErrorCount();
LOGGER.warn("Thread interrupted");
this.keepRunning.set(false);
}
@@ -219,8 +228,10 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
if(datum != null) {
try {
super.addToOutgoingQueue(datum);
+ this.counter.incrementEmittedCount();
statusCounter.incrementStatus(DatumStatus.SUCCESS);
} catch( Exception e ) {
+ this.counter.incrementErrorCount();
statusCounter.incrementStatus(DatumStatus.FAIL);
DatumUtils.addErrorToMetadata(datum, e, this.provider.getClass());
}
@@ -229,4 +240,8 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
this.flushing.set(false);
}
+ @Override
+ public void setStreamsTaskCounter(StreamsTaskCounter counter) {
+ this.counter = counter;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/48ad6009/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java
index 7513631..8423095 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java
@@ -19,6 +19,7 @@
package org.apache.streams.local.tasks;
import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.local.counters.StreamsTaskCounter;
import java.util.List;
import java.util.Map;
@@ -87,4 +88,7 @@ public interface StreamsTask extends Runnable{
*/
public List<BlockingQueue<StreamsDatum>> getOutputQueues();
+
+ public void setStreamsTaskCounter(StreamsTaskCounter counter);
+
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/48ad6009/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
index 6513032..a675d87 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
@@ -43,6 +43,7 @@ import org.apache.streams.core.StreamBuilder;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsPersistWriter;
import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.local.counters.StreamsTaskCounter;
import org.apache.streams.local.queues.ThroughputQueue;
import org.apache.streams.local.test.processors.PassthroughDatumCounterProcessor;
import org.apache.streams.local.test.processors.SlowProcessor;
@@ -93,10 +94,17 @@ public class LocalStreamBuilderTest extends RandomizedTest {
} catch (MalformedObjectNameException|InstanceNotFoundException|MBeanRegistrationException e) {
//No-op
}
+ try {
+ mbs.unregisterMBean(new ObjectName((String.format(StreamsTaskCounter.NAME_TEMPLATE, id))));
+ } catch (MalformedObjectNameException|InstanceNotFoundException|MBeanRegistrationException e) {
+ //No-op
+ }
}
}
+
+
@Test
public void testStreamIdValidations() {
StreamBuilder builder = new LocalStreamBuilder();
@@ -116,7 +124,7 @@ public class LocalStreamBuilderTest extends RandomizedTest {
exp = e;
}
assertNotNull(exp);
- removeRegisteredMBeans("1", "2");
+ removeRegisteredMBeans("1", "2", "id");
}
@Test
@@ -172,9 +180,9 @@ public class LocalStreamBuilderTest extends RandomizedTest {
}
} finally {
for(int i=0; i < numProcessors; ++i) {
- removeRegisteredMBeans(processorId+i);
+ removeRegisteredMBeans(processorId+i, processorId+i+"-"+PassthroughDatumCounterProcessor.class.getCanonicalName());
}
- removeRegisteredMBeans("writer");
+ removeRegisteredMBeans("writer", "numeric_provider");
}
}
@@ -211,7 +219,7 @@ public class LocalStreamBuilderTest extends RandomizedTest {
for(int i=0; i < numProcessors; ++i) {
removeRegisteredMBeans(processorId+i);
}
- removeRegisteredMBeans("writer");
+ removeRegisteredMBeans("writer", "numeric_provider");
}
}
@@ -233,7 +241,9 @@ public class LocalStreamBuilderTest extends RandomizedTest {
assertEquals(numDatums2, PassthroughDatumCounterProcessor.COUNTS.get("proc2").get());
assertEquals(numDatums1+numDatums2, DatumCounterWriter.COUNTS.get("writer").get());
} finally {
- removeRegisteredMBeans("proc1", "proc2", "writer1");
+ String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName();
+ String writerClass = "-"+DatumCounterWriter.class.getCanonicalName();
+ removeRegisteredMBeans("proc1", "proc2", "writer1", "sp1", "sp2");
}
}
@@ -251,6 +261,9 @@ public class LocalStreamBuilderTest extends RandomizedTest {
assertEquals(numDatums, PassthroughDatumCounterProcessor.COUNTS.get("proc2").get());
assertEquals(numDatums*2, DatumCounterWriter.COUNTS.get("writer").get());
} finally {
+ String provClass = "-"+NumericMessageProvider.class.getCanonicalName();
+ String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName();
+ String writerClass = "-"+DatumCounterWriter.class.getCanonicalName();
removeRegisteredMBeans("prov1", "proc1", "proc2", "w1");
}
}
@@ -269,6 +282,9 @@ public class LocalStreamBuilderTest extends RandomizedTest {
builder.start();
assertEquals(numDatums, DatumCounterWriter.COUNTS.get("writer").get());
} finally {
+ String provClass = "-"+NumericMessageProvider.class.getCanonicalName();
+ String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName();
+ String writerClass = "-"+DatumCounterWriter.class.getCanonicalName();
removeRegisteredMBeans("prov1", "proc1", "w1");
}
}
@@ -290,6 +306,9 @@ public class LocalStreamBuilderTest extends RandomizedTest {
//We care mostly that it doesn't terminate too early. With thread shutdowns, etc, the actual time is indeterminate. Just make sure there is an upper bound
assertThat((int) (end - start), is(allOf(greaterThanOrEqualTo(timeout), lessThanOrEqualTo(4 * timeout))));
} finally {
+ String provClass = "-"+NumericMessageProvider.class.getCanonicalName();
+ String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName();
+ String writerClass = "-"+DatumCounterWriter.class.getCanonicalName();
removeRegisteredMBeans("prov1", "proc1", "proc2", "w1");
}
}
@@ -317,6 +336,9 @@ public class LocalStreamBuilderTest extends RandomizedTest {
service.awaitTermination(30000, TimeUnit.MILLISECONDS);
assertThat(Thread.activeCount(), is(equalTo(before)));
} finally {
+ String provClass = "-"+NumericMessageProvider.class.getCanonicalName();
+ String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName();
+ String writerClass = "-"+DatumCounterWriter.class.getCanonicalName();
removeRegisteredMBeans("prov1", "proc1", "w1");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/48ad6009/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java
index 2d28602..a0e28cd 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java
@@ -19,6 +19,8 @@
package org.apache.streams.local.tasks;
import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.local.counters.DatumStatusCounter;
+import org.apache.streams.local.counters.StreamsTaskCounter;
import org.apache.streams.local.queues.ThroughputQueue;
import org.apache.streams.local.test.processors.PassthroughDatumCounterProcessor;
import org.apache.streams.local.test.providers.NumericMessageProvider;
@@ -27,6 +29,9 @@ import org.apache.streams.util.ComponentUtils;
import org.junit.After;
import org.junit.Test;
+import javax.management.InstanceNotFoundException;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
import java.util.Queue;
import java.util.concurrent.*;
@@ -38,6 +43,7 @@ import static org.junit.Assert.*;
public class BasicTasksTest {
+ private static final String MBEAN_ID = "test_bean";
@After
public void removeLocalMBeans() {
try {
@@ -87,7 +93,7 @@ public class BasicTasksTest {
assertTrue("Task should have completed running in aloted time.", service.isTerminated());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- };
+ }
}
@Test
@@ -95,6 +101,8 @@ public class BasicTasksTest {
int numMessages = 100;
PassthroughDatumCounterProcessor processor = new PassthroughDatumCounterProcessor("");
StreamsProcessorTask task = new StreamsProcessorTask(processor);
+ StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID);
+ task.setStreamsTaskCounter(counter);
BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>();
BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages);
task.addOutputQueue(outQueue);
@@ -114,8 +122,7 @@ public class BasicTasksTest {
fail("Processor task failed to output "+numMessages+" in a timely fashion.");
}
}
- task.stopTask();
- assertEquals(numMessages, processor.getMessageCount());
+ task.stopTask();;
service.shutdown();
try {
if(!service.awaitTermination(5, TimeUnit.SECONDS)){
@@ -126,6 +133,11 @@ public class BasicTasksTest {
} catch (InterruptedException e) {
fail("Test Interupted.");
}
+ assertEquals(numMessages, processor.getMessageCount());
+ assertEquals(numMessages, counter.getNumReceived());
+ assertEquals(numMessages, counter.getNumEmitted());
+ assertEquals(0, counter.getNumUnhandledErrors());
+ assertEquals(0.0, counter.getErrorRate(), 0.0);
}
@Test
@@ -133,6 +145,8 @@ public class BasicTasksTest {
int numMessages = 100;
DatumCounterWriter writer = new DatumCounterWriter("");
StreamsPersistWriterTask task = new StreamsPersistWriterTask(writer);
+ StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID);
+ task.setStreamsTaskCounter(counter);
BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>();
BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages);
@@ -160,7 +174,6 @@ public class BasicTasksTest {
}
}
task.stopTask();
- assertEquals(numMessages, writer.getDatumsCounted());
service.shutdown();
try {
if(!service.awaitTermination(5, TimeUnit.SECONDS)){
@@ -171,6 +184,11 @@ public class BasicTasksTest {
} catch (InterruptedException e) {
fail("Test Interupted.");
}
+ assertEquals(numMessages, writer.getDatumsCounted());
+ assertEquals(numMessages, counter.getNumReceived());
+ assertEquals(0, counter.getNumEmitted());
+ assertEquals(0, counter.getNumUnhandledErrors());
+ assertEquals(0.0, counter.getErrorRate(), 0.0);
}
@Test