You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/11/25 20:24:46 UTC

[05/42] incubator-streams git commit: STREAMS-440: custom checkstyle.xml, address compliance

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMultiThreadTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMultiThreadTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMultiThreadTest.java
index 60df89c..ad4aa28 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMultiThreadTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMultiThreadTest.java
@@ -17,299 +17,304 @@
  */
 package org.apache.streams.local.queues;
 
+import org.apache.streams.util.ComponentUtils;
+
 import com.carrotsearch.randomizedtesting.RandomizedTest;
 import com.carrotsearch.randomizedtesting.annotations.Repeat;
-import org.apache.streams.util.ComponentUtils;
 import org.joda.time.DateTime;
 import org.junit.After;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.lang.management.ManagementFactory;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import javax.management.InstanceNotFoundException;
 import javax.management.ObjectName;
-import java.lang.management.ManagementFactory;
-import java.util.concurrent.*;
 
 /**
  * MultiThread unit tests for {@link org.apache.streams.local.queues.ThroughputQueue}
  */
 public class ThroughputQueueMultiThreadTest extends RandomizedTest {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(ThroughputQueueMultiThreadTest.class);
-    private static final String MBEAN_ID = "testQueue";
-    private static final String STREAM_ID = "test_stream";
-    private static long STREAM_START_TIME = (new DateTime()).getMillis();
-
-    /**
-     * Remove registered mbeans from previous tests
-     * @throws Exception
-     */
-    @After
-    public void unregisterMXBean() throws Exception {
-        try {
-            ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, MBEAN_ID, STREAM_ID, STREAM_START_TIME)));
-        } catch (InstanceNotFoundException ife) {
-            //No-op
-        }
+  private static final Logger LOGGER = LoggerFactory.getLogger(ThroughputQueueMultiThreadTest.class);
+  private static final String MBEAN_ID = "testQueue";
+  private static final String STREAM_ID = "test_stream";
+  private static long STREAM_START_TIME = (new DateTime()).getMillis();
+
+  /**
+   * Remove registered mbeans from previous tests
+   * @throws Exception
+   */
+  @After
+  public void unregisterMXBean() throws Exception {
+    try {
+      ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, MBEAN_ID, STREAM_ID, STREAM_START_TIME)));
+    } catch (InstanceNotFoundException ife) {
+      //No-op
     }
