You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by re...@apache.org on 2014/10/20 19:28:57 UTC
[05/11] git commit: Implemented and improved tests,
made fixes were tests exposed bugs
Implemented and improved tests, made fixes were tests exposed bugs
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/0d207736
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/0d207736
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/0d207736
Branch: refs/heads/master
Commit: 0d20773648d12df3d99872c4fd1d377eca976e14
Parents: 1b18eb6
Author: Ryan Ebanks <ry...@gmail.com>
Authored: Wed Oct 8 12:50:00 2014 -0500
Committer: Ryan Ebanks <ry...@gmail.com>
Committed: Wed Oct 8 12:50:00 2014 -0500
----------------------------------------------------------------------
.../local/builders/LocalStreamBuilder.java | 16 ++-
.../streams/local/queues/ThroughputQueue.java | 17 +++-
.../streams/local/tasks/BaseStreamsTask.java | 12 +++
.../streams/local/tasks/StreamsMergeTask.java | 1 -
.../local/tasks/StreamsPersistWriterTask.java | 14 ++-
.../local/tasks/StreamsProcessorTask.java | 15 ++-
.../local/tasks/StreamsProviderTask.java | 5 +
.../apache/streams/local/tasks/StreamsTask.java | 5 +
.../local/builders/LocalStreamBuilderTest.java | 101 +++++++++++--------
.../PassthroughDatumCounterProcessor.java | 2 +
.../test/providers/NumericMessageProvider.java | 2 +-
11 files changed, 137 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0d207736/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index 0b02f4e..4606aa7 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -33,6 +33,7 @@ import java.math.BigInteger;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
@@ -48,6 +49,7 @@ public class LocalStreamBuilder implements StreamBuilder {
private Map<String, StreamComponent> providers;
private Map<String, StreamComponent> components;
private Map<String, Object> streamConfig;
+ private Map<StreamsTask, Future> futures;
private ExecutorService executor;
private ExecutorService monitor;
private int totalTasks;
@@ -100,6 +102,7 @@ public class LocalStreamBuilder implements StreamBuilder {
self.stopInternal(true);
}
};
+ this.futures = new HashMap<>();
}
@Override
@@ -177,6 +180,7 @@ public class LocalStreamBuilder implements StreamBuilder {
this.monitor = Executors.newFixedThreadPool(this.monitorTasks+1);
Map<String, StreamsProviderTask> provTasks = new HashMap<String, StreamsProviderTask>();
tasks = new HashMap<String, List<StreamsTask>>();
+ boolean forcedShutDown = false;
try {
monitorThread = new LocalStreamProcessMonitorThread(executor, 10);
this.monitor.submit(monitorThread);
@@ -198,8 +202,9 @@ public class LocalStreamBuilder implements StreamBuilder {
LOGGER.debug("Components are no longer running or timed out");
} catch (InterruptedException e){
LOGGER.warn("Runtime interrupted. Beginning shutdown");
+ forcedShutDown = true;
} finally{
- stop();
+ stopInternal(forcedShutDown);
}
}
@@ -216,10 +221,12 @@ public class LocalStreamBuilder implements StreamBuilder {
protected void forceShutdown(Map<String, List<StreamsTask>> streamsTasks) {
LOGGER.debug("Shutdown failed. Forcing shutdown");
- //give the stream 30secs to try to shutdown gracefully, then force shutdown otherwise
for(List<StreamsTask> tasks : streamsTasks.values()) {
for(StreamsTask task : tasks) {
task.stopTask();
+ if(task.isWaiting()) {
+ this.futures.get(task).cancel(true);
+ }
}
}
this.executor.shutdown();
@@ -277,7 +284,7 @@ public class LocalStreamBuilder implements StreamBuilder {
for(int i=0; i < tasks; ++i) {
StreamsTask task = comp.createConnectedTask(getTimeout());
task.setStreamConfig(this.streamConfig);
- this.executor.submit(task);
+ this.futures.put(task, this.executor.submit(task));
compTasks.add(task);
if( comp.isOperationCountable() ) {
this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) comp.getOperation(), 10));
@@ -311,6 +318,9 @@ public class LocalStreamBuilder implements StreamBuilder {
if(parentsShutDown) {
for(StreamsTask task : tasks) {
task.stopTask();
+ if(task.isWaiting()) {
+ this.futures.get(task).cancel(true); // no data to process, interrupt block queue
+ }
}
for(StreamsTask task : tasks) {
int count = 0;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0d207736/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
index 2bd27a8..579914b 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java
@@ -165,7 +165,22 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe
@Override
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
- throw new NotImplementedException();
+ ThroughputElement<E> e = this.underlyingQueue.poll(timeout, unit);
+ if(e != null) {
+ try {
+ this.takeCountsLock.writeLock().lockInterruptibly();
+ ++this.elementsRemoved;
+ Long queueTime = e.getWaited();
+ this.totalQueueTime += queueTime;
+ if(this.maxQueuedTime < queueTime) {
+ this.maxQueuedTime = queueTime;
+ }
+ } finally {
+ this.takeCountsLock.writeLock().unlock();
+ }
+ return e.getElement();
+ }
+ return null;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0d207736/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
index 758a883..902a2d7 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
@@ -132,6 +132,18 @@ public abstract class BaseStreamsTask implements StreamsTask {
}
}
+ @Override
+ public boolean isWaiting() {
+ if(this.inQueues == null || this.inQueues.size() == 0) {
+ return true;
+ }
+ boolean empty = true;
+ for(Queue queue : this.inQueues) {
+ empty = empty && queue.isEmpty();
+ }
+ return empty;
+ }
+
/**
* //TODO LOCAL MODE HACK. Need to fix
* In order for our data streams to ported to other data flow frame works(Storm, Hadoop, Spark, etc) we need to be able to
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0d207736/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java
index 585e4dd..7a4c806 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java
@@ -43,7 +43,6 @@ public class StreamsMergeTask extends BaseStreamsTask {
this.keepRunning = new AtomicBoolean(true);
}
-
@Override
public void stopTask() {
this.keepRunning.set(false);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0d207736/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
index bb4f78f..42b32e4 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -43,6 +44,7 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumSt
private Map<String, Object> streamConfig;
private BlockingQueue<StreamsDatum> inQueue;
private AtomicBoolean isRunning;
+ private AtomicBoolean blocked;
private DatumStatusCounter statusCounter = new DatumStatusCounter();
@@ -70,6 +72,12 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumSt
this.sleepTime = sleepTime;
this.keepRunning = new AtomicBoolean(true);
this.isRunning = new AtomicBoolean(true);
+ this.blocked = new AtomicBoolean(false);
+ }
+
+ @Override
+ public boolean isWaiting() {
+ return this.inQueue.isEmpty() && this.blocked.get();
}
@Override
@@ -94,7 +102,9 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumSt
while(this.keepRunning.get()) {
StreamsDatum datum = null;
try {
- datum = this.inQueue.take();
+ this.blocked.set(true);
+ datum = this.inQueue.poll(5, TimeUnit.SECONDS);
+ this.blocked.set(false);
} catch (InterruptedException ie) {
LOGGER.error("Received InterruptedException. Shutting down and re-applying interrupt status.");
this.keepRunning.set(false);
@@ -111,7 +121,7 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumSt
DatumUtils.addErrorToMetadata(datum, e, this.writer.getClass());
}
} else { //datums should never be null
- LOGGER.warn("Received null StreamsDatum @ writer : {}", this.writer.getClass().getName());
+ LOGGER.debug("Received null StreamsDatum @ writer : {}", this.writer.getClass().getName());
}
}
// StreamsDatum datum = this.inQueue.poll();
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0d207736/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
index 79cbf89..a3d367f 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
@@ -29,6 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -45,6 +46,7 @@ public class StreamsProcessorTask extends BaseStreamsTask implements DatumStatus
private Map<String, Object> streamConfig;
private BlockingQueue<StreamsDatum> inQueue;
private AtomicBoolean isRunning;
+ private AtomicBoolean blocked;
private DatumStatusCounter statusCounter = new DatumStatusCounter();
@@ -71,6 +73,12 @@ public class StreamsProcessorTask extends BaseStreamsTask implements DatumStatus
this.sleepTime = sleepTime;
this.keepRunning = new AtomicBoolean(true);
this.isRunning = new AtomicBoolean(true);
+ this.blocked = new AtomicBoolean(true);
+ }
+
+ @Override
+ public boolean isWaiting() {
+ return this.inQueue.isEmpty() && this.blocked.get();
}
@Override
@@ -100,9 +108,12 @@ public class StreamsProcessorTask extends BaseStreamsTask implements DatumStatus
while(this.keepRunning.get()) {
StreamsDatum datum = null;
try {
- datum = this.inQueue.take();
+ this.blocked.set(true);
+ datum = this.inQueue.poll(5, TimeUnit.SECONDS);
+ this.blocked.set(false);
} catch (InterruptedException ie) {
LOGGER.warn("Received InteruptedException, shutting down and re-applying interrupt status.");
+ this.keepRunning.set(false);
Thread.currentThread().interrupt();
}
if(datum != null) {
@@ -125,7 +136,7 @@ public class StreamsProcessorTask extends BaseStreamsTask implements DatumStatus
DatumUtils.addErrorToMetadata(datum, t, this.processor.getClass());
}
} else {
- LOGGER.warn("Removed NULL datum from queue at processor : {}", this.processor.getClass().getName());
+ LOGGER.debug("Removed NULL datum from queue at processor : {}", this.processor.getClass().getName());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0d207736/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
index e3f89c2..c16f64d 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
@@ -117,6 +117,11 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
}
@Override
+ public boolean isWaiting() {
+ return false; //providers don't have inbound queues
+ }
+
+ @Override
public void stopTask() {
LOGGER.debug("Stopping Provider Task for {}", this.provider.getClass().getSimpleName());
this.keepRunning.set(false);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0d207736/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java
index 45c25f3..7513631 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java
@@ -47,6 +47,11 @@ public interface StreamsTask extends Runnable{
public void stopTask();
/**
+ * Returns true if the task is waiting on more data to process
+ * @return true, if waiting on more data to process
+ */
+ public boolean isWaiting();
+ /**
* Add an input {@link java.util.Queue} for this task.
* @param inputQueue
*/
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0d207736/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
index 0e20e43..e602181 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
@@ -51,6 +51,7 @@ import org.apache.streams.local.test.providers.NumericMessageProvider;
import org.apache.streams.local.test.writer.DatumCounterWriter;
import org.apache.streams.local.test.writer.SystemOutWriter;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import com.google.common.collect.Maps;
@@ -83,6 +84,7 @@ public class LocalStreamBuilderTest extends RandomizedTest {
}
}
+
@Test
public void testStreamIdValidations() {
StreamBuilder builder = new LocalStreamBuilder();
@@ -111,8 +113,18 @@ public class LocalStreamBuilderTest extends RandomizedTest {
}
@Test
- @Repeat(iterations = 3)
public void testBasicLinearStream2() {
+ linearStreamNonParallel(1004, 1);
+ }
+
+ @Test
+ public void testBasicLinearStream3() {
+ linearStreamNonParallel(1, 10);
+ }
+
+ @Test
+ @Repeat(iterations = 3)
+ public void testBasicLinearStreamRandom() {
int numDatums = randomIntBetween(1, 100000);
int numProcessors = randomIntBetween(1, 10);
linearStreamNonParallel(numDatums, numProcessors);
@@ -157,31 +169,32 @@ public class LocalStreamBuilderTest extends RandomizedTest {
@Test
public void testParallelLinearStream1() {
String processorId = "proc";
- int numProcessors = randomIntBetween(1, 100);
+ int numProcessors = randomIntBetween(1, 10);
int numDatums = randomIntBetween(1, 300000);
try {
- StreamBuilder builder = new LocalStreamBuilder();
+ StreamBuilder builder = new LocalStreamBuilder(50);
builder.newPerpetualStream("numeric_provider", new NumericMessageProvider(numDatums));
- AtomicInteger[] processorCounters = new AtomicInteger[numProcessors];
String connectTo = null;
for(int i=0; i < numProcessors; ++i) {
- processorCounters[i] = new AtomicInteger(0);
if(i == 0) {
connectTo = "numeric_provider";
} else {
connectTo = processorId+(i-1);
}
int parallelHint = randomIntBetween(1,5);
- builder.addStreamsProcessor(processorId+i, createPassThroughProcessor(processorCounters[i]), parallelHint, connectTo);
+ builder.addStreamsProcessor(processorId+i, new PassthroughDatumCounterProcessor(processorId+i), parallelHint, connectTo);
}
- Set output = Collections.newSetFromMap(new ConcurrentHashMap());
-
- builder.addStreamsPersistWriter("writer", createSetCollectingWriter(output), 1, processorId+(numProcessors-1));
+ builder.addStreamsPersistWriter("writer", new DatumCounterWriter("writer"), 1, processorId+(numProcessors-1));
builder.start();
- // can't test processors since they are serialized and deserialized when parallelized
+
+ assertEquals(numDatums, DatumCounterWriter.RECEIVED.get("writer").size());
for(int i=0; i < numDatums; ++i) {
- assertTrue("Expected writer to have received : "+i, output.contains(i));
+ assertTrue("Expected Writer to receive datum : " + i, DatumCounterWriter.RECEIVED.get("writer").contains(i));
+ }
+ for(int i=0; i < numProcessors; ++i) {
+ assertEquals(numDatums, PassthroughDatumCounterProcessor.COUNTS.get(processorId+i).get());
}
+
} finally {
for(int i=0; i < numProcessors; ++i) {
removeRegisteredMBeans(processorId+i);
@@ -194,23 +207,19 @@ public class LocalStreamBuilderTest extends RandomizedTest {
public void testBasicMergeStream() {
try {
int numDatums1 = randomIntBetween(1, 300000);
- int numDatums2 = randomIntBetween(1, 300000);;
- AtomicInteger counter1 = new AtomicInteger(0);
- AtomicInteger counter2 = new AtomicInteger(0);
- StreamsProcessor processor1 = createPassThroughProcessor(counter1);
- StreamsProcessor processor2 = createPassThroughProcessor(counter2);
+ int numDatums2 = randomIntBetween(1, 300000);
+ StreamsProcessor processor1 = new PassthroughDatumCounterProcessor("proc1");
+ StreamsProcessor processor2 = new PassthroughDatumCounterProcessor("proc2");
StreamBuilder builder = new LocalStreamBuilder();
- Set output = Collections.newSetFromMap(new ConcurrentHashMap());
- AtomicInteger outPutcounter = new AtomicInteger(0);
- builder.newReadCurrentStream("sp1", new NumericMessageProvider(numDatums1))
- .newReadCurrentStream("sp2", new NumericMessageProvider(numDatums2))
+ builder.newPerpetualStream("sp1", new NumericMessageProvider(numDatums1))
+ .newPerpetualStream("sp2", new NumericMessageProvider(numDatums2))
.addStreamsProcessor("proc1", processor1, 1, "sp1")
.addStreamsProcessor("proc2", processor2, 1, "sp2")
- .addStreamsPersistWriter("writer1", createSetCollectingWriter(output, outPutcounter), 1, "proc1", "proc2");
+ .addStreamsPersistWriter("writer1", new DatumCounterWriter("writer"), 1, "proc1", "proc2");
builder.start();
- assertEquals(numDatums1, counter1.get());
- assertEquals(numDatums2, counter2.get());
- assertEquals(numDatums1+numDatums2, outPutcounter.get());
+ assertEquals(numDatums1, PassthroughDatumCounterProcessor.COUNTS.get("proc1").get());
+ assertEquals(numDatums2, PassthroughDatumCounterProcessor.COUNTS.get("proc2").get());
+ assertEquals(numDatums1+numDatums2, DatumCounterWriter.COUNTS.get("writer").get());
} finally {
removeRegisteredMBeans("proc1", "proc2", "writer1");
}
@@ -220,19 +229,15 @@ public class LocalStreamBuilderTest extends RandomizedTest {
public void testBasicBranch() {
try {
int numDatums = randomIntBetween(1, 300000);
- StreamBuilder builder = new LocalStreamBuilder();
- AtomicInteger counter1 = new AtomicInteger(0);
- AtomicInteger counter2 = new AtomicInteger(0);
- Set output = Collections.newSetFromMap(new ConcurrentHashMap());
- AtomicInteger outputCounter = new AtomicInteger(0);
- builder.newReadCurrentStream("prov1", new NumericMessageProvider(numDatums))
- .addStreamsProcessor("proc1", createPassThroughProcessor(counter1), 1, "prov1")
- .addStreamsProcessor("proc2", createPassThroughProcessor(counter2), 1, "prov1")
- .addStreamsPersistWriter("w1", createSetCollectingWriter(output, outputCounter), 1, "proc1", "proc2");
+ StreamBuilder builder = new LocalStreamBuilder(50);
+ builder.newPerpetualStream("prov1", new NumericMessageProvider(numDatums))
+ .addStreamsProcessor("proc1", new PassthroughDatumCounterProcessor("proc1"), 1, "prov1")
+ .addStreamsProcessor("proc2", new PassthroughDatumCounterProcessor("proc2"), 1, "prov1")
+ .addStreamsPersistWriter("w1", new DatumCounterWriter("writer"), 1, "proc1", "proc2");
builder.start();
- assertEquals(numDatums, counter1.get());
- assertEquals(numDatums, counter2.get());
- assertEquals(numDatums*2, outputCounter.get());
+ assertEquals(numDatums, PassthroughDatumCounterProcessor.COUNTS.get("proc1").get());
+ assertEquals(numDatums, PassthroughDatumCounterProcessor.COUNTS.get("proc2").get());
+ assertEquals(numDatums*2, DatumCounterWriter.COUNTS.get("writer").get());
} finally {
removeRegisteredMBeans("prov1", "proc1", "proc2", "w1");
}
@@ -246,13 +251,11 @@ public class LocalStreamBuilderTest extends RandomizedTest {
Map<String, Object> config = Maps.newHashMap();
config.put(LocalStreamBuilder.TIMEOUT_KEY, timeout);
StreamBuilder builder = new LocalStreamBuilder(config);
- AtomicInteger counter = new AtomicInteger(0);
- Set output = Collections.newSetFromMap(new ConcurrentHashMap());
- builder.newReadCurrentStream("prov1", new NumericMessageProvider(numDatums))
+ builder.newPerpetualStream("prov1", new NumericMessageProvider(numDatums))
.addStreamsProcessor("proc1", new SlowProcessor(), 1, "prov1")
- .addStreamsPersistWriter("w1", createSetCollectingWriter(output, counter), 1, "proc1");
+ .addStreamsPersistWriter("w1", new DatumCounterWriter("writer"), 1, "proc1");
builder.start();
- assertEquals(numDatums, counter.get());
+ assertEquals(numDatums, DatumCounterWriter.COUNTS.get("writer").get());
} finally {
removeRegisteredMBeans("prov1", "proc1", "w1");
}
@@ -279,12 +282,13 @@ public class LocalStreamBuilderTest extends RandomizedTest {
}
}
+ @Ignore
@Test
public void ensureShutdownWithBlockedQueue() throws InterruptedException {
try {
ExecutorService service = Executors.newSingleThreadExecutor();
int before = Thread.activeCount();
- final StreamBuilder builder = new LocalStreamBuilder(1);
+ final StreamBuilder builder = new LocalStreamBuilder();
builder.newPerpetualStream("prov1", new NumericMessageProvider(30))
.addStreamsProcessor("proc1", new SlowProcessor(), 1, "prov1")
.addStreamsPersistWriter("w1", new SystemOutWriter(), 1, "proc1");
@@ -298,13 +302,24 @@ public class LocalStreamBuilderTest extends RandomizedTest {
Thread.sleep(500);
builder.stop();
service.shutdownNow();
- service.awaitTermination(1000, TimeUnit.MILLISECONDS);
+ service.awaitTermination(30000, TimeUnit.MILLISECONDS);
assertThat(Thread.activeCount(), is(equalTo(before)));
} finally {
removeRegisteredMBeans("prov1", "proc1", "w1");
}
}
+ @Before
+ private void clearCounters() {
+ PassthroughDatumCounterProcessor.COUNTS.clear();
+ PassthroughDatumCounterProcessor.CLAIMED_ID.clear();
+ PassthroughDatumCounterProcessor.SEEN_DATA.clear();
+ DatumCounterWriter.COUNTS.clear();
+ DatumCounterWriter.CLAIMED_ID.clear();
+ DatumCounterWriter.SEEN_DATA.clear();
+ DatumCounterWriter.RECEIVED.clear();
+ }
+
/**
* Creates {@link org.apache.streams.core.StreamsProcessor} that passes any StreamsDatum it gets as an
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0d207736/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/PassthroughDatumCounterProcessor.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/PassthroughDatumCounterProcessor.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/PassthroughDatumCounterProcessor.java
index b8249d1..bf1c338 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/PassthroughDatumCounterProcessor.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/PassthroughDatumCounterProcessor.java
@@ -78,6 +78,7 @@ public class PassthroughDatumCounterProcessor implements StreamsProcessor {
@Override
public void cleanUp() {
+ System.out.println("Clean up "+this.procId);
synchronized (COUNTS) {
AtomicLong count = COUNTS.get(this.procId);
if(count == null) {
@@ -86,6 +87,7 @@ public class PassthroughDatumCounterProcessor implements StreamsProcessor {
count.addAndGet(this.count);
}
}
+ System.out.println(this.procId+"\t"+this.count);
}
public int getMessageCount() {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0d207736/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java
index 8a5de2d..ac8b6ff 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java
@@ -63,7 +63,7 @@ public class NumericMessageProvider implements StreamsProvider {
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
- System.out.println("******************\n**\tBatchSize="+batch.size()+"\n******************");
+// System.out.println("******************\n**\tBatchSize="+batch.size()+"\n******************");
this.complete = batch.isEmpty() && this.data.isEmpty();
return new StreamsResultSet(batch);
}