You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/10/11 22:37:59 UTC

[09/17] git commit: STREAMS-179 | Added new thread pool executor to shutdown streams

STREAMS-179 | Added new thread pool executor to shutdown streams


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/e091c6cc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/e091c6cc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/e091c6cc

Branch: refs/heads/STREAMS-170
Commit: e091c6ccc4f6ca849a638402ce8de5a7d73a8df0
Parents: 507e679
Author: Ryan Ebanks <re...@Informations-MacBook-Pro-3.local>
Authored: Fri Sep 19 15:22:32 2014 -0500
Committer: Ryan Ebanks <re...@Informations-MacBook-Pro-3.local>
Committed: Fri Sep 19 15:22:32 2014 -0500

----------------------------------------------------------------------
 .../local/builders/LocalStreamBuilder.java      |   3 +-
 ...amOnUnhandleThrowableThreadPoolExecutor.java |  45 ++++++++
 ...nhandledThrowableThreadPoolExecutorTest.java | 103 +++++++++++++++++++
 3 files changed, 150 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e091c6cc/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index 25f9fe7..bec1ff9 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -20,6 +20,7 @@ package org.apache.streams.local.builders;
 
 import org.apache.log4j.spi.LoggerFactory;
 import org.apache.streams.core.*;
+import org.apache.streams.local.executors.ShutdownStreamOnUnhandleThrowableThreadPoolExecutor;
 import org.apache.streams.local.tasks.LocalStreamProcessMonitorThread;
 import org.apache.streams.local.tasks.StatusCounterMonitorThread;
 import org.apache.streams.local.tasks.StreamsProviderTask;
@@ -173,7 +174,7 @@ public class LocalStreamBuilder implements StreamBuilder {
     public void start() {
         attachShutdownHandler();
         boolean isRunning = true;
-        this.executor = Executors.newFixedThreadPool(this.totalTasks);
+        this.executor = new ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(this.totalTasks, this);
         this.monitor = Executors.newFixedThreadPool(this.monitorTasks+1);
         Map<String, StreamsProviderTask> provTasks = new HashMap<String, StreamsProviderTask>();
         tasks = new HashMap<String, List<StreamsTask>>();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e091c6cc/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
new file mode 100644
index 0000000..f8d6343
--- /dev/null
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
@@ -0,0 +1,45 @@
+package org.apache.streams.local.executors;
+
+import org.apache.streams.local.builders.LocalStreamBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.*;
+
+/**
+ * @see {@link java.util.concurrent.ThreadPoolExecutor}
+ */
+public class ShutdownStreamOnUnhandleThrowableThreadPoolExecutor extends ThreadPoolExecutor {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.class);
+
+    private LocalStreamBuilder streamBuilder;
+    private volatile boolean isStoped;
+
+    /**
+     * Creates a fixed size thread pool where corePoolSize & maximumPoolSize equal numThreads with an unbounded queue.
+     * @param numThreads number of threads in pool
+     * @param streamBuilder streambuilder to call {@link org.apache.streams.core.StreamBuilder#stop()} on upon receiving an unhandled throwable
+     */
+    public ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(int numThreads, LocalStreamBuilder streamBuilder) {
+        super(numThreads, numThreads, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
+        this.streamBuilder = streamBuilder;
+        this.isStoped = false;
+    }
+
+    @Override
+    protected void afterExecute(Runnable r, Throwable t) {
+        if(t != null) {
+            LOGGER.error("Runnable, {}, exited with an unhandled throwable! : {}", r.getClass(), t);
+            LOGGER.error("Attempting to shut down stream.");
+            synchronized (this) {
+                if (!this.isStoped) {
+                    this.isStoped = true;
+                    this.streamBuilder.stop();
+                }
+            }
+        } else {
+            LOGGER.trace("Runnable, {}, finished executing.", r.getClass());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e091c6cc/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java
new file mode 100644
index 0000000..17e8dd9
--- /dev/null
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java
@@ -0,0 +1,103 @@
+package org.apache.streams.local.executors;
+
+import org.apache.streams.local.builders.LocalStreamBuilder;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ *
+ */
+public class ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest {
+
+
+    @Test
+    public void testShutDownOnException() {
+        LocalStreamBuilder sb = mock(LocalStreamBuilder.class);
+        final AtomicBoolean isShutdown = new AtomicBoolean(false);
+        doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+                isShutdown.set(true);
+                return null;
+            }
+        }).when(sb).stop();
+
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        Runnable runnable = new Runnable() {
+            @Override
+            public void run() {
+                latch.countDown();
+                throw new RuntimeException("Testing Throwable Handling!");
+            }
+        };
+
+        ExecutorService executor = new ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(1, sb);
+        executor.execute(runnable);
+        try {
+            latch.await();
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+        }
+        executor.shutdownNow();
+        try {
+            executor.awaitTermination(1, TimeUnit.SECONDS);
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+        }
+        assertTrue("Expected StreamBuilder shutdown to be called", isShutdown.get());
+    }
+
+
+    @Test
+    public void testNormalExecution() {
+        LocalStreamBuilder sb = mock(LocalStreamBuilder.class);
+        final AtomicBoolean isShutdown = new AtomicBoolean(false);
+        doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+                isShutdown.set(true);
+                return null;
+            }
+        }).when(sb).stop();
+
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        Runnable runnable = new Runnable() {
+            @Override
+            public void run() {
+                latch.countDown();
+            }
+        };
+
+        ExecutorService executor = new ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(1, sb);
+        executor.execute(runnable);
+        try {
+            latch.await();
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+        }
+        executor.shutdownNow();
+        try {
+            executor.awaitTermination(1, TimeUnit.SECONDS);
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+        }
+        assertFalse("Expected StreamBuilder shutdown to be called", isShutdown.get());
+    }
+
+
+}