You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/11/25 20:24:46 UTC
[05/42] incubator-streams git commit: STREAMS-440: custom
checkstyle.xml, address compliance
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMultiThreadTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMultiThreadTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMultiThreadTest.java
index 60df89c..ad4aa28 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMultiThreadTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMultiThreadTest.java
@@ -17,299 +17,304 @@
*/
package org.apache.streams.local.queues;
+import org.apache.streams.util.ComponentUtils;
+
import com.carrotsearch.randomizedtesting.RandomizedTest;
import com.carrotsearch.randomizedtesting.annotations.Repeat;
-import org.apache.streams.util.ComponentUtils;
import org.joda.time.DateTime;
import org.junit.After;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.lang.management.ManagementFactory;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import javax.management.InstanceNotFoundException;
import javax.management.ObjectName;
-import java.lang.management.ManagementFactory;
-import java.util.concurrent.*;
/**
* MultiThread unit tests for {@link org.apache.streams.local.queues.ThroughputQueue}
*/
public class ThroughputQueueMultiThreadTest extends RandomizedTest {
- private static final Logger LOGGER = LoggerFactory.getLogger(ThroughputQueueMultiThreadTest.class);
- private static final String MBEAN_ID = "testQueue";
- private static final String STREAM_ID = "test_stream";
- private static long STREAM_START_TIME = (new DateTime()).getMillis();
-
- /**
- * Remove registered mbeans from previous tests
- * @throws Exception
- */
- @After
- public void unregisterMXBean() throws Exception {
- try {
- ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, MBEAN_ID, STREAM_ID, STREAM_START_TIME)));
- } catch (InstanceNotFoundException ife) {
- //No-op
- }
+ private static final Logger LOGGER = LoggerFactory.getLogger(ThroughputQueueMultiThreadTest.class);
+ private static final String MBEAN_ID = "testQueue";
+ private static final String STREAM_ID = "test_stream";
+ private static long STREAM_START_TIME = (new DateTime()).getMillis();
+
+ /**
+ * Remove registered mbeans from previous tests
+ * @throws Exception
+ */
+ @After
+ public void unregisterMXBean() throws Exception {
+ try {
+ ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, MBEAN_ID, STREAM_ID, STREAM_START_TIME)));
+ } catch (InstanceNotFoundException ife) {
+ //No-op
}
-
- @After
- public void removeLocalMBeans() {
- try {
- ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local");
- } catch (Exception e) {
- //No op. proceed to next test
- }
+ }
+
+ @After
+ public void removeLocalMBeans() {
+ try {
+ ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local");
+ } catch (Exception e) {
+ //No op. proceed to next test
}
-
-
- /**
- * Test that queue will block on puts when the queue is full
- * @throws InterruptedException
- */
- @Test
- public void testBlockOnFullQueue() throws InterruptedException {
- int queueSize = randomIntBetween(1, 3000);
- ExecutorService executor = Executors.newSingleThreadExecutor();
- CountDownLatch full = new CountDownLatch(1);
- CountDownLatch finished = new CountDownLatch(1);
- ThroughputQueue queue = new ThroughputQueue(queueSize);
- BlocksOnFullQueue testThread = new BlocksOnFullQueue(full, finished, queue, queueSize);
- executor.submit(testThread);
- full.await();
- assertEquals(queueSize, queue.size());
- assertEquals(queueSize, queue.getCurrentSize());
- assertFalse(testThread.isComplete()); //test that it is blocked
- safeSleep(1000);
- assertFalse(testThread.isComplete()); //still blocked
- queue.take();
- finished.await();
- assertEquals(queueSize, queue.size());
- assertEquals(queueSize, queue.getCurrentSize());
- assertTrue(testThread.isComplete());
- executor.shutdownNow();
- executor.awaitTermination(500, TimeUnit.MILLISECONDS);
+ }
+
+
+ /**
+ * Test that queue will block on puts when the queue is full
+ * @throws InterruptedException
+ */
+ @Test
+ public void testBlockOnFullQueue() throws InterruptedException {
+ int queueSize = randomIntBetween(1, 3000);
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ CountDownLatch full = new CountDownLatch(1);
+ CountDownLatch finished = new CountDownLatch(1);
+ ThroughputQueue queue = new ThroughputQueue(queueSize);
+ BlocksOnFullQueue testThread = new BlocksOnFullQueue(full, finished, queue, queueSize);
+ executor.submit(testThread);
+ full.await();
+ assertEquals(queueSize, queue.size());
+ assertEquals(queueSize, queue.getCurrentSize());
+ assertFalse(testThread.isComplete()); //test that it is blocked
+ safeSleep(1000);
+ assertFalse(testThread.isComplete()); //still blocked
+ queue.take();
+ finished.await();
+ assertEquals(queueSize, queue.size());
+ assertEquals(queueSize, queue.getCurrentSize());
+ assertTrue(testThread.isComplete());
+ executor.shutdownNow();
+ executor.awaitTermination(500, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Test that queue will block on Take when queue is empty
+ * @throws InterruptedException
+ */
+ @Test
+ public void testBlockOnEmptyQueue() throws InterruptedException {
+ int queueSize = randomIntBetween(1, 3000);
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ CountDownLatch empty = new CountDownLatch(1);
+ CountDownLatch finished = new CountDownLatch(1);
+ ThroughputQueue queue = new ThroughputQueue();
+ BlocksOnEmptyQueue testThread = new BlocksOnEmptyQueue(empty, finished, queueSize, queue);
+ for(int i=0; i < queueSize; ++i) {
+ queue.put(i);
}
-
- /**
- * Test that queue will block on Take when queue is empty
- * @throws InterruptedException
- */
- @Test
- public void testBlockOnEmptyQueue() throws InterruptedException {
- int queueSize = randomIntBetween(1, 3000);
- ExecutorService executor = Executors.newSingleThreadExecutor();
- CountDownLatch empty = new CountDownLatch(1);
- CountDownLatch finished = new CountDownLatch(1);
- ThroughputQueue queue = new ThroughputQueue();
- BlocksOnEmptyQueue testThread = new BlocksOnEmptyQueue(empty, finished, queueSize, queue);
- for(int i=0; i < queueSize; ++i) {
- queue.put(i);
- }
- executor.submit(testThread);
- empty.await();
- assertEquals(0, queue.size());
- assertEquals(0, queue.getCurrentSize());
- assertFalse(testThread.isComplete());
- safeSleep(1000);
- assertFalse(testThread.isComplete());
- queue.put(1);
- finished.await();
- assertEquals(0, queue.size());
- assertEquals(0, queue.getCurrentSize());
- assertTrue(testThread.isComplete());
- executor.shutdownNow();
- executor.awaitTermination(500, TimeUnit.MILLISECONDS);
+ executor.submit(testThread);
+ empty.await();
+ assertEquals(0, queue.size());
+ assertEquals(0, queue.getCurrentSize());
+ assertFalse(testThread.isComplete());
+ safeSleep(1000);
+ assertFalse(testThread.isComplete());
+ queue.put(1);
+ finished.await();
+ assertEquals(0, queue.size());
+ assertEquals(0, queue.getCurrentSize());
+ assertTrue(testThread.isComplete());
+ executor.shutdownNow();
+ executor.awaitTermination(500, TimeUnit.MILLISECONDS);
+ }
+
+
+ /**
+ * Test multiple threads putting and taking from the queue while
+ * this thread repeatedly calls the MXBean measurement methods.
+ * Should hammer the queue with request from multiple threads
+ * of all request types. Purpose is to expose current modification exceptions
+ * and/or dead locks.
+ */
+ @Test
+ @Repeat(iterations = 3)
+ public void testMultiThreadAccessAndInteruptResponse() throws Exception {
+ int putTakeThreadCount = randomIntBetween(1, 10);
+ int dataCount = randomIntBetween(1, 2000000);
+ int pollCount = randomIntBetween(1, 2000000);
+ int maxSize = randomIntBetween(1, 1000);
+ CountDownLatch finished = new CountDownLatch(putTakeThreadCount);
+ ThroughputQueue queue = new ThroughputQueue(maxSize, MBEAN_ID);
+ ExecutorService executor = Executors.newFixedThreadPool(putTakeThreadCount * 2);
+ for(int i=0; i < putTakeThreadCount; ++i) {
+ executor.submit(new PutData(finished, queue, dataCount));
+ executor.submit(new TakeData(queue));
}
-
-
- /**
- * Test multiple threads putting and taking from the queue while
- * this thread repeatedly calls the MXBean measurement methods.
- * Should hammer the queue with request from multiple threads
- * of all request types. Purpose is to expose current modification exceptions
- * and/or dead locks.
- */
- @Test
- @Repeat(iterations = 3)
- public void testMultiThreadAccessAndInteruptResponse() throws Exception {
- int putTakeThreadCount = randomIntBetween(1, 10);
- int dataCount = randomIntBetween(1, 2000000);
- int pollCount = randomIntBetween(1, 2000000);
- int maxSize = randomIntBetween(1, 1000);
- CountDownLatch finished = new CountDownLatch(putTakeThreadCount);
- ThroughputQueue queue = new ThroughputQueue(maxSize, MBEAN_ID);
- ExecutorService executor = Executors.newFixedThreadPool(putTakeThreadCount * 2);
- for(int i=0; i < putTakeThreadCount; ++i) {
- executor.submit(new PutData(finished, queue, dataCount));
- executor.submit(new TakeData(queue));
- }
- for(int i=0; i < pollCount; ++i) {
- queue.getAvgWait();
- queue.getAdded();
- queue.getCurrentSize();
- queue.getMaxWait();
- queue.getRemoved();
- queue.getThroughput();
- }
- finished.await();
- while(!queue.isEmpty()) {
- LOGGER.info("Waiting for queue to be emptied...");
- safeSleep(500);
- }
- long totalData = ((long) dataCount) * putTakeThreadCount;
- assertEquals(totalData, queue.getAdded());
- assertEquals(totalData, queue.getRemoved());
- executor.shutdown();
- executor.awaitTermination(1000, TimeUnit.MILLISECONDS); //shutdown puts
- executor.shutdownNow();
- executor.awaitTermination(1000, TimeUnit.MILLISECONDS); //shutdown takes
- //Randomized should not report thread leak
+ for(int i=0; i < pollCount; ++i) {
+ queue.getAvgWait();
+ queue.getAdded();
+ queue.getCurrentSize();
+ queue.getMaxWait();
+ queue.getRemoved();
+ queue.getThroughput();
}
-
-
-
- private void safeSleep(long sleep) {
- try {
- Thread.sleep(sleep);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- }
+ finished.await();
+ while(!queue.isEmpty()) {
+ LOGGER.info("Waiting for queue to be emptied...");
+ safeSleep(500);
}
+ long totalData = ((long) dataCount) * putTakeThreadCount;
+ assertEquals(totalData, queue.getAdded());
+ assertEquals(totalData, queue.getRemoved());
+ executor.shutdown();
+ executor.awaitTermination(1000, TimeUnit.MILLISECONDS); //shutdown puts
+ executor.shutdownNow();
+ executor.awaitTermination(1000, TimeUnit.MILLISECONDS); //shutdown takes
+ //Randomized should not report thread leak
+ }
+
+
+
+ private void safeSleep(long sleep) {
+ try {
+ Thread.sleep(sleep);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ }
- /**
- * Helper runnable for test {@link ThroughputQueueMultiThreadTest#testBlockOnFullQueue()}
- */
- private class BlocksOnFullQueue implements Runnable {
-
- private CountDownLatch full;
- volatile private boolean complete;
- private int queueSize;
- private CountDownLatch finished;
- private BlockingQueue queue;
+ /**
+ * Helper runnable for test {@link ThroughputQueueMultiThreadTest#testBlockOnFullQueue()}
+ */
+ private class BlocksOnFullQueue implements Runnable {
- public BlocksOnFullQueue(CountDownLatch latch, CountDownLatch finished, BlockingQueue queue, int queueSize) {
- this.full = latch;
- this.complete = false;
- this.queueSize = queueSize;
- this.finished = finished;
- this.queue = queue;
- }
+ private CountDownLatch full;
+ volatile private boolean complete;
+ private int queueSize;
+ private CountDownLatch finished;
+ private BlockingQueue queue;
- @Override
- public void run() {
- try {
- for (int i = 0; i < this.queueSize; ++i) {
- this.queue.put(i);
- }
- this.full.countDown();
- this.queue.put(0);
- this.complete = true;
- this.finished.countDown();
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- }
- }
+ public BlocksOnFullQueue(CountDownLatch latch, CountDownLatch finished, BlockingQueue queue, int queueSize) {
+ this.full = latch;
+ this.complete = false;
+ this.queueSize = queueSize;
+ this.finished = finished;
+ this.queue = queue;
+ }
- public boolean isComplete() {
- return this.complete;
+ @Override
+ public void run() {
+ try {
+ for (int i = 0; i < this.queueSize; ++i) {
+ this.queue.put(i);
}
+ this.full.countDown();
+ this.queue.put(0);
+ this.complete = true;
+ this.finished.countDown();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
}
-
- /**
- * Helper runnable class for test {@link ThroughputQueueMultiThreadTest#testBlockOnEmptyQueue()}
- */
- private class BlocksOnEmptyQueue implements Runnable {
-
- private CountDownLatch full;
- volatile private boolean complete;
- private int queueSize;
- private CountDownLatch finished;
- private BlockingQueue queue;
-
- public BlocksOnEmptyQueue(CountDownLatch full, CountDownLatch finished, int queueSize, BlockingQueue queue) {
- this.full = full;
- this.finished = finished;
- this.queueSize = queueSize;
- this.queue = queue;
- this.complete = false;
- }
+ public boolean isComplete() {
+ return this.complete;
+ }
+ }
+
+
+ /**
+ * Helper runnable class for test {@link ThroughputQueueMultiThreadTest#testBlockOnEmptyQueue()}
+ */
+ private class BlocksOnEmptyQueue implements Runnable {
+
+ private CountDownLatch full;
+ volatile private boolean complete;
+ private int queueSize;
+ private CountDownLatch finished;
+ private BlockingQueue queue;
+
+ public BlocksOnEmptyQueue(CountDownLatch full, CountDownLatch finished, int queueSize, BlockingQueue queue) {
+ this.full = full;
+ this.finished = finished;
+ this.queueSize = queueSize;
+ this.queue = queue;
+ this.complete = false;
+ }
- @Override
- public void run() {
- try {
- for(int i=0; i < this.queueSize; ++i) {
- this.queue.take();
- }
- this.full.countDown();
- this.queue.take();
- this.complete = true;
- this.finished.countDown();
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- }
+ @Override
+ public void run() {
+ try {
+ for(int i=0; i < this.queueSize; ++i) {
+ this.queue.take();
}
+ this.full.countDown();
+ this.queue.take();
+ this.complete = true;
+ this.finished.countDown();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ }
- public boolean isComplete() {
- return this.complete;
- }
+ public boolean isComplete() {
+ return this.complete;
}
+ }
- private class PutData implements Runnable {
+ private class PutData implements Runnable {
- private BlockingQueue queue;
- private int dataCount;
- private CountDownLatch finished;
+ private BlockingQueue queue;
+ private int dataCount;
+ private CountDownLatch finished;
- public PutData(CountDownLatch finished, BlockingQueue queue, int dataCount) {
- this.queue = queue;
- this.dataCount = dataCount;
- this.finished = finished;
- }
+ public PutData(CountDownLatch finished, BlockingQueue queue, int dataCount) {
+ this.queue = queue;
+ this.dataCount = dataCount;
+ this.finished = finished;
+ }
- @Override
- public void run() {
- try {
- for(int i=0; i < this.dataCount; ++i) {
- this.queue.put(i);
- }
- } catch (InterruptedException ie) {
- LOGGER.error("PUT DATA interupted !");
- Thread.currentThread().interrupt();
- }
- this.finished.countDown();
+ @Override
+ public void run() {
+ try {
+ for(int i=0; i < this.dataCount; ++i) {
+ this.queue.put(i);
}
+ } catch (InterruptedException ie) {
+ LOGGER.error("PUT DATA interupted !");
+ Thread.currentThread().interrupt();
+ }
+ this.finished.countDown();
}
+ }
- private class TakeData implements Runnable {
+ private class TakeData implements Runnable {
- private BlockingQueue queue;
+ private BlockingQueue queue;
- public TakeData(BlockingQueue queue) {
- this.queue = queue;
- }
+ public TakeData(BlockingQueue queue) {
+ this.queue = queue;
+ }
- @Override
- public void run() {
- try {
- while(true) {
- this.queue.take();
- }
- } catch (InterruptedException ie) {
- LOGGER.error("PUT DATA interupted !");
- Thread.currentThread().interrupt();
- }
+ @Override
+ public void run() {
+ try {
+ while(true) {
+ this.queue.take();
}
+ } catch (InterruptedException ie) {
+ LOGGER.error("PUT DATA interupted !");
+ Thread.currentThread().interrupt();
+ }
}
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
index 8c7f5c5..afe1911 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
@@ -17,234 +17,235 @@
*/
package org.apache.streams.local.queues;
+import org.apache.streams.util.ComponentUtils;
+
import com.carrotsearch.randomizedtesting.RandomizedTest;
import com.carrotsearch.randomizedtesting.annotations.Repeat;
-import org.apache.streams.util.ComponentUtils;
import org.joda.time.DateTime;
import org.junit.After;
import org.junit.Test;
+import java.lang.management.ManagementFactory;
import javax.management.MBeanServer;
import javax.management.ObjectInstance;
import javax.management.ObjectName;
-import java.lang.management.ManagementFactory;
/**
* Single thread unit tests for {@link org.apache.streams.local.queues.ThroughputQueue}
*/
public class ThroughputQueueSingleThreadTest extends RandomizedTest {
- private static final String MBEAN_ID = "test_id";
- private static final String STREAM_ID = "test_stream";
- private static long STREAM_START_TIME = (new DateTime()).getMillis();
-
- @After
- public void removeLocalMBeans() {
- try {
- ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local");
- } catch (Exception e) {
- //No op. proceed to next test
- }
+ private static final String MBEAN_ID = "test_id";
+ private static final String STREAM_ID = "test_stream";
+ private static long STREAM_START_TIME = (new DateTime()).getMillis();
+
+ @After
+ public void removeLocalMBeans() {
+ try {
+ ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local");
+ } catch (Exception e) {
+ //No op. proceed to next test
}
-
- /**
- * Test that take and put queue and dequeue data as expected and all
- * measurements form the queue are returning data.
- * @throws Exception
- */
- @Test
- @Repeat(iterations = 3)
- public void testTakeAndPut() throws Exception {
- ThroughputQueue<Integer> queue = new ThroughputQueue<>();
- int putCount = randomIntBetween(1, 1000);
- for(int i=0; i < putCount; ++i) {
- queue.put(i);
- assertEquals(i+1, queue.size());
- assertEquals(queue.size(), queue.getCurrentSize());
- }
- safeSleep(100); //ensure measurable wait time
- int takeCount = randomIntBetween(1, putCount);
- for(int i=0; i < takeCount; ++i) {
- Integer element = queue.take();
- assertNotNull(element);
- assertEquals(i, element.intValue());
- assertEquals(putCount - (1+i), queue.size());
- assertEquals(queue.size(), queue.getCurrentSize());
- }
- assertEquals(putCount-takeCount, queue.size());
- assertEquals(queue.size(), queue.getCurrentSize());
- assertTrue(0.0 < queue.getMaxWait());
- assertTrue(0.0 < queue.getAvgWait());
- assertTrue(0.0 < queue.getThroughput());
- assertEquals(putCount, queue.getAdded());
- assertEquals(takeCount, queue.getRemoved());
+ }
+
+ /**
+ * Test that take and put queue and dequeue data as expected and all
+ * measurements form the queue are returning data.
+ * @throws Exception
+ */
+ @Test
+ @Repeat(iterations = 3)
+ public void testTakeAndPut() throws Exception {
+ ThroughputQueue<Integer> queue = new ThroughputQueue<>();
+ int putCount = randomIntBetween(1, 1000);
+ for(int i=0; i < putCount; ++i) {
+ queue.put(i);
+ assertEquals(i+1, queue.size());
+ assertEquals(queue.size(), queue.getCurrentSize());
}
-
- /**
- * Test that add and remove queue and dequeue data as expected
- * and all measurements from the queue are returning data
- */
- @Test
- @Repeat(iterations = 3)
- public void testAddAndRemove() {
- ThroughputQueue<Integer> queue = new ThroughputQueue<>();
- int putCount = randomIntBetween(1, 1000);
- for(int i=0; i < putCount; ++i) {
- queue.add(i);
- assertEquals(i+1, queue.size());
- assertEquals(queue.size(), queue.getCurrentSize());
- }
- safeSleep(100); //ensure measurable wait time
- int takeCount = randomIntBetween(1, putCount);
- for(int i=0; i < takeCount; ++i) {
- Integer element = queue.remove();
- assertNotNull(element);
- assertEquals(i, element.intValue());
- assertEquals(putCount - (1+i), queue.size());
- assertEquals(queue.size(), queue.getCurrentSize());
- }
- assertEquals(putCount-takeCount, queue.size());
- assertEquals(queue.size(), queue.getCurrentSize());
- assertTrue(0.0 < queue.getMaxWait());
- assertTrue(0.0 < queue.getAvgWait());
- assertTrue(0.0 < queue.getThroughput());
- assertEquals(putCount, queue.getAdded());
- assertEquals(takeCount, queue.getRemoved());
+ safeSleep(100); //ensure measurable wait time
+ int takeCount = randomIntBetween(1, putCount);
+ for(int i=0; i < takeCount; ++i) {
+ Integer element = queue.take();
+ assertNotNull(element);
+ assertEquals(i, element.intValue());
+ assertEquals(putCount - (1+i), queue.size());
+ assertEquals(queue.size(), queue.getCurrentSize());
}
-
- /**
- * Test that offer and poll queue and dequeue data as expected
- * and all measurements from the queue are returning data
- */
- @Test
- @Repeat(iterations = 3)
- public void testOfferAndPoll() {
- ThroughputQueue<Integer> queue = new ThroughputQueue<>();
- int putCount = randomIntBetween(1, 1000);
- for(int i=0; i < putCount; ++i) {
- queue.offer(i);
- assertEquals(i+1, queue.size());
- assertEquals(queue.size(), queue.getCurrentSize());
- }
- safeSleep(100); //ensure measurable wait time
- int takeCount = randomIntBetween(1, putCount);
- for(int i=0; i < takeCount; ++i) {
- Integer element = queue.poll();
- assertNotNull(element);
- assertEquals(i, element.intValue());
- assertEquals(putCount - (1+i), queue.size());
- assertEquals(queue.size(), queue.getCurrentSize());
- }
- assertEquals(putCount-takeCount, queue.size());
- assertEquals(queue.size(), queue.getCurrentSize());
- assertTrue(0.0 < queue.getMaxWait());
- assertTrue(0.0 < queue.getAvgWait());
- assertTrue(0.0 < queue.getThroughput());
- assertEquals(putCount, queue.getAdded());
- assertEquals(takeCount, queue.getRemoved());
+ assertEquals(putCount-takeCount, queue.size());
+ assertEquals(queue.size(), queue.getCurrentSize());
+ assertTrue(0.0 < queue.getMaxWait());
+ assertTrue(0.0 < queue.getAvgWait());
+ assertTrue(0.0 < queue.getThroughput());
+ assertEquals(putCount, queue.getAdded());
+ assertEquals(takeCount, queue.getRemoved());
+ }
+
+ /**
+ * Test that add and remove queue and dequeue data as expected
+ * and all measurements from the queue are returning data
+ */
+ @Test
+ @Repeat(iterations = 3)
+ public void testAddAndRemove() {
+ ThroughputQueue<Integer> queue = new ThroughputQueue<>();
+ int putCount = randomIntBetween(1, 1000);
+ for(int i=0; i < putCount; ++i) {
+ queue.add(i);
+ assertEquals(i+1, queue.size());
+ assertEquals(queue.size(), queue.getCurrentSize());
}
-
-
-
- /**
- * Test that max wait and avg wait return expected values
- * @throws Exception
- */
- @Test
- public void testWait() throws Exception {
- ThroughputQueue queue = new ThroughputQueue();
- int wait = 1000;
-
- for(int i=0; i < 3; ++i) {
- queue.put(1);
- safeSleep(wait);
- queue.take();
- assertTrue(queue.getMaxWait() >= wait && queue.getMaxWait() <= (wait * 2));//can't calculate exactly, making sure its close.
- assertTrue(queue.getAvgWait() >= wait && queue.getAvgWait() <= (wait * 2));
- }
- queue.put(1);
- queue.take();
- assertTrue(queue.getMaxWait() >= wait && queue.getMaxWait() <= (wait * 2));//can't calculate exactly, making sure its close.
- assertTrue(queue.getAvgWait() <= 5000 );
- assertTrue(queue.getAvgWait() >= 500);
+ safeSleep(100); //ensure measurable wait time
+ int takeCount = randomIntBetween(1, putCount);
+ for(int i=0; i < takeCount; ++i) {
+ Integer element = queue.remove();
+ assertNotNull(element);
+ assertEquals(i, element.intValue());
+ assertEquals(putCount - (1+i), queue.size());
+ assertEquals(queue.size(), queue.getCurrentSize());
}
-
- /**
- * Test that throughput returns expected values.
- * @throws Exception
- */
- @Test
- public void testThroughput() throws Exception {
- ThroughputQueue queue = new ThroughputQueue();
- int wait = 100;
- for(int i=0; i < 10; ++i) {
- queue.put(1);
- safeSleep(wait);
- queue.take();
- }
- double throughput = queue.getThroughput();
- assertTrue(throughput <= 15 ); //can't calculate exactly, making sure its close.
- assertTrue(throughput >= 5);
-
- queue = new ThroughputQueue();
- wait = 1000;
- for(int i=0; i < 10; ++i) {
- queue.put(1);
- }
- for(int i=0; i < 10; ++i) {
- queue.take();
- }
- safeSleep(wait);
- throughput = queue.getThroughput();
- assertTrue(throughput <= 15 ); //can't calculate exactly, making sure its close.
- assertTrue(throughput >= 5);
+ assertEquals(putCount-takeCount, queue.size());
+ assertEquals(queue.size(), queue.getCurrentSize());
+ assertTrue(0.0 < queue.getMaxWait());
+ assertTrue(0.0 < queue.getAvgWait());
+ assertTrue(0.0 < queue.getThroughput());
+ assertEquals(putCount, queue.getAdded());
+ assertEquals(takeCount, queue.getRemoved());
+ }
+
+ /**
+ * Test that offer and poll queue and dequeue data as expected
+ * and all measurements from the queue are returning data
+ */
+ @Test
+ @Repeat(iterations = 3)
+ public void testOfferAndPoll() {
+ ThroughputQueue<Integer> queue = new ThroughputQueue<>();
+ int putCount = randomIntBetween(1, 1000);
+ for(int i=0; i < putCount; ++i) {
+ queue.offer(i);
+ assertEquals(i+1, queue.size());
+ assertEquals(queue.size(), queue.getCurrentSize());
}
-
-
- /**
- * Test that the mbean registers
- */
- @Test
- public void testMBeanRegistration() {
- try {
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- Integer beanCount = mbs.getMBeanCount();
- ThroughputQueue queue = new ThroughputQueue(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
- assertEquals("Expected bean to be registered", new Integer(beanCount+1), mbs.getMBeanCount());
- ObjectInstance mBean = mbs.getObjectInstance(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, MBEAN_ID, STREAM_ID, STREAM_START_TIME)));
- assertNotNull(mBean);
- } catch (Exception e) {
- fail("Failed to register MXBean : "+e.getMessage());
- }
+ safeSleep(100); //ensure measurable wait time
+ int takeCount = randomIntBetween(1, putCount);
+ for(int i=0; i < takeCount; ++i) {
+ Integer element = queue.poll();
+ assertNotNull(element);
+ assertEquals(i, element.intValue());
+ assertEquals(putCount - (1+i), queue.size());
+ assertEquals(queue.size(), queue.getCurrentSize());
}
-
- /**
- * Test that mulitple mbeans of the same type with a different name can be registered
- */
- @Test
- public void testMultipleMBeanRegistrations() {
- try {
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- Integer beanCount = mbs.getMBeanCount();
- int numReg = randomIntBetween(2, 100);
- for(int i=0; i < numReg; ++i) {
- ThroughputQueue queue = new ThroughputQueue(MBEAN_ID + "" + i, STREAM_ID, STREAM_START_TIME);
- assertEquals("Expected bean to be registered", new Integer(beanCount + (i+1)), mbs.getMBeanCount());
- ObjectInstance mBean = mbs.getObjectInstance(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, MBEAN_ID + "" + i, STREAM_ID, STREAM_START_TIME)));
- assertNotNull(mBean);
- }
- } catch (Exception e) {
- fail("Failed to register MXBean : "+e.getMessage());
- }
+ assertEquals(putCount-takeCount, queue.size());
+ assertEquals(queue.size(), queue.getCurrentSize());
+ assertTrue(0.0 < queue.getMaxWait());
+ assertTrue(0.0 < queue.getAvgWait());
+ assertTrue(0.0 < queue.getThroughput());
+ assertEquals(putCount, queue.getAdded());
+ assertEquals(takeCount, queue.getRemoved());
+ }
+
+
+
+ /**
+ * Test that max wait and avg wait return expected values
+ * @throws Exception
+ */
+ @Test
+ public void testWait() throws Exception {
+ ThroughputQueue queue = new ThroughputQueue();
+ int wait = 1000;
+
+ for(int i=0; i < 3; ++i) {
+ queue.put(1);
+ safeSleep(wait);
+ queue.take();
+ assertTrue(queue.getMaxWait() >= wait && queue.getMaxWait() <= (wait * 2));//can't calculate exactly, making sure its close.
+ assertTrue(queue.getAvgWait() >= wait && queue.getAvgWait() <= (wait * 2));
+ }
+ queue.put(1);
+ queue.take();
+ assertTrue(queue.getMaxWait() >= wait && queue.getMaxWait() <= (wait * 2));//can't calculate exactly, making sure its close.
+ assertTrue(queue.getAvgWait() <= 5000 );
+ assertTrue(queue.getAvgWait() >= 500);
+ }
+
+ /**
+ * Test that throughput returns expected values.
+ * @throws Exception
+ */
+ @Test
+ public void testThroughput() throws Exception {
+ ThroughputQueue queue = new ThroughputQueue();
+ int wait = 100;
+ for(int i=0; i < 10; ++i) {
+ queue.put(1);
+ safeSleep(wait);
+ queue.take();
+ }
+ double throughput = queue.getThroughput();
+ assertTrue(throughput <= 15 ); //can't calculate exactly, making sure its close.
+ assertTrue(throughput >= 5);
+
+ queue = new ThroughputQueue();
+ wait = 1000;
+ for(int i=0; i < 10; ++i) {
+ queue.put(1);
+ }
+ for(int i=0; i < 10; ++i) {
+ queue.take();
+ }
+ safeSleep(wait);
+ throughput = queue.getThroughput();
+ assertTrue(throughput <= 15 ); //can't calculate exactly, making sure its close.
+ assertTrue(throughput >= 5);
+ }
+
+
+ /**
+ * Test that the mbean registers
+ */
+ @Test
+ public void testMBeanRegistration() {
+ try {
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ Integer beanCount = mbs.getMBeanCount();
+ ThroughputQueue queue = new ThroughputQueue(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
+ assertEquals("Expected bean to be registered", new Integer(beanCount+1), mbs.getMBeanCount());
+ ObjectInstance mBean = mbs.getObjectInstance(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, MBEAN_ID, STREAM_ID, STREAM_START_TIME)));
+ assertNotNull(mBean);
+ } catch (Exception e) {
+ fail("Failed to register MXBean : "+e.getMessage());
+ }
+ }
+
+ /**
+ * Test that mulitple mbeans of the same type with a different name can be registered
+ */
+ @Test
+ public void testMultipleMBeanRegistrations() {
+ try {
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ Integer beanCount = mbs.getMBeanCount();
+ int numReg = randomIntBetween(2, 100);
+ for(int i=0; i < numReg; ++i) {
+ ThroughputQueue queue = new ThroughputQueue(MBEAN_ID + "" + i, STREAM_ID, STREAM_START_TIME);
+ assertEquals("Expected bean to be registered", new Integer(beanCount + (i+1)), mbs.getMBeanCount());
+ ObjectInstance mBean = mbs.getObjectInstance(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, MBEAN_ID + "" + i, STREAM_ID, STREAM_START_TIME)));
+ assertNotNull(mBean);
+ }
+ } catch (Exception e) {
+ fail("Failed to register MXBean : "+e.getMessage());
}
+ }
- private void safeSleep(long sleep) {
- try {
- Thread.sleep(sleep);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- }
+ private void safeSleep(long sleep) {
+ try {
+ Thread.sleep(sleep);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
}
+ }
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java
index 38e948e..2a67550 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java
@@ -18,25 +18,28 @@
package org.apache.streams.local.tasks;
-import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.local.counters.DatumStatusCounter;
import org.apache.streams.local.counters.StreamsTaskCounter;
-import org.apache.streams.local.queues.ThroughputQueue;
import org.apache.streams.local.test.processors.PassthroughDatumCounterProcessor;
import org.apache.streams.local.test.providers.NumericMessageProvider;
import org.apache.streams.local.test.writer.DatumCounterWriter;
import org.apache.streams.util.ComponentUtils;
+
+import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.After;
import org.junit.Test;
-import javax.management.InstanceNotFoundException;
-import javax.management.ObjectName;
-import java.lang.management.ManagementFactory;
-import java.util.Queue;
-import java.util.concurrent.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
/**
*
@@ -44,264 +47,264 @@ import static org.junit.Assert.*;
public class BasicTasksTest {
- private static final String MBEAN_ID = "test_bean";
- @After
- public void removeLocalMBeans() {
- try {
- ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local");
- } catch (Exception e) {
- //No op. proceed to next test
- }
+ private static final String MBEAN_ID = "test_bean";
+ @After
+ public void removeLocalMBeans() {
+ try {
+ ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local");
+ } catch (Exception e) {
+ //No op. proceed to next test
}
+ }
- @Test
- public void testProviderTask() {
- int numMessages = 100;
- NumericMessageProvider provider = new NumericMessageProvider(numMessages);
- StreamsProviderTask task = new StreamsProviderTask(provider, false, null);
- BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>();
- task.addOutputQueue(outQueue);
- //Test that adding input queues to providers is not valid
- BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages);
- Exception exp = null;
- try {
- task.addInputQueue(inQueue);
- } catch (UnsupportedOperationException uoe) {
- exp = uoe;
- }
- assertNotNull(exp);
+ @Test
+ public void testProviderTask() {
+ int numMessages = 100;
+ NumericMessageProvider provider = new NumericMessageProvider(numMessages);
+ StreamsProviderTask task = new StreamsProviderTask(provider, false, null);
+ BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>();
+ task.addOutputQueue(outQueue);
+ //Test that adding input queues to providers is not valid
+ BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages);
+ Exception exp = null;
+ try {
+ task.addInputQueue(inQueue);
+ } catch (UnsupportedOperationException uoe) {
+ exp = uoe;
+ }
+ assertNotNull(exp);
- ExecutorService service = Executors.newFixedThreadPool(1);
- service.submit(task);
- int attempts = 0;
- while(outQueue.size() != numMessages) {
- Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
- if(attempts == 10) {
- fail("Provider task failed to output "+numMessages+" in a timely fashion.");
- }
- }
- service.shutdown();
- try {
- if(!service.awaitTermination(10, TimeUnit.SECONDS)){
- service.shutdownNow();
- fail("Service did not terminate.");
- }
- assertTrue("Task should have completed running in allotted time.", service.isTerminated());
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
+ ExecutorService service = Executors.newFixedThreadPool(1);
+ service.submit(task);
+ int attempts = 0;
+ while(outQueue.size() != numMessages) {
+ Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+ if(attempts == 10) {
+ fail("Provider task failed to output "+numMessages+" in a timely fashion.");
+ }
+ }
+ service.shutdown();
+ try {
+ if(!service.awaitTermination(10, TimeUnit.SECONDS)){
+ service.shutdownNow();
+ fail("Service did not terminate.");
+ }
+ assertTrue("Task should have completed running in allotted time.", service.isTerminated());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
}
+ }
- @Test
- public void testProcessorTask() {
- int numMessages = 100;
- PassthroughDatumCounterProcessor processor = new PassthroughDatumCounterProcessor("");
- StreamsProcessorTask task = new StreamsProcessorTask(processor);
- StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, null, -1);
- task.setStreamsTaskCounter(counter);
- BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>();
- BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages);
- task.addOutputQueue(outQueue);
- task.addInputQueue(inQueue);
- assertEquals(numMessages, task.getInputQueues().get(0).size());
- ExecutorService service = Executors.newFixedThreadPool(1);
- service.submit(task);
- int attempts = 0;
- while(inQueue.size() != 0 && outQueue.size() != numMessages) {
- Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
- ++attempts;
- if(attempts == 10) {
- fail("Processor task failed to output "+numMessages+" in a timely fashion.");
- }
- }
- task.stopTask();;
- service.shutdown();
- try {
- if(!service.awaitTermination(5, TimeUnit.SECONDS)){
- service.shutdownNow();
- fail("Service did not terminate.");
- }
- assertTrue("Task should have completed running in allotted time.", service.isTerminated());
- } catch (InterruptedException e) {
- fail("Test Interrupted.");
- }
- assertEquals(numMessages, processor.getMessageCount());
- assertEquals(numMessages, counter.getNumReceived());
- assertEquals(numMessages, counter.getNumEmitted());
- assertEquals(0, counter.getNumUnhandledErrors());
- assertEquals(0.0, counter.getErrorRate(), 0.0);
+ @Test
+ public void testProcessorTask() {
+ int numMessages = 100;
+ PassthroughDatumCounterProcessor processor = new PassthroughDatumCounterProcessor("");
+ StreamsProcessorTask task = new StreamsProcessorTask(processor);
+ StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, null, -1);
+ task.setStreamsTaskCounter(counter);
+ BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>();
+ BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages);
+ task.addOutputQueue(outQueue);
+ task.addInputQueue(inQueue);
+ assertEquals(numMessages, task.getInputQueues().get(0).size());
+ ExecutorService service = Executors.newFixedThreadPool(1);
+ service.submit(task);
+ int attempts = 0;
+ while(inQueue.size() != 0 && outQueue.size() != numMessages) {
+ Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+ ++attempts;
+ if(attempts == 10) {
+ fail("Processor task failed to output "+numMessages+" in a timely fashion.");
+ }
+ }
+ task.stopTask();;
+ service.shutdown();
+ try {
+ if(!service.awaitTermination(5, TimeUnit.SECONDS)){
+ service.shutdownNow();
+ fail("Service did not terminate.");
+ }
+ assertTrue("Task should have completed running in allotted time.", service.isTerminated());
+ } catch (InterruptedException e) {
+ fail("Test Interrupted.");
}
+ assertEquals(numMessages, processor.getMessageCount());
+ assertEquals(numMessages, counter.getNumReceived());
+ assertEquals(numMessages, counter.getNumEmitted());
+ assertEquals(0, counter.getNumUnhandledErrors());
+ assertEquals(0.0, counter.getErrorRate(), 0.0);
+ }
- @Test
- public void testWriterTask() {
- int numMessages = 100;
- DatumCounterWriter writer = new DatumCounterWriter("");
- StreamsPersistWriterTask task = new StreamsPersistWriterTask(writer);
- StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, null, -1);
- task.setStreamsTaskCounter(counter);
- BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>();
- BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages);
+ @Test
+ public void testWriterTask() {
+ int numMessages = 100;
+ DatumCounterWriter writer = new DatumCounterWriter("");
+ StreamsPersistWriterTask task = new StreamsPersistWriterTask(writer);
+ StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, null, -1);
+ task.setStreamsTaskCounter(counter);
+ BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>();
+ BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages);
- Exception exp = null;
- try {
- task.addOutputQueue(outQueue);
- } catch (UnsupportedOperationException uoe) {
- exp = uoe;
- }
- assertNotNull(exp);
- task.addInputQueue(inQueue);
- assertEquals(numMessages, task.getInputQueues().get(0).size());
- ExecutorService service = Executors.newFixedThreadPool(1);
- service.submit(task);
- int attempts = 0;
- while(inQueue.size() != 0 ) {
- Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
- ++attempts;
- if(attempts == 10) {
- fail("Processor task failed to output "+numMessages+" in a timely fashion.");
- }
- }
- task.stopTask();
- service.shutdown();
- try {
- if(!service.awaitTermination(15, TimeUnit.SECONDS)){
- service.shutdownNow();
- fail("Service did not terminate.");
- }
- assertTrue("Task should have completed running in allotted time.", service.isTerminated());
- } catch (InterruptedException e) {
- fail("Test Interrupted.");
- }
- assertEquals(numMessages, writer.getDatumsCounted());
- assertEquals(numMessages, counter.getNumReceived());
- assertEquals(0, counter.getNumEmitted());
- assertEquals(0, counter.getNumUnhandledErrors());
- assertEquals(0.0, counter.getErrorRate(), 0.0);
+ Exception exp = null;
+ try {
+ task.addOutputQueue(outQueue);
+ } catch (UnsupportedOperationException uoe) {
+ exp = uoe;
}
+ assertNotNull(exp);
+ task.addInputQueue(inQueue);
+ assertEquals(numMessages, task.getInputQueues().get(0).size());
+ ExecutorService service = Executors.newFixedThreadPool(1);
+ service.submit(task);
+ int attempts = 0;
+ while(inQueue.size() != 0 ) {
+ Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+ ++attempts;
+ if(attempts == 10) {
+ fail("Processor task failed to output "+numMessages+" in a timely fashion.");
+ }
+ }
+ task.stopTask();
+ service.shutdown();
+ try {
+ if(!service.awaitTermination(15, TimeUnit.SECONDS)){
+ service.shutdownNow();
+ fail("Service did not terminate.");
+ }
+ assertTrue("Task should have completed running in allotted time.", service.isTerminated());
+ } catch (InterruptedException e) {
+ fail("Test Interrupted.");
+ }
+ assertEquals(numMessages, writer.getDatumsCounted());
+ assertEquals(numMessages, counter.getNumReceived());
+ assertEquals(0, counter.getNumEmitted());
+ assertEquals(0, counter.getNumUnhandledErrors());
+ assertEquals(0.0, counter.getErrorRate(), 0.0);
+ }
- @Test
- public void testMergeTask() {
- int numMessages = 100;
- int incoming = 5;
- StreamsMergeTask task = new StreamsMergeTask();
- BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>();
- task.addOutputQueue(outQueue);
- for(int i=0; i < incoming; ++i) {
- task.addInputQueue(createInputQueue(numMessages));
- }
- ExecutorService service = Executors.newFixedThreadPool(1);
- service.submit(task);
- int attempts = 0;
- while(outQueue.size() != incoming * numMessages ) {
- Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
- ++attempts;
- if(attempts == 10) {
- assertEquals("Processor task failed to output " + (numMessages * incoming) + " in a timely fashion.", (numMessages * incoming), outQueue.size());
- }
- }
- task.stopTask();
- service.shutdown();
- try {
- if(!service.awaitTermination(5, TimeUnit.SECONDS)){
- service.shutdownNow();
- fail("Service did not terminate.");
- }
- assertTrue("Task should have completed running in allotted time.", service.isTerminated());
- } catch (InterruptedException e) {
- fail("Test Interrupted.");
- }
+ @Test
+ public void testMergeTask() {
+ int numMessages = 100;
+ int incoming = 5;
+ StreamsMergeTask task = new StreamsMergeTask();
+ BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>();
+ task.addOutputQueue(outQueue);
+ for(int i=0; i < incoming; ++i) {
+ task.addInputQueue(createInputQueue(numMessages));
+ }
+ ExecutorService service = Executors.newFixedThreadPool(1);
+ service.submit(task);
+ int attempts = 0;
+ while(outQueue.size() != incoming * numMessages ) {
+ Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+ ++attempts;
+ if(attempts == 10) {
+ assertEquals("Processor task failed to output " + (numMessages * incoming) + " in a timely fashion.", (numMessages * incoming), outQueue.size());
+ }
}
+ task.stopTask();
+ service.shutdown();
+ try {
+ if(!service.awaitTermination(5, TimeUnit.SECONDS)){
+ service.shutdownNow();
+ fail("Service did not terminate.");
+ }
+ assertTrue("Task should have completed running in allotted time.", service.isTerminated());
+ } catch (InterruptedException e) {
+ fail("Test Interrupted.");
+ }
+ }
- @Test
- public void testBranching() {
- int numMessages = 100;
- PassthroughDatumCounterProcessor processor = new PassthroughDatumCounterProcessor("");
- StreamsProcessorTask task = new StreamsProcessorTask(processor);
- BlockingQueue<StreamsDatum> outQueue1 = new LinkedBlockingQueue<>();
- BlockingQueue<StreamsDatum> outQueue2 = new LinkedBlockingQueue<>();
- BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages);
- task.addOutputQueue(outQueue1);
- task.addOutputQueue(outQueue2);
- task.addInputQueue(inQueue);
- assertEquals(numMessages, task.getInputQueues().get(0).size());
- ExecutorService service = Executors.newFixedThreadPool(1);
- service.submit(task);
- int attempts = 0;
- while(inQueue.size() != 0 ) {
- Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
- ++attempts;
- if(attempts == 10) {
- assertEquals("Processor task failed to output "+(numMessages)+" in a timely fashion.", 0, inQueue.size());
- }
- }
- task.stopTask();
+ @Test
+ public void testBranching() {
+ int numMessages = 100;
+ PassthroughDatumCounterProcessor processor = new PassthroughDatumCounterProcessor("");
+ StreamsProcessorTask task = new StreamsProcessorTask(processor);
+ BlockingQueue<StreamsDatum> outQueue1 = new LinkedBlockingQueue<>();
+ BlockingQueue<StreamsDatum> outQueue2 = new LinkedBlockingQueue<>();
+ BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages);
+ task.addOutputQueue(outQueue1);
+ task.addOutputQueue(outQueue2);
+ task.addInputQueue(inQueue);
+ assertEquals(numMessages, task.getInputQueues().get(0).size());
+ ExecutorService service = Executors.newFixedThreadPool(1);
+ service.submit(task);
+ int attempts = 0;
+ while(inQueue.size() != 0 ) {
+ Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+ ++attempts;
+ if(attempts == 10) {
+ assertEquals("Processor task failed to output "+(numMessages)+" in a timely fashion.", 0, inQueue.size());
+ }
+ }
+ task.stopTask();
- service.shutdown();
- try {
- if(!service.awaitTermination(5, TimeUnit.SECONDS)){
- service.shutdownNow();
- fail("Service did not terminate.");
- }
- assertTrue("Task should have completed running in allotted time.", service.isTerminated());
- } catch (InterruptedException e) {
- fail("Test Interrupted.");
- }
- assertEquals(numMessages, processor.getMessageCount());
- assertEquals(numMessages, outQueue1.size());
- assertEquals(numMessages, outQueue2.size());
+ service.shutdown();
+ try {
+ if(!service.awaitTermination(5, TimeUnit.SECONDS)){
+ service.shutdownNow();
+ fail("Service did not terminate.");
+ }
+ assertTrue("Task should have completed running in allotted time.", service.isTerminated());
+ } catch (InterruptedException e) {
+ fail("Test Interrupted.");
}
+ assertEquals(numMessages, processor.getMessageCount());
+ assertEquals(numMessages, outQueue1.size());
+ assertEquals(numMessages, outQueue2.size());
+ }
- @Test
- public void testBranchingSerialization() {
- int numMessages = 1;
- PassthroughDatumCounterProcessor processor = new PassthroughDatumCounterProcessor("");
- StreamsProcessorTask task = new StreamsProcessorTask(processor);
- BlockingQueue<StreamsDatum> outQueue1 = new LinkedBlockingQueue<>();
- BlockingQueue<StreamsDatum> outQueue2 = new LinkedBlockingQueue<>();
- BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages);
- task.addOutputQueue(outQueue1);
- task.addOutputQueue(outQueue2);
- task.addInputQueue(inQueue);
- ExecutorService service = Executors.newFixedThreadPool(1);
- service.submit(task);
- int attempts = 0;
- while(inQueue.size() != 0 ) {
- Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
- ++attempts;
- if(attempts == 10) {
- assertEquals("Processor task failed to output "+(numMessages)+" in a timely fashion.", 0, inQueue.size());
- }
- }
- task.stopTask();
+ @Test
+ public void testBranchingSerialization() {
+ int numMessages = 1;
+ PassthroughDatumCounterProcessor processor = new PassthroughDatumCounterProcessor("");
+ StreamsProcessorTask task = new StreamsProcessorTask(processor);
+ BlockingQueue<StreamsDatum> outQueue1 = new LinkedBlockingQueue<>();
+ BlockingQueue<StreamsDatum> outQueue2 = new LinkedBlockingQueue<>();
+ BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages);
+ task.addOutputQueue(outQueue1);
+ task.addOutputQueue(outQueue2);
+ task.addInputQueue(inQueue);
+ ExecutorService service = Executors.newFixedThreadPool(1);
+ service.submit(task);
+ int attempts = 0;
+ while(inQueue.size() != 0 ) {
+ Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+ ++attempts;
+ if(attempts == 10) {
+ assertEquals("Processor task failed to output "+(numMessages)+" in a timely fashion.", 0, inQueue.size());
+ }
+ }
+ task.stopTask();
- service.shutdown();
- try {
- if(!service.awaitTermination(5, TimeUnit.SECONDS)){
- service.shutdownNow();
- fail("Service did not terminate.");
- }
- assertTrue("Task should have completed running in allotted time.", service.isTerminated());
- } catch (InterruptedException e) {
- fail("Test Interrupted.");
- }
- assertEquals(numMessages, processor.getMessageCount());
- assertEquals(numMessages, outQueue1.size());
- assertEquals(numMessages, outQueue2.size());
- StreamsDatum datum1 = outQueue1.poll();
- StreamsDatum datum2 = outQueue2.poll();
- assertNotNull(datum1);
- assertEquals(datum1, datum2);
- datum1.setDocument("a");
- assertNotEquals(datum1, datum2);
+ service.shutdown();
+ try {
+ if(!service.awaitTermination(5, TimeUnit.SECONDS)){
+ service.shutdownNow();
+ fail("Service did not terminate.");
+ }
+ assertTrue("Task should have completed running in allotted time.", service.isTerminated());
+ } catch (InterruptedException e) {
+ fail("Test Interrupted.");
}
+ assertEquals(numMessages, processor.getMessageCount());
+ assertEquals(numMessages, outQueue1.size());
+ assertEquals(numMessages, outQueue2.size());
+ StreamsDatum datum1 = outQueue1.poll();
+ StreamsDatum datum2 = outQueue2.poll();
+ assertNotNull(datum1);
+ assertEquals(datum1, datum2);
+ datum1.setDocument("a");
+ assertNotEquals(datum1, datum2);
+ }
- private BlockingQueue<StreamsDatum> createInputQueue(int numDatums) {
- BlockingQueue<StreamsDatum> queue = new LinkedBlockingQueue<>();
- for(int i=0; i < numDatums; ++i) {
- queue.add(new StreamsDatum(i));
- }
- return queue;
+ private BlockingQueue<StreamsDatum> createInputQueue(int numDatums) {
+ BlockingQueue<StreamsDatum> queue = new LinkedBlockingQueue<>();
+ for(int i=0; i < numDatums; ++i) {
+ queue.add(new StreamsDatum(i));
}
+ return queue;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/StreamsProviderTaskTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/StreamsProviderTaskTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/StreamsProviderTaskTest.java
index 222566d..782e232 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/StreamsProviderTaskTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/StreamsProviderTaskTest.java
@@ -23,132 +23,142 @@ import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
import org.apache.streams.core.StreamsResultSet;
import org.apache.streams.util.ComponentUtils;
+
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.Queue;
-import java.util.concurrent.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.atMost;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
/**
* Tests the StreamsProviderTask.
*/
public class StreamsProviderTaskTest {
- protected StreamsProvider mockProvider;
- protected ExecutorService pool;
-
- @Before
- public void setup() {
- mockProvider = mock(StreamsProvider.class);
- pool = Executors.newFixedThreadPool(1);
- }
-
- @After
- public void removeLocalMBeans() {
- try {
- ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local");
- } catch (Exception e) {
- //No op. proceed to next test
- }
- }
-
- @Test
- public void runPerpetual() {
- StreamsProviderTask task = new StreamsProviderTask(mockProvider, true, null);
- when(mockProvider.isRunning()).thenReturn(true);
- when(mockProvider.readCurrent()).thenReturn(new StreamsResultSet(new LinkedBlockingQueue<StreamsDatum>()));
- task.setTimeout(500);
- task.setSleepTime(10);
- task.run();
- //Setting this to at least 2 means that it was correctly set to perpetual mode
- verify(mockProvider, atLeast(2)).readCurrent();
- verify(mockProvider, atMost(1)).prepare(null);
- }
-
- @Test
- public void flushes() {
- BlockingQueue<StreamsDatum> out = new LinkedBlockingQueue<>();
- StreamsProviderTask task = new StreamsProviderTask(mockProvider, true, null);
- when(mockProvider.isRunning()).thenReturn(true);
- when(mockProvider.readCurrent()).thenReturn(new StreamsResultSet(getQueue(3)));
- task.setTimeout(100);
- task.setSleepTime(10);
- task.getOutputQueues().add(out);
- task.run();
- assertThat(out.size(), is(equalTo(3)));
+ protected StreamsProvider mockProvider;
+ protected ExecutorService pool;
+
+ @Before
+ public void setup() {
+ mockProvider = mock(StreamsProvider.class);
+ pool = Executors.newFixedThreadPool(1);
+ }
+
+ @After
+ public void removeLocalMBeans() {
+ try {
+ ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local");
+ } catch (Exception e) {
+ //No op. proceed to next test
}
-
- protected Queue<StreamsDatum> getQueue(int numElems) {
- Queue<StreamsDatum> results = new LinkedBlockingQueue<>();
- for(int i=0; i<numElems; i++) {
- results.add(new StreamsDatum(Math.random()));
- }
- return results;
- }
-
- @Test
- public void runNonPerpetual() {
- StreamsProviderTask task = new StreamsProviderTask(mockProvider, false, null);
- when(mockProvider.isRunning()).thenReturn(true);
- when(mockProvider.readCurrent()).thenReturn(new StreamsResultSet(new LinkedBlockingQueue<StreamsDatum>()));
- task.setTimeout(500);
- task.setSleepTime(10);
- task.run();
- //In read current mode, this should only be called 1 time
- verify(mockProvider, atLeast(1)).readCurrent();
- verify(mockProvider, atMost(1)).prepare(null);
- }
-
- @Test
- public void stoppable() throws InterruptedException {
- StreamsProviderTask task = new StreamsProviderTask(mockProvider, true, null);
- when(mockProvider.isRunning()).thenReturn(true);
- when(mockProvider.readCurrent()).thenReturn(new StreamsResultSet(new LinkedBlockingQueue<StreamsDatum>()));
- task.setTimeout(-1);
- task.setSleepTime(10);
- Future<?> taskResult = pool.submit(task);
-
- //After a few milliseconds, tell the task that it is to stop and wait until it says it isn't or a timeout happens
- int count = 0;
- do {
- Thread.sleep(100);
- if(count == 0) {
- task.stopTask();
- }
- } while(++count < 10 && !taskResult.isDone());
- verifyNotRunning(task, taskResult);
-
+ }
+
+ @Test
+ public void runPerpetual() {
+ StreamsProviderTask task = new StreamsProviderTask(mockProvider, true, null);
+ when(mockProvider.isRunning()).thenReturn(true);
+ when(mockProvider.readCurrent()).thenReturn(new StreamsResultSet(new LinkedBlockingQueue<StreamsDatum>()));
+ task.setTimeout(500);
+ task.setSleepTime(10);
+ task.run();
+ //Setting this to at least 2 means that it was correctly set to perpetual mode
+ verify(mockProvider, atLeast(2)).readCurrent();
+ verify(mockProvider, atMost(1)).prepare(null);
+ }
+
+ @Test
+ public void flushes() {
+ BlockingQueue<StreamsDatum> out = new LinkedBlockingQueue<>();
+ StreamsProviderTask task = new StreamsProviderTask(mockProvider, true, null);
+ when(mockProvider.isRunning()).thenReturn(true);
+ when(mockProvider.readCurrent()).thenReturn(new StreamsResultSet(getQueue(3)));
+ task.setTimeout(100);
+ task.setSleepTime(10);
+ task.getOutputQueues().add(out);
+ task.run();
+ assertThat(out.size(), is(equalTo(3)));
+ }
+
+ protected Queue<StreamsDatum> getQueue(int numElems) {
+ Queue<StreamsDatum> results = new LinkedBlockingQueue<>();
+ for(int i=0; i<numElems; i++) {
+ results.add(new StreamsDatum(Math.random()));
}
-
- @Test
- public void earlyException() throws InterruptedException {
- StreamsProviderTask task = new StreamsProviderTask(mockProvider, true, null);
- when(mockProvider.isRunning()).thenReturn(true);
- doThrow(new RuntimeException()).when(mockProvider).prepare(null);
- task.setTimeout(-1);
- task.setSleepTime(10);
- Future<?> taskResult = pool.submit(task);
- int count = 0;
- while(++count < 10 && !taskResult.isDone()) {
- Thread.sleep(100);
- }
- verifyNotRunning(task, taskResult);
+ return results;
+ }
+
+ @Test
+ public void runNonPerpetual() {
+ StreamsProviderTask task = new StreamsProviderTask(mockProvider, false, null);
+ when(mockProvider.isRunning()).thenReturn(true);
+ when(mockProvider.readCurrent()).thenReturn(new StreamsResultSet(new LinkedBlockingQueue<StreamsDatum>()));
+ task.setTimeout(500);
+ task.setSleepTime(10);
+ task.run();
+ //In read current mode, this should only be called 1 time
+ verify(mockProvider, atLeast(1)).readCurrent();
+ verify(mockProvider, atMost(1)).prepare(null);
+ }
+
+ @Test
+ public void stoppable() throws InterruptedException {
+ StreamsProviderTask task = new StreamsProviderTask(mockProvider, true, null);
+ when(mockProvider.isRunning()).thenReturn(true);
+ when(mockProvider.readCurrent()).thenReturn(new StreamsResultSet(new LinkedBlockingQueue<StreamsDatum>()));
+ task.setTimeout(-1);
+ task.setSleepTime(10);
+ Future<?> taskResult = pool.submit(task);
+
+ //After a few milliseconds, tell the task that it is to stop and wait until it says it isn't or a timeout happens
+ int count = 0;
+ do {
+ Thread.sleep(100);
+ if(count == 0) {
+ task.stopTask();
+ }
+ } while(++count < 10 && !taskResult.isDone());
+ verifyNotRunning(task, taskResult);
+
+ }
+
+ @Test
+ public void earlyException() throws InterruptedException {
+ StreamsProviderTask task = new StreamsProviderTask(mockProvider, true, null);
+ when(mockProvider.isRunning()).thenReturn(true);
+ doThrow(new RuntimeException()).when(mockProvider).prepare(null);
+ task.setTimeout(-1);
+ task.setSleepTime(10);
+ Future<?> taskResult = pool.submit(task);
+ int count = 0;
+ while(++count < 10 && !taskResult.isDone()) {
+ Thread.sleep(100);
}
-
- protected void verifyNotRunning(StreamsProviderTask task, Future<?> taskResult) {
- //Make sure the task is reporting that it is complete and that the run method returned
- if(taskResult.isDone()) {
- assertThat(task.isRunning(), is(false));
- } else {
- ComponentUtils.shutdownExecutor(pool, 0, 10);
- fail();
- }
+ verifyNotRunning(task, taskResult);
+ }
+
+ protected void verifyNotRunning(StreamsProviderTask task, Future<?> taskResult) {
+ //Make sure the task is reporting that it is complete and that the run method returned
+ if(taskResult.isDone()) {
+ assertThat(task.isRunning(), is(false));
+ } else {
+ ComponentUtils.shutdownExecutor(pool, 0, 10);
+ fail();
}
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/DoNothingProcessor.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/DoNothingProcessor.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/DoNothingProcessor.java
index cad7873..31a83ec 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/DoNothingProcessor.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/DoNothingProcessor.java
@@ -20,6 +20,7 @@ package org.apache.streams.local.test.processors;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProcessor;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,34 +32,34 @@ import java.util.List;
*/
public class DoNothingProcessor implements StreamsProcessor {
- private final static Logger LOGGER = LoggerFactory.getLogger(DoNothingProcessor.class);
+ private final static Logger LOGGER = LoggerFactory.getLogger(DoNothingProcessor.class);
- public final static String STREAMS_ID = "DoNothingProcessor";
+ public final static String STREAMS_ID = "DoNothingProcessor";
- List<StreamsDatum> result;
+ List<StreamsDatum> result;
- public DoNothingProcessor() {
- }
+ public DoNothingProcessor() {
+ }
- @Override
- public String getId() {
- return STREAMS_ID;
- }
+ @Override
+ public String getId() {
+ return STREAMS_ID;
+ }
- @Override
- public List<StreamsDatum> process(StreamsDatum entry) {
- this.result = new LinkedList<StreamsDatum>();
- result.add(entry);
- return result;
- }
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+ this.result = new LinkedList<StreamsDatum>();
+ result.add(entry);
+ return result;
+ }
- @Override
- public void prepare(Object configurationObject) {
+ @Override
+ public void prepare(Object configurationObject) {
- }
+ }
- @Override
- public void cleanUp() {
- LOGGER.debug("Processor clean up!");
- }
+ @Override
+ public void cleanUp() {
+ LOGGER.debug("Processor clean up!");
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/PassthroughDatumCounterProcessor.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/PassthroughDatumCounterProcessor.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/PassthroughDatumCounterProcessor.java
index 970a8dc..43343e5 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/PassthroughDatumCounterProcessor.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/PassthroughDatumCounterProcessor.java
@@ -20,10 +20,15 @@ package org.apache.streams.local.test.processors;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProcessor;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.*;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
@@ -32,76 +37,76 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class PassthroughDatumCounterProcessor implements StreamsProcessor {
- private final static Logger LOGGER = LoggerFactory.getLogger(PassthroughDatumCounterProcessor.class);
+ private final static Logger LOGGER = LoggerFactory.getLogger(PassthroughDatumCounterProcessor.class);
- public final static String STREAMS_ID = "PassthroughDatumCounterProcessor";
+ public final static String STREAMS_ID = "PassthroughDatumCounterProcessor";
- /**
- * Set of all ids that have been claimed. Ensures all instances are assigned unique ids
- */
- public static Set<Integer> CLAIMED_ID = new HashSet<Integer>();
- /**
- * Random instance to generate ids
- */
- public static final Random RAND = new Random();
- /**
- * Set of instance ids that received data. Usefully for testing parrallelization is actually working.
- */
- public final static Set<Integer> SEEN_DATA = new HashSet<Integer>();
- /**
- * The total count of data seen by a all instances of a processor.
- */
- public static final ConcurrentHashMap<String, AtomicLong> COUNTS = new ConcurrentHashMap<>();
+ /**
+ * Set of all ids that have been claimed. Ensures all instances are assigned unique ids
+ */
+ public static Set<Integer> CLAIMED_ID = new HashSet<Integer>();
+ /**
+ * Random instance to generate ids
+ */
+ public static final Random RAND = new Random();
+ /**
+ * Set of instance ids that received data. Usefully for testing parrallelization is actually working.
+ */
+ public final static Set<Integer> SEEN_DATA = new HashSet<Integer>();
+ /**
+ * The total count of data seen by a all instances of a processor.
+ */
+ public static final ConcurrentHashMap<String, AtomicLong> COUNTS = new ConcurrentHashMap<>();
- private int count = 0;
- private int id;
- private String procId;
+ private int count = 0;
+ private int id;
+ private String procId;
- public PassthroughDatumCounterProcessor(String procId) {
- this.procId = procId;
- }
+ public PassthroughDatumCounterProcessor(String procId) {
+ this.procId = procId;
+ }
- @Override
- public String getId() {
- return STREAMS_ID;
- }
+ @Override
+ public String getId() {
+ return STREAMS_ID;
+ }
- @Override
- public List<StreamsDatum> process(StreamsDatum entry) {
- ++this.count;
- List<StreamsDatum> result = new LinkedList<StreamsDatum>();
- result.add(entry);
- synchronized (SEEN_DATA) {
- SEEN_DATA.add(this.id);
- }
- return result;
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+ ++this.count;
+ List<StreamsDatum> result = new LinkedList<StreamsDatum>();
+ result.add(entry);
+ synchronized (SEEN_DATA) {
+ SEEN_DATA.add(this.id);
}
+ return result;
+ }
- @Override
- public void prepare(Object configurationObject) {
- synchronized (CLAIMED_ID) {
- this.id = RAND.nextInt();
- while(!CLAIMED_ID.add(this.id)) {
- this.id = RAND.nextInt();
- }
- }
+ @Override
+ public void prepare(Object configurationObject) {
+ synchronized (CLAIMED_ID) {
+ this.id = RAND.nextInt();
+ while(!CLAIMED_ID.add(this.id)) {
+ this.id = RAND.nextInt();
+ }
}
+ }
- @Override
- public void cleanUp() {
- LOGGER.debug("Clean up {}", this.procId);
- synchronized (COUNTS) {
- AtomicLong count = COUNTS.get(this.procId);
- if(count == null) {
- COUNTS.put(this.procId, new AtomicLong(this.count));
- } else {
- count.addAndGet(this.count);
- }
- }
- LOGGER.debug("{}\t{}", this.procId, this.count);
+ @Override
+ public void cleanUp() {
+ LOGGER.debug("Clean up {}", this.procId);
+ synchronized (COUNTS) {
+ AtomicLong count = COUNTS.get(this.procId);
+ if(count == null) {
+ COUNTS.put(this.procId, new AtomicLong(this.count));
+ } else {
+ count.addAndGet(this.count);
+ }
}
+ LOGGER.debug("{}\t{}", this.procId, this.count);
+ }
- public int getMessageCount() {
- return this.count;
- }
+ public int getMessageCount() {
+ return this.count;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/SlowProcessor.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/SlowProcessor.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/SlowProcessor.java
index 2b172cd..227e0f8 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/SlowProcessor.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/SlowProcessor.java
@@ -19,40 +19,41 @@
package org.apache.streams.local.test.processors;
-import com.google.common.collect.Lists;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProcessor;
+import com.google.common.collect.Lists;
+
import java.util.List;
/**
*/
public class SlowProcessor implements StreamsProcessor {
- public final static String STREAMS_ID = "DoNothingProcessor";
+ public final static String STREAMS_ID = "DoNothingProcessor";
- @Override
- public String getId() {
- return STREAMS_ID;
- }
+ @Override
+ public String getId() {
+ return STREAMS_ID;
+ }
- @Override
- public List<StreamsDatum> process(StreamsDatum entry) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- return Lists.newArrayList(entry);
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
}
+ return Lists.newArrayList(entry);
+ }
- @Override
- public void prepare(Object configurationObject) {
+ @Override
+ public void prepare(Object configurationObject) {
- }
+ }
- @Override
- public void cleanUp() {
+ @Override
+ public void cleanUp() {
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java
index bdbc9ec..571c0fc 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java
@@ -19,10 +19,11 @@
package org.apache.streams.local.test.providers;
-import com.google.common.collect.Queues;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
import org.apache.streams.core.StreamsResultSet;
+
+import com.google.common.collect.Queues;
import org.joda.time.DateTime;
import java.math.BigInteger;
@@ -32,43 +33,43 @@ import java.math.BigInteger;
*/
public class EmptyResultSetProvider implements StreamsProvider {
- @Override
- public String getId() {
- return "EmptyResultSetProvider";
- }
+ @Override
+ public String getId() {
+ return "EmptyResultSetProvider";
+ }
- @Override
- public void startStream() {
- //NOP
- }
+ @Override
+ public void startStream() {
+ //NOP
+ }
- @Override
- public StreamsResultSet readCurrent() {
- return new StreamsResultSet(Queues.<StreamsDatum>newLinkedBlockingQueue());
- }
+ @Override
+ public StreamsResultSet readCurrent() {
+ return new StreamsResultSet(Queues.<StreamsDatum>newLinkedBlockingQueue());
+ }
- @Override
- public StreamsResultSet readNew(BigInteger sequence) {
- return new StreamsResultSet(Queues.<StreamsDatum>newLinkedBlockingQueue());
- }
+ @Override
+ public StreamsResultSet readNew(BigInteger sequence) {
+ return new StreamsResultSet(Queues.<StreamsDatum>newLinkedBlockingQueue());
+ }
- @Override
- public StreamsResultSet readRange(DateTime start, DateTime end) {
- return new StreamsResultSet(Queues.<StreamsDatum>newLinkedBlockingQueue());
- }
+ @Override
+ public StreamsResultSet readRange(DateTime start, DateTime end) {
+ return new StreamsResultSet(Queues.<StreamsDatum>newLinkedBlockingQueue());
+ }
- @Override
- public boolean isRunning() {
- return true;
- }
+ @Override
+ public boolean isRunning() {
+ return true;
+ }
- @Override
- public void prepare(Object configurationObject) {
- //NOP
- }
+ @Override
+ public void prepare(Object configurationObject) {
+ //NOP
+ }
- @Override
- public void cleanUp() {
- //NOP
- }
+ @Override
+ public void cleanUp() {
+ //NOP
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java
index d7c1568..88494a8 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java
@@ -18,93 +18,91 @@
package org.apache.streams.local.test.providers;
-import com.google.common.collect.Queues;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
import org.apache.streams.core.StreamsResultSet;
+
+import com.google.common.collect.Queues;
import org.joda.time.DateTime;
import java.math.BigInteger;
-import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
/**
* Test StreamsProvider that sends out StreamsDatums numbered from 0 to numMessages.
*/
public class NumericMessageProvider implements StreamsProvider {
- @Override
- public String getId() {
- return "NumericMessageProvider";
- }
-
- private static final int DEFAULT_BATCH_SIZE = 100;
-
- private int numMessages;
- private BlockingQueue<StreamsDatum> data;
- private volatile boolean complete = false;
-
- public NumericMessageProvider(int numMessages) {
- this.numMessages = numMessages;
- }
-
- @Override
- public void startStream() {
- this.data = constructQueue();
+ @Override
+ public String getId() {
+ return "NumericMessageProvider";
+ }
+
+ private static final int DEFAULT_BATCH_SIZE = 100;
+
+ private int numMessages;
+ private BlockingQueue<StreamsDatum> data;
+ private volatile boolean complete = false;
+
+ public NumericMessageProvider(int numMessages) {
+ this.numMessages = numMessages;
+ }
+
+ @Override
+ public void startStream() {
+ this.data = constructQueue();
+ }
+
+ @Override
+ public StreamsResultSet readCurrent() {
+ int batchSize = 0;
+ Queue<StreamsDatum> batch = Queues.newLinkedBlockingQueue();
+ try {
+ while (!this.data.isEmpty() && batchSize < DEFAULT_BATCH_SIZE) {
+ batch.add(this.data.take());
+ ++batchSize;
+ }
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
}
-
- @Override
- public StreamsResultSet readCurrent() {
- int batchSize = 0;
- Queue<StreamsDatum> batch = Queues.newLinkedBlockingQueue();
- try {
- while (!this.data.isEmpty() && batchSize < DEFAULT_BATCH_SIZE) {
- batch.add(this.data.take());
- ++batchSize;
- }
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- }
// System.out.println("******************\n**\tBatchSize="+batch.size()+"\n******************");
- this.complete = batch.isEmpty() && this.data.isEmpty();
- return new StreamsResultSet(batch);
- }
-
- @Override
- public StreamsResultSet readNew(BigInteger sequence) {
- return new StreamsResultSet(constructQueue());
- }
-
- @Override
- public StreamsResultSet readRange(DateTime start, DateTime end) {
- return new StreamsResultSet(constructQueue());
- }
-
- @Override
- public boolean isRunning() {
- return !this.complete;
- }
-
- @Override
- public void prepare(Object configurationObject) {
- this.data = constructQueue();
- }
-
- @Override
- public void cleanUp() {
-
- }
-
- private BlockingQueue<StreamsDatum> constructQueue() {
- BlockingQueue<StreamsDatum> datums = Queues.newArrayBlockingQueue(numMessages);
- for(int i=0;i<numMessages;i++) {
- datums.add(new StreamsDatum(i));
- }
- return datums;
+ this.complete = batch.isEmpty() && this.data.isEmpty();
+ return new StreamsResultSet(batch);
+ }
+
+ @Override
+ public StreamsResultSet readNew(BigInteger sequence) {
+ return new StreamsResultSet(constructQueue());
+ }
+
+ @Override
+ public StreamsResultSet readRange(DateTime start, DateTime end) {
+ return new StreamsResultSet(constructQueue());
+ }
+
+ @Override
+ public boolean isRunning() {
+ return !this.complete;
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+ this.data = constructQueue();
+ }
+
+ @Override
+ public void cleanUp() {
+
+ }
+
+ private BlockingQueue<StreamsDatum> constructQueue() {
+ BlockingQueue<StreamsDatum> datums = Queues.newArrayBlockingQueue(numMessages);
+ for(int i=0;i<numMessages;i++) {
+ datums.add(new StreamsDatum(i));
}
+ return datums;
+ }
}