You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/02/20 10:15:06 UTC
[3/3] camel git commit: CAMEL-9153: ThreadPoolRejectedPolicy does not
implement Abort as expected. Thanks to Michael Riedel for the patch.
CAMEL-9153: ThreadPoolRejectedPolicy does not implement Abort as expected. Thanks to Michael Riedel for the patch.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/80b0d0b4
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/80b0d0b4
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/80b0d0b4
Branch: refs/heads/master
Commit: 80b0d0b44515b50088ba486fcd83e353eba9d5fa
Parents: 04f3d11
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Feb 20 09:57:57 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Feb 20 09:57:57 2016 +0100
----------------------------------------------------------------------
.../apache/camel/ThreadPoolRejectedPolicy.java | 5 +-
.../RejectableScheduledThreadPoolExecutor.java | 14 +-
.../RejectableThreadPoolExecutor.java | 14 +-
.../camel/ThreadPoolRejectedPolicyTest.java | 275 +++++++++++++++++++
4 files changed, 303 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/80b0d0b4/camel-core/src/main/java/org/apache/camel/ThreadPoolRejectedPolicy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/ThreadPoolRejectedPolicy.java b/camel-core/src/main/java/org/apache/camel/ThreadPoolRejectedPolicy.java
index 623a3ed..11f3b59 100644
--- a/camel-core/src/main/java/org/apache/camel/ThreadPoolRejectedPolicy.java
+++ b/camel-core/src/main/java/org/apache/camel/ThreadPoolRejectedPolicy.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import javax.xml.bind.annotation.XmlEnum;
@@ -42,7 +43,9 @@ public enum ThreadPoolRejectedPolicy {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (r instanceof Rejectable) {
- ((Rejectable) r).reject();
+ ((Rejectable)r).reject();
+ } else {
+ throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + executor.toString());
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/80b0d0b4/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableScheduledThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableScheduledThreadPoolExecutor.java b/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableScheduledThreadPoolExecutor.java
index 3f61869..7a205e6 100644
--- a/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableScheduledThreadPoolExecutor.java
+++ b/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableScheduledThreadPoolExecutor.java
@@ -22,6 +22,8 @@ import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
+import org.apache.camel.Rejectable;
+
/**
* Scheduled thread pool executor that creates {@link RejectableFutureTask} instead of
* {@link java.util.concurrent.FutureTask} when registering new tasks for execution.
@@ -70,12 +72,20 @@ public class RejectableScheduledThreadPoolExecutor extends ScheduledThreadPoolEx
@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
- return new RejectableFutureTask<T>(runnable, value);
+ if (runnable instanceof Rejectable) {
+ return new RejectableFutureTask<T>(runnable, value);
+ } else {
+ return super.newTaskFor(runnable, value);
+ }
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
- return new RejectableFutureTask<T>(callable);
+ if (callable instanceof Rejectable) {
+ return new RejectableFutureTask<T>(callable);
+ } else {
+ return super.newTaskFor(callable);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/camel/blob/80b0d0b4/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableThreadPoolExecutor.java b/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableThreadPoolExecutor.java
index 6d67c6b..8962184 100644
--- a/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableThreadPoolExecutor.java
+++ b/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableThreadPoolExecutor.java
@@ -24,6 +24,8 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import org.apache.camel.Rejectable;
+
/**
* Thread pool executor that creates {@link RejectableFutureTask} instead of
* {@link java.util.concurrent.FutureTask} when registering new tasks for execution.
@@ -76,12 +78,20 @@ public class RejectableThreadPoolExecutor extends ThreadPoolExecutor {
@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
- return new RejectableFutureTask<T>(runnable, value);
+ if (runnable instanceof Rejectable) {
+ return new RejectableFutureTask<T>(runnable, value);
+ } else {
+ return super.newTaskFor(runnable, value);
+ }
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
- return new RejectableFutureTask<T>(callable);
+ if (callable instanceof Rejectable) {
+ return new RejectableFutureTask<T>(callable);
+ } else {
+ return super.newTaskFor(callable);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/camel/blob/80b0d0b4/camel-core/src/test/java/org/apache/camel/ThreadPoolRejectedPolicyTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/ThreadPoolRejectedPolicyTest.java b/camel-core/src/test/java/org/apache/camel/ThreadPoolRejectedPolicyTest.java
new file mode 100644
index 0000000..fb64d02
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/ThreadPoolRejectedPolicyTest.java
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.util.concurrent.RejectableThreadPoolExecutor;
+
+public class ThreadPoolRejectedPolicyTest extends TestSupport {
+
+ public void testAbortAsRejectedExecutionHandler() throws InterruptedException {
+
+ final ExecutorService executorService = createTestExecutorService(ThreadPoolRejectedPolicy.Abort.asRejectedExecutionHandler());
+
+ final MockCallable<String> task1 = new MockCallable<String>();
+ final Future<?> result1 = executorService.submit(task1);
+ final MockRunnable task2 = new MockRunnable();
+ final Future<?> result2 = executorService.submit(task2);
+ final MockCallable<String> task3 = new MockCallable<String>();
+ try {
+ executorService.submit(task3);
+ fail("Third task should have been rejected by a threadpool is full with 1 task and queue is full with 1 task.");
+ } catch (RejectedExecutionException e) {
+ }
+
+ shutdownAndAwait(executorService);
+
+ assertInvoked(task1, result1);
+ assertInvoked(task2, result2);
+ assertRejected(task3, null);
+ }
+
+ public void testAbortAsRejectedExecutionHandlerWithRejectableTasks() throws InterruptedException {
+
+ final ExecutorService executorService = createTestExecutorService(ThreadPoolRejectedPolicy.Abort.asRejectedExecutionHandler());
+
+ final MockRejectableRunnable task1 = new MockRejectableRunnable();
+ final Future<?> result1 = executorService.submit(task1);
+ final MockRejectableCallable<String> task2 = new MockRejectableCallable<String>();
+ final Future<?> result2 = executorService.submit(task2);
+ final MockRejectableRunnable task3 = new MockRejectableRunnable();
+ final Future<?> result3 = executorService.submit(task3);
+
+ final MockRejectableCallable<String> task4 = new MockRejectableCallable<String>();
+ final Future<?> result4 = executorService.submit(task4);
+
+ shutdownAndAwait(executorService);
+
+ assertInvoked(task1, result1);
+ assertInvoked(task2, result2);
+ assertRejected(task3, result3);
+ assertRejected(task4, result4);
+ }
+
+ public void testCallerRunsAsRejectedExecutionHandler() throws InterruptedException {
+
+ final ExecutorService executorService = createTestExecutorService(ThreadPoolRejectedPolicy.CallerRuns.asRejectedExecutionHandler());
+
+ final MockRunnable task1 = new MockRunnable();
+ final Future<?> result1 = executorService.submit(task1);
+ final MockRunnable task2 = new MockRunnable();
+ final Future<?> result2 = executorService.submit(task2);
+ final MockRunnable task3 = new MockRunnable();
+ final Future<?> result3 = executorService.submit(task3);
+
+ shutdownAndAwait(executorService);
+
+ assertInvoked(task1, result1);
+ assertInvoked(task2, result2);
+ assertInvoked(task3, result3);
+ }
+
+ public void testCallerRunsAsRejectedExecutionHandlerWithRejectableTasks() throws InterruptedException {
+
+ final ExecutorService executorService = createTestExecutorService(ThreadPoolRejectedPolicy.CallerRuns.asRejectedExecutionHandler());
+
+ final MockRejectableRunnable task1 = new MockRejectableRunnable();
+ final Future<?> result1 = executorService.submit(task1);
+ final MockRejectableRunnable task2 = new MockRejectableRunnable();
+ final Future<?> result2 = executorService.submit(task2);
+ final MockRejectableRunnable task3 = new MockRejectableRunnable();
+ final Future<?> result3 = executorService.submit(task3);
+
+ shutdownAndAwait(executorService);
+
+ assertInvoked(task1, result1);
+ assertInvoked(task2, result2);
+ assertInvoked(task3, result3);
+ }
+
+ public void testDiscardAsRejectedExecutionHandler() throws InterruptedException {
+
+ final ExecutorService executorService = createTestExecutorService(ThreadPoolRejectedPolicy.Discard.asRejectedExecutionHandler());
+
+ final MockRunnable task1 = new MockRunnable();
+ final Future<?> result1 = executorService.submit(task1);
+ final MockRunnable task2 = new MockRunnable();
+ final Future<?> result2 = executorService.submit(task2);
+ final MockRunnable task3 = new MockRunnable();
+ final Future<?> result3 = executorService.submit(task3);
+
+ shutdownAndAwait(executorService);
+
+ assertInvoked(task1, result1);
+ assertInvoked(task2, result2);
+ assertRejected(task3, result3);
+ }
+
+ public void testDiscardAsRejectedExecutionHandlerWithRejectableTasks() throws InterruptedException {
+
+ final ExecutorService executorService = createTestExecutorService(ThreadPoolRejectedPolicy.Discard.asRejectedExecutionHandler());
+
+ final MockRejectableRunnable task1 = new MockRejectableRunnable();
+ final Future<?> result1 = executorService.submit(task1);
+ final MockRejectableRunnable task2 = new MockRejectableRunnable();
+ final Future<?> result2 = executorService.submit(task2);
+ final MockRejectableRunnable task3 = new MockRejectableRunnable();
+ final Future<?> result3 = executorService.submit(task3);
+
+ shutdownAndAwait(executorService);
+
+ assertInvoked(task1, result1);
+ assertInvoked(task2, result2);
+ assertRejected(task3, result3);
+ }
+
+ public void testDiscardOldestAsRejectedExecutionHandler() throws InterruptedException {
+
+ final ExecutorService executorService = createTestExecutorService(ThreadPoolRejectedPolicy.DiscardOldest.asRejectedExecutionHandler());
+
+ final MockRunnable task1 = new MockRunnable();
+ final Future<?> result1 = executorService.submit(task1);
+ final MockRunnable task2 = new MockRunnable();
+ final Future<?> result2 = executorService.submit(task2);
+ final MockRunnable task3 = new MockRunnable();
+ final Future<?> result3 = executorService.submit(task3);
+
+ shutdownAndAwait(executorService);
+
+ assertInvoked(task1, result1);
+ assertRejected(task2, result2);
+ assertInvoked(task3, result3);
+ }
+
+ public void testDiscardOldestAsRejectedExecutionHandlerWithRejectableTasks() throws InterruptedException {
+
+ final ExecutorService executorService = createTestExecutorService(ThreadPoolRejectedPolicy.DiscardOldest.asRejectedExecutionHandler());
+
+ final MockRejectableRunnable task1 = new MockRejectableRunnable();
+ final Future<?> result1 = executorService.submit(task1);
+ final MockRejectableRunnable task2 = new MockRejectableRunnable();
+ final Future<?> result2 = executorService.submit(task2);
+ final MockRejectableRunnable task3 = new MockRejectableRunnable();
+ final Future<?> result3 = executorService.submit(task3);
+
+ shutdownAndAwait(executorService);
+
+ assertInvoked(task1, result1);
+ assertRejected(task2, result2);
+ assertInvoked(task3, result3);
+ }
+
+ private ExecutorService createTestExecutorService(final RejectedExecutionHandler rejectedExecutionHandler) {
+ return new RejectableThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(1), rejectedExecutionHandler);
+ }
+
+ private void shutdownAndAwait(final ExecutorService executorService) {
+ executorService.shutdown();
+ try {
+ assertTrue("Test ExecutorService shutdown is not expected to take longer than 10 seconds.", executorService.awaitTermination(10, TimeUnit.SECONDS));
+ } catch (InterruptedException e) {
+ fail("Test ExecutorService shutdown is not expected to be interrupted.");
+ }
+ }
+
+ private void assertInvoked(MockTask task, Future<?> result) {
+ assertTrue(result.isDone());
+ assertEquals(1, task.getInvocationCount());
+ if (task instanceof Rejectable) {
+ assertEquals(0, task.getRejectionCount());
+ }
+ }
+
+ private void assertRejected(MockTask task, Future<?> result) {
+ if (result != null) {
+ assertFalse(result.isDone());
+ }
+ assertEquals(0, task.getInvocationCount());
+ if (task instanceof Rejectable) {
+ assertEquals(1, task.getRejectionCount());
+ }
+ }
+
+ private abstract static class MockTask {
+ private final AtomicInteger invocationCount = new AtomicInteger();
+
+ private final AtomicInteger rejectionCount = new AtomicInteger();
+
+ public int getInvocationCount() {
+ return invocationCount.get();
+ }
+
+ protected void countInvocation() {
+ invocationCount.incrementAndGet();
+ }
+
+ public int getRejectionCount() {
+ return rejectionCount.get();
+ }
+
+ protected void countRejection() {
+ rejectionCount.incrementAndGet();
+ }
+ }
+
+ private static class MockRunnable extends MockTask implements Runnable {
+ @Override
+ public void run() {
+ countInvocation();
+ try {
+ TimeUnit.MILLISECONDS.sleep(100);
+ } catch (InterruptedException e) {
+ fail("MockRunnable task is not expected to be interrupted.");
+ }
+ }
+ }
+
+ private static class MockRejectableRunnable extends MockRunnable implements Rejectable {
+ @Override
+ public void reject() {
+ countRejection();
+ }
+ }
+
+ private static class MockCallable<T> extends MockTask implements Callable<T> {
+ @Override
+ public T call() throws Exception {
+ countInvocation();
+ try {
+ TimeUnit.MILLISECONDS.sleep(100);
+ } catch (InterruptedException e) {
+ fail("MockCallable task is not expected to be interrupted.");
+ }
+ return null;
+ }
+ }
+
+ private static class MockRejectableCallable<T> extends MockCallable<T> implements Rejectable {
+ @Override
+ public void reject() {
+ countRejection();
+ }
+ }
+}