You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/10/11 22:37:59 UTC
[09/17] git commit: STREAMS-179 | Added new thread pool executor to
shutdown streams
STREAMS-179 | Added new thread pool executor to shutdown streams
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/e091c6cc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/e091c6cc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/e091c6cc
Branch: refs/heads/STREAMS-170
Commit: e091c6ccc4f6ca849a638402ce8de5a7d73a8df0
Parents: 507e679
Author: Ryan Ebanks <re...@Informations-MacBook-Pro-3.local>
Authored: Fri Sep 19 15:22:32 2014 -0500
Committer: Ryan Ebanks <re...@Informations-MacBook-Pro-3.local>
Committed: Fri Sep 19 15:22:32 2014 -0500
----------------------------------------------------------------------
.../local/builders/LocalStreamBuilder.java | 3 +-
...amOnUnhandleThrowableThreadPoolExecutor.java | 45 ++++++++
...nhandledThrowableThreadPoolExecutorTest.java | 103 +++++++++++++++++++
3 files changed, 150 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e091c6cc/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index 25f9fe7..bec1ff9 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -20,6 +20,7 @@ package org.apache.streams.local.builders;
import org.apache.log4j.spi.LoggerFactory;
import org.apache.streams.core.*;
+import org.apache.streams.local.executors.ShutdownStreamOnUnhandleThrowableThreadPoolExecutor;
import org.apache.streams.local.tasks.LocalStreamProcessMonitorThread;
import org.apache.streams.local.tasks.StatusCounterMonitorThread;
import org.apache.streams.local.tasks.StreamsProviderTask;
@@ -173,7 +174,7 @@ public class LocalStreamBuilder implements StreamBuilder {
public void start() {
attachShutdownHandler();
boolean isRunning = true;
- this.executor = Executors.newFixedThreadPool(this.totalTasks);
+ this.executor = new ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(this.totalTasks, this);
this.monitor = Executors.newFixedThreadPool(this.monitorTasks+1);
Map<String, StreamsProviderTask> provTasks = new HashMap<String, StreamsProviderTask>();
tasks = new HashMap<String, List<StreamsTask>>();
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e091c6cc/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
new file mode 100644
index 0000000..f8d6343
--- /dev/null
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
@@ -0,0 +1,45 @@
+package org.apache.streams.local.executors;
+
+import org.apache.streams.local.builders.LocalStreamBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.*;
+
+/**
+ * @see {@link java.util.concurrent.ThreadPoolExecutor}
+ */
+public class ShutdownStreamOnUnhandleThrowableThreadPoolExecutor extends ThreadPoolExecutor {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.class);
+
+ private LocalStreamBuilder streamBuilder;
+ private volatile boolean isStoped;
+
+ /**
+ * Creates a fixed size thread pool where corePoolSize & maximumPoolSize equal numThreads with an unbounded queue.
+ * @param numThreads number of threads in pool
+ * @param streamBuilder streambuilder to call {@link org.apache.streams.core.StreamBuilder#stop()} on upon receiving an unhandled throwable
+ */
+ public ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(int numThreads, LocalStreamBuilder streamBuilder) {
+ super(numThreads, numThreads, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
+ this.streamBuilder = streamBuilder;
+ this.isStoped = false;
+ }
+
+ @Override
+ protected void afterExecute(Runnable r, Throwable t) {
+ if(t != null) {
+ LOGGER.error("Runnable, {}, exited with an unhandled throwable! : {}", r.getClass(), t);
+ LOGGER.error("Attempting to shut down stream.");
+ synchronized (this) {
+ if (!this.isStoped) {
+ this.isStoped = true;
+ this.streamBuilder.stop();
+ }
+ }
+ } else {
+ LOGGER.trace("Runnable, {}, finished executing.", r.getClass());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e091c6cc/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java
new file mode 100644
index 0000000..17e8dd9
--- /dev/null
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java
@@ -0,0 +1,103 @@
+package org.apache.streams.local.executors;
+
+import org.apache.streams.local.builders.LocalStreamBuilder;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ *
+ */
+public class ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest {
+
+
+ @Test
+ public void testShutDownOnException() {
+ LocalStreamBuilder sb = mock(LocalStreamBuilder.class);
+ final AtomicBoolean isShutdown = new AtomicBoolean(false);
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ isShutdown.set(true);
+ return null;
+ }
+ }).when(sb).stop();
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+ latch.countDown();
+ throw new RuntimeException("Testing Throwable Handling!");
+ }
+ };
+
+ ExecutorService executor = new ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(1, sb);
+ executor.execute(runnable);
+ try {
+ latch.await();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ executor.shutdownNow();
+ try {
+ executor.awaitTermination(1, TimeUnit.SECONDS);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ assertTrue("Expected StreamBuilder shutdown to be called", isShutdown.get());
+ }
+
+
+ @Test
+ public void testNormalExecution() {
+ LocalStreamBuilder sb = mock(LocalStreamBuilder.class);
+ final AtomicBoolean isShutdown = new AtomicBoolean(false);
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ isShutdown.set(true);
+ return null;
+ }
+ }).when(sb).stop();
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+ latch.countDown();
+ }
+ };
+
+ ExecutorService executor = new ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(1, sb);
+ executor.execute(runnable);
+ try {
+ latch.await();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ executor.shutdownNow();
+ try {
+ executor.awaitTermination(1, TimeUnit.SECONDS);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ assertFalse("Expected StreamBuilder shutdown to be called", isShutdown.get());
+ }
+
+
+}