You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/11/25 20:24:47 UTC
[06/42] incubator-streams git commit: STREAMS-440: custom
checkstyle.xml, address compliance
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
index 3405882..26272ea 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
@@ -18,17 +18,22 @@
package org.apache.streams.local.tasks;
-import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.streams.config.StreamsConfiguration;
-import org.apache.streams.core.*;
+import org.apache.streams.core.DatumStatus;
+import org.apache.streams.core.DatumStatusCountable;
+import org.apache.streams.core.DatumStatusCounter;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
import org.apache.streams.core.util.DatumUtils;
import org.apache.streams.local.counters.StreamsTaskCounter;
+
+import com.google.common.util.concurrent.Uninterruptibles;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigInteger;
-import java.util.Map;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
@@ -40,219 +45,219 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusCountable {
- private final static Logger LOGGER = LoggerFactory.getLogger(StreamsProviderTask.class);
+ private final static Logger LOGGER = LoggerFactory.getLogger(StreamsProviderTask.class);
- public DatumStatusCounter getDatumStatusCounter() {
- return this.statusCounter;
- }
+ public DatumStatusCounter getDatumStatusCounter() {
+ return this.statusCounter;
+ }
- private static enum Type {
- PERPETUAL,
- READ_CURRENT,
- READ_NEW,
- READ_RANGE
- }
+ private static enum Type {
+ PERPETUAL,
+ READ_CURRENT,
+ READ_NEW,
+ READ_RANGE
+ }
- private static final int START = 0;
- private static final int END = 1;
+ private static final int START = 0;
+ private static final int END = 1;
- private StreamsProvider provider;
- private final AtomicBoolean keepRunning = new AtomicBoolean(true);
- private final AtomicBoolean flushing = new AtomicBoolean(false);
- private final AtomicBoolean started = new AtomicBoolean(false);
- private Type type;
- private BigInteger sequence;
- private DateTime[] dateRange;
- private StreamsConfiguration config;
+ private StreamsProvider provider;
+ private final AtomicBoolean keepRunning = new AtomicBoolean(true);
+ private final AtomicBoolean flushing = new AtomicBoolean(false);
+ private final AtomicBoolean started = new AtomicBoolean(false);
+ private Type type;
+ private BigInteger sequence;
+ private DateTime[] dateRange;
+ private StreamsConfiguration config;
- private int timeout;
- private long sleepTime;
- private int zeros = 0;
- private DatumStatusCounter statusCounter = new DatumStatusCounter();
- private StreamsTaskCounter counter;
+ private int timeout;
+ private long sleepTime;
+ private int zeros = 0;
+ private DatumStatusCounter statusCounter = new DatumStatusCounter();
+ private StreamsTaskCounter counter;
- /**
- * Constructor for a StreamsProvider to execute {@link org.apache.streams.core.StreamsProvider:readCurrent()}
- * @param provider
- */
- public StreamsProviderTask(StreamsProvider provider, boolean perpetual, StreamsConfiguration streamConfig) {
- super(streamConfig);
- streamConfig = super.streamConfig;
- this.provider = provider;
- if( perpetual )
- this.type = Type.PERPETUAL;
- else
- this.type = Type.READ_CURRENT;
- this.timeout = super.streamConfig.getProviderTimeoutMs().intValue();
- this.sleepTime = streamConfig.getBatchFrequencyMs();
- }
+ /**
+ * Constructor for a StreamsProvider to execute {@link org.apache.streams.core.StreamsProvider:readCurrent()}
+ * @param provider
+ */
+ public StreamsProviderTask(StreamsProvider provider, boolean perpetual, StreamsConfiguration streamConfig) {
+ super(streamConfig);
+ streamConfig = super.streamConfig;
+ this.provider = provider;
+ if( perpetual )
+ this.type = Type.PERPETUAL;
+ else
+ this.type = Type.READ_CURRENT;
+ this.timeout = super.streamConfig.getProviderTimeoutMs().intValue();
+ this.sleepTime = streamConfig.getBatchFrequencyMs();
+ }
- /**
- * Constructor for a StreamsProvider to execute {@link org.apache.streams.core.StreamsProvider:readNew(BigInteger)}
- * @param provider
- * @param sequence
- */
- public StreamsProviderTask(StreamsProvider provider, BigInteger sequence, StreamsConfiguration streamConfig) {
- super(streamConfig);
- this.provider = provider;
- this.type = Type.READ_NEW;
- this.sequence = sequence;
- this.timeout = streamConfig.getProviderTimeoutMs().intValue();
- this.sleepTime = streamConfig.getBatchFrequencyMs();
- }
+ /**
+ * Constructor for a StreamsProvider to execute {@link org.apache.streams.core.StreamsProvider:readNew(BigInteger)}
+ * @param provider
+ * @param sequence
+ */
+ public StreamsProviderTask(StreamsProvider provider, BigInteger sequence, StreamsConfiguration streamConfig) {
+ super(streamConfig);
+ this.provider = provider;
+ this.type = Type.READ_NEW;
+ this.sequence = sequence;
+ this.timeout = streamConfig.getProviderTimeoutMs().intValue();
+ this.sleepTime = streamConfig.getBatchFrequencyMs();
+ }
- /**
- * Constructor for a StreamsProvider to execute {@link org.apache.streams.core.StreamsProvider:readRange(DateTime,DateTime)}
- * @param provider
- * @param start
- * @param end
- */
- public StreamsProviderTask(StreamsProvider provider, DateTime start, DateTime end, StreamsConfiguration streamConfig) {
- super(streamConfig);
- this.provider = provider;
- this.type = Type.READ_RANGE;
- this.dateRange = new DateTime[2];
- this.dateRange[START] = start;
- this.dateRange[END] = end;
- this.timeout = streamConfig.getProviderTimeoutMs().intValue();
- this.sleepTime = streamConfig.getBatchFrequencyMs();
- }
+ /**
+ * Constructor for a StreamsProvider to execute {@link org.apache.streams.core.StreamsProvider:readRange(DateTime,DateTime)}
+ * @param provider
+ * @param start
+ * @param end
+ */
+ public StreamsProviderTask(StreamsProvider provider, DateTime start, DateTime end, StreamsConfiguration streamConfig) {
+ super(streamConfig);
+ this.provider = provider;
+ this.type = Type.READ_RANGE;
+ this.dateRange = new DateTime[2];
+ this.dateRange[START] = start;
+ this.dateRange[END] = end;
+ this.timeout = streamConfig.getProviderTimeoutMs().intValue();
+ this.sleepTime = streamConfig.getBatchFrequencyMs();
+ }
- public void setTimeout(int timeout) {
- this.timeout = timeout;
- }
+ public void setTimeout(int timeout) {
+ this.timeout = timeout;
+ }
- public void setSleepTime(long sleepTime) {
- this.sleepTime = sleepTime;
- }
+ public void setSleepTime(long sleepTime) {
+ this.sleepTime = sleepTime;
+ }
- @Override
- public boolean isWaiting() {
- return false; //providers don't have inbound queues
- }
+ @Override
+ public boolean isWaiting() {
+ return false; //providers don't have inbound queues
+ }
- @Override
- public void stopTask() {
- LOGGER.debug("Stopping Provider Task for {}", this.provider.getClass().getSimpleName());
- this.keepRunning.set(false);
- }
+ @Override
+ public void stopTask() {
+ LOGGER.debug("Stopping Provider Task for {}", this.provider.getClass().getSimpleName());
+ this.keepRunning.set(false);
+ }
- @Override
- public void addInputQueue(BlockingQueue<StreamsDatum> inputQueue) {
- throw new UnsupportedOperationException(this.getClass().getName()+" does not support method - setInputQueue()");
- }
+ @Override
+ public void addInputQueue(BlockingQueue<StreamsDatum> inputQueue) {
+ throw new UnsupportedOperationException(this.getClass().getName()+" does not support method - setInputQueue()");
+ }
- @Override
- public void setStreamConfig(StreamsConfiguration config) {
- this.config = config;
- }
+ @Override
+ public void setStreamConfig(StreamsConfiguration config) {
+ this.config = config;
+ }
- @Override
- public void run() {
- try {
- this.provider.prepare(this.config); //TODO allow for configuration objects
- StreamsResultSet resultSet = null;
- //Negative values mean we want to run forever
- long maxZeros = timeout < 0 ? Long.MAX_VALUE : (timeout / sleepTime);
- if(this.counter == null) { //should never be null
- this.counter = new StreamsTaskCounter(this.provider.getClass().getName()+ UUID.randomUUID().toString(), getStreamIdentifier(), getStartedAt());
+ @Override
+ public void run() {
+ try {
+ this.provider.prepare(this.config); //TODO allow for configuration objects
+ StreamsResultSet resultSet = null;
+ //Negative values mean we want to run forever
+ long maxZeros = timeout < 0 ? Long.MAX_VALUE : (timeout / sleepTime);
+ if(this.counter == null) { //should never be null
+ this.counter = new StreamsTaskCounter(this.provider.getClass().getName()+ UUID.randomUUID().toString(), getStreamIdentifier(), getStartedAt());
+ }
+ switch(this.type) {
+ case PERPETUAL: {
+ provider.startStream();
+ this.started.set(true);
+ while(this.isRunning()) {
+ try {
+ long startTime = System.currentTimeMillis();
+ resultSet = provider.readCurrent();
+ this.counter.addTime(System.currentTimeMillis() - startTime);
+ if( resultSet.size() == 0 )
+ zeros++;
+ else {
+ zeros = 0;
+ }
+ flushResults(resultSet);
+ // the way this works needs to change...
+ if(zeros > maxZeros)
+ this.keepRunning.set(false);
+ if(zeros > 0)
+ Uninterruptibles.sleepUninterruptibly(sleepTime, TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ this.counter.incrementErrorCount();
+ LOGGER.warn("Thread exception");
+ this.keepRunning.set(false);
}
- switch(this.type) {
- case PERPETUAL: {
- provider.startStream();
- this.started.set(true);
- while(this.isRunning()) {
- try {
- long startTime = System.currentTimeMillis();
- resultSet = provider.readCurrent();
- this.counter.addTime(System.currentTimeMillis() - startTime);
- if( resultSet.size() == 0 )
- zeros++;
- else {
- zeros = 0;
- }
- flushResults(resultSet);
- // the way this works needs to change...
- if(zeros > maxZeros)
- this.keepRunning.set(false);
- if(zeros > 0)
- Uninterruptibles.sleepUninterruptibly(sleepTime, TimeUnit.MILLISECONDS);
- } catch (Exception e) {
- this.counter.incrementErrorCount();
- LOGGER.warn("Thread exception");
- this.keepRunning.set(false);
- }
- }
- Uninterruptibles.sleepUninterruptibly(sleepTime, TimeUnit.MILLISECONDS);
- }
- break;
- case READ_CURRENT:
- resultSet = this.provider.readCurrent();
- this.started.set(true);
- break;
- case READ_NEW:
- resultSet = this.provider.readNew(this.sequence);
- this.started.set(true);
- break;
- case READ_RANGE:
- resultSet = this.provider.readRange(this.dateRange[START], this.dateRange[END]);
- this.started.set(true);
- break;
- default: throw new RuntimeException("Type has not been added to StreamsProviderTask.");
- }
- if( resultSet != null )
- flushResults(resultSet);
-
- } catch(Throwable e) {
- LOGGER.error("Caught Throwable in Provider {}", this.provider.getClass().getSimpleName(), e);
- } finally {
- Uninterruptibles.sleepUninterruptibly(sleepTime, TimeUnit.MILLISECONDS);
- LOGGER.debug("Complete Provider Task execution for {}", this.provider.getClass().getSimpleName());
- this.provider.cleanUp();
- //Setting started to 'true' here will allow the isRunning() method to return false in the event of an exception
- //before started would normally be set to true n the run method.
- this.started.set(true);
- this.keepRunning.set(false);
+ }
+ Uninterruptibles.sleepUninterruptibly(sleepTime, TimeUnit.MILLISECONDS);
}
- }
+ break;
+ case READ_CURRENT:
+ resultSet = this.provider.readCurrent();
+ this.started.set(true);
+ break;
+ case READ_NEW:
+ resultSet = this.provider.readNew(this.sequence);
+ this.started.set(true);
+ break;
+ case READ_RANGE:
+ resultSet = this.provider.readRange(this.dateRange[START], this.dateRange[END]);
+ this.started.set(true);
+ break;
+ default: throw new RuntimeException("Type has not been added to StreamsProviderTask.");
+ }
+ if( resultSet != null )
+ flushResults(resultSet);
- @Override
- public boolean isRunning() {
- //We want to make sure that we never return false if it is flushing, regardless of the state of the provider
- //or whether we have been told to shut down. If someone really wants us to shut down, they will interrupt the
- //thread and force us to shutdown. We also want to make sure we have had the opportunity to run before the
- //runtime kills us.
- return !this.started.get() || this.flushing.get() || (this.provider.isRunning() && this.keepRunning.get());
+ } catch(Throwable e) {
+ LOGGER.error("Caught Throwable in Provider {}", this.provider.getClass().getSimpleName(), e);
+ } finally {
+ Uninterruptibles.sleepUninterruptibly(sleepTime, TimeUnit.MILLISECONDS);
+ LOGGER.debug("Complete Provider Task execution for {}", this.provider.getClass().getSimpleName());
+ this.provider.cleanUp();
+ //Setting started to 'true' here will allow the isRunning() method to return false in the event of an exception
+ //before started would normally be set to true n the run method.
+ this.started.set(true);
+ this.keepRunning.set(false);
}
+ }
- public void flushResults(StreamsResultSet resultSet) {
- Queue<StreamsDatum> queue = resultSet.getQueue();
- this.flushing.set(true);
- while(!queue.isEmpty()) {
- StreamsDatum datum = queue.poll();
- if(!this.keepRunning.get()) {
- break;
- }
- if(datum != null) {
- try {
- super.addToOutgoingQueue(datum);
- this.counter.incrementEmittedCount();
- statusCounter.incrementStatus(DatumStatus.SUCCESS);
- } catch( Exception e ) {
- this.counter.incrementErrorCount();
- statusCounter.incrementStatus(DatumStatus.FAIL);
- DatumUtils.addErrorToMetadata(datum, e, this.provider.getClass());
- }
- }
+ @Override
+ public boolean isRunning() {
+ //We want to make sure that we never return false if it is flushing, regardless of the state of the provider
+ //or whether we have been told to shut down. If someone really wants us to shut down, they will interrupt the
+ //thread and force us to shutdown. We also want to make sure we have had the opportunity to run before the
+ //runtime kills us.
+ return !this.started.get() || this.flushing.get() || (this.provider.isRunning() && this.keepRunning.get());
+ }
+
+ public void flushResults(StreamsResultSet resultSet) {
+ Queue<StreamsDatum> queue = resultSet.getQueue();
+ this.flushing.set(true);
+ while(!queue.isEmpty()) {
+ StreamsDatum datum = queue.poll();
+ if(!this.keepRunning.get()) {
+ break;
+ }
+ if(datum != null) {
+ try {
+ super.addToOutgoingQueue(datum);
+ this.counter.incrementEmittedCount();
+ statusCounter.incrementStatus(DatumStatus.SUCCESS);
+ } catch( Exception e ) {
+ this.counter.incrementErrorCount();
+ statusCounter.incrementStatus(DatumStatus.FAIL);
+ DatumUtils.addErrorToMetadata(datum, e, this.provider.getClass());
}
- this.flushing.set(false);
+ }
}
+ this.flushing.set(false);
+ }
- @Override
- public void setStreamsTaskCounter(StreamsTaskCounter counter) {
- this.counter = counter;
- }
+ @Override
+ public void setStreamsTaskCounter(StreamsTaskCounter counter) {
+ this.counter = counter;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java
index 5c14c1f..1b91e5c 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java
@@ -23,8 +23,6 @@ import org.apache.streams.core.StreamsDatum;
import org.apache.streams.local.counters.StreamsTaskCounter;
import java.util.List;
-import java.util.Map;
-import java.util.Queue;
import java.util.concurrent.BlockingQueue;
/**
@@ -33,53 +31,53 @@ import java.util.concurrent.BlockingQueue;
*/
public interface StreamsTask extends Runnable{
- /**
- * Informs the task to stop. Tasks may or may not try to empty its inbound queue before halting.
- */
- public void stopTask();
+ /**
+ * Informs the task to stop. Tasks may or may not try to empty its inbound queue before halting.
+ */
+ public void stopTask();
- /**
- * Returns true if the task is waiting on more data to process
- * @return true, if waiting on more data to process
- */
- public boolean isWaiting();
- /**
- * Add an input {@link java.util.Queue} for this task.
- * @param inputQueue
- */
- public void addInputQueue(BlockingQueue<StreamsDatum> inputQueue);
+ /**
+ * Returns true if the task is waiting on more data to process
+ * @return true, if waiting on more data to process
+ */
+ public boolean isWaiting();
+ /**
+ * Add an input {@link java.util.Queue} for this task.
+ * @param inputQueue
+ */
+ public void addInputQueue(BlockingQueue<StreamsDatum> inputQueue);
- /**
- * Add an output {@link java.util.Queue} for this task.
- * @param outputQueue
- */
- public void addOutputQueue(BlockingQueue<StreamsDatum> outputQueue);
+ /**
+ * Add an output {@link java.util.Queue} for this task.
+ * @param outputQueue
+ */
+ public void addOutputQueue(BlockingQueue<StreamsDatum> outputQueue);
- /**
- * Set the configuration object that will shared and passed to all instances of StreamsTask.
- * @param config optional configuration information
- */
- public void setStreamConfig(StreamsConfiguration config);
+ /**
+ * Set the configuration object that will shared and passed to all instances of StreamsTask.
+ * @param config optional configuration information
+ */
+ public void setStreamConfig(StreamsConfiguration config);
- /**
- * Returns true when the task has not completed. Returns false otherwise
- * @return true when the task has not completed. Returns false otherwise
- */
- public boolean isRunning();
+ /**
+ * Returns true when the task has not completed. Returns false otherwise
+ * @return true when the task has not completed. Returns false otherwise
+ */
+ public boolean isRunning();
- /**
- * Returns the input queues that have been set for this task.
- * @return list of input queues
- */
- public List<BlockingQueue<StreamsDatum>> getInputQueues();
+ /**
+ * Returns the input queues that have been set for this task.
+ * @return list of input queues
+ */
+ public List<BlockingQueue<StreamsDatum>> getInputQueues();
- /**
- * Returns the output queues that have been set for this task
- * @return list of output queues
- */
- public List<BlockingQueue<StreamsDatum>> getOutputQueues();
+ /**
+ * Returns the output queues that have been set for this task
+ * @return list of output queues
+ */
+ public List<BlockingQueue<StreamsDatum>> getOutputQueues();
- public void setStreamsTaskCounter(StreamsTaskCounter counter);
+ public void setStreamsTaskCounter(StreamsTaskCounter counter);
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
index 2bbfdcc..741c0b5 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
@@ -18,11 +18,6 @@
package org.apache.streams.local.builders;
-import com.carrotsearch.randomizedtesting.RandomizedTest;
-import com.carrotsearch.randomizedtesting.annotations.Repeat;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.streams.core.StreamBuilder;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsPersistWriter;
@@ -36,6 +31,12 @@ import org.apache.streams.local.test.providers.NumericMessageProvider;
import org.apache.streams.local.test.writer.DatumCounterWriter;
import org.apache.streams.local.test.writer.SystemOutWriter;
import org.apache.streams.util.ComponentUtils;
+
+import com.carrotsearch.randomizedtesting.RandomizedTest;
+import com.carrotsearch.randomizedtesting.annotations.Repeat;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Uninterruptibles;
import org.joda.time.DateTime;
import org.junit.After;
import org.junit.Before;
@@ -44,7 +45,6 @@ import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
-import javax.management.*;
import java.lang.management.ManagementFactory;
import java.util.Collections;
import java.util.List;
@@ -55,10 +55,21 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanRegistrationException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
-import static org.hamcrest.Matchers.*;
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
/**
* Basic Tests for the LocalStreamBuilder.
@@ -70,334 +81,334 @@ import static org.mockito.Mockito.*;
*
*/
public class LocalStreamBuilderTest extends RandomizedTest {
- private static final String MBEAN_ID = "test_id";
- private static final String STREAM_ID = "test_stream";
- private static long STREAM_START_TIME = (new DateTime()).getMillis();
+ private static final String MBEAN_ID = "test_id";
+ private static final String STREAM_ID = "test_stream";
+ private static long STREAM_START_TIME = (new DateTime()).getMillis();
- @After
- public void removeLocalMBeans() {
- try {
- ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local");
- } catch (Exception e) {
- //No op. proceed to next test
- }
+ @After
+ public void removeLocalMBeans() {
+ try {
+ ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local");
+ } catch (Exception e) {
+ //No op. proceed to next test
}
+ }
- public void removeRegisteredMBeans(String... ids) {
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- for(String id : ids) {
- try {
- mbs.unregisterMBean(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, id, STREAM_ID, STREAM_START_TIME)));
- } catch (MalformedObjectNameException|InstanceNotFoundException|MBeanRegistrationException e) {
- //No-op
- }
- try {
- mbs.unregisterMBean(new ObjectName((String.format(StreamsTaskCounter.NAME_TEMPLATE, id, STREAM_ID, STREAM_START_TIME))));
- } catch (MalformedObjectNameException|InstanceNotFoundException|MBeanRegistrationException e) {
- //No-op
- }
- }
+ public void removeRegisteredMBeans(String... ids) {
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ for(String id : ids) {
+ try {
+ mbs.unregisterMBean(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, id, STREAM_ID, STREAM_START_TIME)));
+ } catch (MalformedObjectNameException|InstanceNotFoundException|MBeanRegistrationException e) {
+ //No-op
+ }
+ try {
+ mbs.unregisterMBean(new ObjectName((String.format(StreamsTaskCounter.NAME_TEMPLATE, id, STREAM_ID, STREAM_START_TIME))));
+ } catch (MalformedObjectNameException|InstanceNotFoundException|MBeanRegistrationException e) {
+ //No-op
+ }
}
+ }
- @Test
- public void testStreamIdValidations() {
- StreamBuilder builder = new LocalStreamBuilder();
- builder.newReadCurrentStream("id", new NumericMessageProvider(1));
- Exception exp = null;
- try {
- builder.newReadCurrentStream("id", new NumericMessageProvider(1));
- } catch (RuntimeException e) {
- exp = e;
- }
- assertNotNull(exp);
- exp = null;
- builder.addStreamsProcessor("1", new PassthroughDatumCounterProcessor("1"), 1, "id");
- try {
- builder.addStreamsProcessor("2", new PassthroughDatumCounterProcessor("2"), 1, "id", "id2");
- } catch (RuntimeException e) {
- exp = e;
- }
- assertNotNull(exp);
- removeRegisteredMBeans("1", "2", "id");
+ @Test
+ public void testStreamIdValidations() {
+ StreamBuilder builder = new LocalStreamBuilder();
+ builder.newReadCurrentStream("id", new NumericMessageProvider(1));
+ Exception exp = null;
+ try {
+ builder.newReadCurrentStream("id", new NumericMessageProvider(1));
+ } catch (RuntimeException e) {
+ exp = e;
}
-
- @Test
- public void testBasicLinearStream1() {
- linearStreamNonParallel(1, 1);
+ assertNotNull(exp);
+ exp = null;
+ builder.addStreamsProcessor("1", new PassthroughDatumCounterProcessor("1"), 1, "id");
+ try {
+ builder.addStreamsProcessor("2", new PassthroughDatumCounterProcessor("2"), 1, "id", "id2");
+ } catch (RuntimeException e) {
+ exp = e;
}
+ assertNotNull(exp);
+ removeRegisteredMBeans("1", "2", "id");
+ }
- @Test
- public void testBasicLinearStream2() {
- linearStreamNonParallel(1004, 1);
- }
+ @Test
+ public void testBasicLinearStream1() {
+ linearStreamNonParallel(1, 1);
+ }
- @Test
- public void testBasicLinearStream3() {
- linearStreamNonParallel(1, 10);
- }
+ @Test
+ public void testBasicLinearStream2() {
+ linearStreamNonParallel(1004, 1);
+ }
- @Test
- @Repeat(iterations = 3)
- public void testBasicLinearStreamRandom() {
- int numDatums = randomIntBetween(1, 100000);
- int numProcessors = randomIntBetween(1, 10);
- linearStreamNonParallel(numDatums, numProcessors);
- }
+ @Test
+ public void testBasicLinearStream3() {
+ linearStreamNonParallel(1, 10);
+ }
+
+ @Test
+ @Repeat(iterations = 3)
+ public void testBasicLinearStreamRandom() {
+ int numDatums = randomIntBetween(1, 100000);
+ int numProcessors = randomIntBetween(1, 10);
+ linearStreamNonParallel(numDatums, numProcessors);
+ }
- /**
- * Tests that all datums pass through each processor and that all datums reach the writer
- * @param numDatums
- * @param numProcessors
- */
- private void linearStreamNonParallel(int numDatums, int numProcessors) {
- String processorId = "proc";
- try {
- StreamBuilder builder = new LocalStreamBuilder(10);
- builder.newPerpetualStream("numeric_provider", new NumericMessageProvider(numDatums));
- String connectTo = null;
- for(int i=0; i < numProcessors; ++i) {
- if(i == 0) {
- connectTo = "numeric_provider";
- } else {
- connectTo = processorId+(i-1);
- }
- builder.addStreamsProcessor(processorId+i, new PassthroughDatumCounterProcessor(processorId+i), 1, connectTo);
- }
- Set output = Collections.newSetFromMap(new ConcurrentHashMap());
- builder.addStreamsPersistWriter("writer", new DatumCounterWriter("writer"), 1, processorId+(numProcessors-1));
- builder.start();
- for(int i=0; i < numProcessors; ++i) {
- assertEquals("Processor "+i+" did not receive all of the datums", numDatums, PassthroughDatumCounterProcessor.COUNTS.get(processorId+i).get());
- }
- for(int i=0; i < numDatums; ++i) {
- assertTrue("Expected writer to have received : "+i, DatumCounterWriter.RECEIVED.get("writer").contains(i));
- }
- } finally {
- for(int i=0; i < numProcessors; ++i) {
- removeRegisteredMBeans(processorId+i, processorId+i+"-"+PassthroughDatumCounterProcessor.class.getCanonicalName());
- }
- removeRegisteredMBeans("writer", "numeric_provider");
+ /**
+ * Tests that all datums pass through each processor and that all datums reach the writer
+ * @param numDatums
+ * @param numProcessors
+ */
+ private void linearStreamNonParallel(int numDatums, int numProcessors) {
+ String processorId = "proc";
+ try {
+ StreamBuilder builder = new LocalStreamBuilder(10);
+ builder.newPerpetualStream("numeric_provider", new NumericMessageProvider(numDatums));
+ String connectTo = null;
+ for(int i=0; i < numProcessors; ++i) {
+ if(i == 0) {
+ connectTo = "numeric_provider";
+ } else {
+ connectTo = processorId+(i-1);
}
+ builder.addStreamsProcessor(processorId+i, new PassthroughDatumCounterProcessor(processorId+i), 1, connectTo);
+ }
+ Set output = Collections.newSetFromMap(new ConcurrentHashMap());
+ builder.addStreamsPersistWriter("writer", new DatumCounterWriter("writer"), 1, processorId+(numProcessors-1));
+ builder.start();
+ for(int i=0; i < numProcessors; ++i) {
+ assertEquals("Processor "+i+" did not receive all of the datums", numDatums, PassthroughDatumCounterProcessor.COUNTS.get(processorId+i).get());
+ }
+ for(int i=0; i < numDatums; ++i) {
+ assertTrue("Expected writer to have received : "+i, DatumCounterWriter.RECEIVED.get("writer").contains(i));
+ }
+ } finally {
+ for(int i=0; i < numProcessors; ++i) {
+ removeRegisteredMBeans(processorId+i, processorId+i+"-"+PassthroughDatumCounterProcessor.class.getCanonicalName());
+ }
+ removeRegisteredMBeans("writer", "numeric_provider");
}
+ }
- @Test
- public void testParallelLinearStream1() {
- String processorId = "proc";
- int numProcessors = randomIntBetween(1, 10);
- int numDatums = randomIntBetween(1, 300000);
- try {
- StreamBuilder builder = new LocalStreamBuilder(50);
- builder.newPerpetualStream("numeric_provider", new NumericMessageProvider(numDatums));
- String connectTo = null;
- for(int i=0; i < numProcessors; ++i) {
- if(i == 0) {
- connectTo = "numeric_provider";
- } else {
- connectTo = processorId+(i-1);
- }
- int parallelHint = randomIntBetween(1,5);
- builder.addStreamsProcessor(processorId+i, new PassthroughDatumCounterProcessor(processorId+i), parallelHint, connectTo);
- }
- builder.addStreamsPersistWriter("writer", new DatumCounterWriter("writer"), 1, processorId+(numProcessors-1));
- builder.start();
- Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
- builder.stop();
- Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
- assertEquals(numDatums, DatumCounterWriter.RECEIVED.get("writer").size());
- for(int i=0; i < numDatums; ++i) {
- assertTrue("Expected Writer to receive datum : " + i, DatumCounterWriter.RECEIVED.get("writer").contains(i));
- }
- for(int i=0; i < numProcessors; ++i) {
- assertEquals(numDatums, PassthroughDatumCounterProcessor.COUNTS.get(processorId+i).get());
- }
-
- } finally {
- for(int i=0; i < numProcessors; ++i) {
- removeRegisteredMBeans(processorId+i);
- }
- removeRegisteredMBeans("writer", "numeric_provider");
+ @Test
+ public void testParallelLinearStream1() {
+ String processorId = "proc";
+ int numProcessors = randomIntBetween(1, 10);
+ int numDatums = randomIntBetween(1, 300000);
+ try {
+ StreamBuilder builder = new LocalStreamBuilder(50);
+ builder.newPerpetualStream("numeric_provider", new NumericMessageProvider(numDatums));
+ String connectTo = null;
+ for(int i=0; i < numProcessors; ++i) {
+ if(i == 0) {
+ connectTo = "numeric_provider";
+ } else {
+ connectTo = processorId+(i-1);
}
+ int parallelHint = randomIntBetween(1,5);
+ builder.addStreamsProcessor(processorId+i, new PassthroughDatumCounterProcessor(processorId+i), parallelHint, connectTo);
+ }
+ builder.addStreamsPersistWriter("writer", new DatumCounterWriter("writer"), 1, processorId+(numProcessors-1));
+ builder.start();
+ Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
+ builder.stop();
+ Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
+ assertEquals(numDatums, DatumCounterWriter.RECEIVED.get("writer").size());
+ for(int i=0; i < numDatums; ++i) {
+ assertTrue("Expected Writer to receive datum : " + i, DatumCounterWriter.RECEIVED.get("writer").contains(i));
+ }
+ for(int i=0; i < numProcessors; ++i) {
+ assertEquals(numDatums, PassthroughDatumCounterProcessor.COUNTS.get(processorId+i).get());
+ }
+
+ } finally {
+ for(int i=0; i < numProcessors; ++i) {
+ removeRegisteredMBeans(processorId+i);
+ }
+ removeRegisteredMBeans("writer", "numeric_provider");
}
+ }
- @Test
- public void testBasicMergeStream() {
- try {
- int numDatums1 = randomIntBetween(1, 300000);
- int numDatums2 = randomIntBetween(1, 300000);
- StreamsProcessor processor1 = new PassthroughDatumCounterProcessor("proc1");
- StreamsProcessor processor2 = new PassthroughDatumCounterProcessor("proc2");
- StreamBuilder builder = new LocalStreamBuilder();
- builder.newPerpetualStream("sp1", new NumericMessageProvider(numDatums1))
- .newPerpetualStream("sp2", new NumericMessageProvider(numDatums2))
- .addStreamsProcessor("proc1", processor1, 1, "sp1")
- .addStreamsProcessor("proc2", processor2, 1, "sp2")
- .addStreamsPersistWriter("writer1", new DatumCounterWriter("writer"), 1, "proc1", "proc2");
- builder.start();
- assertEquals(numDatums1, PassthroughDatumCounterProcessor.COUNTS.get("proc1").get());
- assertEquals(numDatums2, PassthroughDatumCounterProcessor.COUNTS.get("proc2").get());
- assertEquals(numDatums1+numDatums2, DatumCounterWriter.COUNTS.get("writer").get());
- } finally {
- String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName();
- String writerClass = "-"+DatumCounterWriter.class.getCanonicalName();
- removeRegisteredMBeans("proc1", "proc2", "writer1", "sp1", "sp2");
- }
+ @Test
+ public void testBasicMergeStream() {
+ try {
+ int numDatums1 = randomIntBetween(1, 300000);
+ int numDatums2 = randomIntBetween(1, 300000);
+ StreamsProcessor processor1 = new PassthroughDatumCounterProcessor("proc1");
+ StreamsProcessor processor2 = new PassthroughDatumCounterProcessor("proc2");
+ StreamBuilder builder = new LocalStreamBuilder();
+ builder.newPerpetualStream("sp1", new NumericMessageProvider(numDatums1))
+ .newPerpetualStream("sp2", new NumericMessageProvider(numDatums2))
+ .addStreamsProcessor("proc1", processor1, 1, "sp1")
+ .addStreamsProcessor("proc2", processor2, 1, "sp2")
+ .addStreamsPersistWriter("writer1", new DatumCounterWriter("writer"), 1, "proc1", "proc2");
+ builder.start();
+ assertEquals(numDatums1, PassthroughDatumCounterProcessor.COUNTS.get("proc1").get());
+ assertEquals(numDatums2, PassthroughDatumCounterProcessor.COUNTS.get("proc2").get());
+ assertEquals(numDatums1+numDatums2, DatumCounterWriter.COUNTS.get("writer").get());
+ } finally {
+ String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName();
+ String writerClass = "-"+DatumCounterWriter.class.getCanonicalName();
+ removeRegisteredMBeans("proc1", "proc2", "writer1", "sp1", "sp2");
}
+ }
- @Test
- public void testBasicBranch() {
- try {
- int numDatums = randomIntBetween(1, 300000);
- StreamBuilder builder = new LocalStreamBuilder(50);
- builder.newPerpetualStream("prov1", new NumericMessageProvider(numDatums))
- .addStreamsProcessor("proc1", new PassthroughDatumCounterProcessor("proc1"), 1, "prov1")
- .addStreamsProcessor("proc2", new PassthroughDatumCounterProcessor("proc2"), 1, "prov1")
- .addStreamsPersistWriter("w1", new DatumCounterWriter("writer"), 1, "proc1", "proc2");
- builder.start();
- assertEquals(numDatums, PassthroughDatumCounterProcessor.COUNTS.get("proc1").get());
- assertEquals(numDatums, PassthroughDatumCounterProcessor.COUNTS.get("proc2").get());
- assertEquals(numDatums*2, DatumCounterWriter.COUNTS.get("writer").get());
- } finally {
- String provClass = "-"+NumericMessageProvider.class.getCanonicalName();
- String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName();
- String writerClass = "-"+DatumCounterWriter.class.getCanonicalName();
- removeRegisteredMBeans("prov1", "proc1", "proc2", "w1");
- }
+ @Test
+ public void testBasicBranch() {
+ try {
+ int numDatums = randomIntBetween(1, 300000);
+ StreamBuilder builder = new LocalStreamBuilder(50);
+ builder.newPerpetualStream("prov1", new NumericMessageProvider(numDatums))
+ .addStreamsProcessor("proc1", new PassthroughDatumCounterProcessor("proc1"), 1, "prov1")
+ .addStreamsProcessor("proc2", new PassthroughDatumCounterProcessor("proc2"), 1, "prov1")
+ .addStreamsPersistWriter("w1", new DatumCounterWriter("writer"), 1, "proc1", "proc2");
+ builder.start();
+ assertEquals(numDatums, PassthroughDatumCounterProcessor.COUNTS.get("proc1").get());
+ assertEquals(numDatums, PassthroughDatumCounterProcessor.COUNTS.get("proc2").get());
+ assertEquals(numDatums*2, DatumCounterWriter.COUNTS.get("writer").get());
+ } finally {
+ String provClass = "-"+NumericMessageProvider.class.getCanonicalName();
+ String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName();
+ String writerClass = "-"+DatumCounterWriter.class.getCanonicalName();
+ removeRegisteredMBeans("prov1", "proc1", "proc2", "w1");
}
+ }
- @Test
- public void testSlowProcessorBranch() {
- try {
- int numDatums = 30;
- int timeout = 2000;
- Map<String, Object> config = Maps.newHashMap();
- config.put(LocalStreamBuilder.TIMEOUT_KEY, timeout);
- StreamBuilder builder = new LocalStreamBuilder(config);
- builder.newPerpetualStream("prov1", new NumericMessageProvider(numDatums))
- .addStreamsProcessor("proc1", new SlowProcessor(), 1, "prov1")
- .addStreamsPersistWriter("w1", new DatumCounterWriter("writer"), 1, "proc1");
- builder.start();
- assertEquals(numDatums, DatumCounterWriter.COUNTS.get("writer").get());
- } finally {
- String provClass = "-"+NumericMessageProvider.class.getCanonicalName();
- String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName();
- String writerClass = "-"+DatumCounterWriter.class.getCanonicalName();
- removeRegisteredMBeans("prov1", "proc1", "w1");
- }
+ @Test
+ public void testSlowProcessorBranch() {
+ try {
+ int numDatums = 30;
+ int timeout = 2000;
+ Map<String, Object> config = Maps.newHashMap();
+ config.put(LocalStreamBuilder.TIMEOUT_KEY, timeout);
+ StreamBuilder builder = new LocalStreamBuilder(config);
+ builder.newPerpetualStream("prov1", new NumericMessageProvider(numDatums))
+ .addStreamsProcessor("proc1", new SlowProcessor(), 1, "prov1")
+ .addStreamsPersistWriter("w1", new DatumCounterWriter("writer"), 1, "proc1");
+ builder.start();
+ assertEquals(numDatums, DatumCounterWriter.COUNTS.get("writer").get());
+ } finally {
+ String provClass = "-"+NumericMessageProvider.class.getCanonicalName();
+ String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName();
+ String writerClass = "-"+DatumCounterWriter.class.getCanonicalName();
+ removeRegisteredMBeans("prov1", "proc1", "w1");
}
+ }
- @Test
- public void testConfiguredProviderTimeout() {
- try {
- Map<String, Object> config = Maps.newHashMap();
- int timeout = 10000;
- config.put(LocalStreamBuilder.TIMEOUT_KEY, timeout);
- long start = System.currentTimeMillis();
- StreamBuilder builder = new LocalStreamBuilder(-1, config);
- builder.newPerpetualStream("prov1", new EmptyResultSetProvider())
- .addStreamsProcessor("proc1", new PassthroughDatumCounterProcessor("proc1"), 1, "prov1")
- .addStreamsProcessor("proc2", new PassthroughDatumCounterProcessor("proc2"), 1, "proc1")
- .addStreamsPersistWriter("w1", new SystemOutWriter(), 1, "proc1");
- builder.start();
- long end = System.currentTimeMillis();
- //We care mostly that it doesn't terminate too early. With thread shutdowns, etc, the actual time is indeterminate. Just make sure there is an upper bound
- assertThat((int) (end - start), is(allOf(greaterThanOrEqualTo(timeout), lessThanOrEqualTo(4 * timeout))));
- } finally {
- String provClass = "-"+NumericMessageProvider.class.getCanonicalName();
- String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName();
- String writerClass = "-"+DatumCounterWriter.class.getCanonicalName();
- removeRegisteredMBeans("prov1", "proc1", "proc2", "w1");
- }
+ @Test
+ public void testConfiguredProviderTimeout() {
+ try {
+ Map<String, Object> config = Maps.newHashMap();
+ int timeout = 10000;
+ config.put(LocalStreamBuilder.TIMEOUT_KEY, timeout);
+ long start = System.currentTimeMillis();
+ StreamBuilder builder = new LocalStreamBuilder(-1, config);
+ builder.newPerpetualStream("prov1", new EmptyResultSetProvider())
+ .addStreamsProcessor("proc1", new PassthroughDatumCounterProcessor("proc1"), 1, "prov1")
+ .addStreamsProcessor("proc2", new PassthroughDatumCounterProcessor("proc2"), 1, "proc1")
+ .addStreamsPersistWriter("w1", new SystemOutWriter(), 1, "proc1");
+ builder.start();
+ long end = System.currentTimeMillis();
+ //We care mostly that it doesn't terminate too early. With thread shutdowns, etc, the actual time is indeterminate. Just make sure there is an upper bound
+ assertThat((int) (end - start), is(allOf(greaterThanOrEqualTo(timeout), lessThanOrEqualTo(4 * timeout))));
+ } finally {
+ String provClass = "-"+NumericMessageProvider.class.getCanonicalName();
+ String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName();
+ String writerClass = "-"+DatumCounterWriter.class.getCanonicalName();
+ removeRegisteredMBeans("prov1", "proc1", "proc2", "w1");
}
+ }
- @Ignore
- @Test
- public void ensureShutdownWithBlockedQueue() throws InterruptedException {
- try {
- ExecutorService service = Executors.newSingleThreadExecutor();
- int before = Thread.activeCount();
- final StreamBuilder builder = new LocalStreamBuilder();
- builder.newPerpetualStream("prov1", new NumericMessageProvider(30))
- .addStreamsProcessor("proc1", new SlowProcessor(), 1, "prov1")
- .addStreamsPersistWriter("w1", new SystemOutWriter(), 1, "proc1");
- service.submit(new Runnable() {
- @Override
- public void run() {
- builder.start();
- }
- });
- //Let streams spin up threads and start to process
- Thread.sleep(500);
- builder.stop();
- service.shutdownNow();
- service.awaitTermination(30000, TimeUnit.MILLISECONDS);
- assertThat(Thread.activeCount(), is(equalTo(before)));
- } finally {
- String provClass = "-"+NumericMessageProvider.class.getCanonicalName();
- String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName();
- String writerClass = "-"+DatumCounterWriter.class.getCanonicalName();
- removeRegisteredMBeans("prov1", "proc1", "w1");
+ @Ignore
+ @Test
+ public void ensureShutdownWithBlockedQueue() throws InterruptedException {
+ try {
+ ExecutorService service = Executors.newSingleThreadExecutor();
+ int before = Thread.activeCount();
+ final StreamBuilder builder = new LocalStreamBuilder();
+ builder.newPerpetualStream("prov1", new NumericMessageProvider(30))
+ .addStreamsProcessor("proc1", new SlowProcessor(), 1, "prov1")
+ .addStreamsPersistWriter("w1", new SystemOutWriter(), 1, "proc1");
+ service.submit(new Runnable() {
+ @Override
+ public void run() {
+ builder.start();
}
+ });
+ //Let streams spin up threads and start to process
+ Thread.sleep(500);
+ builder.stop();
+ service.shutdownNow();
+ service.awaitTermination(30000, TimeUnit.MILLISECONDS);
+ assertThat(Thread.activeCount(), is(equalTo(before)));
+ } finally {
+ String provClass = "-"+NumericMessageProvider.class.getCanonicalName();
+ String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName();
+ String writerClass = "-"+DatumCounterWriter.class.getCanonicalName();
+ removeRegisteredMBeans("prov1", "proc1", "w1");
}
+ }
- @Before
- private void clearCounters() {
- PassthroughDatumCounterProcessor.COUNTS.clear();
- PassthroughDatumCounterProcessor.CLAIMED_ID.clear();
- PassthroughDatumCounterProcessor.SEEN_DATA.clear();
- DatumCounterWriter.COUNTS.clear();
- DatumCounterWriter.CLAIMED_ID.clear();
- DatumCounterWriter.SEEN_DATA.clear();
- DatumCounterWriter.RECEIVED.clear();
- }
+ @Before
+ private void clearCounters() {
+ PassthroughDatumCounterProcessor.COUNTS.clear();
+ PassthroughDatumCounterProcessor.CLAIMED_ID.clear();
+ PassthroughDatumCounterProcessor.SEEN_DATA.clear();
+ DatumCounterWriter.COUNTS.clear();
+ DatumCounterWriter.CLAIMED_ID.clear();
+ DatumCounterWriter.SEEN_DATA.clear();
+ DatumCounterWriter.RECEIVED.clear();
+ }
- /**
- * Creates {@link org.apache.streams.core.StreamsProcessor} that passes any StreamsDatum it gets as an
- * input and counts the number of items it processes.
- * @param counter
- * @return
- */
- private StreamsProcessor createPassThroughProcessor(final AtomicInteger counter) {
- StreamsProcessor processor = mock(StreamsProcessor.class);
- when(processor.process(any(StreamsDatum.class))).thenAnswer(new Answer<List<StreamsDatum>>() {
- @Override
- public List<StreamsDatum> answer(InvocationOnMock invocationOnMock) throws Throwable {
- List<StreamsDatum> datum = Lists.newLinkedList();
- if(counter != null) {
- counter.incrementAndGet();
- }
- datum.add((StreamsDatum) invocationOnMock.getArguments()[0] );
- return datum;
- }
- });
- return processor;
- }
+ /**
+ * Creates {@link org.apache.streams.core.StreamsProcessor} that passes any StreamsDatum it gets as an
+ * input and counts the number of items it processes.
+ * @param counter
+ * @return
+ */
+ private StreamsProcessor createPassThroughProcessor(final AtomicInteger counter) {
+ StreamsProcessor processor = mock(StreamsProcessor.class);
+ when(processor.process(any(StreamsDatum.class))).thenAnswer(new Answer<List<StreamsDatum>>() {
+ @Override
+ public List<StreamsDatum> answer(InvocationOnMock invocationOnMock) throws Throwable {
+ List<StreamsDatum> datum = Lists.newLinkedList();
+ if(counter != null) {
+ counter.incrementAndGet();
+ }
+ datum.add((StreamsDatum) invocationOnMock.getArguments()[0] );
+ return datum;
+ }
+ });
+ return processor;
+ }
- private StreamsPersistWriter createSetCollectingWriter(final Set collector) {
- return createSetCollectingWriter(collector, null);
- }
+ private StreamsPersistWriter createSetCollectingWriter(final Set collector) {
+ return createSetCollectingWriter(collector, null);
+ }
- /**
- * Creates a StreamsPersistWriter that adds every datums document to a set
- * @param collector
- * @return
- */
- private StreamsPersistWriter createSetCollectingWriter(final Set collector, final AtomicInteger counter) {
- StreamsPersistWriter writer = mock(StreamsPersistWriter.class);
- doAnswer(new Answer() {
- @Override
- public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
- if(counter != null) {
- counter.incrementAndGet();
- }
- collector.add(((StreamsDatum)invocationOnMock.getArguments()[0]).getDocument());
- return null;
- }
- }).when(writer).write(any(StreamsDatum.class));
- return writer;
- }
+ /**
+ * Creates a StreamsPersistWriter that adds every datums document to a set
+ * @param collector
+ * @return
+ */
+ private StreamsPersistWriter createSetCollectingWriter(final Set collector, final AtomicInteger counter) {
+ StreamsPersistWriter writer = mock(StreamsPersistWriter.class);
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ if(counter != null) {
+ counter.incrementAndGet();
+ }
+ collector.add(((StreamsDatum)invocationOnMock.getArguments()[0]).getDocument());
+ return null;
+ }
+ }).when(writer).write(any(StreamsDatum.class));
+ return writer;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/ToyLocalBuilderExample.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/ToyLocalBuilderExample.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/ToyLocalBuilderExample.java
index a77dfec..2c61093 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/ToyLocalBuilderExample.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/ToyLocalBuilderExample.java
@@ -30,16 +30,16 @@ import org.apache.streams.local.test.writer.DoNothingWriter;
*/
public class ToyLocalBuilderExample {
- /**
- * A simple example of how to run a stream in local mode.
- * @param args
- */
- public static void main(String[] args) {
- StreamBuilder builder = new LocalStreamBuilder();
- builder.newReadCurrentStream("prov", new NumericMessageProvider(1000000))
- .addStreamsProcessor("proc", new DoNothingProcessor(), 100, "prov")
- .addStreamsPersistWriter("writer", new DoNothingWriter(), 3, "proc");
- builder.start();
- }
+ /**
+ * A simple example of how to run a stream in local mode.
+ * @param args
+ */
+ public static void main(String[] args) {
+ StreamBuilder builder = new LocalStreamBuilder();
+ builder.newReadCurrentStream("prov", new NumericMessageProvider(1000000))
+ .addStreamsProcessor("proc", new DoNothingProcessor(), 100, "prov")
+ .addStreamsPersistWriter("writer", new DoNothingWriter(), 3, "proc");
+ builder.start();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/DatumStatusCounterTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/DatumStatusCounterTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/DatumStatusCounterTest.java
index 9775c6f..9d92bec 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/DatumStatusCounterTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/DatumStatusCounterTest.java
@@ -23,112 +23,112 @@ import org.joda.time.DateTime;
import org.junit.After;
import org.junit.Test;
+import java.lang.management.ManagementFactory;
import javax.management.InstanceNotFoundException;
import javax.management.ObjectName;
-import java.lang.management.ManagementFactory;
/**
*
*/
public class DatumStatusCounterTest extends RandomizedTest {
- private static final String MBEAN_ID = "test_id";
- private static final String STREAM_ID = "test_stream";
- private static long STREAM_START_TIME = (new DateTime()).getMillis();
+ private static final String MBEAN_ID = "test_id";
+ private static final String STREAM_ID = "test_stream";
+ private static long STREAM_START_TIME = (new DateTime()).getMillis();
- /**
- * Remove registered mbeans from previous tests
- * @throws Exception
- */
- @After
- public void unregisterMXBean() throws Exception {
- try {
- ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(String.format(DatumStatusCounter.NAME_TEMPLATE, MBEAN_ID, STREAM_ID, STREAM_START_TIME)));
- } catch (InstanceNotFoundException ife) {
- //No-op
- }
+ /**
+ * Remove registered mbeans from previous tests
+ * @throws Exception
+ */
+ @After
+ public void unregisterMXBean() throws Exception {
+ try {
+ ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(String.format(DatumStatusCounter.NAME_TEMPLATE, MBEAN_ID, STREAM_ID, STREAM_START_TIME)));
+ } catch (InstanceNotFoundException ife) {
+ //No-op
}
+ }
- /**
- * Test Constructor can register the counter as an mxbean with throwing an exception.
- */
- @Test
- public void testConstructor() {
- try {
- new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
- } catch (Throwable t) {
- fail("Constructor Threw Exception : "+t.getMessage());
- }
+ /**
+ * Test Constructor can register the counter as an mxbean with throwing an exception.
+ */
+ @Test
+ public void testConstructor() {
+ try {
+ new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
+ } catch (Throwable t) {
+ fail("Constructor Threw Exception : "+t.getMessage());
}
+ }
- /**
- * Test that you can increment passes and it returns the correct count
- * @throws Exception
- */
- @Test
- @Repeat(iterations = 3)
- public void testPassed() throws Exception {
- DatumStatusCounter counter = new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
- int numIncrements = randomIntBetween(1, 100000);
- for(int i=0; i < numIncrements; ++i) {
- counter.incrementPassedCount();
- }
- assertEquals(numIncrements, counter.getNumPassed());
+ /**
+ * Test that you can increment passes and it returns the correct count
+ * @throws Exception
+ */
+ @Test
+ @Repeat(iterations = 3)
+ public void testPassed() throws Exception {
+ DatumStatusCounter counter = new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
+ int numIncrements = randomIntBetween(1, 100000);
+ for(int i=0; i < numIncrements; ++i) {
+ counter.incrementPassedCount();
+ }
+ assertEquals(numIncrements, counter.getNumPassed());
- unregisterMXBean();
+ unregisterMXBean();
- counter = new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
- numIncrements = randomIntBetween(1, 100000);
- long total = 0;
- for(int i=0; i < numIncrements; ++i) {
- long delta = randomIntBetween(1, 100);
- total += delta;
- counter.incrementPassedCount(delta);
- }
- assertEquals(total, counter.getNumPassed());
+ counter = new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
+ numIncrements = randomIntBetween(1, 100000);
+ long total = 0;
+ for(int i=0; i < numIncrements; ++i) {
+ long delta = randomIntBetween(1, 100);
+ total += delta;
+ counter.incrementPassedCount(delta);
}
+ assertEquals(total, counter.getNumPassed());
+ }
- /**
- * Test that you can increment failed and it returns the correct count
- * @throws Exception
- */
- @Test
- @Repeat(iterations = 3)
- public void testFailed() throws Exception {
- DatumStatusCounter counter = new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
- int numIncrements = randomIntBetween(1, 100000);
- for(int i=0; i < numIncrements; ++i) {
- counter.incrementFailedCount();
- }
- assertEquals(numIncrements, counter.getNumFailed());
+ /**
+ * Test that you can increment failed and it returns the correct count
+ * @throws Exception
+ */
+ @Test
+ @Repeat(iterations = 3)
+ public void testFailed() throws Exception {
+ DatumStatusCounter counter = new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
+ int numIncrements = randomIntBetween(1, 100000);
+ for(int i=0; i < numIncrements; ++i) {
+ counter.incrementFailedCount();
+ }
+ assertEquals(numIncrements, counter.getNumFailed());
- unregisterMXBean();
+ unregisterMXBean();
- counter = new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
- numIncrements = randomIntBetween(1, 100000);
- long total = 0;
- for(int i=0; i < numIncrements; ++i) {
- long delta = randomIntBetween(1, 100);
- total += delta;
- counter.incrementFailedCount(delta);
- }
- assertEquals(total, counter.getNumFailed());
+ counter = new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
+ numIncrements = randomIntBetween(1, 100000);
+ long total = 0;
+ for(int i=0; i < numIncrements; ++i) {
+ long delta = randomIntBetween(1, 100);
+ total += delta;
+ counter.incrementFailedCount(delta);
}
+ assertEquals(total, counter.getNumFailed());
+ }
- /**
- * Test failure rate returns expected values
- */
- @Test
- @Repeat(iterations = 3)
- public void testFailureRate() {
- DatumStatusCounter counter = new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
- assertEquals(0.0, counter.getFailRate(), 0);
- int failures = randomIntBetween(0, 100000);
- int passes = randomIntBetween(0, 100000);
- counter.incrementPassedCount(passes);
- counter.incrementFailedCount(failures);
- assertEquals((double)failures / (double)(passes + failures), counter.getFailRate(), 0);
- }
+ /**
+ * Test failure rate returns expected values
+ */
+ @Test
+ @Repeat(iterations = 3)
+ public void testFailureRate() {
+ DatumStatusCounter counter = new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
+ assertEquals(0.0, counter.getFailRate(), 0);
+ int failures = randomIntBetween(0, 100000);
+ int passes = randomIntBetween(0, 100000);
+ counter.incrementPassedCount(passes);
+ counter.incrementFailedCount(failures);
+ assertEquals((double)failures / (double)(passes + failures), counter.getFailRate(), 0);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/StreamsTaskCounterTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/StreamsTaskCounterTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/StreamsTaskCounterTest.java
index 95fd610..544b065 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/StreamsTaskCounterTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/StreamsTaskCounterTest.java
@@ -23,139 +23,139 @@ import org.joda.time.DateTime;
import org.junit.After;
import org.junit.Test;
+import java.lang.management.ManagementFactory;
import javax.management.InstanceNotFoundException;
import javax.management.ObjectName;
-import java.lang.management.ManagementFactory;
/**
* Unit tests for {@link org.apache.streams.local.counters.StreamsTaskCounter}
*/
public class StreamsTaskCounterTest extends RandomizedTest {
- private static final String MBEAN_ID = "test_id";
- private static final String STREAM_ID = "test_stream";
- private static long STREAM_START_TIME = (new DateTime()).getMillis();
-
- /**
- * Remove registered mbeans from previous tests
- * @throws Exception
- */
- @After
- public void unregisterMXBean() throws Exception {
- try {
- ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(String.format(StreamsTaskCounter.NAME_TEMPLATE, MBEAN_ID, STREAM_ID, STREAM_START_TIME)));
- } catch (InstanceNotFoundException ife) {
- //No-op
- }
+ private static final String MBEAN_ID = "test_id";
+ private static final String STREAM_ID = "test_stream";
+ private static long STREAM_START_TIME = (new DateTime()).getMillis();
+
+ /**
+ * Remove registered mbeans from previous tests
+ * @throws Exception
+ */
+ @After
+ public void unregisterMXBean() throws Exception {
+ try {
+ ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(String.format(StreamsTaskCounter.NAME_TEMPLATE, MBEAN_ID, STREAM_ID, STREAM_START_TIME)));
+ } catch (InstanceNotFoundException ife) {
+ //No-op
}
-
- /**
- * Test constructor does not throw errors
- */
- @Test
- public void testConstructor() {
- try {
- new StreamsTaskCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
- } catch (Throwable t) {
- fail("Constructor threw error : "+t.getMessage());
- }
+ }
+
+ /**
+ * Test constructor does not throw errors
+ */
+ @Test
+ public void testConstructor() {
+ try {
+ new StreamsTaskCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
+ } catch (Throwable t) {
+ fail("Constructor threw error : "+t.getMessage());
}
-
- /**
- * Test emitted increments correctly and returns expected value
- * @throws Exception
- */
- @Test
- @Repeat(iterations = 3)
- public void testEmitted() throws Exception {
- StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
- int numIncrements = randomIntBetween(1, 100000);
- for(int i=0; i < numIncrements; ++i) {
- counter.incrementEmittedCount();
- }
- assertEquals(numIncrements, counter.getNumEmitted());
-
- unregisterMXBean();
-
- counter = new StreamsTaskCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
- numIncrements = randomIntBetween(1, 100000);
- long total = 0;
- for(int i=0; i < numIncrements; ++i) {
- long delta = randomIntBetween(1, 100);
- total += delta;
- counter.incrementEmittedCount(delta);
- }
- assertEquals(total, counter.getNumEmitted());
+ }
+
+ /**
+ * Test emitted increments correctly and returns expected value
+ * @throws Exception
+ */
+ @Test
+ @Repeat(iterations = 3)
+ public void testEmitted() throws Exception {
+ StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
+ int numIncrements = randomIntBetween(1, 100000);
+ for(int i=0; i < numIncrements; ++i) {
+ counter.incrementEmittedCount();
}
+ assertEquals(numIncrements, counter.getNumEmitted());
+
+ unregisterMXBean();
- /**
- * Test received increments correctly and returns expected value
- * @throws Exception
- */
- @Test
- @Repeat(iterations = 3)
- public void testReceived() throws Exception {
- StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
- int numIncrements = randomIntBetween(1, 100000);
- for(int i=0; i < numIncrements; ++i) {
- counter.incrementReceivedCount();
- }
- assertEquals(numIncrements, counter.getNumReceived());
-
- unregisterMXBean();
-
- counter = new StreamsTaskCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
- numIncrements = randomIntBetween(1, 100000);
- long total = 0;
- for(int i=0; i < numIncrements; ++i) {
- long delta = randomIntBetween(1, 100);
- total += delta;
- counter.incrementReceivedCount(delta);
- }
- assertEquals(total, counter.getNumReceived());
+ counter = new StreamsTaskCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
+ numIncrements = randomIntBetween(1, 100000);
+ long total = 0;
+ for(int i=0; i < numIncrements; ++i) {
+ long delta = randomIntBetween(1, 100);
+ total += delta;
+ counter.incrementEmittedCount(delta);
}
+ assertEquals(total, counter.getNumEmitted());
+ }
+
+ /**
+ * Test received increments correctly and returns expected value
+ * @throws Exception
+ */
+ @Test
+ @Repeat(iterations = 3)
+ public void testReceived() throws Exception {
+ StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
+ int numIncrements = randomIntBetween(1, 100000);
+ for(int i=0; i < numIncrements; ++i) {
+ counter.incrementReceivedCount();
+ }
+ assertEquals(numIncrements, counter.getNumReceived());
+
+ unregisterMXBean();
- /**
- * Test errors increments correctly and returns expected value
- * @throws Exception
- */
- @Test
- @Repeat(iterations = 3)
- public void testError() throws Exception {
- StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
- int numIncrements = randomIntBetween(1, 100000);
- for(int i=0; i < numIncrements; ++i) {
- counter.incrementErrorCount();
- }
- assertEquals(numIncrements, counter.getNumUnhandledErrors());
-
- unregisterMXBean();
-
- counter = new StreamsTaskCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
- numIncrements = randomIntBetween(1, 100000);
- long total = 0;
- for(int i=0; i < numIncrements; ++i) {
- long delta = randomIntBetween(1, 100);
- total += delta;
- counter.incrementErrorCount(delta);
- }
- assertEquals(total, counter.getNumUnhandledErrors());
+ counter = new StreamsTaskCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
+ numIncrements = randomIntBetween(1, 100000);
+ long total = 0;
+ for(int i=0; i < numIncrements; ++i) {
+ long delta = randomIntBetween(1, 100);
+ total += delta;
+ counter.incrementReceivedCount(delta);
}
+ assertEquals(total, counter.getNumReceived());
+ }
+
+ /**
+ * Test errors increments correctly and returns expected value
+ * @throws Exception
+ */
+ @Test
+ @Repeat(iterations = 3)
+ public void testError() throws Exception {
+ StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
+ int numIncrements = randomIntBetween(1, 100000);
+ for(int i=0; i < numIncrements; ++i) {
+ counter.incrementErrorCount();
+ }
+ assertEquals(numIncrements, counter.getNumUnhandledErrors());
+
+ unregisterMXBean();
- /**
- * Test error rate returns expected value
- * @throws Exception
- */
- @Test
- @Repeat(iterations = 3)
- public void testErrorRate() throws Exception {
- StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
- assertEquals(0.0, counter.getErrorRate(), 0);
- int failures = randomIntBetween(0, 100000);
- int received = randomIntBetween(0, 100000);
- counter.incrementReceivedCount(received);
- counter.incrementErrorCount(failures);
- assertEquals((double)failures / (double)(received), counter.getErrorRate(), 0);
+ counter = new StreamsTaskCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
+ numIncrements = randomIntBetween(1, 100000);
+ long total = 0;
+ for(int i=0; i < numIncrements; ++i) {
+ long delta = randomIntBetween(1, 100);
+ total += delta;
+ counter.incrementErrorCount(delta);
}
+ assertEquals(total, counter.getNumUnhandledErrors());
+ }
+
+ /**
+ * Test error rate returns expected value
+ * @throws Exception
+ */
+ @Test
+ @Repeat(iterations = 3)
+ public void testErrorRate() throws Exception {
+ StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME);
+ assertEquals(0.0, counter.getErrorRate(), 0);
+ int failures = randomIntBetween(0, 100000);
+ int received = randomIntBetween(0, 100000);
+ counter.incrementReceivedCount(received);
+ counter.incrementErrorCount(failures);
+ assertEquals((double)failures / (double)(received), counter.getErrorRate(), 0);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java
index 7e33ab9..e3b608d 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java
@@ -20,6 +20,7 @@ package org.apache.streams.local.executors;
import org.apache.streams.local.builders.LocalStreamBuilder;
import org.apache.streams.util.ComponentUtils;
+
import org.junit.After;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
@@ -27,7 +28,6 @@ import org.mockito.stubbing.Answer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -35,7 +35,6 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
/**
*
@@ -43,90 +42,90 @@ import static org.mockito.Mockito.when;
public class ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest {
- @After
- public void removeLocalMBeans() {
- try {
- ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local");
- } catch (Exception e) {
- //No op. proceed to next test
- }
+ @After
+ public void removeLocalMBeans() {
+ try {
+ ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local");
+ } catch (Exception e) {
+ //No op. proceed to next test
}
-
- @Test
- public void testShutDownOnException() {
- LocalStreamBuilder sb = mock(LocalStreamBuilder.class);
- final AtomicBoolean isShutdown = new AtomicBoolean(false);
- doAnswer(new Answer() {
- @Override
- public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
- isShutdown.set(true);
- return null;
- }
- }).when(sb).stop();
-
- final CountDownLatch latch = new CountDownLatch(1);
-
- Runnable runnable = new Runnable() {
- @Override
- public void run() {
- latch.countDown();
- throw new RuntimeException("Testing Throwable Handling!");
- }
- };
-
- ExecutorService executor = new ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(1, sb);
- executor.execute(runnable);
- try {
- latch.await();
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- }
- executor.shutdownNow();
- try {
- executor.awaitTermination(1, TimeUnit.SECONDS);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- }
- assertTrue("Expected StreamBuilder shutdown to be called", isShutdown.get());
+ }
+
+ @Test
+ public void testShutDownOnException() {
+ LocalStreamBuilder sb = mock(LocalStreamBuilder.class);
+ final AtomicBoolean isShutdown = new AtomicBoolean(false);
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ isShutdown.set(true);
+ return null;
+ }
+ }).when(sb).stop();
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+ latch.countDown();
+ throw new RuntimeException("Testing Throwable Handling!");
+ }
+ };
+
+ ExecutorService executor = new ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(1, sb);
+ executor.execute(runnable);
+ try {
+ latch.await();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
}
-
-
- @Test
- public void testNormalExecution() {
- LocalStreamBuilder sb = mock(LocalStreamBuilder.class);
- final AtomicBoolean isShutdown = new AtomicBoolean(false);
- doAnswer(new Answer() {
- @Override
- public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
- isShutdown.set(true);
- return null;
- }
- }).when(sb).stop();
-
- final CountDownLatch latch = new CountDownLatch(1);
-
- Runnable runnable = new Runnable() {
- @Override
- public void run() {
- latch.countDown();
- }
- };
-
- ExecutorService executor = new ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(1, sb);
- executor.execute(runnable);
- try {
- latch.await();
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- }
- executor.shutdownNow();
- try {
- executor.awaitTermination(1, TimeUnit.SECONDS);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- }
- assertFalse("Expected StreamBuilder shutdown to be called", isShutdown.get());
+ executor.shutdownNow();
+ try {
+ executor.awaitTermination(1, TimeUnit.SECONDS);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ assertTrue("Expected StreamBuilder shutdown to be called", isShutdown.get());
+ }
+
+
+ @Test
+ public void testNormalExecution() {
+ LocalStreamBuilder sb = mock(LocalStreamBuilder.class);
+ final AtomicBoolean isShutdown = new AtomicBoolean(false);
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ isShutdown.set(true);
+ return null;
+ }
+ }).when(sb).stop();
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+ latch.countDown();
+ }
+ };
+
+ ExecutorService executor = new ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(1, sb);
+ executor.execute(runnable);
+ try {
+ latch.await();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ executor.shutdownNow();
+ try {
+ executor.awaitTermination(1, TimeUnit.SECONDS);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
}
+ assertFalse("Expected StreamBuilder shutdown to be called", isShutdown.get());
+ }
}