You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/11/25 20:24:47 UTC

[06/42] incubator-streams git commit: STREAMS-440: custom checkstyle.xml, address compliance

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
index 3405882..26272ea 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
@@ -18,17 +18,22 @@
 
 package org.apache.streams.local.tasks;
 
-import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.streams.config.StreamsConfiguration;
-import org.apache.streams.core.*;
+import org.apache.streams.core.DatumStatus;
+import org.apache.streams.core.DatumStatusCountable;
+import org.apache.streams.core.DatumStatusCounter;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.core.util.DatumUtils;
 import org.apache.streams.local.counters.StreamsTaskCounter;
+
+import com.google.common.util.concurrent.Uninterruptibles;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.math.BigInteger;
-import java.util.Map;
 import java.util.Queue;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
@@ -40,219 +45,219 @@ import java.util.concurrent.atomic.AtomicBoolean;
  */
 public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusCountable {
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(StreamsProviderTask.class);
+  private final static Logger LOGGER = LoggerFactory.getLogger(StreamsProviderTask.class);
 
-    public DatumStatusCounter getDatumStatusCounter() {
-        return this.statusCounter;
-    }
+  public DatumStatusCounter getDatumStatusCounter() {
+    return this.statusCounter;
+  }
 
-    private static enum Type {
-        PERPETUAL,
-        READ_CURRENT,
-        READ_NEW,
-        READ_RANGE
-    }
+  private static enum Type {
+    PERPETUAL,
+    READ_CURRENT,
+    READ_NEW,
+    READ_RANGE
+  }
 
-    private static final int START = 0;
-    private static final int END = 1;
+  private static final int START = 0;
+  private static final int END = 1;
 
-    private StreamsProvider provider;
-    private final AtomicBoolean keepRunning = new AtomicBoolean(true);
-    private final AtomicBoolean flushing = new AtomicBoolean(false);
-    private final AtomicBoolean started = new AtomicBoolean(false);
-    private Type type;
-    private BigInteger sequence;
-    private DateTime[] dateRange;
-    private StreamsConfiguration config;
+  private StreamsProvider provider;
+  private final AtomicBoolean keepRunning = new AtomicBoolean(true);
+  private final AtomicBoolean flushing = new AtomicBoolean(false);
+  private final AtomicBoolean started = new AtomicBoolean(false);
+  private Type type;
+  private BigInteger sequence;
+  private DateTime[] dateRange;
+  private StreamsConfiguration config;
 
-    private int timeout;
-    private long sleepTime;
-    private int zeros = 0;
-    private DatumStatusCounter statusCounter = new DatumStatusCounter();
-    private StreamsTaskCounter counter;
+  private int timeout;
+  private long sleepTime;
+  private int zeros = 0;
+  private DatumStatusCounter statusCounter = new DatumStatusCounter();
+  private StreamsTaskCounter counter;
 
-    /**
-     * Constructor for a StreamsProvider to execute {@link org.apache.streams.core.StreamsProvider:readCurrent()}
-     * @param provider
-     */
-    public StreamsProviderTask(StreamsProvider provider, boolean perpetual, StreamsConfiguration streamConfig) {
-        super(streamConfig);
-        streamConfig = super.streamConfig;
-        this.provider = provider;
-        if( perpetual )
-            this.type = Type.PERPETUAL;
-        else
-            this.type = Type.READ_CURRENT;
-        this.timeout = super.streamConfig.getProviderTimeoutMs().intValue();
-        this.sleepTime = streamConfig.getBatchFrequencyMs();
-    }
+  /**
+   * Constructor for a StreamsProvider to execute {@link org.apache.streams.core.StreamsProvider:readCurrent()}
+   * @param provider
+   */
+  public StreamsProviderTask(StreamsProvider provider, boolean perpetual, StreamsConfiguration streamConfig) {
+    super(streamConfig);
+    streamConfig = super.streamConfig;
+    this.provider = provider;
+    if( perpetual )
+      this.type = Type.PERPETUAL;
+    else
+      this.type = Type.READ_CURRENT;
+    this.timeout = super.streamConfig.getProviderTimeoutMs().intValue();
+    this.sleepTime = streamConfig.getBatchFrequencyMs();
+  }
 
-    /**
-     * Constructor for a StreamsProvider to execute {@link org.apache.streams.core.StreamsProvider:readNew(BigInteger)}
-     * @param provider
-     * @param sequence
-     */
-    public StreamsProviderTask(StreamsProvider provider, BigInteger sequence, StreamsConfiguration streamConfig) {
-        super(streamConfig);
-        this.provider = provider;
-        this.type = Type.READ_NEW;
-        this.sequence = sequence;
-        this.timeout = streamConfig.getProviderTimeoutMs().intValue();
-        this.sleepTime = streamConfig.getBatchFrequencyMs();
-    }
+  /**
+   * Constructor for a StreamsProvider to execute {@link org.apache.streams.core.StreamsProvider:readNew(BigInteger)}
+   * @param provider
+   * @param sequence
+   */
+  public StreamsProviderTask(StreamsProvider provider, BigInteger sequence, StreamsConfiguration streamConfig) {
+    super(streamConfig);
+    this.provider = provider;
+    this.type = Type.READ_NEW;
+    this.sequence = sequence;
+    this.timeout = streamConfig.getProviderTimeoutMs().intValue();
+    this.sleepTime = streamConfig.getBatchFrequencyMs();
+  }
 
-    /**
-     * Constructor for a StreamsProvider to execute {@link org.apache.streams.core.StreamsProvider:readRange(DateTime,DateTime)}
-     * @param provider
-     * @param start
-     * @param end
-     */
-    public StreamsProviderTask(StreamsProvider provider, DateTime start, DateTime end, StreamsConfiguration streamConfig) {
-        super(streamConfig);
-        this.provider = provider;
-        this.type = Type.READ_RANGE;
-        this.dateRange = new DateTime[2];
-        this.dateRange[START] = start;
-        this.dateRange[END] = end;
-        this.timeout = streamConfig.getProviderTimeoutMs().intValue();
-        this.sleepTime = streamConfig.getBatchFrequencyMs();
-    }
+  /**
+   * Constructor for a StreamsProvider to execute {@link org.apache.streams.core.StreamsProvider:readRange(DateTime,DateTime)}
+   * @param provider
+   * @param start
+   * @param end
+   */
+  public StreamsProviderTask(StreamsProvider provider, DateTime start, DateTime end, StreamsConfiguration streamConfig) {
+    super(streamConfig);
+    this.provider = provider;
+    this.type = Type.READ_RANGE;
+    this.dateRange = new DateTime[2];
+    this.dateRange[START] = start;
+    this.dateRange[END] = end;
+    this.timeout = streamConfig.getProviderTimeoutMs().intValue();
+    this.sleepTime = streamConfig.getBatchFrequencyMs();
+  }
 
-    public void setTimeout(int timeout) {
-        this.timeout = timeout;
-    }
+  public void setTimeout(int timeout) {
+    this.timeout = timeout;
+  }
 
-    public void setSleepTime(long sleepTime) {
-        this.sleepTime = sleepTime;
-    }
+  public void setSleepTime(long sleepTime) {
+    this.sleepTime = sleepTime;
+  }
 
-    @Override
-    public boolean isWaiting() {
-        return false; //providers don't have inbound queues
-    }
+  @Override
+  public boolean isWaiting() {
+    return false; //providers don't have inbound queues
+  }
 
-    @Override
-    public void stopTask() {
-        LOGGER.debug("Stopping Provider Task for {}", this.provider.getClass().getSimpleName());
-        this.keepRunning.set(false);
-    }
+  @Override
+  public void stopTask() {
+    LOGGER.debug("Stopping Provider Task for {}", this.provider.getClass().getSimpleName());
+    this.keepRunning.set(false);
+  }
 
-    @Override
-    public void addInputQueue(BlockingQueue<StreamsDatum> inputQueue) {
-        throw new UnsupportedOperationException(this.getClass().getName()+" does not support method - setInputQueue()");
-    }
+  @Override
+  public void addInputQueue(BlockingQueue<StreamsDatum> inputQueue) {
+    throw new UnsupportedOperationException(this.getClass().getName()+" does not support method - setInputQueue()");
+  }
 
-    @Override
-    public void setStreamConfig(StreamsConfiguration config) {
-        this.config = config;
-    }
+  @Override
+  public void setStreamConfig(StreamsConfiguration config) {
+    this.config = config;
+  }
 
 
-    @Override
-    public void run() {
-        try {
-            this.provider.prepare(this.config); //TODO allow for configuration objects
-            StreamsResultSet resultSet = null;
-            //Negative values mean we want to run forever
-            long maxZeros = timeout < 0 ? Long.MAX_VALUE : (timeout / sleepTime);
-            if(this.counter == null) { //should never be null
-                this.counter = new StreamsTaskCounter(this.provider.getClass().getName()+ UUID.randomUUID().toString(), getStreamIdentifier(), getStartedAt());
+  @Override
+  public void run() {
+    try {
+      this.provider.prepare(this.config); //TODO allow for configuration objects
+      StreamsResultSet resultSet = null;
+      //Negative values mean we want to run forever
+      long maxZeros = timeout < 0 ? Long.MAX_VALUE : (timeout / sleepTime);
+      if(this.counter == null) { //should never be null
+        this.counter = new StreamsTaskCounter(this.provider.getClass().getName()+ UUID.randomUUID().toString(), getStreamIdentifier(), getStartedAt());
+      }
+      switch(this.type) {
+        case PERPETUAL: {
+          provider.startStream();
+          this.started.set(true);
+          while(this.isRunning()) {
+            try {
+              long startTime = System.currentTimeMillis();
+              resultSet = provider.readCurrent();
+              this.counter.addTime(System.currentTimeMillis() - startTime);
+              if( resultSet.size() == 0 )
+                zeros++;
+              else {
+                zeros = 0;
+              }
+              flushResults(resultSet);
+              // the way this works needs to change...
+              if(zeros > maxZeros)
+                this.keepRunning.set(false);
+              if(zeros > 0)
+                Uninterruptibles.sleepUninterruptibly(sleepTime, TimeUnit.MILLISECONDS);
+            } catch (Exception e) {
+              this.counter.incrementErrorCount();
+              LOGGER.warn("Thread exception");
+              this.keepRunning.set(false);
             }
-            switch(this.type) {
-                case PERPETUAL: {
-                    provider.startStream();
-                    this.started.set(true);
-                    while(this.isRunning()) {
-                        try {
-                            long startTime = System.currentTimeMillis();
-                            resultSet = provider.readCurrent();
-                            this.counter.addTime(System.currentTimeMillis() - startTime);
-                            if( resultSet.size() == 0 )
-                                zeros++;
-                            else {
-                                zeros = 0;
-                            }
-                            flushResults(resultSet);
-                            // the way this works needs to change...
-                            if(zeros > maxZeros)
-                                this.keepRunning.set(false);
-                            if(zeros > 0)
-                                Uninterruptibles.sleepUninterruptibly(sleepTime, TimeUnit.MILLISECONDS);
-                        } catch (Exception e) {
-                            this.counter.incrementErrorCount();
-                            LOGGER.warn("Thread exception");
-                            this.keepRunning.set(false);
-                        }
-                    }
-                    Uninterruptibles.sleepUninterruptibly(sleepTime, TimeUnit.MILLISECONDS);
-                }
-                    break;
-                case READ_CURRENT:
-                    resultSet = this.provider.readCurrent();
-                    this.started.set(true);
-                    break;
-                case READ_NEW:
-                    resultSet = this.provider.readNew(this.sequence);
-                    this.started.set(true);
-                    break;
-                case READ_RANGE:
-                    resultSet = this.provider.readRange(this.dateRange[START], this.dateRange[END]);
-                    this.started.set(true);
-                    break;
-                default: throw new RuntimeException("Type has not been added to StreamsProviderTask.");
-            }
-            if( resultSet != null )
-                flushResults(resultSet);
-
-        } catch(Throwable e) {
-            LOGGER.error("Caught Throwable in Provider {}", this.provider.getClass().getSimpleName(), e);
-        }  finally {
-            Uninterruptibles.sleepUninterruptibly(sleepTime, TimeUnit.MILLISECONDS);
-            LOGGER.debug("Complete Provider Task execution for {}", this.provider.getClass().getSimpleName());
-            this.provider.cleanUp();
-            //Setting started to 'true' here will allow the isRunning() method to return false in the event of an exception
-            //before started would normally be set to true n the run method.
-            this.started.set(true);
-            this.keepRunning.set(false);
+          }
+          Uninterruptibles.sleepUninterruptibly(sleepTime, TimeUnit.MILLISECONDS);
         }
-    }
+        break;
+        case READ_CURRENT:
+          resultSet = this.provider.readCurrent();
+          this.started.set(true);
+          break;
+        case READ_NEW:
+          resultSet = this.provider.readNew(this.sequence);
+          this.started.set(true);
+          break;
+        case READ_RANGE:
+          resultSet = this.provider.readRange(this.dateRange[START], this.dateRange[END]);
+          this.started.set(true);
+          break;
+        default: throw new RuntimeException("Type has not been added to StreamsProviderTask.");
+      }
+      if( resultSet != null )
+        flushResults(resultSet);
 
-    @Override
-    public boolean isRunning() {
-        //We want to make sure that we never return false if it is flushing, regardless of the state of the provider
-        //or whether we have been told to shut down.  If someone really wants us to shut down, they will interrupt the
-        //thread and force us to shutdown.  We also want to make sure we have had the opportunity to run before the
-        //runtime kills us.
-        return !this.started.get() || this.flushing.get() || (this.provider.isRunning() && this.keepRunning.get());
+    } catch(Throwable e) {
+      LOGGER.error("Caught Throwable in Provider {}", this.provider.getClass().getSimpleName(), e);
+    }  finally {
+      Uninterruptibles.sleepUninterruptibly(sleepTime, TimeUnit.MILLISECONDS);
+      LOGGER.debug("Complete Provider Task execution for {}", this.provider.getClass().getSimpleName());
+      this.provider.cleanUp();
+      //Setting started to 'true' here will allow the isRunning() method to return false in the event of an exception
+      //before started would normally be set to true n the run method.
+      this.started.set(true);
+      this.keepRunning.set(false);
     }
+  }
 
-    public void flushResults(StreamsResultSet resultSet) {
-        Queue<StreamsDatum> queue = resultSet.getQueue();
-        this.flushing.set(true);
-        while(!queue.isEmpty()) {
-            StreamsDatum datum = queue.poll();
-            if(!this.keepRunning.get()) {
-                break;
-            }
-            if(datum != null) {
-                try {
-                    super.addToOutgoingQueue(datum);
-                    this.counter.incrementEmittedCount();
-                    statusCounter.incrementStatus(DatumStatus.SUCCESS);
-                } catch( Exception e ) {
-                    this.counter.incrementErrorCount();
-                    statusCounter.incrementStatus(DatumStatus.FAIL);
-                    DatumUtils.addErrorToMetadata(datum, e, this.provider.getClass());
-                }
-            }
+  @Override
+  public boolean isRunning() {
+    //We want to make sure that we never return false if it is flushing, regardless of the state of the provider
+    //or whether we have been told to shut down.  If someone really wants us to shut down, they will interrupt the
+    //thread and force us to shutdown.  We also want to make sure we have had the opportunity to run before the
+    //runtime kills us.
+    return !this.started.get() || this.flushing.get() || (this.provider.isRunning() && this.keepRunning.get());
+  }
+
+  public void flushResults(StreamsResultSet resultSet) {
+    Queue<StreamsDatum> queue = resultSet.getQueue();
+    this.flushing.set(true);
+    while(!queue.isEmpty()) {
+      StreamsDatum datum = queue.poll();
+      if(!this.keepRunning.get()) {
+        break;
+      }
+      if(datum != null) {
+        try {
+          super.addToOutgoingQueue(datum);
+          this.counter.incrementEmittedCount();
+          statusCounter.incrementStatus(DatumStatus.SUCCESS);
+        } catch( Exception e ) {
+          this.counter.incrementErrorCount();
+          statusCounter.incrementStatus(DatumStatus.FAIL);
+          DatumUtils.addErrorToMetadata(datum, e, this.provider.getClass());
         }
-        this.flushing.set(false);
+      }
     }
+    this.flushing.set(false);
+  }
 
-    @Override
-    public void setStreamsTaskCounter(StreamsTaskCounter counter) {
-        this.counter = counter;
-    }
+  @Override
+  public void setStreamsTaskCounter(StreamsTaskCounter counter) {
+    this.counter = counter;
+  }
 
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java
index 5c14c1f..1b91e5c 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java
@@ -23,8 +23,6 @@ import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.local.counters.StreamsTaskCounter;
 
 import java.util.List;
-import java.util.Map;
-import java.util.Queue;
 import java.util.concurrent.BlockingQueue;
 
 /**
@@ -33,53 +31,53 @@ import java.util.concurrent.BlockingQueue;
  */
 public interface StreamsTask extends Runnable{
 
-    /**
-     * Informs the task to stop. Tasks may or may not try to empty its inbound queue before halting.
-     */
-    public void stopTask();
+  /**
+   * Informs the task to stop. Tasks may or may not try to empty its inbound queue before halting.
+   */
+  public void stopTask();
 
-    /**
-     * Returns true if the task is waiting on more data to process
-     * @return true, if waiting on more data to process
-     */
-    public boolean isWaiting();
-    /**
-     * Add an input {@link java.util.Queue} for this task.
-     * @param inputQueue
-     */
-    public void addInputQueue(BlockingQueue<StreamsDatum> inputQueue);
+  /**
+   * Returns true if the task is waiting on more data to process
+   * @return true, if waiting on more data to process
+   */
+  public boolean isWaiting();
+  /**
+   * Add an input {@link java.util.Queue} for this task.
+   * @param inputQueue
+   */
+  public void addInputQueue(BlockingQueue<StreamsDatum> inputQueue);
 
-    /**
-     * Add an output {@link java.util.Queue} for this task.
-     * @param outputQueue
-     */
-    public void addOutputQueue(BlockingQueue<StreamsDatum> outputQueue);
+  /**
+   * Add an output {@link java.util.Queue} for this task.
+   * @param outputQueue
+   */
+  public void addOutputQueue(BlockingQueue<StreamsDatum> outputQueue);
 
-    /**
-     * Set the configuration object that will shared and passed to all instances of StreamsTask.
-     * @param config optional configuration information
-     */
-    public void setStreamConfig(StreamsConfiguration config);
+  /**
+   * Set the configuration object that will shared and passed to all instances of StreamsTask.
+   * @param config optional configuration information
+   */
+  public void setStreamConfig(StreamsConfiguration config);
 
-    /**
-     * Returns true when the task has not completed. Returns false otherwise
-     * @return true when the task has not completed. Returns false otherwise
-     */
-    public boolean isRunning();
+  /**
+   * Returns true when the task has not completed. Returns false otherwise
+   * @return true when the task has not completed. Returns false otherwise
+   */
+  public boolean isRunning();
 
-    /**
-     * Returns the input queues that have been set for this task.
-     * @return list of input queues
-     */
-    public List<BlockingQueue<StreamsDatum>> getInputQueues();
+  /**
+   * Returns the input queues that have been set for this task.
+   * @return list of input queues
+   */
+  public List<BlockingQueue<StreamsDatum>> getInputQueues();
 
-    /**
-     * Returns the output queues that have been set for this task
-     * @return list of output queues
-     */
-    public List<BlockingQueue<StreamsDatum>> getOutputQueues();
+  /**
+   * Returns the output queues that have been set for this task
+   * @return list of output queues
+   */
+  public List<BlockingQueue<StreamsDatum>> getOutputQueues();
 
 
-    public void setStreamsTaskCounter(StreamsTaskCounter counter);
+  public void setStreamsTaskCounter(StreamsTaskCounter counter);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
index 2bbfdcc..741c0b5 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
@@ -18,11 +18,6 @@
 
 package org.apache.streams.local.builders;
 
-import com.carrotsearch.randomizedtesting.RandomizedTest;
-import com.carrotsearch.randomizedtesting.annotations.Repeat;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.streams.core.StreamBuilder;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsPersistWriter;
@@ -36,6 +31,12 @@ import org.apache.streams.local.test.providers.NumericMessageProvider;
 import org.apache.streams.local.test.writer.DatumCounterWriter;
 import org.apache.streams.local.test.writer.SystemOutWriter;
 import org.apache.streams.util.ComponentUtils;
+
+import com.carrotsearch.randomizedtesting.RandomizedTest;
+import com.carrotsearch.randomizedtesting.annotations.Repeat;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Uninterruptibles;
 import org.joda.time.DateTime;
 import org.junit.After;
 import org.junit.Before;
@@ -44,7 +45,6 @@ import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
-import javax.management.*;
 import java.lang.management.ManagementFactory;
 import java.util.Collections;
 import java.util.List;
@@ -55,10 +55,21 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanRegistrationException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
 
-import static org.hamcrest.Matchers.*;
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 /**
  * Basic Tests for the LocalStreamBuilder.
@@ -70,334 +81,334 @@ import static org.mockito.Mockito.*;
  *
  */
 public class LocalStreamBuilderTest extends RandomizedTest {
-    private static final String MBEAN_ID = "test_id";
-    private static final String STREAM_ID = "test_stream";
-    private static long STREAM_START_TIME = (new DateTime()).getMillis();
+  private static final String MBEAN_ID = "test_id";
+  private static final String STREAM_ID = "test_stream";
+  private static long STREAM_START_TIME = (new DateTime()).getMillis();
 
-    @After
-    public void removeLocalMBeans() {
-        try {
-            ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local");
-        } catch (Exception e) {
-            //No op.  proceed to next test
-        }
+  @After
+  public void removeLocalMBeans() {
+    try {
+      ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local");
+    } catch (Exception e) {
+      //No op.  proceed to next test
     }
+  }
 
 
-    public void removeRegisteredMBeans(String... ids) {
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-        for(String id : ids) {
-            try {
-                mbs.unregisterMBean(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, id, STREAM_ID, STREAM_START_TIME)));
-            } catch (MalformedObjectNameException|InstanceNotFoundException|MBeanRegistrationException e) {
-                //No-op
-            }
-            try {
-                mbs.unregisterMBean(new ObjectName((String.format(StreamsTaskCounter.NAME_TEMPLATE, id, STREAM_ID, STREAM_START_TIME))));
-            } catch (MalformedObjectNameException|InstanceNotFoundException|MBeanRegistrationException e) {
-                //No-op
-            }
-        }
+  public void removeRegisteredMBeans(String... ids) {
+    MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+    for(String id : ids) {
+      try {
+        mbs.unregisterMBean(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, id, STREAM_ID, STREAM_START_TIME)));
+      } catch (MalformedObjectNameException|InstanceNotFoundException|MBeanRegistrationException e) {
+        //No-op
+      }
+      try {
+        mbs.unregisterMBean(new ObjectName((String.format(StreamsTaskCounter.NAME_TEMPLATE, id, STREAM_ID, STREAM_START_TIME))));
+      } catch (MalformedObjectNameException|InstanceNotFoundException|MBeanRegistrationException e) {
+        //No-op
+      }
     }
+  }
 
 
 
 
-    @Test
-    public void testStreamIdValidations() {
-        StreamBuilder builder = new LocalStreamBuilder();
-        builder.newReadCurrentStream("id", new NumericMessageProvider(1));
-        Exception exp = null;
-        try {
-            builder.newReadCurrentStream("id", new NumericMessageProvider(1));
-        } catch (RuntimeException e) {
-            exp = e;
-        }
-        assertNotNull(exp);
-        exp = null;
-        builder.addStreamsProcessor("1", new PassthroughDatumCounterProcessor("1"), 1, "id");
-        try {
-            builder.addStreamsProcessor("2", new PassthroughDatumCounterProcessor("2"), 1, "id", "id2");
-        } catch (RuntimeException e) {
-            exp = e;
-        }
-        assertNotNull(exp);
-        removeRegisteredMBeans("1", "2", "id");
+  @Test
+  public void testStreamIdValidations() {
+    StreamBuilder builder = new LocalStreamBuilder();
+    builder.newReadCurrentStream("id", new NumericMessageProvider(1));
+    Exception exp = null;
+    try {
+      builder.newReadCurrentStream("id", new NumericMessageProvider(1));
+    } catch (RuntimeException e) {
+      exp = e;
     }
-
-    @Test
-    public void testBasicLinearStream1()  {
-        linearStreamNonParallel(1, 1);
+    assertNotNull(exp);
+    exp = null;
+    builder.addStreamsProcessor("1", new PassthroughDatumCounterProcessor("1"), 1, "id");
+    try {
+      builder.addStreamsProcessor("2", new PassthroughDatumCounterProcessor("2"), 1, "id", "id2");
+    } catch (RuntimeException e) {
+      exp = e;
     }
+    assertNotNull(exp);
+    removeRegisteredMBeans("1", "2", "id");
+  }
 
-    @Test
-    public void testBasicLinearStream2()  {
-        linearStreamNonParallel(1004, 1);
-    }
+  @Test
+  public void testBasicLinearStream1()  {
+    linearStreamNonParallel(1, 1);
+  }
 
-    @Test
-    public void testBasicLinearStream3()  {
-        linearStreamNonParallel(1, 10);
-    }
+  @Test
+  public void testBasicLinearStream2()  {
+    linearStreamNonParallel(1004, 1);
+  }
 
-    @Test
-    @Repeat(iterations = 3)
-    public void testBasicLinearStreamRandom()  {
-        int numDatums = randomIntBetween(1, 100000);
-        int numProcessors = randomIntBetween(1, 10);
-        linearStreamNonParallel(numDatums, numProcessors);
-    }
+  @Test
+  public void testBasicLinearStream3()  {
+    linearStreamNonParallel(1, 10);
+  }
+
+  @Test
+  @Repeat(iterations = 3)
+  public void testBasicLinearStreamRandom()  {
+    int numDatums = randomIntBetween(1, 100000);
+    int numProcessors = randomIntBetween(1, 10);
+    linearStreamNonParallel(numDatums, numProcessors);
+  }
 
-    /**
-     * Tests that all datums pass through each processor and that all datums reach the writer
-     * @param numDatums
-     * @param numProcessors
-     */
-    private void linearStreamNonParallel(int numDatums, int numProcessors) {
-        String processorId = "proc";
-        try {
-            StreamBuilder builder = new LocalStreamBuilder(10);
-            builder.newPerpetualStream("numeric_provider", new NumericMessageProvider(numDatums));
-            String connectTo = null;
-            for(int i=0; i < numProcessors; ++i) {
-                if(i == 0) {
-                    connectTo = "numeric_provider";
-                } else {
-                    connectTo = processorId+(i-1);
-                }
-                builder.addStreamsProcessor(processorId+i, new PassthroughDatumCounterProcessor(processorId+i), 1, connectTo);
-            }
-            Set output = Collections.newSetFromMap(new ConcurrentHashMap());
-            builder.addStreamsPersistWriter("writer", new DatumCounterWriter("writer"), 1, processorId+(numProcessors-1));
-            builder.start();
-            for(int i=0; i < numProcessors; ++i) {
-                assertEquals("Processor "+i+" did not receive all of the datums", numDatums, PassthroughDatumCounterProcessor.COUNTS.get(processorId+i).get());
-            }
-            for(int i=0; i < numDatums; ++i) {
-                assertTrue("Expected writer to have received : "+i, DatumCounterWriter.RECEIVED.get("writer").contains(i));
-            }
-        } finally {
-            for(int i=0; i < numProcessors; ++i) {
-                removeRegisteredMBeans(processorId+i, processorId+i+"-"+PassthroughDatumCounterProcessor.class.getCanonicalName());
-            }
-            removeRegisteredMBeans("writer", "numeric_provider");
+  /**
+   * Tests that all datums pass through each processor and that all datums reach the writer
+   * @param numDatums
+   * @param numProcessors
+   */
+  private void linearStreamNonParallel(int numDatums, int numProcessors) {
+    String processorId = "proc";
+    try {
+      StreamBuilder builder = new LocalStreamBuilder(10);
+      builder.newPerpetualStream("numeric_provider", new NumericMessageProvider(numDatums));
+      String connectTo = null;
+      for(int i=0; i < numProcessors; ++i) {
+        if(i == 0) {
+          connectTo = "numeric_provider";
+        } else {
+          connectTo = processorId+(i-1);
         }
+        builder.addStreamsProcessor(processorId+i, new PassthroughDatumCounterProcessor(processorId+i), 1, connectTo);
+      }
+      Set output = Collections.newSetFromMap(new ConcurrentHashMap());
+      builder.addStreamsPersistWriter("writer", new DatumCounterWriter("writer"), 1, processorId+(numProcessors-1));
+      builder.start();
+      for(int i=0; i < numProcessors; ++i) {
+        assertEquals("Processor "+i+" did not receive all of the datums", numDatums, PassthroughDatumCounterProcessor.COUNTS.get(processorId+i).get());
+      }
+      for(int i=0; i < numDatums; ++i) {
+        assertTrue("Expected writer to have received : "+i, DatumCounterWriter.RECEIVED.get("writer").contains(i));
+      }
+    } finally {
+      for(int i=0; i < numProcessors; ++i) {
+        removeRegisteredMBeans(processorId+i, processorId+i+"-"+PassthroughDatumCounterProcessor.class.getCanonicalName());
+      }
+      removeRegisteredMBeans("writer", "numeric_provider");
     }
+  }
 
-    @Test
-    public void testParallelLinearStream1() {
-        String processorId = "proc";
-        int numProcessors = randomIntBetween(1, 10);
-        int numDatums = randomIntBetween(1, 300000);
-        try {
-            StreamBuilder builder = new LocalStreamBuilder(50);
-            builder.newPerpetualStream("numeric_provider", new NumericMessageProvider(numDatums));
-            String connectTo = null;
-            for(int i=0; i < numProcessors; ++i) {
-                if(i == 0) {
-                    connectTo = "numeric_provider";
-                } else {
-                    connectTo = processorId+(i-1);
-                }
-                int parallelHint = randomIntBetween(1,5);
-                builder.addStreamsProcessor(processorId+i, new PassthroughDatumCounterProcessor(processorId+i), parallelHint, connectTo);
-            }
-            builder.addStreamsPersistWriter("writer", new DatumCounterWriter("writer"), 1, processorId+(numProcessors-1));
-            builder.start();
-            Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
-            builder.stop();
-            Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
-            assertEquals(numDatums, DatumCounterWriter.RECEIVED.get("writer").size());
-            for(int i=0; i < numDatums; ++i) {
-                assertTrue("Expected Writer to receive datum : " + i, DatumCounterWriter.RECEIVED.get("writer").contains(i));
-            }
-            for(int i=0; i < numProcessors; ++i) {
-                assertEquals(numDatums, PassthroughDatumCounterProcessor.COUNTS.get(processorId+i).get());
-            }
-
-        } finally {
-            for(int i=0; i < numProcessors; ++i) {
-                removeRegisteredMBeans(processorId+i);
-            }
-            removeRegisteredMBeans("writer", "numeric_provider");
+  @Test
+  public void testParallelLinearStream1() {
+    String processorId = "proc";
+    int numProcessors = randomIntBetween(1, 10);
+    int numDatums = randomIntBetween(1, 300000);
+    try {
+      StreamBuilder builder = new LocalStreamBuilder(50);
+      builder.newPerpetualStream("numeric_provider", new NumericMessageProvider(numDatums));
+      String connectTo = null;
+      for(int i=0; i < numProcessors; ++i) {
+        if(i == 0) {
+          connectTo = "numeric_provider";
+        } else {
+          connectTo = processorId+(i-1);
         }
+        int parallelHint = randomIntBetween(1,5);
+        builder.addStreamsProcessor(processorId+i, new PassthroughDatumCounterProcessor(processorId+i), parallelHint, connectTo);
+      }
+      builder.addStreamsPersistWriter("writer", new DatumCounterWriter("writer"), 1, processorId+(numProcessors-1));
+      builder.start();
+      Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
+      builder.stop();
+      Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
+      assertEquals(numDatums, DatumCounterWriter.RECEIVED.get("writer").size());
+      for(int i=0; i < numDatums; ++i) {
+        assertTrue("Expected Writer to receive datum : " + i, DatumCounterWriter.RECEIVED.get("writer").contains(i));
+      }
+      for(int i=0; i < numProcessors; ++i) {
+        assertEquals(numDatums, PassthroughDatumCounterProcessor.COUNTS.get(processorId+i).get());
+      }
+
+    } finally {
+      for(int i=0; i < numProcessors; ++i) {
+        removeRegisteredMBeans(processorId+i);
+      }
+      removeRegisteredMBeans("writer", "numeric_provider");
     }
+  }
 
-    @Test
-    public void testBasicMergeStream() {
-        try {
-            int numDatums1 = randomIntBetween(1, 300000);
-            int numDatums2 = randomIntBetween(1, 300000);
-            StreamsProcessor processor1 = new PassthroughDatumCounterProcessor("proc1");
-            StreamsProcessor processor2 = new PassthroughDatumCounterProcessor("proc2");
-            StreamBuilder builder = new LocalStreamBuilder();
-            builder.newPerpetualStream("sp1", new NumericMessageProvider(numDatums1))
-                    .newPerpetualStream("sp2", new NumericMessageProvider(numDatums2))
-                    .addStreamsProcessor("proc1", processor1, 1, "sp1")
-                    .addStreamsProcessor("proc2", processor2, 1, "sp2")
-                    .addStreamsPersistWriter("writer1", new DatumCounterWriter("writer"), 1, "proc1", "proc2");
-            builder.start();
-            assertEquals(numDatums1, PassthroughDatumCounterProcessor.COUNTS.get("proc1").get());
-            assertEquals(numDatums2, PassthroughDatumCounterProcessor.COUNTS.get("proc2").get());
-            assertEquals(numDatums1+numDatums2, DatumCounterWriter.COUNTS.get("writer").get());
-        } finally {
-            String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName();
-            String writerClass = "-"+DatumCounterWriter.class.getCanonicalName();
-            removeRegisteredMBeans("proc1", "proc2", "writer1", "sp1", "sp2");
-        }
+  @Test
+  public void testBasicMergeStream() {
+    try {
+      int numDatums1 = randomIntBetween(1, 300000);
+      int numDatums2 = randomIntBetween(1, 300000);
+      StreamsProcessor processor1 = new PassthroughDatumCounterProcessor("proc1");
+      StreamsProcessor processor2 = new PassthroughDatumCounterProcessor("proc2");
+      StreamBuilder builder = new LocalStreamBuilder();
+      builder.newPerpetualStream("sp1", new NumericMessageProvider(numDatums1))
+          .newPerpetualStream("sp2", new NumericMessageProvider(numDatums2))
+          .addStreamsProcessor("proc1", processor1, 1, "sp1")
+          .addStreamsProcessor("proc2", processor2, 1, "sp2")
+          .addStreamsPersistWriter("writer1", new DatumCounterWriter("writer"), 1, "proc1", "proc2");
+      builder.start();
+      assertEquals(numDatums1, PassthroughDatumCounterProcessor.COUNTS.get("proc1").get());
+      assertEquals(numDatums2, PassthroughDatumCounterProcessor.COUNTS.get("proc2").get());
+      assertEquals(numDatums1+numDatums2, DatumCounterWriter.COUNTS.get("writer").get());
+    } finally {
+      String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName();
+      String writerClass = "-"+DatumCounterWriter.class.getCanonicalName();
+      removeRegisteredMBeans("proc1", "proc2", "writer1", "sp1", "sp2");
     }
+  }
 
-    @Test
-    public void testBasicBranch() {
-        try {
-            int numDatums = randomIntBetween(1, 300000);
-            StreamBuilder builder = new LocalStreamBuilder(50);
-            builder.newPerpetualStream("prov1", new NumericMessageProvider(numDatums))
-                    .addStreamsProcessor("proc1", new PassthroughDatumCounterProcessor("proc1"), 1, "prov1")
-                    .addStreamsProcessor("proc2", new PassthroughDatumCounterProcessor("proc2"), 1, "prov1")
-                    .addStreamsPersistWriter("w1", new DatumCounterWriter("writer"), 1, "proc1", "proc2");
-            builder.start();
-            assertEquals(numDatums, PassthroughDatumCounterProcessor.COUNTS.get("proc1").get());
-            assertEquals(numDatums, PassthroughDatumCounterProcessor.COUNTS.get("proc2").get());
-            assertEquals(numDatums*2, DatumCounterWriter.COUNTS.get("writer").get());
-        } finally {
-            String provClass = "-"+NumericMessageProvider.class.getCanonicalName();
-            String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName();
-            String writerClass = "-"+DatumCounterWriter.class.getCanonicalName();
-            removeRegisteredMBeans("prov1", "proc1", "proc2", "w1");
-        }
+  @Test
+  public void testBasicBranch() {
+    try {
+      int numDatums = randomIntBetween(1, 300000);
+      StreamBuilder builder = new LocalStreamBuilder(50);
+      builder.newPerpetualStream("prov1", new NumericMessageProvider(numDatums))
+          .addStreamsProcessor("proc1", new PassthroughDatumCounterProcessor("proc1"), 1, "prov1")
+          .addStreamsProcessor("proc2", new PassthroughDatumCounterProcessor("proc2"), 1, "prov1")
+          .addStreamsPersistWriter("w1", new DatumCounterWriter("writer"), 1, "proc1", "proc2");
+      builder.start();
+      assertEquals(numDatums, PassthroughDatumCounterProcessor.COUNTS.get("proc1").get());
+      assertEquals(numDatums, PassthroughDatumCounterProcessor.COUNTS.get("proc2").get());
+      assertEquals(numDatums*2, DatumCounterWriter.COUNTS.get("writer").get());
+    } finally {
+      String provClass = "-"+NumericMessageProvider.class.getCanonicalName();
+      String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName();
+      String writerClass = "-"+DatumCounterWriter.class.getCanonicalName();
+      removeRegisteredMBeans("prov1", "proc1", "proc2", "w1");
     }
+  }
 
-    @Test
-    public void testSlowProcessorBranch() {
-        try {
-            int numDatums = 30;
-            int timeout = 2000;
-            Map<String, Object> config = Maps.newHashMap();
-            config.put(LocalStreamBuilder.TIMEOUT_KEY, timeout);
-            StreamBuilder builder = new LocalStreamBuilder(config);
-            builder.newPerpetualStream("prov1", new NumericMessageProvider(numDatums))
-                    .addStreamsProcessor("proc1", new SlowProcessor(), 1, "prov1")
-                    .addStreamsPersistWriter("w1", new DatumCounterWriter("writer"), 1, "proc1");
-            builder.start();
-            assertEquals(numDatums, DatumCounterWriter.COUNTS.get("writer").get());
-        } finally {
-            String provClass = "-"+NumericMessageProvider.class.getCanonicalName();
-            String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName();
-            String writerClass = "-"+DatumCounterWriter.class.getCanonicalName();
-            removeRegisteredMBeans("prov1", "proc1", "w1");
-        }
+  @Test
+  public void testSlowProcessorBranch() {
+    try {
+      int numDatums = 30;
+      int timeout = 2000;
+      Map<String, Object> config = Maps.newHashMap();
+      config.put(LocalStreamBuilder.TIMEOUT_KEY, timeout);
+      StreamBuilder builder = new LocalStreamBuilder(config);
+      builder.newPerpetualStream("prov1", new NumericMessageProvider(numDatums))
+          .addStreamsProcessor("proc1", new SlowProcessor(), 1, "prov1")
+          .addStreamsPersistWriter("w1", new DatumCounterWriter("writer"), 1, "proc1");
+      builder.start();
+      assertEquals(numDatums, DatumCounterWriter.COUNTS.get("writer").get());
+    } finally {
+      String provClass = "-"+NumericMessageProvider.class.getCanonicalName();
+      String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName();
+      String writerClass = "-"+DatumCounterWriter.class.getCanonicalName();
+      removeRegisteredMBeans("prov1", "proc1", "w1");
     }
+  }
 
-    @Test
-    public void testConfiguredProviderTimeout() {
-        try {
-            Map<String, Object> config = Maps.newHashMap();
-            int timeout = 10000;
-            config.put(LocalStreamBuilder.TIMEOUT_KEY, timeout);
-            long start = System.currentTimeMillis();
-            StreamBuilder builder = new LocalStreamBuilder(-1, config);
-            builder.newPerpetualStream("prov1", new EmptyResultSetProvider())
-                    .addStreamsProcessor("proc1", new PassthroughDatumCounterProcessor("proc1"), 1, "prov1")
-                    .addStreamsProcessor("proc2", new PassthroughDatumCounterProcessor("proc2"), 1, "proc1")
-                    .addStreamsPersistWriter("w1", new SystemOutWriter(), 1, "proc1");
-            builder.start();
-            long end = System.currentTimeMillis();
-            //We care mostly that it doesn't terminate too early.  With thread shutdowns, etc, the actual time is indeterminate.  Just make sure there is an upper bound
-            assertThat((int) (end - start), is(allOf(greaterThanOrEqualTo(timeout), lessThanOrEqualTo(4 * timeout))));
-        } finally {
-            String provClass = "-"+NumericMessageProvider.class.getCanonicalName();
-            String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName();
-            String writerClass = "-"+DatumCounterWriter.class.getCanonicalName();
-            removeRegisteredMBeans("prov1", "proc1", "proc2", "w1");
-        }
+  @Test
+  public void testConfiguredProviderTimeout() {
+    try {
+      Map<String, Object> config = Maps.newHashMap();
+      int timeout = 10000;
+      config.put(LocalStreamBuilder.TIMEOUT_KEY, timeout);
+      long start = System.currentTimeMillis();
+      StreamBuilder builder = new LocalStreamBuilder(-1, config);
+      builder.newPerpetualStream("prov1", new EmptyResultSetProvider())
+          .addStreamsProcessor("proc1", new PassthroughDatumCounterProcessor("proc1"), 1, "prov1")
+          .addStreamsProcessor("proc2", new PassthroughDatumCounterProcessor("proc2"), 1, "proc1")
+          .addStreamsPersistWriter("w1", new SystemOutWriter(), 1, "proc1");
+      builder.start();
+      long end = System.currentTimeMillis();
+      //We care mostly that it doesn't terminate too early.  With thread shutdowns, etc, the actual time is indeterminate.  Just make sure there is an upper bound
+      assertThat((int) (end - start), is(allOf(greaterThanOrEqualTo(timeout), lessThanOrEqualTo(4 * timeout))));
+    } finally {
+      String provClass = "-"+NumericMessageProvider.class.getCanonicalName();
+      String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName();
+      String writerClass = "-"+DatumCounterWriter.class.getCanonicalName();
+      removeRegisteredMBeans("prov1", "proc1", "proc2", "w1");
     }
+  }
 
-    @Ignore
-    @Test
-    public void ensureShutdownWithBlockedQueue() throws InterruptedException {
-        try {
-            ExecutorService service = Executors.newSingleThreadExecutor();
-            int before = Thread.activeCount();
-            final StreamBuilder builder = new LocalStreamBuilder();
-            builder.newPerpetualStream("prov1", new NumericMessageProvider(30))
-                    .addStreamsProcessor("proc1", new SlowProcessor(), 1, "prov1")
-                    .addStreamsPersistWriter("w1", new SystemOutWriter(), 1, "proc1");
-            service.submit(new Runnable() {
-                @Override
-                public void run() {
-                    builder.start();
-                }
-            });
-            //Let streams spin up threads and start to process
-            Thread.sleep(500);
-            builder.stop();
-            service.shutdownNow();
-            service.awaitTermination(30000, TimeUnit.MILLISECONDS);
-            assertThat(Thread.activeCount(), is(equalTo(before)));
-        } finally {
-            String provClass = "-"+NumericMessageProvider.class.getCanonicalName();
-            String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName();
-            String writerClass = "-"+DatumCounterWriter.class.getCanonicalName();
-            removeRegisteredMBeans("prov1", "proc1", "w1");
+  @Ignore
+  @Test
+  public void ensureShutdownWithBlockedQueue() throws InterruptedException {
+    try {
+      ExecutorService service = Executors.newSingleThreadExecutor();
+      int before = Thread.activeCount();
+      final StreamBuilder builder = new LocalStreamBuilder();
+      builder.newPerpetualStream("prov1", new NumericMessageProvider(30))
+          .addStreamsProcessor("proc1", new SlowProcessor(), 1, "prov1")
+          .addStreamsPersistWriter("w1", new SystemOutWriter(), 1, "proc1");
+      service.submit(new Runnable() {
+        @Override
+        public void run() {
+          builder.start();
         }
+      });
+      //Let streams spin up threads and start to process
+      Thread.sleep(500);
+      builder.stop();
+      service.shutdownNow();
+      service.awaitTermination(30000, TimeUnit.MILLISECONDS);
+      assertThat(Thread.activeCount(), is(equalTo(before)));
+    } finally {
+      String provClass = "-"+NumericMessageProvider.class.getCanonicalName();
+      String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName();
+      String writerClass = "-"+DatumCounterWriter.class.getCanonicalName();
+      removeRegisteredMBeans("prov1", "proc1", "w1");
     }
+  }
 
-    @Before
-    private void clearCounters() {
-        PassthroughDatumCounterProcessor.COUNTS.clear();
-        PassthroughDatumCounterProcessor.CLAIMED_ID.clear();
-        PassthroughDatumCounterProcessor.SEEN_DATA.clear();
-        DatumCounterWriter.COUNTS.clear();
-        DatumCounterWriter.CLAIMED_ID.clear();
-        DatumCounterWriter.SEEN_DATA.clear();
-        DatumCounterWriter.RECEIVED.clear();
-    }
+  @Before
+  private void clearCounters() {
+    PassthroughDatumCounterProcessor.COUNTS.clear();
+    PassthroughDatumCounterProcessor.CLAIMED_ID.clear();
+    PassthroughDatumCounterProcessor.SEEN_DATA.clear();
+    DatumCounterWriter.COUNTS.clear();
+    DatumCounterWriter.CLAIMED_ID.clear();
+    DatumCounterWriter.SEEN_DATA.clear();
+    DatumCounterWriter.RECEIVED.clear();
+  }
 
 
-    /**
-     * Creates {@link org.apache.streams.core.StreamsProcessor} that passes any StreamsDatum it gets as an
-     * input and counts the number of items it processes.
-     * @param counter
-     * @return
-     */
-    private StreamsProcessor createPassThroughProcessor(final AtomicInteger counter) {
-        StreamsProcessor processor = mock(StreamsProcessor.class);
-        when(processor.process(any(StreamsDatum.class))).thenAnswer(new Answer<List<StreamsDatum>>() {
-            @Override
-            public List<StreamsDatum> answer(InvocationOnMock invocationOnMock) throws Throwable {
-                List<StreamsDatum> datum = Lists.newLinkedList();
-                if(counter != null) {
-                    counter.incrementAndGet();
-                }
-                datum.add((StreamsDatum) invocationOnMock.getArguments()[0] );
-                return datum;
-            }
-        });
-        return processor;
-    }
+  /**
+   * Creates {@link org.apache.streams.core.StreamsProcessor} that passes any StreamsDatum it gets as an
+   * input and counts the number of items it processes.
+   * @param counter
+   * @return
+   */
+  private StreamsProcessor createPassThroughProcessor(final AtomicInteger counter) {
+    StreamsProcessor processor = mock(StreamsProcessor.class);
+    when(processor.process(any(StreamsDatum.class))).thenAnswer(new Answer<List<StreamsDatum>>() {
+      @Override
+      public List<StreamsDatum> answer(InvocationOnMock invocationOnMock) throws Throwable {
+        List<StreamsDatum> datum = Lists.newLinkedList();
+        if(counter != null) {
+          counter.incrementAndGet();
+        }
+        datum.add((StreamsDatum) invocationOnMock.getArguments()[0] );
+        return datum;
+      }
+    });
+    return processor;
+  }
 
-    private StreamsPersistWriter createSetCollectingWriter(final Set collector) {
-        return createSetCollectingWriter(collector, null);
-    }
+  private StreamsPersistWriter createSetCollectingWriter(final Set collector) {
+    return createSetCollectingWriter(collector, null);
+  }
 
-    /**
-     * Creates a StreamsPersistWriter that adds every datums document to a set
-     * @param collector
-     * @return
-     */
-    private StreamsPersistWriter createSetCollectingWriter(final Set collector, final AtomicInteger counter) {
-        StreamsPersistWriter writer = mock(StreamsPersistWriter.class);
-        doAnswer(new Answer() {
-            @Override
-            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
-                if(counter != null) {
-                    counter.incrementAndGet();
-                }
-                collector.add(((StreamsDatum)invocationOnMock.getArguments()[0]).getDocument());
-                return null;
-            }
-        }).when(writer).write(any(StreamsDatum.class));
-        return writer;
-    }
+  /**
+   * Creates a StreamsPersistWriter that adds every datums document to a set
+   * @param collector
+   * @return
+   */
+  private StreamsPersistWriter createSetCollectingWriter(final Set collector, final AtomicInteger counter) {
+    StreamsPersistWriter writer = mock(StreamsPersistWriter.class);
+    doAnswer(new Answer() {
+      @Override
+      public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+        if(counter != null) {
+          counter.incrementAndGet();
+        }
+        collector.add(((StreamsDatum)invocationOnMock.getArguments()[0]).getDocument());
+        return null;
+      }
+    }).when(writer).write(any(StreamsDatum.class));
+    return writer;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/ToyLocalBuilderExample.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/ToyLocalBuilderExample.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/ToyLocalBuilderExample.java
index a77dfec..2c61093 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/ToyLocalBuilderExample.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/ToyLocalBuilderExample.java
@@ -30,16 +30,16 @@ import org.apache.streams.local.test.writer.DoNothingWriter;
  */
 public class ToyLocalBuilderExample {
 
-    /**
-     * A simple example of how to run a stream in local mode.
-     * @param args
-     */
-    public static void main(String[] args) {
-        StreamBuilder builder = new LocalStreamBuilder();
-        builder.newReadCurrentStream("prov", new NumericMessageProvider(1000000))
-                .addStreamsProcessor("proc", new DoNothingProcessor(), 100, "prov")
-                .addStreamsPersistWriter("writer", new DoNothingWriter(), 3, "proc");
-        builder.start();
-    }
+  /**
+   * A simple example of how to run a stream in local mode.
+   * @param args
+   */
+  public static void main(String[] args) {
+    StreamBuilder builder = new LocalStreamBuilder();
+    builder.newReadCurrentStream("prov", new NumericMessageProvider(1000000))
+        .addStreamsProcessor("proc", new DoNothingProcessor(), 100, "prov")
+        .addStreamsPersistWriter("writer", new DoNothingWriter(), 3, "proc");
+    builder.start();
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/DatumStatusCounterTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/DatumStatusCounterTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/DatumStatusCounterTest.java
index 9775c6f..9d92bec 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/DatumStatusCounterTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/DatumStatusCounterTest.java
@@ -23,112 +23,112 @@ import org.joda.time.DateTime;
 import org.junit.After;
 import org.junit.Test;
 
+import java.lang.management.ManagementFactory;
 import javax.management.InstanceNotFoundException;
 import javax.management.ObjectName;
-import java.lang.management.ManagementFactory;
 
 /**
  *
  */
 public class DatumStatusCounterTest extends RandomizedTest {
 
-    private static final String MBEAN_ID = "test_id";
-    private static final String STREAM_ID = "test_stream";
-    private static long STREAM_START_TIME = (new DateTime()).getMillis();
+  private static final String MBEAN_ID = "test_id";
+  private static final String STREAM_ID = "test_stream";
+  private static long STREAM_START_TIME = (new DateTime()).getMillis();
 
 
-    /**
-     * Remove registered mbeans from previous tests
-     * @throws Exception
-     */
-    @After
-    public void unregisterMXBean() throws Exception {
-        try {
-            ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(String.format(DatumStatusCounter.NAME_TEMPLATE, MBEAN_ID, STREAM_ID, STREAM_START_TIME)));
-        } catch (InstanceNotFoundException ife) {
-            //No-op
-        }
+  /**
+   * Remove registered mbeans from previous tests
+   * @throws Exception
+   */
+  @After
+  public void unregisterMXBean() throws Exception {
+    try {
+      ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(String.format(DatumStatusCounter.NAME_TEMPLATE, MBEAN_ID, STREAM_ID, STREAM_START_TIME)));
+    } catch (InstanceNotFoundException ife) {
+      //No-op
     }
+  }
 
-    /**
-     * Test Constructor can register the counter as an mxbean with throwing an exception.
-     */
-    @Test
-    public void testConstructor() {
-        try {
-            new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
-        } catch (Throwable t) {
-            fail("Constructor Threw Exception : "+t.getMessage());
-        }
+  /**
+   * Test Constructor can register the counter as an mxbean with throwing an exception.
+   */
+  @Test
+  public void testConstructor() {
+    try {
+      new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
+    } catch (Throwable t) {
+      fail("Constructor Threw Exception : "+t.getMessage());
     }
+  }
 
-    /**
-     * Test that you can increment passes and it returns the correct count
-     * @throws Exception
-     */
-    @Test
-    @Repeat(iterations = 3)
-    public void testPassed() throws Exception {
-        DatumStatusCounter counter = new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
-        int numIncrements = randomIntBetween(1, 100000);
-        for(int i=0; i < numIncrements; ++i) {
-            counter.incrementPassedCount();
-        }
-        assertEquals(numIncrements, counter.getNumPassed());
+  /**
+   * Test that you can increment passes and it returns the correct count
+   * @throws Exception
+   */
+  @Test
+  @Repeat(iterations = 3)
+  public void testPassed() throws Exception {
+    DatumStatusCounter counter = new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
+    int numIncrements = randomIntBetween(1, 100000);
+    for(int i=0; i < numIncrements; ++i) {
+      counter.incrementPassedCount();
+    }
+    assertEquals(numIncrements, counter.getNumPassed());
 
-        unregisterMXBean();
+    unregisterMXBean();
 
-        counter = new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
-        numIncrements = randomIntBetween(1, 100000);
-        long total = 0;
-        for(int i=0; i < numIncrements; ++i) {
-            long delta = randomIntBetween(1, 100);
-            total += delta;
-            counter.incrementPassedCount(delta);
-        }
-        assertEquals(total, counter.getNumPassed());
+    counter = new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
+    numIncrements = randomIntBetween(1, 100000);
+    long total = 0;
+    for(int i=0; i < numIncrements; ++i) {
+      long delta = randomIntBetween(1, 100);
+      total += delta;
+      counter.incrementPassedCount(delta);
     }
+    assertEquals(total, counter.getNumPassed());
+  }
 
-    /**
-     * Test that you can increment failed and it returns the correct count
-     * @throws Exception
-     */
-    @Test
-    @Repeat(iterations = 3)
-    public void testFailed() throws Exception {
-        DatumStatusCounter counter = new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
-        int numIncrements = randomIntBetween(1, 100000);
-        for(int i=0; i < numIncrements; ++i) {
-            counter.incrementFailedCount();
-        }
-        assertEquals(numIncrements, counter.getNumFailed());
+  /**
+   * Test that you can increment failed and it returns the correct count
+   * @throws Exception
+   */
+  @Test
+  @Repeat(iterations = 3)
+  public void testFailed() throws Exception {
+    DatumStatusCounter counter = new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
+    int numIncrements = randomIntBetween(1, 100000);
+    for(int i=0; i < numIncrements; ++i) {
+      counter.incrementFailedCount();
+    }
+    assertEquals(numIncrements, counter.getNumFailed());
 
-        unregisterMXBean();
+    unregisterMXBean();
 
-        counter = new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
-        numIncrements = randomIntBetween(1, 100000);
-        long total = 0;
-        for(int i=0; i < numIncrements; ++i) {
-            long delta = randomIntBetween(1, 100);
-            total += delta;
-            counter.incrementFailedCount(delta);
-        }
-        assertEquals(total, counter.getNumFailed());
+    counter = new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
+    numIncrements = randomIntBetween(1, 100000);
+    long total = 0;
+    for(int i=0; i < numIncrements; ++i) {
+      long delta = randomIntBetween(1, 100);
+      total += delta;
+      counter.incrementFailedCount(delta);
     }
+    assertEquals(total, counter.getNumFailed());
+  }
 
 
-    /**
-     * Test failure rate returns expected values
-     */
-    @Test
-    @Repeat(iterations = 3)
-    public void testFailureRate() {
-        DatumStatusCounter counter = new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
-        assertEquals(0.0, counter.getFailRate(), 0);
-        int failures = randomIntBetween(0, 100000);
-        int passes = randomIntBetween(0, 100000);
-        counter.incrementPassedCount(passes);
-        counter.incrementFailedCount(failures);
-        assertEquals((double)failures / (double)(passes + failures), counter.getFailRate(), 0);
-    }
+  /**
+   * Test failure rate returns expected values
+   */
+  @Test
+  @Repeat(iterations = 3)
+  public void testFailureRate() {
+    DatumStatusCounter counter = new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
+    assertEquals(0.0, counter.getFailRate(), 0);
+    int failures = randomIntBetween(0, 100000);
+    int passes = randomIntBetween(0, 100000);
+    counter.incrementPassedCount(passes);
+    counter.incrementFailedCount(failures);
+    assertEquals((double)failures / (double)(passes + failures), counter.getFailRate(), 0);
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/StreamsTaskCounterTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/StreamsTaskCounterTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/StreamsTaskCounterTest.java
index 95fd610..544b065 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/StreamsTaskCounterTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/StreamsTaskCounterTest.java
@@ -23,139 +23,139 @@ import org.joda.time.DateTime;
 import org.junit.After;
 import org.junit.Test;
 
+import java.lang.management.ManagementFactory;
 import javax.management.InstanceNotFoundException;
 import javax.management.ObjectName;
-import java.lang.management.ManagementFactory;
 
 /**
  * Unit tests for {@link org.apache.streams.local.counters.StreamsTaskCounter}
  */
 public class StreamsTaskCounterTest extends RandomizedTest {
 
-    private static final String MBEAN_ID = "test_id";
-    private static final String STREAM_ID = "test_stream";
-    private static long STREAM_START_TIME = (new DateTime()).getMillis();
-
-    /**
-     * Remove registered mbeans from previous tests
-     * @throws Exception
-     */
-    @After
-    public void unregisterMXBean() throws Exception {
-        try {
-            ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(String.format(StreamsTaskCounter.NAME_TEMPLATE, MBEAN_ID, STREAM_ID, STREAM_START_TIME)));
-        } catch (InstanceNotFoundException ife) {
-            //No-op
-        }
+  private static final String MBEAN_ID = "test_id";
+  private static final String STREAM_ID = "test_stream";
+  private static long STREAM_START_TIME = (new DateTime()).getMillis();
+
+  /**
+   * Remove registered mbeans from previous tests
+   * @throws Exception
+   */
+  @After
+  public void unregisterMXBean() throws Exception {
+    try {
+      ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(String.format(StreamsTaskCounter.NAME_TEMPLATE, MBEAN_ID, STREAM_ID, STREAM_START_TIME)));
+    } catch (InstanceNotFoundException ife) {
+      //No-op
     }
-
-    /**
-     * Test constructor does not throw errors
-     */
-    @Test
-    public void testConstructor() {
-        try {
-            new StreamsTaskCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
-        } catch (Throwable t) {
-            fail("Constructor threw error : "+t.getMessage());
-        }
+  }
+
+  /**
+   * Test constructor does not throw errors
+   */
+  @Test
+  public void testConstructor() {
+    try {
+      new StreamsTaskCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
+    } catch (Throwable t) {
+      fail("Constructor threw error : "+t.getMessage());
     }
-
-    /**
-     * Test emitted increments correctly and returns expected value
-     * @throws Exception
-     */
-    @Test
-    @Repeat(iterations = 3)
-    public void testEmitted() throws Exception {
-        StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
-        int numIncrements = randomIntBetween(1, 100000);
-        for(int i=0; i < numIncrements; ++i) {
-            counter.incrementEmittedCount();
-        }
-        assertEquals(numIncrements, counter.getNumEmitted());
-
-        unregisterMXBean();
-
-        counter = new StreamsTaskCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
-        numIncrements = randomIntBetween(1, 100000);
-        long total = 0;
-        for(int i=0; i < numIncrements; ++i) {
-            long delta = randomIntBetween(1, 100);
-            total += delta;
-            counter.incrementEmittedCount(delta);
-        }
-        assertEquals(total, counter.getNumEmitted());
+  }
+
+  /**
+   * Test emitted increments correctly and returns expected value
+   * @throws Exception
+   */
+  @Test
+  @Repeat(iterations = 3)
+  public void testEmitted() throws Exception {
+    StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
+    int numIncrements = randomIntBetween(1, 100000);
+    for(int i=0; i < numIncrements; ++i) {
+      counter.incrementEmittedCount();
     }
+    assertEquals(numIncrements, counter.getNumEmitted());
+
+    unregisterMXBean();
 
-    /**
-     * Test received increments correctly and returns expected value
-     * @throws Exception
-     */
-    @Test
-    @Repeat(iterations = 3)
-    public void testReceived() throws Exception {
-        StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
-        int numIncrements = randomIntBetween(1, 100000);
-        for(int i=0; i < numIncrements; ++i) {
-            counter.incrementReceivedCount();
-        }
-        assertEquals(numIncrements, counter.getNumReceived());
-
-        unregisterMXBean();
-
-        counter = new StreamsTaskCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
-        numIncrements = randomIntBetween(1, 100000);
-        long total = 0;
-        for(int i=0; i < numIncrements; ++i) {
-            long delta = randomIntBetween(1, 100);
-            total += delta;
-            counter.incrementReceivedCount(delta);
-        }
-        assertEquals(total, counter.getNumReceived());
+    counter = new StreamsTaskCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
+    numIncrements = randomIntBetween(1, 100000);
+    long total = 0;
+    for(int i=0; i < numIncrements; ++i) {
+      long delta = randomIntBetween(1, 100);
+      total += delta;
+      counter.incrementEmittedCount(delta);
     }
+    assertEquals(total, counter.getNumEmitted());
+  }
+
+  /**
+   * Test received increments correctly and returns expected value
+   * @throws Exception
+   */
+  @Test
+  @Repeat(iterations = 3)
+  public void testReceived() throws Exception {
+    StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
+    int numIncrements = randomIntBetween(1, 100000);
+    for(int i=0; i < numIncrements; ++i) {
+      counter.incrementReceivedCount();
+    }
+    assertEquals(numIncrements, counter.getNumReceived());
+
+    unregisterMXBean();
 
-    /**
-     * Test errors increments correctly and returns expected value
-     * @throws Exception
-     */
-    @Test
-    @Repeat(iterations = 3)
-    public void testError() throws Exception {
-        StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
-        int numIncrements = randomIntBetween(1, 100000);
-        for(int i=0; i < numIncrements; ++i) {
-            counter.incrementErrorCount();
-        }
-        assertEquals(numIncrements, counter.getNumUnhandledErrors());
-
-        unregisterMXBean();
-
-        counter = new StreamsTaskCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
-        numIncrements = randomIntBetween(1, 100000);
-        long total = 0;
-        for(int i=0; i < numIncrements; ++i) {
-            long delta = randomIntBetween(1, 100);
-            total += delta;
-            counter.incrementErrorCount(delta);
-        }
-        assertEquals(total, counter.getNumUnhandledErrors());
+    counter = new StreamsTaskCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
+    numIncrements = randomIntBetween(1, 100000);
+    long total = 0;
+    for(int i=0; i < numIncrements; ++i) {
+      long delta = randomIntBetween(1, 100);
+      total += delta;
+      counter.incrementReceivedCount(delta);
     }
+    assertEquals(total, counter.getNumReceived());
+  }
+
+  /**
+   * Test errors increments correctly and returns expected value
+   * @throws Exception
+   */
+  @Test
+  @Repeat(iterations = 3)
+  public void testError() throws Exception {
+    StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
+    int numIncrements = randomIntBetween(1, 100000);
+    for(int i=0; i < numIncrements; ++i) {
+      counter.incrementErrorCount();
+    }
+    assertEquals(numIncrements, counter.getNumUnhandledErrors());
+
+    unregisterMXBean();
 
-    /**
-     * Test error rate returns expected value
-     * @throws Exception
-     */
-    @Test
-    @Repeat(iterations = 3)
-    public void testErrorRate() throws Exception {
-        StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
-        assertEquals(0.0, counter.getErrorRate(), 0);
-        int failures = randomIntBetween(0, 100000);
-        int received = randomIntBetween(0, 100000);
-        counter.incrementReceivedCount(received);
-        counter.incrementErrorCount(failures);
-        assertEquals((double)failures / (double)(received), counter.getErrorRate(), 0);
+    counter = new StreamsTaskCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
+    numIncrements = randomIntBetween(1, 100000);
+    long total = 0;
+    for(int i=0; i < numIncrements; ++i) {
+      long delta = randomIntBetween(1, 100);
+      total += delta;
+      counter.incrementErrorCount(delta);
     }
+    assertEquals(total, counter.getNumUnhandledErrors());
+  }
+
+  /**
+   * Test error rate returns expected value
+   * @throws Exception
+   */
+  @Test
+  @Repeat(iterations = 3)
+  public void testErrorRate() throws Exception {
+    StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
+    assertEquals(0.0, counter.getErrorRate(), 0);
+    int failures = randomIntBetween(0, 100000);
+    int received = randomIntBetween(0, 100000);
+    counter.incrementReceivedCount(received);
+    counter.incrementErrorCount(failures);
+    assertEquals((double)failures / (double)(received), counter.getErrorRate(), 0);
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java
index 7e33ab9..e3b608d 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java
@@ -20,6 +20,7 @@ package org.apache.streams.local.executors;
 
 import org.apache.streams.local.builders.LocalStreamBuilder;
 import org.apache.streams.util.ComponentUtils;
+
 import org.junit.After;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
@@ -27,7 +28,6 @@ import org.mockito.stubbing.Answer;
 
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -35,7 +35,6 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 /**
  *
@@ -43,90 +42,90 @@ import static org.mockito.Mockito.when;
 public class ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest {
 
 
-    @After
-    public void removeLocalMBeans() {
-        try {
-            ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local");
-        } catch (Exception e) {
-            //No op.  proceed to next test
-        }
+  @After
+  public void removeLocalMBeans() {
+    try {
+      ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local");
+    } catch (Exception e) {
+      //No op.  proceed to next test
     }
-
-    @Test
-    public void testShutDownOnException() {
-        LocalStreamBuilder sb = mock(LocalStreamBuilder.class);
-        final AtomicBoolean isShutdown = new AtomicBoolean(false);
-        doAnswer(new Answer() {
-            @Override
-            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
-                isShutdown.set(true);
-                return null;
-            }
-        }).when(sb).stop();
-
-        final CountDownLatch latch = new CountDownLatch(1);
-
-        Runnable runnable = new Runnable() {
-            @Override
-            public void run() {
-                latch.countDown();
-                throw new RuntimeException("Testing Throwable Handling!");
-            }
-        };
-
-        ExecutorService executor = new ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(1, sb);
-        executor.execute(runnable);
-        try {
-            latch.await();
-        } catch (InterruptedException ie) {
-            Thread.currentThread().interrupt();
-        }
-        executor.shutdownNow();
-        try {
-            executor.awaitTermination(1, TimeUnit.SECONDS);
-        } catch (InterruptedException ie) {
-            Thread.currentThread().interrupt();
-        }
-        assertTrue("Expected StreamBuilder shutdown to be called", isShutdown.get());
+  }
+
+  @Test
+  public void testShutDownOnException() {
+    LocalStreamBuilder sb = mock(LocalStreamBuilder.class);
+    final AtomicBoolean isShutdown = new AtomicBoolean(false);
+    doAnswer(new Answer() {
+      @Override
+      public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+        isShutdown.set(true);
+        return null;
+      }
+    }).when(sb).stop();
+
+    final CountDownLatch latch = new CountDownLatch(1);
+
+    Runnable runnable = new Runnable() {
+      @Override
+      public void run() {
+        latch.countDown();
+        throw new RuntimeException("Testing Throwable Handling!");
+      }
+    };
+
+    ExecutorService executor = new ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(1, sb);
+    executor.execute(runnable);
+    try {
+      latch.await();
+    } catch (InterruptedException ie) {
+      Thread.currentThread().interrupt();
     }
-
-
-    @Test
-    public void testNormalExecution() {
-        LocalStreamBuilder sb = mock(LocalStreamBuilder.class);
-        final AtomicBoolean isShutdown = new AtomicBoolean(false);
-        doAnswer(new Answer() {
-            @Override
-            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
-                isShutdown.set(true);
-                return null;
-            }
-        }).when(sb).stop();
-
-        final CountDownLatch latch = new CountDownLatch(1);
-
-        Runnable runnable = new Runnable() {
-            @Override
-            public void run() {
-                latch.countDown();
-            }
-        };
-
-        ExecutorService executor = new ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(1, sb);
-        executor.execute(runnable);
-        try {
-            latch.await();
-        } catch (InterruptedException ie) {
-            Thread.currentThread().interrupt();
-        }
-        executor.shutdownNow();
-        try {
-            executor.awaitTermination(1, TimeUnit.SECONDS);
-        } catch (InterruptedException ie) {
-            Thread.currentThread().interrupt();
-        }
-        assertFalse("Expected StreamBuilder shutdown to be called", isShutdown.get());
+    executor.shutdownNow();
+    try {
+      executor.awaitTermination(1, TimeUnit.SECONDS);
+    } catch (InterruptedException ie) {
+      Thread.currentThread().interrupt();
+    }
+    assertTrue("Expected StreamBuilder shutdown to be called", isShutdown.get());
+  }
+
+
+  @Test
+  public void testNormalExecution() {
+    LocalStreamBuilder sb = mock(LocalStreamBuilder.class);
+    final AtomicBoolean isShutdown = new AtomicBoolean(false);
+    doAnswer(new Answer() {
+      @Override
+      public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+        isShutdown.set(true);
+        return null;
+      }
+    }).when(sb).stop();
+
+    final CountDownLatch latch = new CountDownLatch(1);
+
+    Runnable runnable = new Runnable() {
+      @Override
+      public void run() {
+        latch.countDown();
+      }
+    };
+
+    ExecutorService executor = new ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(1, sb);
+    executor.execute(runnable);
+    try {
+      latch.await();
+    } catch (InterruptedException ie) {
+      Thread.currentThread().interrupt();
+    }
+    executor.shutdownNow();
+    try {
+      executor.awaitTermination(1, TimeUnit.SECONDS);
+    } catch (InterruptedException ie) {
+      Thread.currentThread().interrupt();
     }
+    assertFalse("Expected StreamBuilder shutdown to be called", isShutdown.get());
+  }
 
 
 }