You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/02/20 10:15:06 UTC

[3/3] camel git commit: CAMEL-9153: ThreadPoolRejectedPolicy does not implement Abort as expected. Thanks to Michael Riedel for the patch.

CAMEL-9153: ThreadPoolRejectedPolicy does not implement Abort as expected. Thanks to Michael Riedel for the patch.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/80b0d0b4
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/80b0d0b4
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/80b0d0b4

Branch: refs/heads/master
Commit: 80b0d0b44515b50088ba486fcd83e353eba9d5fa
Parents: 04f3d11
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Feb 20 09:57:57 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Feb 20 09:57:57 2016 +0100

----------------------------------------------------------------------
 .../apache/camel/ThreadPoolRejectedPolicy.java  |   5 +-
 .../RejectableScheduledThreadPoolExecutor.java  |  14 +-
 .../RejectableThreadPoolExecutor.java           |  14 +-
 .../camel/ThreadPoolRejectedPolicyTest.java     | 275 +++++++++++++++++++
 4 files changed, 303 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/80b0d0b4/camel-core/src/main/java/org/apache/camel/ThreadPoolRejectedPolicy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/ThreadPoolRejectedPolicy.java b/camel-core/src/main/java/org/apache/camel/ThreadPoolRejectedPolicy.java
index 623a3ed..11f3b59 100644
--- a/camel-core/src/main/java/org/apache/camel/ThreadPoolRejectedPolicy.java
+++ b/camel-core/src/main/java/org/apache/camel/ThreadPoolRejectedPolicy.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel;
 
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ThreadPoolExecutor;
 import javax.xml.bind.annotation.XmlEnum;
@@ -42,7 +43,9 @@ public enum ThreadPoolRejectedPolicy {
                 @Override
                 public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                     if (r instanceof Rejectable) {
-                        ((Rejectable) r).reject();
+                        ((Rejectable)r).reject();
+                    } else {
+                        throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + executor.toString());
                     }
                 }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/80b0d0b4/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableScheduledThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableScheduledThreadPoolExecutor.java b/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableScheduledThreadPoolExecutor.java
index 3f61869..7a205e6 100644
--- a/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableScheduledThreadPoolExecutor.java
+++ b/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableScheduledThreadPoolExecutor.java
@@ -22,6 +22,8 @@ import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 
+import org.apache.camel.Rejectable;
+
 /**
  * Scheduled thread pool executor that creates {@link RejectableFutureTask} instead of
  * {@link java.util.concurrent.FutureTask} when registering new tasks for execution.
@@ -70,12 +72,20 @@ public class RejectableScheduledThreadPoolExecutor extends ScheduledThreadPoolEx
 
     @Override
     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
-        return new RejectableFutureTask<T>(runnable, value);
+        if (runnable instanceof Rejectable) {
+            return new RejectableFutureTask<T>(runnable, value);
+        } else {
+            return super.newTaskFor(runnable, value);
+        }
     }
 
     @Override
     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
-        return new RejectableFutureTask<T>(callable);
+        if (callable instanceof Rejectable) {
+            return new RejectableFutureTask<T>(callable);
+        } else {
+            return super.newTaskFor(callable);
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/80b0d0b4/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableThreadPoolExecutor.java b/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableThreadPoolExecutor.java
index 6d67c6b..8962184 100644
--- a/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableThreadPoolExecutor.java
+++ b/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableThreadPoolExecutor.java
@@ -24,6 +24,8 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.camel.Rejectable;
+
 /**
  * Thread pool executor that creates {@link RejectableFutureTask} instead of
  * {@link java.util.concurrent.FutureTask} when registering new tasks for execution.
@@ -76,12 +78,20 @@ public class RejectableThreadPoolExecutor extends ThreadPoolExecutor {
 
     @Override
     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
-        return new RejectableFutureTask<T>(runnable, value);
+        if (runnable instanceof Rejectable) {
+            return new RejectableFutureTask<T>(runnable, value);
+        } else {
+            return super.newTaskFor(runnable, value);
+        }
     }
 
     @Override
     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
-        return new RejectableFutureTask<T>(callable);
+        if (callable instanceof Rejectable) {
+            return new RejectableFutureTask<T>(callable);
+        } else {
+            return super.newTaskFor(callable);
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/80b0d0b4/camel-core/src/test/java/org/apache/camel/ThreadPoolRejectedPolicyTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/ThreadPoolRejectedPolicyTest.java b/camel-core/src/test/java/org/apache/camel/ThreadPoolRejectedPolicyTest.java
new file mode 100644
index 0000000..fb64d02
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/ThreadPoolRejectedPolicyTest.java
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.util.concurrent.RejectableThreadPoolExecutor;
+
+public class ThreadPoolRejectedPolicyTest extends TestSupport {
+
+    public void testAbortAsRejectedExecutionHandler() throws InterruptedException {
+
+        final ExecutorService executorService = createTestExecutorService(ThreadPoolRejectedPolicy.Abort.asRejectedExecutionHandler());
+
+        final MockCallable<String> task1 = new MockCallable<String>();
+        final Future<?> result1 = executorService.submit(task1);
+        final MockRunnable task2 = new MockRunnable();
+        final Future<?> result2 = executorService.submit(task2);
+        final MockCallable<String> task3 = new MockCallable<String>();
+        try {
+            executorService.submit(task3);
+            fail("Third task should have been rejected by a threadpool is full with 1 task and queue is full with 1 task.");
+        } catch (RejectedExecutionException e) {
+        }
+
+        shutdownAndAwait(executorService);
+
+        assertInvoked(task1, result1);
+        assertInvoked(task2, result2);
+        assertRejected(task3, null);
+    }
+
+    public void testAbortAsRejectedExecutionHandlerWithRejectableTasks() throws InterruptedException {
+
+        final ExecutorService executorService = createTestExecutorService(ThreadPoolRejectedPolicy.Abort.asRejectedExecutionHandler());
+
+        final MockRejectableRunnable task1 = new MockRejectableRunnable();
+        final Future<?> result1 = executorService.submit(task1);
+        final MockRejectableCallable<String> task2 = new MockRejectableCallable<String>();
+        final Future<?> result2 = executorService.submit(task2);
+        final MockRejectableRunnable task3 = new MockRejectableRunnable();
+        final Future<?> result3 = executorService.submit(task3);
+
+        final MockRejectableCallable<String> task4 = new MockRejectableCallable<String>();
+        final Future<?> result4 = executorService.submit(task4);
+
+        shutdownAndAwait(executorService);
+
+        assertInvoked(task1, result1);
+        assertInvoked(task2, result2);
+        assertRejected(task3, result3);
+        assertRejected(task4, result4);
+    }
+
+    public void testCallerRunsAsRejectedExecutionHandler() throws InterruptedException {
+
+        final ExecutorService executorService = createTestExecutorService(ThreadPoolRejectedPolicy.CallerRuns.asRejectedExecutionHandler());
+
+        final MockRunnable task1 = new MockRunnable();
+        final Future<?> result1 = executorService.submit(task1);
+        final MockRunnable task2 = new MockRunnable();
+        final Future<?> result2 = executorService.submit(task2);
+        final MockRunnable task3 = new MockRunnable();
+        final Future<?> result3 = executorService.submit(task3);
+
+        shutdownAndAwait(executorService);
+
+        assertInvoked(task1, result1);
+        assertInvoked(task2, result2);
+        assertInvoked(task3, result3);
+    }
+
+    public void testCallerRunsAsRejectedExecutionHandlerWithRejectableTasks() throws InterruptedException {
+
+        final ExecutorService executorService = createTestExecutorService(ThreadPoolRejectedPolicy.CallerRuns.asRejectedExecutionHandler());
+
+        final MockRejectableRunnable task1 = new MockRejectableRunnable();
+        final Future<?> result1 = executorService.submit(task1);
+        final MockRejectableRunnable task2 = new MockRejectableRunnable();
+        final Future<?> result2 = executorService.submit(task2);
+        final MockRejectableRunnable task3 = new MockRejectableRunnable();
+        final Future<?> result3 = executorService.submit(task3);
+
+        shutdownAndAwait(executorService);
+
+        assertInvoked(task1, result1);
+        assertInvoked(task2, result2);
+        assertInvoked(task3, result3);
+    }
+
+    public void testDiscardAsRejectedExecutionHandler() throws InterruptedException {
+
+        final ExecutorService executorService = createTestExecutorService(ThreadPoolRejectedPolicy.Discard.asRejectedExecutionHandler());
+
+        final MockRunnable task1 = new MockRunnable();
+        final Future<?> result1 = executorService.submit(task1);
+        final MockRunnable task2 = new MockRunnable();
+        final Future<?> result2 = executorService.submit(task2);
+        final MockRunnable task3 = new MockRunnable();
+        final Future<?> result3 = executorService.submit(task3);
+
+        shutdownAndAwait(executorService);
+
+        assertInvoked(task1, result1);
+        assertInvoked(task2, result2);
+        assertRejected(task3, result3);
+    }
+
+    public void testDiscardAsRejectedExecutionHandlerWithRejectableTasks() throws InterruptedException {
+
+        final ExecutorService executorService = createTestExecutorService(ThreadPoolRejectedPolicy.Discard.asRejectedExecutionHandler());
+
+        final MockRejectableRunnable task1 = new MockRejectableRunnable();
+        final Future<?> result1 = executorService.submit(task1);
+        final MockRejectableRunnable task2 = new MockRejectableRunnable();
+        final Future<?> result2 = executorService.submit(task2);
+        final MockRejectableRunnable task3 = new MockRejectableRunnable();
+        final Future<?> result3 = executorService.submit(task3);
+
+        shutdownAndAwait(executorService);
+
+        assertInvoked(task1, result1);
+        assertInvoked(task2, result2);
+        assertRejected(task3, result3);
+    }
+
+    public void testDiscardOldestAsRejectedExecutionHandler() throws InterruptedException {
+
+        final ExecutorService executorService = createTestExecutorService(ThreadPoolRejectedPolicy.DiscardOldest.asRejectedExecutionHandler());
+
+        final MockRunnable task1 = new MockRunnable();
+        final Future<?> result1 = executorService.submit(task1);
+        final MockRunnable task2 = new MockRunnable();
+        final Future<?> result2 = executorService.submit(task2);
+        final MockRunnable task3 = new MockRunnable();
+        final Future<?> result3 = executorService.submit(task3);
+
+        shutdownAndAwait(executorService);
+
+        assertInvoked(task1, result1);
+        assertRejected(task2, result2);
+        assertInvoked(task3, result3);
+    }
+
+    public void testDiscardOldestAsRejectedExecutionHandlerWithRejectableTasks() throws InterruptedException {
+
+        final ExecutorService executorService = createTestExecutorService(ThreadPoolRejectedPolicy.DiscardOldest.asRejectedExecutionHandler());
+
+        final MockRejectableRunnable task1 = new MockRejectableRunnable();
+        final Future<?> result1 = executorService.submit(task1);
+        final MockRejectableRunnable task2 = new MockRejectableRunnable();
+        final Future<?> result2 = executorService.submit(task2);
+        final MockRejectableRunnable task3 = new MockRejectableRunnable();
+        final Future<?> result3 = executorService.submit(task3);
+
+        shutdownAndAwait(executorService);
+
+        assertInvoked(task1, result1);
+        assertRejected(task2, result2);
+        assertInvoked(task3, result3);
+    }
+
+    private ExecutorService createTestExecutorService(final RejectedExecutionHandler rejectedExecutionHandler) {
+        return new RejectableThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(1), rejectedExecutionHandler);
+    }
+
+    private void shutdownAndAwait(final ExecutorService executorService) {
+        executorService.shutdown();
+        try {
+            assertTrue("Test ExecutorService shutdown is not expected to take longer than 10 seconds.", executorService.awaitTermination(10, TimeUnit.SECONDS));
+        } catch (InterruptedException e) {
+            fail("Test ExecutorService shutdown is not expected to be interrupted.");
+        }
+    }
+
+    private void assertInvoked(MockTask task, Future<?> result) {
+        assertTrue(result.isDone());
+        assertEquals(1, task.getInvocationCount());
+        if (task instanceof Rejectable) {
+            assertEquals(0, task.getRejectionCount());
+        }
+    }
+
+    private void assertRejected(MockTask task, Future<?> result) {
+        if (result != null) {
+            assertFalse(result.isDone());
+        }
+        assertEquals(0, task.getInvocationCount());
+        if (task instanceof Rejectable) {
+            assertEquals(1, task.getRejectionCount());
+        }
+    }
+
+    private abstract static class MockTask {
+        private final AtomicInteger invocationCount = new AtomicInteger();
+
+        private final AtomicInteger rejectionCount = new AtomicInteger();
+
+        public int getInvocationCount() {
+            return invocationCount.get();
+        }
+
+        protected void countInvocation() {
+            invocationCount.incrementAndGet();
+        }
+
+        public int getRejectionCount() {
+            return rejectionCount.get();
+        }
+
+        protected void countRejection() {
+            rejectionCount.incrementAndGet();
+        }
+    }
+
+    private static class MockRunnable extends MockTask implements Runnable {
+        @Override
+        public void run() {
+            countInvocation();
+            try {
+                TimeUnit.MILLISECONDS.sleep(100);
+            } catch (InterruptedException e) {
+                fail("MockRunnable task is not expected to be interrupted.");
+            }
+        }
+    }
+
+    private static class MockRejectableRunnable extends MockRunnable implements Rejectable {
+        @Override
+        public void reject() {
+            countRejection();
+        }
+    }
+
+    private static class MockCallable<T> extends MockTask implements Callable<T> {
+        @Override
+        public T call() throws Exception {
+            countInvocation();
+            try {
+                TimeUnit.MILLISECONDS.sleep(100);
+            } catch (InterruptedException e) {
+                fail("MockCallable task is not expected to be interrupted.");
+            }
+            return null;
+        }
+    }
+
+    private static class MockRejectableCallable<T> extends MockCallable<T> implements Rejectable {
+        @Override
+        public void reject() {
+            countRejection();
+        }
+    }
+}