-
-    @After
-    public void removeLocalMBeans() {
-        try {
-            ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local");
-        } catch (Exception e) {
-            //No op.  proceed to next test
-        }
+  }
+
+  @After
+  public void removeLocalMBeans() {
+    try {
+      ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local");
+    } catch (Exception e) {
+      //No op.  proceed to next test
     }
-
-
-    /**
-     * Test that queue will block on puts when the queue is full
-     * @throws InterruptedException
-     */
-    @Test
-    public void testBlockOnFullQueue() throws InterruptedException {
-        int queueSize = randomIntBetween(1, 3000);
-        ExecutorService executor = Executors.newSingleThreadExecutor();
-        CountDownLatch full = new CountDownLatch(1);
-        CountDownLatch finished = new CountDownLatch(1);
-        ThroughputQueue queue = new ThroughputQueue(queueSize);
-        BlocksOnFullQueue testThread = new BlocksOnFullQueue(full, finished, queue, queueSize);
-        executor.submit(testThread);
-        full.await();
-        assertEquals(queueSize, queue.size());
-        assertEquals(queueSize, queue.getCurrentSize());
-        assertFalse(testThread.isComplete()); //test that it is blocked
-        safeSleep(1000);
-        assertFalse(testThread.isComplete()); //still blocked
-        queue.take();
-        finished.await();
-        assertEquals(queueSize, queue.size());
-        assertEquals(queueSize, queue.getCurrentSize());
-        assertTrue(testThread.isComplete());
-        executor.shutdownNow();
-        executor.awaitTermination(500, TimeUnit.MILLISECONDS);
+  }
+
+
+  /**
+   * Test that queue will block on puts when the queue is full
+   * @throws InterruptedException
+   */
+  @Test
+  public void testBlockOnFullQueue() throws InterruptedException {
+    int queueSize = randomIntBetween(1, 3000);
+    ExecutorService executor = Executors.newSingleThreadExecutor();
+    CountDownLatch full = new CountDownLatch(1);
+    CountDownLatch finished = new CountDownLatch(1);
+    ThroughputQueue queue = new ThroughputQueue(queueSize);
+    BlocksOnFullQueue testThread = new BlocksOnFullQueue(full, finished, queue, queueSize);
+    executor.submit(testThread);
+    full.await();
+    assertEquals(queueSize, queue.size());
+    assertEquals(queueSize, queue.getCurrentSize());
+    assertFalse(testThread.isComplete()); //test that it is blocked
+    safeSleep(1000);
+    assertFalse(testThread.isComplete()); //still blocked
+    queue.take();
+    finished.await();
+    assertEquals(queueSize, queue.size());
+    assertEquals(queueSize, queue.getCurrentSize());
+    assertTrue(testThread.isComplete());
+    executor.shutdownNow();
+    executor.awaitTermination(500, TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * Test that queue will block on Take when queue is empty
+   * @throws InterruptedException
+   */
+  @Test
+  public void testBlockOnEmptyQueue() throws InterruptedException {
+    int queueSize = randomIntBetween(1, 3000);
+    ExecutorService executor = Executors.newSingleThreadExecutor();
+    CountDownLatch empty = new CountDownLatch(1);
+    CountDownLatch finished = new CountDownLatch(1);
+    ThroughputQueue queue = new ThroughputQueue();
+    BlocksOnEmptyQueue testThread = new BlocksOnEmptyQueue(empty, finished, queueSize, queue);
+    for(int i=0; i < queueSize; ++i) {
+      queue.put(i);
     }
-
-    /**
-     * Test that queue will block on Take when queue is empty
-     * @throws InterruptedException
-     */
-    @Test
-    public void testBlockOnEmptyQueue() throws InterruptedException {
-        int queueSize = randomIntBetween(1, 3000);
-        ExecutorService executor = Executors.newSingleThreadExecutor();
-        CountDownLatch empty = new CountDownLatch(1);
-        CountDownLatch finished = new CountDownLatch(1);
-        ThroughputQueue queue = new ThroughputQueue();
-        BlocksOnEmptyQueue testThread = new BlocksOnEmptyQueue(empty, finished, queueSize, queue);
-        for(int i=0; i < queueSize; ++i) {
-            queue.put(i);
-        }
-        executor.submit(testThread);
-        empty.await();
-        assertEquals(0, queue.size());
-        assertEquals(0, queue.getCurrentSize());
-        assertFalse(testThread.isComplete());
-        safeSleep(1000);
-        assertFalse(testThread.isComplete());
-        queue.put(1);
-        finished.await();
-        assertEquals(0, queue.size());
-        assertEquals(0, queue.getCurrentSize());
-        assertTrue(testThread.isComplete());
-        executor.shutdownNow();
-        executor.awaitTermination(500, TimeUnit.MILLISECONDS);
+    executor.submit(testThread);
+    empty.await();
+    assertEquals(0, queue.size());
+    assertEquals(0, queue.getCurrentSize());
+    assertFalse(testThread.isComplete());
+    safeSleep(1000);
+    assertFalse(testThread.isComplete());
+    queue.put(1);
+    finished.await();
+    assertEquals(0, queue.size());
+    assertEquals(0, queue.getCurrentSize());
+    assertTrue(testThread.isComplete());
+    executor.shutdownNow();
+    executor.awaitTermination(500, TimeUnit.MILLISECONDS);
+  }
+
+
+  /**
+   * Test multiple threads putting and taking from the queue while
+   * this thread repeatedly calls the MXBean measurement methods.
+   * Should hammer the queue with request from multiple threads
+   * of all request types.  Purpose is to expose current modification exceptions
+   * and/or dead locks.
+   */
+  @Test
+  @Repeat(iterations = 3)
+  public void testMultiThreadAccessAndInteruptResponse() throws Exception {
+    int putTakeThreadCount = randomIntBetween(1, 10);
+    int dataCount = randomIntBetween(1, 2000000);
+    int pollCount = randomIntBetween(1, 2000000);
+    int maxSize = randomIntBetween(1, 1000);
+    CountDownLatch finished = new CountDownLatch(putTakeThreadCount);
+    ThroughputQueue queue = new ThroughputQueue(maxSize, MBEAN_ID);
+    ExecutorService executor = Executors.newFixedThreadPool(putTakeThreadCount * 2);
+    for(int i=0; i < putTakeThreadCount; ++i) {
+      executor.submit(new PutData(finished, queue, dataCount));
+      executor.submit(new TakeData(queue));
     }
-
-
-    /**
-     * Test multiple threads putting and taking from the queue while
-     * this thread repeatedly calls the MXBean measurement methods.
-     * Should hammer the queue with request from multiple threads
-     * of all request types.  Purpose is to expose current modification exceptions
-     * and/or dead locks.
-     */
-    @Test
-    @Repeat(iterations = 3)
-    public void testMultiThreadAccessAndInteruptResponse() throws Exception {
-        int putTakeThreadCount = randomIntBetween(1, 10);
-        int dataCount = randomIntBetween(1, 2000000);
-        int pollCount = randomIntBetween(1, 2000000);
-        int maxSize = randomIntBetween(1, 1000);
-        CountDownLatch finished = new CountDownLatch(putTakeThreadCount);
-        ThroughputQueue queue = new ThroughputQueue(maxSize, MBEAN_ID);
-        ExecutorService executor = Executors.newFixedThreadPool(putTakeThreadCount * 2);
-        for(int i=0; i < putTakeThreadCount; ++i) {
-            executor.submit(new PutData(finished, queue, dataCount));
-            executor.submit(new TakeData(queue));
-        }
-        for(int i=0; i < pollCount; ++i) {
-            queue.getAvgWait();
-            queue.getAdded();
-            queue.getCurrentSize();
-            queue.getMaxWait();
-            queue.getRemoved();
-            queue.getThroughput();
-        }
-        finished.await();
-        while(!queue.isEmpty()) {
-            LOGGER.info("Waiting for queue to be emptied...");
-            safeSleep(500);
-        }
-        long totalData = ((long) dataCount) * putTakeThreadCount;
-        assertEquals(totalData, queue.getAdded());
-        assertEquals(totalData, queue.getRemoved());
-        executor.shutdown();
-        executor.awaitTermination(1000, TimeUnit.MILLISECONDS); //shutdown puts
-        executor.shutdownNow();
-        executor.awaitTermination(1000, TimeUnit.MILLISECONDS); //shutdown takes
-        //Randomized should not report thread leak
+    for(int i=0; i < pollCount; ++i) {
+      queue.getAvgWait();
+      queue.getAdded();
+      queue.getCurrentSize();
+      queue.getMaxWait();
+      queue.getRemoved();
+      queue.getThroughput();
     }
-
-
-
-    private void safeSleep(long sleep) {
-        try {
-            Thread.sleep(sleep);
-        } catch (InterruptedException ie) {
-            Thread.currentThread().interrupt();
-        }
+    finished.await();
+    while(!queue.isEmpty()) {
+      LOGGER.info("Waiting for queue to be emptied...");
+      safeSleep(500);
     }
+    long totalData = ((long) dataCount) * putTakeThreadCount;
+    assertEquals(totalData, queue.getAdded());
+    assertEquals(totalData, queue.getRemoved());
+    executor.shutdown();
+    executor.awaitTermination(1000, TimeUnit.MILLISECONDS); //shutdown puts
+    executor.shutdownNow();
+    executor.awaitTermination(1000, TimeUnit.MILLISECONDS); //shutdown takes
+    //Randomized should not report thread leak
+  }
+
+
+
+  private void safeSleep(long sleep) {
+    try {
+      Thread.sleep(sleep);
+    } catch (InterruptedException ie) {
+      Thread.currentThread().interrupt();
+    }
+  }
 
 
 
 
-    /**
-     * Helper runnable for test {@link ThroughputQueueMultiThreadTest#testBlockOnFullQueue()}
-     */
-    private class BlocksOnFullQueue implements Runnable {
-
-        private CountDownLatch full;
-        volatile private boolean complete;
-        private int queueSize;
-        private CountDownLatch finished;
-        private BlockingQueue queue;
+  /**
+   * Helper runnable for test {@link ThroughputQueueMultiThreadTest#testBlockOnFullQueue()}
+   */
+  private class BlocksOnFullQueue implements Runnable {
 
-        public BlocksOnFullQueue(CountDownLatch latch, CountDownLatch finished, BlockingQueue queue, int queueSize) {
-            this.full = latch;
-            this.complete = false;
-            this.queueSize = queueSize;
-            this.finished = finished;
-            this.queue = queue;
-        }
+    private CountDownLatch full;
+    volatile private boolean complete;
+    private int queueSize;
+    private CountDownLatch finished;
+    private BlockingQueue queue;
 
-        @Override
-        public void run() {
-            try {
-                for (int i = 0; i < this.queueSize; ++i) {
-                    this.queue.put(i);
-                }
-                this.full.countDown();
-                this.queue.put(0);
-                this.complete = true;
-                this.finished.countDown();
-            } catch (InterruptedException ie) {
-                Thread.currentThread().interrupt();
-            }
-        }
+    public BlocksOnFullQueue(CountDownLatch latch, CountDownLatch finished, BlockingQueue queue, int queueSize) {
+      this.full = latch;
+      this.complete = false;
+      this.queueSize = queueSize;
+      this.finished = finished;
+      this.queue = queue;
+    }
 
-        public boolean isComplete() {
-            return this.complete;
+    @Override
+    public void run() {
+      try {
+        for (int i = 0; i < this.queueSize; ++i) {
+          this.queue.put(i);
         }
+        this.full.countDown();
+        this.queue.put(0);
+        this.complete = true;
+        this.finished.countDown();
+      } catch (InterruptedException ie) {
+        Thread.currentThread().interrupt();
+      }
     }
 
-
-    /**
-     * Helper runnable class for test {@link ThroughputQueueMultiThreadTest#testBlockOnEmptyQueue()}
-     */
-    private class BlocksOnEmptyQueue implements Runnable {
-
-        private CountDownLatch full;
-        volatile private boolean complete;
-        private int queueSize;
-        private CountDownLatch finished;
-        private BlockingQueue queue;
-
-        public BlocksOnEmptyQueue(CountDownLatch full, CountDownLatch finished, int queueSize, BlockingQueue queue) {
-            this.full = full;
-            this.finished = finished;
-            this.queueSize = queueSize;
-            this.queue = queue;
-            this.complete = false;
-        }
+    public boolean isComplete() {
+      return this.complete;
+    }
+  }
+
+
+  /**
+   * Helper runnable class for test {@link ThroughputQueueMultiThreadTest#testBlockOnEmptyQueue()}
+   */
+  private class BlocksOnEmptyQueue implements Runnable {
+
+    private CountDownLatch full;
+    volatile private boolean complete;
+    private int queueSize;
+    private CountDownLatch finished;
+    private BlockingQueue queue;
+
+    public BlocksOnEmptyQueue(CountDownLatch full, CountDownLatch finished, int queueSize, BlockingQueue queue) {
+      this.full = full;
+      this.finished = finished;
+      this.queueSize = queueSize;
+      this.queue = queue;
+      this.complete = false;
+    }
 
 
-        @Override
-        public void run() {
-            try {
-                for(int i=0; i < this.queueSize; ++i) {
-                    this.queue.take();
-                }
-                this.full.countDown();
-                this.queue.take();
-                this.complete = true;
-                this.finished.countDown();
-            } catch (InterruptedException ie) {
-                Thread.currentThread().interrupt();
-            }
+    @Override
+    public void run() {
+      try {
+        for(int i=0; i < this.queueSize; ++i) {
+          this.queue.take();
         }
+        this.full.countDown();
+        this.queue.take();
+        this.complete = true;
+        this.finished.countDown();
+      } catch (InterruptedException ie) {
+        Thread.currentThread().interrupt();
+      }
+    }
 
-        public boolean isComplete() {
-            return this.complete;
-        }
+    public boolean isComplete() {
+      return this.complete;
     }
+  }
 
 
-    private class PutData implements Runnable {
+  private class PutData implements Runnable {
 
-        private BlockingQueue queue;
-        private int dataCount;
-        private CountDownLatch finished;
+    private BlockingQueue queue;
+    private int dataCount;
+    private CountDownLatch finished;
 
-        public PutData(CountDownLatch finished, BlockingQueue queue, int dataCount) {
-            this.queue = queue;
-            this.dataCount = dataCount;
-            this.finished = finished;
-        }
+    public PutData(CountDownLatch finished, BlockingQueue queue, int dataCount) {
+      this.queue = queue;
+      this.dataCount = dataCount;
+      this.finished = finished;
+    }
 
 
-        @Override
-        public void run() {
-            try {
-                for(int i=0; i < this.dataCount; ++i) {
-                    this.queue.put(i);
-                }
-            } catch (InterruptedException ie) {
-                LOGGER.error("PUT DATA interupted !");
-                Thread.currentThread().interrupt();
-            }
-            this.finished.countDown();
+    @Override
+    public void run() {
+      try {
+        for(int i=0; i < this.dataCount; ++i) {
+          this.queue.put(i);
         }
+      } catch (InterruptedException ie) {
+        LOGGER.error("PUT DATA interupted !");
+        Thread.currentThread().interrupt();
+      }
+      this.finished.countDown();
     }
+  }
 
 
-    private class TakeData implements Runnable {
+  private class TakeData implements Runnable {
 
-        private BlockingQueue queue;
+    private BlockingQueue queue;
 
-        public TakeData(BlockingQueue queue) {
-            this.queue = queue;
-        }
+    public TakeData(BlockingQueue queue) {
+      this.queue = queue;
+    }
 
 
-        @Override
-        public void run() {
-            try {
-                while(true) {
-                    this.queue.take();
-                }
-            } catch (InterruptedException ie) {
-                LOGGER.error("PUT DATA interupted !");
-                Thread.currentThread().interrupt();
-            }
+    @Override
+    public void run() {
+      try {
+        while(true) {
+          this.queue.take();
         }
+      } catch (InterruptedException ie) {
+        LOGGER.error("PUT DATA interupted !");
+        Thread.currentThread().interrupt();
+      }
     }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
index 8c7f5c5..afe1911 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java
@@ -17,234 +17,235 @@
  */
 package org.apache.streams.local.queues;
 
+import org.apache.streams.util.ComponentUtils;
+
 import com.carrotsearch.randomizedtesting.RandomizedTest;
 import com.carrotsearch.randomizedtesting.annotations.Repeat;
-import org.apache.streams.util.ComponentUtils;
 import org.joda.time.DateTime;
 import org.junit.After;
 import org.junit.Test;
 
+import java.lang.management.ManagementFactory;
 import javax.management.MBeanServer;
 import javax.management.ObjectInstance;
 import javax.management.ObjectName;
-import java.lang.management.ManagementFactory;
 
 /**
  * Single thread unit tests for {@link org.apache.streams.local.queues.ThroughputQueue}
  */
 public class ThroughputQueueSingleThreadTest extends RandomizedTest {
-    private static final String MBEAN_ID = "test_id";
-    private static final String STREAM_ID = "test_stream";
-    private static long STREAM_START_TIME = (new DateTime()).getMillis();
-
-    @After
-    public void removeLocalMBeans() {
-        try {
-            ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local");
-        } catch (Exception e) {
-            //No op.  proceed to next test
-        }
+  private static final String MBEAN_ID = "test_id";
+  private static final String STREAM_ID = "test_stream";
+  private static long STREAM_START_TIME = (new DateTime()).getMillis();
+
+  @After
+  public void removeLocalMBeans() {
+    try {
+      ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local");
+    } catch (Exception e) {
+      //No op.  proceed to next test
     }
-
-    /**
-     * Test that take and put queue and dequeue data as expected and all
-     * measurements form the queue are returning data.
-     * @throws Exception
-     */
-    @Test
-    @Repeat(iterations = 3)
-    public void testTakeAndPut() throws Exception {
-        ThroughputQueue<Integer> queue = new ThroughputQueue<>();
-        int putCount = randomIntBetween(1, 1000);
-        for(int i=0; i < putCount; ++i) {
-            queue.put(i);
-            assertEquals(i+1, queue.size());
-            assertEquals(queue.size(), queue.getCurrentSize());
-        }
-        safeSleep(100); //ensure measurable wait time
-        int takeCount = randomIntBetween(1, putCount);
-        for(int i=0; i < takeCount; ++i) {
-            Integer element = queue.take();
-            assertNotNull(element);
-            assertEquals(i, element.intValue());
-            assertEquals(putCount - (1+i), queue.size());
-            assertEquals(queue.size(), queue.getCurrentSize());
-        }
-        assertEquals(putCount-takeCount, queue.size());
-        assertEquals(queue.size(), queue.getCurrentSize());
-        assertTrue(0.0 < queue.getMaxWait());
-        assertTrue(0.0 < queue.getAvgWait());
-        assertTrue(0.0 < queue.getThroughput());
-        assertEquals(putCount, queue.getAdded());
-        assertEquals(takeCount, queue.getRemoved());
+  }
+
+  /**
+   * Test that take and put queue and dequeue data as expected and all
+   * measurements form the queue are returning data.
+   * @throws Exception
+   */
+  @Test
+  @Repeat(iterations = 3)
+  public void testTakeAndPut() throws Exception {
+    ThroughputQueue<Integer> queue = new ThroughputQueue<>();
+    int putCount = randomIntBetween(1, 1000);
+    for(int i=0; i < putCount; ++i) {
+      queue.put(i);
+      assertEquals(i+1, queue.size());
+      assertEquals(queue.size(), queue.getCurrentSize());
     }
-
-    /**
-     * Test that add and remove queue and dequeue data as expected
-     * and all measurements from the queue are returning data
-     */
-    @Test
-    @Repeat(iterations = 3)
-    public void testAddAndRemove() {
-        ThroughputQueue<Integer> queue = new ThroughputQueue<>();
-        int putCount = randomIntBetween(1, 1000);
-        for(int i=0; i < putCount; ++i) {
-            queue.add(i);
-            assertEquals(i+1, queue.size());
-            assertEquals(queue.size(), queue.getCurrentSize());
-        }
-        safeSleep(100); //ensure measurable wait time
-        int takeCount = randomIntBetween(1, putCount);
-        for(int i=0; i < takeCount; ++i) {
-            Integer element = queue.remove();
-            assertNotNull(element);
-            assertEquals(i, element.intValue());
-            assertEquals(putCount - (1+i), queue.size());
-            assertEquals(queue.size(), queue.getCurrentSize());
-        }
-        assertEquals(putCount-takeCount, queue.size());
-        assertEquals(queue.size(), queue.getCurrentSize());
-        assertTrue(0.0 < queue.getMaxWait());
-        assertTrue(0.0 < queue.getAvgWait());
-        assertTrue(0.0 < queue.getThroughput());
-        assertEquals(putCount, queue.getAdded());
-        assertEquals(takeCount, queue.getRemoved());
+    safeSleep(100); //ensure measurable wait time
+    int takeCount = randomIntBetween(1, putCount);
+    for(int i=0; i < takeCount; ++i) {
+      Integer element = queue.take();
+      assertNotNull(element);
+      assertEquals(i, element.intValue());
+      assertEquals(putCount - (1+i), queue.size());
+      assertEquals(queue.size(), queue.getCurrentSize());
     }
-
-    /**
-     * Test that offer and poll queue and dequeue data as expected
-     * and all measurements from the queue are returning data
-     */
-    @Test
-    @Repeat(iterations = 3)
-    public void testOfferAndPoll() {
-        ThroughputQueue<Integer> queue = new ThroughputQueue<>();
-        int putCount = randomIntBetween(1, 1000);
-        for(int i=0; i < putCount; ++i) {
-            queue.offer(i);
-            assertEquals(i+1, queue.size());
-            assertEquals(queue.size(), queue.getCurrentSize());
-        }
-        safeSleep(100); //ensure measurable wait time
-        int takeCount = randomIntBetween(1, putCount);
-        for(int i=0; i < takeCount; ++i) {
-            Integer element = queue.poll();
-            assertNotNull(element);
-            assertEquals(i, element.intValue());
-            assertEquals(putCount - (1+i), queue.size());
-            assertEquals(queue.size(), queue.getCurrentSize());
-        }
-        assertEquals(putCount-takeCount, queue.size());
-        assertEquals(queue.size(), queue.getCurrentSize());
-        assertTrue(0.0 < queue.getMaxWait());
-        assertTrue(0.0 < queue.getAvgWait());
-        assertTrue(0.0 < queue.getThroughput());
-        assertEquals(putCount, queue.getAdded());
-        assertEquals(takeCount, queue.getRemoved());
+    assertEquals(putCount-takeCount, queue.size());
+    assertEquals(queue.size(), queue.getCurrentSize());
+    assertTrue(0.0 < queue.getMaxWait());
+    assertTrue(0.0 < queue.getAvgWait());
+    assertTrue(0.0 < queue.getThroughput());
+    assertEquals(putCount, queue.getAdded());
+    assertEquals(takeCount, queue.getRemoved());
+  }
+
+  /**
+   * Test that add and remove queue and dequeue data as expected
+   * and all measurements from the queue are returning data
+   */
+  @Test
+  @Repeat(iterations = 3)
+  public void testAddAndRemove() {
+    ThroughputQueue<Integer> queue = new ThroughputQueue<>();
+    int putCount = randomIntBetween(1, 1000);
+    for(int i=0; i < putCount; ++i) {
+      queue.add(i);
+      assertEquals(i+1, queue.size());
+      assertEquals(queue.size(), queue.getCurrentSize());
     }
-
-
-
-    /**
-     * Test that max wait and avg wait return expected values
-     * @throws Exception
-     */
-    @Test
-    public void testWait() throws Exception {
-        ThroughputQueue queue = new ThroughputQueue();
-        int wait = 1000;
-
-        for(int i=0; i < 3; ++i) {
-            queue.put(1);
-            safeSleep(wait);
-            queue.take();
-            assertTrue(queue.getMaxWait() >= wait && queue.getMaxWait() <= (wait * 2));//can't calculate exactly, making sure its close.
-            assertTrue(queue.getAvgWait() >= wait && queue.getAvgWait() <= (wait * 2));
-        }
-        queue.put(1);
-        queue.take();
-        assertTrue(queue.getMaxWait() >= wait && queue.getMaxWait() <= (wait * 2));//can't calculate exactly, making sure its close.
-        assertTrue(queue.getAvgWait() <= 5000 );
-        assertTrue(queue.getAvgWait() >= 500);
+    safeSleep(100); //ensure measurable wait time
+    int takeCount = randomIntBetween(1, putCount);
+    for(int i=0; i < takeCount; ++i) {
+      Integer element = queue.remove();
+      assertNotNull(element);
+      assertEquals(i, element.intValue());
+      assertEquals(putCount - (1+i), queue.size());
+      assertEquals(queue.size(), queue.getCurrentSize());
     }
-
-    /**
-     * Test that throughput returns expected values.
-     * @throws Exception
-     */
-    @Test
-    public void testThroughput() throws Exception {
-        ThroughputQueue queue = new ThroughputQueue();
-        int wait = 100;
-        for(int i=0; i < 10; ++i) {
-            queue.put(1);
-            safeSleep(wait);
-            queue.take();
-        }
-        double throughput = queue.getThroughput();
-        assertTrue(throughput <= 15 ); //can't calculate exactly, making sure its close.
-        assertTrue(throughput >= 5);
-
-        queue = new ThroughputQueue();
-        wait = 1000;
-        for(int i=0; i < 10; ++i) {
-            queue.put(1);
-        }
-        for(int i=0; i < 10; ++i) {
-            queue.take();
-        }
-        safeSleep(wait);
-        throughput = queue.getThroughput();
-        assertTrue(throughput <= 15 ); //can't calculate exactly, making sure its close.
-        assertTrue(throughput >= 5);
+    assertEquals(putCount-takeCount, queue.size());
+    assertEquals(queue.size(), queue.getCurrentSize());
+    assertTrue(0.0 < queue.getMaxWait());
+    assertTrue(0.0 < queue.getAvgWait());
+    assertTrue(0.0 < queue.getThroughput());
+    assertEquals(putCount, queue.getAdded());
+    assertEquals(takeCount, queue.getRemoved());
+  }
+
+  /**
+   * Test that offer and poll queue and dequeue data as expected
+   * and all measurements from the queue are returning data
+   */
+  @Test
+  @Repeat(iterations = 3)
+  public void testOfferAndPoll() {
+    ThroughputQueue<Integer> queue = new ThroughputQueue<>();
+    int putCount = randomIntBetween(1, 1000);
+    for(int i=0; i < putCount; ++i) {
+      queue.offer(i);
+      assertEquals(i+1, queue.size());
+      assertEquals(queue.size(), queue.getCurrentSize());
     }
-
-
-    /**
-     * Test that the mbean registers
-     */
-    @Test
-    public void testMBeanRegistration() {
-        try {
-            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-            Integer beanCount = mbs.getMBeanCount();
-            ThroughputQueue queue = new ThroughputQueue(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
-            assertEquals("Expected bean to be registered", new Integer(beanCount+1), mbs.getMBeanCount());
-            ObjectInstance mBean = mbs.getObjectInstance(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, MBEAN_ID, STREAM_ID, STREAM_START_TIME)));
-            assertNotNull(mBean);
-        } catch (Exception e) {
-            fail("Failed to register MXBean : "+e.getMessage());
-        }
+    safeSleep(100); //ensure measurable wait time
+    int takeCount = randomIntBetween(1, putCount);
+    for(int i=0; i < takeCount; ++i) {
+      Integer element = queue.poll();
+      assertNotNull(element);
+      assertEquals(i, element.intValue());
+      assertEquals(putCount - (1+i), queue.size());
+      assertEquals(queue.size(), queue.getCurrentSize());
     }
-
-    /**
-     * Test that mulitple mbeans of the same type with a different name can be registered
-     */
-    @Test
-    public void testMultipleMBeanRegistrations() {
-        try {
-            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-            Integer beanCount = mbs.getMBeanCount();
-            int numReg = randomIntBetween(2, 100);
-            for(int i=0; i < numReg; ++i) {
-                ThroughputQueue queue = new ThroughputQueue(MBEAN_ID + "" + i, STREAM_ID, STREAM_START_TIME);
-                assertEquals("Expected bean to be registered", new Integer(beanCount + (i+1)), mbs.getMBeanCount());
-                ObjectInstance mBean = mbs.getObjectInstance(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, MBEAN_ID + "" + i, STREAM_ID, STREAM_START_TIME)));
-                assertNotNull(mBean);
-            }
-        } catch (Exception e) {
-            fail("Failed to register MXBean : "+e.getMessage());
-        }
+    assertEquals(putCount-takeCount, queue.size());
+    assertEquals(queue.size(), queue.getCurrentSize());
+    assertTrue(0.0 < queue.getMaxWait());
+    assertTrue(0.0 < queue.getAvgWait());
+    assertTrue(0.0 < queue.getThroughput());
+    assertEquals(putCount, queue.getAdded());
+    assertEquals(takeCount, queue.getRemoved());
+  }
+
+
+
+  /**
+   * Test that max wait and avg wait return expected values
+   * @throws Exception
+   */
+  @Test
+  public void testWait() throws Exception {
+    ThroughputQueue queue = new ThroughputQueue();
+    int wait = 1000;
+
+    for(int i=0; i < 3; ++i) {
+      queue.put(1);
+      safeSleep(wait);
+      queue.take();
+      assertTrue(queue.getMaxWait() >= wait && queue.getMaxWait() <= (wait * 2));//can't calculate exactly, making sure its close.
+      assertTrue(queue.getAvgWait() >= wait && queue.getAvgWait() <= (wait * 2));
+    }
+    queue.put(1);
+    queue.take();
+    assertTrue(queue.getMaxWait() >= wait && queue.getMaxWait() <= (wait * 2));//can't calculate exactly, making sure its close.
+    assertTrue(queue.getAvgWait() <= 5000 );
+    assertTrue(queue.getAvgWait() >= 500);
+  }
+
+  /**
+   * Test that throughput returns expected values.
+   * @throws Exception
+   */
+  @Test
+  public void testThroughput() throws Exception {
+    ThroughputQueue queue = new ThroughputQueue();
+    int wait = 100;
+    for(int i=0; i < 10; ++i) {
+      queue.put(1);
+      safeSleep(wait);
+      queue.take();
+    }
+    double throughput = queue.getThroughput();
+    assertTrue(throughput <= 15 ); //can't calculate exactly, making sure its close.
+    assertTrue(throughput >= 5);
+
+    queue = new ThroughputQueue();
+    wait = 1000;
+    for(int i=0; i < 10; ++i) {
+      queue.put(1);
+    }
+    for(int i=0; i < 10; ++i) {
+      queue.take();
+    }
+    safeSleep(wait);
+    throughput = queue.getThroughput();
+    assertTrue(throughput <= 15 ); //can't calculate exactly, making sure its close.
+    assertTrue(throughput >= 5);
+  }
+
+
+  /**
+   * Test that the mbean registers
+   */
+  @Test
+  public void testMBeanRegistration() {
+    try {
+      MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+      Integer beanCount = mbs.getMBeanCount();
+      ThroughputQueue queue = new ThroughputQueue(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
+      assertEquals("Expected bean to be registered", new Integer(beanCount+1), mbs.getMBeanCount());
+      ObjectInstance mBean = mbs.getObjectInstance(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, MBEAN_ID, STREAM_ID, STREAM_START_TIME)));
+      assertNotNull(mBean);
+    } catch (Exception e) {
+      fail("Failed to register MXBean : "+e.getMessage());
+    }
+  }
+
+  /**
+   * Test that mulitple mbeans of the same type with a different name can be registered
+   */
+  @Test
+  public void testMultipleMBeanRegistrations() {
+    try {
+      MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+      Integer beanCount = mbs.getMBeanCount();
+      int numReg = randomIntBetween(2, 100);
+      for(int i=0; i < numReg; ++i) {
+        ThroughputQueue queue = new ThroughputQueue(MBEAN_ID + "" + i, STREAM_ID, STREAM_START_TIME);
+        assertEquals("Expected bean to be registered", new Integer(beanCount + (i+1)), mbs.getMBeanCount());
+        ObjectInstance mBean = mbs.getObjectInstance(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, MBEAN_ID + "" + i, STREAM_ID, STREAM_START_TIME)));
+        assertNotNull(mBean);
+      }
+    } catch (Exception e) {
+      fail("Failed to register MXBean : "+e.getMessage());
     }
+  }
 
 
-    private void safeSleep(long sleep) {
-        try {
-            Thread.sleep(sleep);
-        } catch (InterruptedException ie) {
-            Thread.currentThread().interrupt();
-        }
+  private void safeSleep(long sleep) {
+    try {
+      Thread.sleep(sleep);
+    } catch (InterruptedException ie) {
+      Thread.currentThread().interrupt();
     }
+  }
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java
index 38e948e..2a67550 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java
@@ -18,25 +18,28 @@
 
 package org.apache.streams.local.tasks;
 
-import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.local.counters.DatumStatusCounter;
 import org.apache.streams.local.counters.StreamsTaskCounter;
-import org.apache.streams.local.queues.ThroughputQueue;
 import org.apache.streams.local.test.processors.PassthroughDatumCounterProcessor;
 import org.apache.streams.local.test.providers.NumericMessageProvider;
 import org.apache.streams.local.test.writer.DatumCounterWriter;
 import org.apache.streams.util.ComponentUtils;
+
+import com.google.common.util.concurrent.Uninterruptibles;
 import org.junit.After;
 import org.junit.Test;
 
-import javax.management.InstanceNotFoundException;
-import javax.management.ObjectName;
-import java.lang.management.ManagementFactory;
-import java.util.Queue;
-import java.util.concurrent.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  *
@@ -44,264 +47,264 @@ import static org.junit.Assert.*;
 public class BasicTasksTest {
 
 
-    private static final String MBEAN_ID = "test_bean";
-    @After
-    public void removeLocalMBeans() {
-        try {
-            ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local");
-        } catch (Exception e) {
-            //No op.  proceed to next test
-        }
+  private static final String MBEAN_ID = "test_bean";
+  @After
+  public void removeLocalMBeans() {
+    try {
+      ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local");
+    } catch (Exception e) {
+      //No op.  proceed to next test
     }
+  }
 
-    @Test
-    public void testProviderTask() {
-        int numMessages = 100;
-        NumericMessageProvider provider = new NumericMessageProvider(numMessages);
-        StreamsProviderTask task = new StreamsProviderTask(provider, false, null);
-        BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>();
-        task.addOutputQueue(outQueue);
-        //Test that adding input queues to providers is not valid
-        BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages);
-        Exception exp = null;
-        try {
-            task.addInputQueue(inQueue);
-        } catch (UnsupportedOperationException uoe) {
-            exp = uoe;
-        }
-        assertNotNull(exp);
+  @Test
+  public void testProviderTask() {
+    int numMessages = 100;
+    NumericMessageProvider provider = new NumericMessageProvider(numMessages);
+    StreamsProviderTask task = new StreamsProviderTask(provider, false, null);
+    BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>();
+    task.addOutputQueue(outQueue);
+    //Test that adding input queues to providers is not valid
+    BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages);
+    Exception exp = null;
+    try {
+      task.addInputQueue(inQueue);
+    } catch (UnsupportedOperationException uoe) {
+      exp = uoe;
+    }
+    assertNotNull(exp);
 
-        ExecutorService service = Executors.newFixedThreadPool(1);
-        service.submit(task);
-        int attempts = 0;
-        while(outQueue.size() != numMessages) {
-            Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
-            if(attempts == 10) {
-                fail("Provider task failed to output "+numMessages+" in a timely fashion.");
-            }
-        }
-        service.shutdown();
-        try {
-            if(!service.awaitTermination(10, TimeUnit.SECONDS)){
-                service.shutdownNow();
-                fail("Service did not terminate.");
-            }
-            assertTrue("Task should have completed running in allotted time.", service.isTerminated());
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-        }
+    ExecutorService service = Executors.newFixedThreadPool(1);
+    service.submit(task);
+    int attempts = 0;
+    while(outQueue.size() != numMessages) {
+      Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+      if(attempts == 10) {
+        fail("Provider task failed to output "+numMessages+" in a timely fashion.");
+      }
+    }
+    service.shutdown();
+    try {
+      if(!service.awaitTermination(10, TimeUnit.SECONDS)){
+        service.shutdownNow();
+        fail("Service did not terminate.");
+      }
+      assertTrue("Task should have completed running in allotted time.", service.isTerminated());
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
     }
+  }
 
-    @Test
-    public void testProcessorTask() {
-        int numMessages = 100;
-        PassthroughDatumCounterProcessor processor = new PassthroughDatumCounterProcessor("");
-        StreamsProcessorTask task = new StreamsProcessorTask(processor);
-        StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, null, -1);
-        task.setStreamsTaskCounter(counter);
-        BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>();
-        BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages);
-        task.addOutputQueue(outQueue);
-        task.addInputQueue(inQueue);
-        assertEquals(numMessages, task.getInputQueues().get(0).size());
-        ExecutorService service = Executors.newFixedThreadPool(1);
-        service.submit(task);
-        int attempts = 0;
-        while(inQueue.size() != 0 && outQueue.size() != numMessages) {
-            Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
-            ++attempts;
-            if(attempts == 10) {
-                fail("Processor task failed to output "+numMessages+" in a timely fashion.");
-            }
-        }
-        task.stopTask();;
-        service.shutdown();
-        try {
-            if(!service.awaitTermination(5, TimeUnit.SECONDS)){
-                service.shutdownNow();
-                fail("Service did not terminate.");
-            }
-            assertTrue("Task should have completed running in allotted time.", service.isTerminated());
-        } catch (InterruptedException e) {
-            fail("Test Interrupted.");
-        }
-        assertEquals(numMessages, processor.getMessageCount());
-        assertEquals(numMessages, counter.getNumReceived());
-        assertEquals(numMessages, counter.getNumEmitted());
-        assertEquals(0, counter.getNumUnhandledErrors());
-        assertEquals(0.0, counter.getErrorRate(), 0.0);
+  @Test
+  public void testProcessorTask() {
+    int numMessages = 100;
+    PassthroughDatumCounterProcessor processor = new PassthroughDatumCounterProcessor("");
+    StreamsProcessorTask task = new StreamsProcessorTask(processor);
+    StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, null, -1);
+    task.setStreamsTaskCounter(counter);
+    BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>();
+    BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages);
+    task.addOutputQueue(outQueue);
+    task.addInputQueue(inQueue);
+    assertEquals(numMessages, task.getInputQueues().get(0).size());
+    ExecutorService service = Executors.newFixedThreadPool(1);
+    service.submit(task);
+    int attempts = 0;
+    while(inQueue.size() != 0 && outQueue.size() != numMessages) {
+      Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+      ++attempts;
+      if(attempts == 10) {
+        fail("Processor task failed to output "+numMessages+" in a timely fashion.");
+      }
+    }
+    task.stopTask();;
+    service.shutdown();
+    try {
+      if(!service.awaitTermination(5, TimeUnit.SECONDS)){
+        service.shutdownNow();
+        fail("Service did not terminate.");
+      }
+      assertTrue("Task should have completed running in allotted time.", service.isTerminated());
+    } catch (InterruptedException e) {
+      fail("Test Interrupted.");
     }
+    assertEquals(numMessages, processor.getMessageCount());
+    assertEquals(numMessages, counter.getNumReceived());
+    assertEquals(numMessages, counter.getNumEmitted());
+    assertEquals(0, counter.getNumUnhandledErrors());
+    assertEquals(0.0, counter.getErrorRate(), 0.0);
+  }
 
-    @Test
-    public void testWriterTask() {
-        int numMessages = 100;
-        DatumCounterWriter writer = new DatumCounterWriter("");
-        StreamsPersistWriterTask task = new StreamsPersistWriterTask(writer);
-        StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, null, -1);
-        task.setStreamsTaskCounter(counter);
-        BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>();
-        BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages);
+  @Test
+  public void testWriterTask() {
+    int numMessages = 100;
+    DatumCounterWriter writer = new DatumCounterWriter("");
+    StreamsPersistWriterTask task = new StreamsPersistWriterTask(writer);
+    StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, null, -1);
+    task.setStreamsTaskCounter(counter);
+    BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>();
+    BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages);
 
-        Exception exp = null;
-        try {
-            task.addOutputQueue(outQueue);
-        } catch (UnsupportedOperationException uoe) {
-            exp = uoe;
-        }
-        assertNotNull(exp);
-        task.addInputQueue(inQueue);
-        assertEquals(numMessages, task.getInputQueues().get(0).size());
-        ExecutorService service = Executors.newFixedThreadPool(1);
-        service.submit(task);
-        int attempts = 0;
-        while(inQueue.size() != 0 ) {
-            Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
-            ++attempts;
-            if(attempts == 10) {
-                fail("Processor task failed to output "+numMessages+" in a timely fashion.");
-            }
-        }
-        task.stopTask();
-        service.shutdown();
-        try {
-            if(!service.awaitTermination(15, TimeUnit.SECONDS)){
-                service.shutdownNow();
-                fail("Service did not terminate.");
-            }
-            assertTrue("Task should have completed running in allotted time.", service.isTerminated());
-        } catch (InterruptedException e) {
-            fail("Test Interrupted.");
-        }
-        assertEquals(numMessages, writer.getDatumsCounted());
-        assertEquals(numMessages, counter.getNumReceived());
-        assertEquals(0, counter.getNumEmitted());
-        assertEquals(0, counter.getNumUnhandledErrors());
-        assertEquals(0.0, counter.getErrorRate(), 0.0);
+    Exception exp = null;
+    try {
+      task.addOutputQueue(outQueue);
+    } catch (UnsupportedOperationException uoe) {
+      exp = uoe;
     }
+    assertNotNull(exp);
+    task.addInputQueue(inQueue);
+    assertEquals(numMessages, task.getInputQueues().get(0).size());
+    ExecutorService service = Executors.newFixedThreadPool(1);
+    service.submit(task);
+    int attempts = 0;
+    while(inQueue.size() != 0 ) {
+      Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+      ++attempts;
+      if(attempts == 10) {
+        fail("Processor task failed to output "+numMessages+" in a timely fashion.");
+      }
+    }
+    task.stopTask();
+    service.shutdown();
+    try {
+      if(!service.awaitTermination(15, TimeUnit.SECONDS)){
+        service.shutdownNow();
+        fail("Service did not terminate.");
+      }
+      assertTrue("Task should have completed running in allotted time.", service.isTerminated());
+    } catch (InterruptedException e) {
+      fail("Test Interrupted.");
+    }
+    assertEquals(numMessages, writer.getDatumsCounted());
+    assertEquals(numMessages, counter.getNumReceived());
+    assertEquals(0, counter.getNumEmitted());
+    assertEquals(0, counter.getNumUnhandledErrors());
+    assertEquals(0.0, counter.getErrorRate(), 0.0);
+  }
 
-    @Test
-    public void testMergeTask() {
-        int numMessages = 100;
-        int incoming = 5;
-        StreamsMergeTask task = new StreamsMergeTask();
-        BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>();
-        task.addOutputQueue(outQueue);
-        for(int i=0; i < incoming; ++i) {
-            task.addInputQueue(createInputQueue(numMessages));
-        }
-        ExecutorService service = Executors.newFixedThreadPool(1);
-        service.submit(task);
-        int attempts = 0;
-        while(outQueue.size() != incoming * numMessages ) {
-            Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
-            ++attempts;
-            if(attempts == 10) {
-                assertEquals("Processor task failed to output " + (numMessages * incoming) + " in a timely fashion.", (numMessages * incoming), outQueue.size());
-            }
-        }
-        task.stopTask();
-        service.shutdown();
-        try {
-            if(!service.awaitTermination(5, TimeUnit.SECONDS)){
-                service.shutdownNow();
-                fail("Service did not terminate.");
-            }
-            assertTrue("Task should have completed running in allotted time.", service.isTerminated());
-        } catch (InterruptedException e) {
-            fail("Test Interrupted.");
-        }
+  @Test
+  public void testMergeTask() {
+    int numMessages = 100;
+    int incoming = 5;
+    StreamsMergeTask task = new StreamsMergeTask();
+    BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>();
+    task.addOutputQueue(outQueue);
+    for(int i=0; i < incoming; ++i) {
+      task.addInputQueue(createInputQueue(numMessages));
+    }
+    ExecutorService service = Executors.newFixedThreadPool(1);
+    service.submit(task);
+    int attempts = 0;
+    while(outQueue.size() != incoming * numMessages ) {
+      Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+      ++attempts;
+      if(attempts == 10) {
+        assertEquals("Processor task failed to output " + (numMessages * incoming) + " in a timely fashion.", (numMessages * incoming), outQueue.size());
+      }
     }
+    task.stopTask();
+    service.shutdown();
+    try {
+      if(!service.awaitTermination(5, TimeUnit.SECONDS)){
+        service.shutdownNow();
+        fail("Service did not terminate.");
+      }
+      assertTrue("Task should have completed running in allotted time.", service.isTerminated());
+    } catch (InterruptedException e) {
+      fail("Test Interrupted.");
+    }
+  }
 
-    @Test
-    public void testBranching() {
-        int numMessages = 100;
-        PassthroughDatumCounterProcessor processor = new PassthroughDatumCounterProcessor("");
-        StreamsProcessorTask task = new StreamsProcessorTask(processor);
-        BlockingQueue<StreamsDatum> outQueue1 = new LinkedBlockingQueue<>();
-        BlockingQueue<StreamsDatum> outQueue2 = new LinkedBlockingQueue<>();
-        BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages);
-        task.addOutputQueue(outQueue1);
-        task.addOutputQueue(outQueue2);
-        task.addInputQueue(inQueue);
-        assertEquals(numMessages, task.getInputQueues().get(0).size());
-        ExecutorService service = Executors.newFixedThreadPool(1);
-        service.submit(task);
-        int attempts = 0;
-        while(inQueue.size() != 0 ) {
-            Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
-            ++attempts;
-            if(attempts == 10) {
-                assertEquals("Processor task failed to output "+(numMessages)+" in a timely fashion.", 0, inQueue.size());
-            }
-        }
-        task.stopTask();
+  @Test
+  public void testBranching() {
+    int numMessages = 100;
+    PassthroughDatumCounterProcessor processor = new PassthroughDatumCounterProcessor("");
+    StreamsProcessorTask task = new StreamsProcessorTask(processor);
+    BlockingQueue<StreamsDatum> outQueue1 = new LinkedBlockingQueue<>();
+    BlockingQueue<StreamsDatum> outQueue2 = new LinkedBlockingQueue<>();
+    BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages);
+    task.addOutputQueue(outQueue1);
+    task.addOutputQueue(outQueue2);
+    task.addInputQueue(inQueue);
+    assertEquals(numMessages, task.getInputQueues().get(0).size());
+    ExecutorService service = Executors.newFixedThreadPool(1);
+    service.submit(task);
+    int attempts = 0;
+    while(inQueue.size() != 0 ) {
+      Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+      ++attempts;
+      if(attempts == 10) {
+        assertEquals("Processor task failed to output "+(numMessages)+" in a timely fashion.", 0, inQueue.size());
+      }
+    }
+    task.stopTask();
 
-        service.shutdown();
-        try {
-            if(!service.awaitTermination(5, TimeUnit.SECONDS)){
-                service.shutdownNow();
-                fail("Service did not terminate.");
-            }
-            assertTrue("Task should have completed running in allotted time.", service.isTerminated());
-        } catch (InterruptedException e) {
-            fail("Test Interrupted.");
-        }
-        assertEquals(numMessages, processor.getMessageCount());
-        assertEquals(numMessages, outQueue1.size());
-        assertEquals(numMessages, outQueue2.size());
+    service.shutdown();
+    try {
+      if(!service.awaitTermination(5, TimeUnit.SECONDS)){
+        service.shutdownNow();
+        fail("Service did not terminate.");
+      }
+      assertTrue("Task should have completed running in allotted time.", service.isTerminated());
+    } catch (InterruptedException e) {
+      fail("Test Interrupted.");
     }
+    assertEquals(numMessages, processor.getMessageCount());
+    assertEquals(numMessages, outQueue1.size());
+    assertEquals(numMessages, outQueue2.size());
+  }
 
-    @Test
-    public void testBranchingSerialization() {
-        int numMessages = 1;
-        PassthroughDatumCounterProcessor processor = new PassthroughDatumCounterProcessor("");
-        StreamsProcessorTask task = new StreamsProcessorTask(processor);
-        BlockingQueue<StreamsDatum> outQueue1 = new LinkedBlockingQueue<>();
-        BlockingQueue<StreamsDatum> outQueue2 = new LinkedBlockingQueue<>();
-        BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages);
-        task.addOutputQueue(outQueue1);
-        task.addOutputQueue(outQueue2);
-        task.addInputQueue(inQueue);
-        ExecutorService service = Executors.newFixedThreadPool(1);
-        service.submit(task);
-        int attempts = 0;
-        while(inQueue.size() != 0 ) {
-            Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
-            ++attempts;
-            if(attempts == 10) {
-                assertEquals("Processor task failed to output "+(numMessages)+" in a timely fashion.", 0, inQueue.size());
-            }
-        }
-        task.stopTask();
+  @Test
+  public void testBranchingSerialization() {
+    int numMessages = 1;
+    PassthroughDatumCounterProcessor processor = new PassthroughDatumCounterProcessor("");
+    StreamsProcessorTask task = new StreamsProcessorTask(processor);
+    BlockingQueue<StreamsDatum> outQueue1 = new LinkedBlockingQueue<>();
+    BlockingQueue<StreamsDatum> outQueue2 = new LinkedBlockingQueue<>();
+    BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages);
+    task.addOutputQueue(outQueue1);
+    task.addOutputQueue(outQueue2);
+    task.addInputQueue(inQueue);
+    ExecutorService service = Executors.newFixedThreadPool(1);
+    service.submit(task);
+    int attempts = 0;
+    while(inQueue.size() != 0 ) {
+      Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+      ++attempts;
+      if(attempts == 10) {
+        assertEquals("Processor task failed to output "+(numMessages)+" in a timely fashion.", 0, inQueue.size());
+      }
+    }
+    task.stopTask();
 
-        service.shutdown();
-        try {
-            if(!service.awaitTermination(5, TimeUnit.SECONDS)){
-                service.shutdownNow();
-                fail("Service did not terminate.");
-            }
-            assertTrue("Task should have completed running in allotted time.", service.isTerminated());
-        } catch (InterruptedException e) {
-            fail("Test Interrupted.");
-        }
-        assertEquals(numMessages, processor.getMessageCount());
-        assertEquals(numMessages, outQueue1.size());
-        assertEquals(numMessages, outQueue2.size());
-        StreamsDatum datum1 = outQueue1.poll();
-        StreamsDatum datum2 = outQueue2.poll();
-        assertNotNull(datum1);
-        assertEquals(datum1, datum2);
-        datum1.setDocument("a");
-        assertNotEquals(datum1, datum2);
+    service.shutdown();
+    try {
+      if(!service.awaitTermination(5, TimeUnit.SECONDS)){
+        service.shutdownNow();
+        fail("Service did not terminate.");
+      }
+      assertTrue("Task should have completed running in allotted time.", service.isTerminated());
+    } catch (InterruptedException e) {
+      fail("Test Interrupted.");
     }
+    assertEquals(numMessages, processor.getMessageCount());
+    assertEquals(numMessages, outQueue1.size());
+    assertEquals(numMessages, outQueue2.size());
+    StreamsDatum datum1 = outQueue1.poll();
+    StreamsDatum datum2 = outQueue2.poll();
+    assertNotNull(datum1);
+    assertEquals(datum1, datum2);
+    datum1.setDocument("a");
+    assertNotEquals(datum1, datum2);
+  }
 
-    private BlockingQueue<StreamsDatum> createInputQueue(int numDatums) {
-        BlockingQueue<StreamsDatum> queue = new LinkedBlockingQueue<>();
-        for(int i=0; i < numDatums; ++i) {
-            queue.add(new StreamsDatum(i));
-        }
-        return queue;
+  private BlockingQueue<StreamsDatum> createInputQueue(int numDatums) {
+    BlockingQueue<StreamsDatum> queue = new LinkedBlockingQueue<>();
+    for(int i=0; i < numDatums; ++i) {
+      queue.add(new StreamsDatum(i));
     }
+    return queue;
+  }
 
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/StreamsProviderTaskTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/StreamsProviderTaskTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/StreamsProviderTaskTest.java
index 222566d..782e232 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/StreamsProviderTaskTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/StreamsProviderTaskTest.java
@@ -23,132 +23,142 @@ import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.util.ComponentUtils;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Queue;
-import java.util.concurrent.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.atMost;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests the StreamsProviderTask.
  */
 public class StreamsProviderTaskTest {
 
-    protected StreamsProvider mockProvider;
-    protected ExecutorService pool;
-
-    @Before
-    public void setup() {
-        mockProvider = mock(StreamsProvider.class);
-        pool = Executors.newFixedThreadPool(1);
-    }
-
-    @After
-    public void removeLocalMBeans() {
-        try {
-            ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local");
-        } catch (Exception e) {
-            //No op.  proceed to next test
-        }
-    }
-
-    @Test
-    public void runPerpetual() {
-        StreamsProviderTask task = new StreamsProviderTask(mockProvider, true, null);
-        when(mockProvider.isRunning()).thenReturn(true);
-        when(mockProvider.readCurrent()).thenReturn(new StreamsResultSet(new LinkedBlockingQueue<StreamsDatum>()));
-        task.setTimeout(500);
-        task.setSleepTime(10);
-        task.run();
-        //Setting this to at least 2 means that it was correctly set to perpetual mode
-        verify(mockProvider, atLeast(2)).readCurrent();
-        verify(mockProvider, atMost(1)).prepare(null);
-    }
-
-    @Test
-    public void flushes() {
-        BlockingQueue<StreamsDatum> out = new LinkedBlockingQueue<>();
-        StreamsProviderTask task = new StreamsProviderTask(mockProvider, true, null);
-        when(mockProvider.isRunning()).thenReturn(true);
-        when(mockProvider.readCurrent()).thenReturn(new StreamsResultSet(getQueue(3)));
-        task.setTimeout(100);
-        task.setSleepTime(10);
-        task.getOutputQueues().add(out);
-        task.run();
-        assertThat(out.size(), is(equalTo(3)));
+  protected StreamsProvider mockProvider;
+  protected ExecutorService pool;
+
+  @Before
+  public void setup() {
+    mockProvider = mock(StreamsProvider.class);
+    pool = Executors.newFixedThreadPool(1);
+  }
+
+  @After
+  public void removeLocalMBeans() {
+    try {
+      ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local");
+    } catch (Exception e) {
+      //No op.  proceed to next test
     }
-
-    protected Queue<StreamsDatum> getQueue(int numElems) {
-        Queue<StreamsDatum> results = new LinkedBlockingQueue<>();
-        for(int i=0; i<numElems; i++) {
-            results.add(new StreamsDatum(Math.random()));
-        }
-        return results;
-    }
-
-    @Test
-    public void runNonPerpetual() {
-        StreamsProviderTask task = new StreamsProviderTask(mockProvider, false, null);
-        when(mockProvider.isRunning()).thenReturn(true);
-        when(mockProvider.readCurrent()).thenReturn(new StreamsResultSet(new LinkedBlockingQueue<StreamsDatum>()));
-        task.setTimeout(500);
-        task.setSleepTime(10);
-        task.run();
-        //In read current mode, this should only be called 1 time
-        verify(mockProvider, atLeast(1)).readCurrent();
-        verify(mockProvider, atMost(1)).prepare(null);
-    }
-
-    @Test
-    public void stoppable() throws InterruptedException {
-        StreamsProviderTask task = new StreamsProviderTask(mockProvider, true, null);
-        when(mockProvider.isRunning()).thenReturn(true);
-        when(mockProvider.readCurrent()).thenReturn(new StreamsResultSet(new LinkedBlockingQueue<StreamsDatum>()));
-        task.setTimeout(-1);
-        task.setSleepTime(10);
-        Future<?> taskResult = pool.submit(task);
-
-        //After a few milliseconds, tell the task that it is to stop and wait until it says it isn't or a timeout happens
-        int count = 0;
-        do {
-            Thread.sleep(100);
-            if(count == 0) {
-                task.stopTask();
-            }
-        } while(++count < 10 && !taskResult.isDone());
-        verifyNotRunning(task, taskResult);
-
+  }
+
+  @Test
+  public void runPerpetual() {
+    StreamsProviderTask task = new StreamsProviderTask(mockProvider, true, null);
+    when(mockProvider.isRunning()).thenReturn(true);
+    when(mockProvider.readCurrent()).thenReturn(new StreamsResultSet(new LinkedBlockingQueue<StreamsDatum>()));
+    task.setTimeout(500);
+    task.setSleepTime(10);
+    task.run();
+    //Setting this to at least 2 means that it was correctly set to perpetual mode
+    verify(mockProvider, atLeast(2)).readCurrent();
+    verify(mockProvider, atMost(1)).prepare(null);
+  }
+
+  @Test
+  public void flushes() {
+    BlockingQueue<StreamsDatum> out = new LinkedBlockingQueue<>();
+    StreamsProviderTask task = new StreamsProviderTask(mockProvider, true, null);
+    when(mockProvider.isRunning()).thenReturn(true);
+    when(mockProvider.readCurrent()).thenReturn(new StreamsResultSet(getQueue(3)));
+    task.setTimeout(100);
+    task.setSleepTime(10);
+    task.getOutputQueues().add(out);
+    task.run();
+    assertThat(out.size(), is(equalTo(3)));
+  }
+
+  protected Queue<StreamsDatum> getQueue(int numElems) {
+    Queue<StreamsDatum> results = new LinkedBlockingQueue<>();
+    for(int i=0; i<numElems; i++) {
+      results.add(new StreamsDatum(Math.random()));
     }
-
-    @Test
-    public void earlyException() throws InterruptedException {
-        StreamsProviderTask task = new StreamsProviderTask(mockProvider, true, null);
-        when(mockProvider.isRunning()).thenReturn(true);
-        doThrow(new RuntimeException()).when(mockProvider).prepare(null);
-        task.setTimeout(-1);
-        task.setSleepTime(10);
-        Future<?> taskResult = pool.submit(task);
-        int count = 0;
-        while(++count < 10 && !taskResult.isDone()) {
-            Thread.sleep(100);
-        }
-        verifyNotRunning(task, taskResult);
+    return results;
+  }
+
+  @Test
+  public void runNonPerpetual() {
+    StreamsProviderTask task = new StreamsProviderTask(mockProvider, false, null);
+    when(mockProvider.isRunning()).thenReturn(true);
+    when(mockProvider.readCurrent()).thenReturn(new StreamsResultSet(new LinkedBlockingQueue<StreamsDatum>()));
+    task.setTimeout(500);
+    task.setSleepTime(10);
+    task.run();
+    //In read current mode, this should only be called 1 time
+    verify(mockProvider, atLeast(1)).readCurrent();
+    verify(mockProvider, atMost(1)).prepare(null);
+  }
+
+  @Test
+  public void stoppable() throws InterruptedException {
+    StreamsProviderTask task = new StreamsProviderTask(mockProvider, true, null);
+    when(mockProvider.isRunning()).thenReturn(true);
+    when(mockProvider.readCurrent()).thenReturn(new StreamsResultSet(new LinkedBlockingQueue<StreamsDatum>()));
+    task.setTimeout(-1);
+    task.setSleepTime(10);
+    Future<?> taskResult = pool.submit(task);
+
+    //After a few milliseconds, tell the task that it is to stop and wait until it says it isn't or a timeout happens
+    int count = 0;
+    do {
+      Thread.sleep(100);
+      if(count == 0) {
+        task.stopTask();
+      }
+    } while(++count < 10 && !taskResult.isDone());
+    verifyNotRunning(task, taskResult);
+
+  }
+
+  @Test
+  public void earlyException() throws InterruptedException {
+    StreamsProviderTask task = new StreamsProviderTask(mockProvider, true, null);
+    when(mockProvider.isRunning()).thenReturn(true);
+    doThrow(new RuntimeException()).when(mockProvider).prepare(null);
+    task.setTimeout(-1);
+    task.setSleepTime(10);
+    Future<?> taskResult = pool.submit(task);
+    int count = 0;
+    while(++count < 10 && !taskResult.isDone()) {
+      Thread.sleep(100);
     }
-
-    protected void verifyNotRunning(StreamsProviderTask task, Future<?> taskResult) {
-        //Make sure the task is reporting that it is complete and that the run method returned
-        if(taskResult.isDone()) {
-            assertThat(task.isRunning(), is(false));
-        } else {
-            ComponentUtils.shutdownExecutor(pool, 0, 10);
-            fail();
-        }
+    verifyNotRunning(task, taskResult);
+  }
+
+  protected void verifyNotRunning(StreamsProviderTask task, Future<?> taskResult) {
+    //Make sure the task is reporting that it is complete and that the run method returned
+    if(taskResult.isDone()) {
+      assertThat(task.isRunning(), is(false));
+    } else {
+      ComponentUtils.shutdownExecutor(pool, 0, 10);
+      fail();
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/DoNothingProcessor.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/DoNothingProcessor.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/DoNothingProcessor.java
index cad7873..31a83ec 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/DoNothingProcessor.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/DoNothingProcessor.java
@@ -20,6 +20,7 @@ package org.apache.streams.local.test.processors;
 
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,34 +32,34 @@ import java.util.List;
  */
 public class DoNothingProcessor implements StreamsProcessor {
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(DoNothingProcessor.class);
+  private final static Logger LOGGER = LoggerFactory.getLogger(DoNothingProcessor.class);
 
-    public final static String STREAMS_ID = "DoNothingProcessor";
+  public final static String STREAMS_ID = "DoNothingProcessor";
 
-    List<StreamsDatum> result;
+  List<StreamsDatum> result;
 
-    public DoNothingProcessor() {
-    }
+  public DoNothingProcessor() {
+  }
 
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
 
-    @Override
-    public List<StreamsDatum> process(StreamsDatum entry) {
-        this.result = new LinkedList<StreamsDatum>();
-        result.add(entry);
-        return result;
-    }
+  @Override
+  public List<StreamsDatum> process(StreamsDatum entry) {
+    this.result = new LinkedList<StreamsDatum>();
+    result.add(entry);
+    return result;
+  }
 
-    @Override
-    public void prepare(Object configurationObject) {
+  @Override
+  public void prepare(Object configurationObject) {
 
-    }
+  }
 
-    @Override
-    public void cleanUp() {
-        LOGGER.debug("Processor clean up!");
-    }
+  @Override
+  public void cleanUp() {
+    LOGGER.debug("Processor clean up!");
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/PassthroughDatumCounterProcessor.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/PassthroughDatumCounterProcessor.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/PassthroughDatumCounterProcessor.java
index 970a8dc..43343e5 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/PassthroughDatumCounterProcessor.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/PassthroughDatumCounterProcessor.java
@@ -20,10 +20,15 @@ package org.apache.streams.local.test.processors;
 
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -32,76 +37,76 @@ import java.util.concurrent.atomic.AtomicLong;
  */
 public class PassthroughDatumCounterProcessor implements StreamsProcessor {
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(PassthroughDatumCounterProcessor.class);
+  private final static Logger LOGGER = LoggerFactory.getLogger(PassthroughDatumCounterProcessor.class);
 
-    public final static String STREAMS_ID = "PassthroughDatumCounterProcessor";
+  public final static String STREAMS_ID = "PassthroughDatumCounterProcessor";
 
-    /**
-     * Set of all ids that have been claimed.  Ensures all instances are assigned unique ids
-     */
-    public static Set<Integer> CLAIMED_ID = new HashSet<Integer>();
-    /**
-     * Random instance to generate ids
-     */
-    public static final Random RAND = new Random();
-    /**
-     * Set of instance ids that received data. Usefully for testing parrallelization is actually working.
-     */
-    public final static Set<Integer> SEEN_DATA = new HashSet<Integer>();
-    /**
-     * The total count of data seen by a all instances of a processor.
-     */
-    public static final ConcurrentHashMap<String, AtomicLong> COUNTS = new ConcurrentHashMap<>();
+  /**
+   * Set of all ids that have been claimed.  Ensures all instances are assigned unique ids
+   */
+  public static Set<Integer> CLAIMED_ID = new HashSet<Integer>();
+  /**
+   * Random instance to generate ids
+   */
+  public static final Random RAND = new Random();
+  /**
+   * Set of instance ids that received data. Usefully for testing parrallelization is actually working.
+   */
+  public final static Set<Integer> SEEN_DATA = new HashSet<Integer>();
+  /**
+   * The total count of data seen by a all instances of a processor.
+   */
+  public static final ConcurrentHashMap<String, AtomicLong> COUNTS = new ConcurrentHashMap<>();
 
-    private int count = 0;
-    private int id;
-    private String procId;
+  private int count = 0;
+  private int id;
+  private String procId;
 
-    public PassthroughDatumCounterProcessor(String procId) {
-        this.procId = procId;
-    }
+  public PassthroughDatumCounterProcessor(String procId) {
+    this.procId = procId;
+  }
 
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
 
-    @Override
-    public List<StreamsDatum> process(StreamsDatum entry) {
-        ++this.count;
-        List<StreamsDatum> result = new LinkedList<StreamsDatum>();
-        result.add(entry);
-        synchronized (SEEN_DATA) {
-            SEEN_DATA.add(this.id);
-        }
-        return result;
+  @Override
+  public List<StreamsDatum> process(StreamsDatum entry) {
+    ++this.count;
+    List<StreamsDatum> result = new LinkedList<StreamsDatum>();
+    result.add(entry);
+    synchronized (SEEN_DATA) {
+      SEEN_DATA.add(this.id);
     }
+    return result;
+  }
 
-    @Override
-    public void prepare(Object configurationObject) {
-        synchronized (CLAIMED_ID) {
-            this.id = RAND.nextInt();
-            while(!CLAIMED_ID.add(this.id)) {
-                this.id = RAND.nextInt();
-            }
-        }
+  @Override
+  public void prepare(Object configurationObject) {
+    synchronized (CLAIMED_ID) {
+      this.id = RAND.nextInt();
+      while(!CLAIMED_ID.add(this.id)) {
+        this.id = RAND.nextInt();
+      }
     }
+  }
 
-    @Override
-    public void cleanUp() {
-        LOGGER.debug("Clean up {}", this.procId);
-        synchronized (COUNTS) {
-            AtomicLong count = COUNTS.get(this.procId);
-            if(count == null) {
-                COUNTS.put(this.procId, new AtomicLong(this.count));
-            } else {
-                count.addAndGet(this.count);
-            }
-        }
-        LOGGER.debug("{}\t{}", this.procId, this.count);
+  @Override
+  public void cleanUp() {
+    LOGGER.debug("Clean up {}", this.procId);
+    synchronized (COUNTS) {
+      AtomicLong count = COUNTS.get(this.procId);
+      if(count == null) {
+        COUNTS.put(this.procId, new AtomicLong(this.count));
+      } else {
+        count.addAndGet(this.count);
+      }
     }
+    LOGGER.debug("{}\t{}", this.procId, this.count);
+  }
 
-    public int getMessageCount() {
-        return this.count;
-    }
+  public int getMessageCount() {
+    return this.count;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/SlowProcessor.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/SlowProcessor.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/SlowProcessor.java
index 2b172cd..227e0f8 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/SlowProcessor.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/SlowProcessor.java
@@ -19,40 +19,41 @@
 
 package org.apache.streams.local.test.processors;
 
-import com.google.common.collect.Lists;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
 
+import com.google.common.collect.Lists;
+
 import java.util.List;
 
 /**
  */
 public class SlowProcessor  implements StreamsProcessor {
 
-    public final static String STREAMS_ID = "DoNothingProcessor";
+  public final static String STREAMS_ID = "DoNothingProcessor";
 
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
 
-    @Override
-    public List<StreamsDatum> process(StreamsDatum entry) {
-        try {
-            Thread.sleep(1000);
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-        }
-        return Lists.newArrayList(entry);
+  @Override
+  public List<StreamsDatum> process(StreamsDatum entry) {
+    try {
+      Thread.sleep(1000);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
     }
+    return Lists.newArrayList(entry);
+  }
 
-    @Override
-    public void prepare(Object configurationObject) {
+  @Override
+  public void prepare(Object configurationObject) {
 
-    }
+  }
 
-    @Override
-    public void cleanUp() {
+  @Override
+  public void cleanUp() {
 
-    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java
index bdbc9ec..571c0fc 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java
@@ -19,10 +19,11 @@
 
 package org.apache.streams.local.test.providers;
 
-import com.google.common.collect.Queues;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
+
+import com.google.common.collect.Queues;
 import org.joda.time.DateTime;
 
 import java.math.BigInteger;
@@ -32,43 +33,43 @@ import java.math.BigInteger;
  */
 public class EmptyResultSetProvider implements StreamsProvider {
 
-    @Override
-    public String getId() {
-        return "EmptyResultSetProvider";
-    }
+  @Override
+  public String getId() {
+    return "EmptyResultSetProvider";
+  }
 
-    @Override
-    public void startStream() {
-        //NOP
-    }
+  @Override
+  public void startStream() {
+    //NOP
+  }
 
-    @Override
-    public StreamsResultSet readCurrent() {
-        return new StreamsResultSet(Queues.<StreamsDatum>newLinkedBlockingQueue());
-    }
+  @Override
+  public StreamsResultSet readCurrent() {
+    return new StreamsResultSet(Queues.<StreamsDatum>newLinkedBlockingQueue());
+  }
 
-    @Override
-    public StreamsResultSet readNew(BigInteger sequence) {
-        return new StreamsResultSet(Queues.<StreamsDatum>newLinkedBlockingQueue());
-    }
+  @Override
+  public StreamsResultSet readNew(BigInteger sequence) {
+    return new StreamsResultSet(Queues.<StreamsDatum>newLinkedBlockingQueue());
+  }
 
-    @Override
-    public StreamsResultSet readRange(DateTime start, DateTime end) {
-        return new StreamsResultSet(Queues.<StreamsDatum>newLinkedBlockingQueue());
-    }
+  @Override
+  public StreamsResultSet readRange(DateTime start, DateTime end) {
+    return new StreamsResultSet(Queues.<StreamsDatum>newLinkedBlockingQueue());
+  }
 
-    @Override
-    public boolean isRunning() {
-        return true;
-    }
+  @Override
+  public boolean isRunning() {
+    return true;
+  }
 
-    @Override
-    public void prepare(Object configurationObject) {
-        //NOP
-    }
+  @Override
+  public void prepare(Object configurationObject) {
+    //NOP
+  }
 
-    @Override
-    public void cleanUp() {
-        //NOP
-    }
+  @Override
+  public void cleanUp() {
+    //NOP
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java
index d7c1568..88494a8 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java
@@ -18,93 +18,91 @@
 
 package org.apache.streams.local.test.providers;
 
-import com.google.common.collect.Queues;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
+
+import com.google.common.collect.Queues;
 import org.joda.time.DateTime;
 
 import java.math.BigInteger;
-import java.util.Iterator;
 import java.util.Queue;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Test StreamsProvider that sends out StreamsDatums numbered from 0 to numMessages.
  */
 public class NumericMessageProvider implements StreamsProvider {
 
-    @Override
-    public String getId() {
-        return "NumericMessageProvider";
-    }
-
-    private static final int DEFAULT_BATCH_SIZE = 100;
-
-    private int numMessages;
-    private BlockingQueue<StreamsDatum> data;
-    private volatile boolean complete = false;
-
-    public NumericMessageProvider(int numMessages) {
-        this.numMessages = numMessages;
-    }
-
-    @Override
-    public void startStream() {
-        this.data = constructQueue();
+  @Override
+  public String getId() {
+    return "NumericMessageProvider";
+  }
+
+  private static final int DEFAULT_BATCH_SIZE = 100;
+
+  private int numMessages;
+  private BlockingQueue<StreamsDatum> data;
+  private volatile boolean complete = false;
+
+  public NumericMessageProvider(int numMessages) {
+    this.numMessages = numMessages;
+  }
+
+  @Override
+  public void startStream() {
+    this.data = constructQueue();
+  }
+
+  @Override
+  public StreamsResultSet readCurrent() {
+    int batchSize = 0;
+    Queue<StreamsDatum> batch = Queues.newLinkedBlockingQueue();
+    try {
+      while (!this.data.isEmpty() && batchSize < DEFAULT_BATCH_SIZE) {
+        batch.add(this.data.take());
+        ++batchSize;
+      }
+    } catch (InterruptedException ie) {
+      Thread.currentThread().interrupt();
     }
-
-    @Override
-    public StreamsResultSet readCurrent() {
-        int batchSize = 0;
-        Queue<StreamsDatum> batch = Queues.newLinkedBlockingQueue();
-        try {
-            while (!this.data.isEmpty() && batchSize < DEFAULT_BATCH_SIZE) {
-                batch.add(this.data.take());
-                ++batchSize;
-            }
-        } catch (InterruptedException ie) {
-            Thread.currentThread().interrupt();
-        }
 //        System.out.println("******************\n**\tBatchSize="+batch.size()+"\n******************");
-        this.complete = batch.isEmpty() && this.data.isEmpty();
-        return new StreamsResultSet(batch);
-    }
-
-    @Override
-    public StreamsResultSet readNew(BigInteger sequence) {
-        return new StreamsResultSet(constructQueue());
-    }
-
-    @Override
-    public StreamsResultSet readRange(DateTime start, DateTime end) {
-        return new StreamsResultSet(constructQueue());
-    }
-
-    @Override
-    public boolean isRunning() {
-        return !this.complete;
-    }
-
-    @Override
-    public void prepare(Object configurationObject) {
-        this.data = constructQueue();
-    }
-
-    @Override
-    public void cleanUp() {
-
-    }
-
-    private BlockingQueue<StreamsDatum> constructQueue() {
-        BlockingQueue<StreamsDatum> datums = Queues.newArrayBlockingQueue(numMessages);
-        for(int i=0;i<numMessages;i++) {
-            datums.add(new StreamsDatum(i));
-        }
-        return datums;
+    this.complete = batch.isEmpty() && this.data.isEmpty();
+    return new StreamsResultSet(batch);
+  }
+
+  @Override
+  public StreamsResultSet readNew(BigInteger sequence) {
+    return new StreamsResultSet(constructQueue());
+  }
+
+  @Override
+  public StreamsResultSet readRange(DateTime start, DateTime end) {
+    return new StreamsResultSet(constructQueue());
+  }
+
+  @Override
+  public boolean isRunning() {
+    return !this.complete;
+  }
+
+  @Override
+  public void prepare(Object configurationObject) {
+    this.data = constructQueue();
+  }
+
+  @Override
+  public void cleanUp() {
+
+  }
+
+  private BlockingQueue<StreamsDatum> constructQueue() {
+    BlockingQueue<StreamsDatum> datums = Queues.newArrayBlockingQueue(numMessages);
+    for(int i=0;i<numMessages;i++) {
+      datums.add(new StreamsDatum(i));
     }
+    return datums;
+  }
 }