You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/04/20 08:16:50 UTC

[1/2] camel git commit: CAMEL-9791: Threads EIP now allow error handler to perform redelivery if adding task to queue is rejected. Thanks to Thibaut Robert for the unit test.

Repository: camel
Updated Branches:
  refs/heads/camel-2.17.x e4f4ed38f -> 194984784


CAMEL-9791: Threads EIP now allow error handler to perform redelivery if adding task to queue is rejected. Thanks to Thibaut Robert for the unit test.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5f442d8b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5f442d8b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5f442d8b

Branch: refs/heads/camel-2.17.x
Commit: 5f442d8bf03ab2015ce56b8e05a9fea4a1b37e31
Parents: e4f4ed3
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Apr 4 16:55:54 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Apr 20 08:16:23 2016 +0200

----------------------------------------------------------------------
 .../camel/processor/ThreadsProcessor.java       |  4 -
 ...eadsRejectedExecutionWithDeadLetterTest.java | 91 ++++++++++++++++++++
 2 files changed, 91 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/5f442d8b/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
index c71821f..2f97943 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
@@ -92,11 +92,7 @@ public class ThreadsProcessor extends ServiceSupport implements AsyncProcessor,
             if (abort) {
                 exchange.setException(new RejectedExecutionException());
             }
-
             LOG.trace("{} routing exchange {} ", abort ? "Aborted" : "Rejected", exchange);
-            // we should not continue routing, and no redelivery should be performed
-            exchange.setProperty(Exchange.ROUTE_STOP, true);
-            exchange.setProperty(Exchange.REDELIVERY_EXHAUSTED, true);
 
             if (shutdown.get()) {
                 exchange.setException(new RejectedExecutionException("ThreadsProcessor is not running."));

http://git-wip-us.apache.org/repos/asf/camel/blob/5f442d8b/camel-core/src/test/java/org/apache/camel/issues/ThreadsRejectedExecutionWithDeadLetterTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/issues/ThreadsRejectedExecutionWithDeadLetterTest.java b/camel-core/src/test/java/org/apache/camel/issues/ThreadsRejectedExecutionWithDeadLetterTest.java
new file mode 100644
index 0000000..9f32e5f
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/issues/ThreadsRejectedExecutionWithDeadLetterTest.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.issues;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.ThreadPoolRejectedPolicy;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version
+ */
+public class ThreadsRejectedExecutionWithDeadLetterTest extends ContextTestSupport {
+
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+    public void testThreadsRejectedExecution() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("seda:start").errorHandler(deadLetterChannel("mock:failed"))
+                        .to("log:before")
+                        // will use our custom pool
+                        .threads()
+                        .maxPoolSize(1).poolSize(1) // 1 thread max
+                        .maxQueueSize(1)            // 1 queued task
+                        //(Test fails whatever the chosen policy below)
+                        .rejectedPolicy(ThreadPoolRejectedPolicy.Abort)
+                        .delay(1000)
+                        .to("log:after")
+                        .to("mock:result");
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:result").expectedMessageCount(2);
+        getMockEndpoint("mock:failed").expectedMessageCount(1);
+
+        template.sendBody("seda:start", "Hello World"); // will block
+        template.sendBody("seda:start", "Hi World");    // will be queued
+        template.sendBody("seda:start", "Bye World");   // will be rejected
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testThreadsRejectedExecutionWithRedelivery() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("seda:start").errorHandler(deadLetterChannel("mock:failed").maximumRedeliveries(5))
+                        .to("log:before")
+                        // will use our custom pool
+                        .threads()
+                        .maxPoolSize(1).poolSize(1) // 1 thread max
+                        .maxQueueSize(1)            // 1 queued task
+                        //(Test fails whatever the chosen policy below)
+                        .rejectedPolicy(ThreadPoolRejectedPolicy.Abort)
+                        .delay(1000)
+                        .to("log:after")
+                        .to("mock:result");
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:result").expectedMessageCount(3);
+        getMockEndpoint("mock:failed").expectedMessageCount(0);
+
+        template.sendBody("seda:start", "Hello World"); // will block
+        template.sendBody("seda:start", "Hi World");    // will be queued
+        template.sendBody("seda:start", "Bye World");   // will be rejected and queued on redelivery later
+
+        assertMockEndpointsSatisfied();
+    }
+
+}
\ No newline at end of file


[2/2] camel git commit: CAMEL-9791: Threads EIP should use the rejection handler if configured. And only use caller runs as fallback.

Posted by da...@apache.org.
CAMEL-9791: Threads EIP should use the rejection handler if configured. And only use caller runs as fallback.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/19498478
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/19498478
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/19498478

Branch: refs/heads/camel-2.17.x
Commit: 194984784f29b8594654ea8255e59b2c0e22e938
Parents: 5f442d8
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Apr 12 11:40:48 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Apr 20 08:16:39 2016 +0200

----------------------------------------------------------------------
 .../camel/management/mbean/ManagedThreads.java  | 11 ++-
 .../apache/camel/model/ThreadsDefinition.java   | 30 +++++---
 .../camel/processor/ThreadsProcessor.java       | 72 +++++++++-----------
 3 files changed, 59 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/19498478/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThreads.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThreads.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThreads.java
index 408dbcb..fe78b10 100644
--- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThreads.java
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThreads.java
@@ -40,13 +40,18 @@ public class ManagedThreads extends ManagedProcessor implements ManagedThreadsMB
 
     @Override
     public Boolean isCallerRunsWhenRejected() {
-        return processor.isCallerRunsWhenRejected();
+        if (processor.getExecutorService() instanceof ThreadPoolExecutor) {
+            String name = getRejectedPolicy();
+            return "CallerRuns".equals(name);
+        } else {
+            return null;
+        }
     }
 
     @Override
     public String getRejectedPolicy() {
-        if (processor.getRejectedPolicy() != null) {
-            return processor.getRejectedPolicy().name();
+        if (processor.getExecutorService() instanceof ThreadPoolExecutor) {
+            return ((ThreadPoolExecutor) processor.getExecutorService()).getRejectedExecutionHandler().toString();
         } else {
             return null;
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/19498478/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
index 065be28..a287659 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
@@ -85,6 +85,19 @@ public class ThreadsDefinition extends OutputDefinition<ThreadsDefinition> imple
         // prefer any explicit configured executor service
         boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, true);
         ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, name, this, false);
+
+        // resolve what rejected policy to use
+        ThreadPoolRejectedPolicy policy = resolveRejectedPolicy(routeContext);
+        if (policy == null) {
+            if (callerRunsWhenRejected == null || callerRunsWhenRejected) {
+                // should use caller runs by default if not configured
+                policy = ThreadPoolRejectedPolicy.CallerRuns;
+            } else {
+                policy = ThreadPoolRejectedPolicy.Abort;
+            }
+        }
+        log.debug("Using ThreadPoolRejectedPolicy: {}", policy);
+
         // if no explicit then create from the options
         if (threadPool == null) {
             ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager();
@@ -94,7 +107,7 @@ public class ThreadsDefinition extends OutputDefinition<ThreadsDefinition> imple
                     .maxPoolSize(getMaxPoolSize())
                     .keepAliveTime(getKeepAliveTime(), getTimeUnit())
                     .maxQueueSize(getMaxQueueSize())
-                    .rejectedPolicy(getRejectedPolicy())
+                    .rejectedPolicy(policy)
                     .allowCoreThreadTimeOut(getAllowCoreThreadTimeOut())
                     .build();
             threadPool = manager.newThreadPool(this, name, profile);
@@ -112,6 +125,9 @@ public class ThreadsDefinition extends OutputDefinition<ThreadsDefinition> imple
             if (getKeepAliveTime() != null) {
                 throw new IllegalArgumentException("KeepAliveTime and executorServiceRef options cannot be used together.");
             }
+            if (getTimeUnit() != null) {
+                throw new IllegalArgumentException("TimeUnit and executorServiceRef options cannot be used together.");
+            }
             if (getMaxQueueSize() != null) {
                 throw new IllegalArgumentException("MaxQueueSize and executorServiceRef options cannot be used together.");
             }
@@ -123,14 +139,7 @@ public class ThreadsDefinition extends OutputDefinition<ThreadsDefinition> imple
             }
         }
 
-        ThreadsProcessor thread = new ThreadsProcessor(routeContext.getCamelContext(), threadPool, shutdownThreadPool);
-        if (getCallerRunsWhenRejected() == null) {
-            // should be true by default
-            thread.setCallerRunsWhenRejected(true);
-        } else {
-            thread.setCallerRunsWhenRejected(getCallerRunsWhenRejected());
-        }
-        thread.setRejectedPolicy(resolveRejectedPolicy(routeContext));
+        ThreadsProcessor thread = new ThreadsProcessor(routeContext.getCamelContext(), threadPool, shutdownThreadPool, policy);
 
         List<Processor> pipe = new ArrayList<Processor>(2);
         pipe.add(thread);
@@ -262,7 +271,8 @@ public class ThreadsDefinition extends OutputDefinition<ThreadsDefinition> imple
     }
 
     /**
-     * Whether or not the caller should run the task when it was rejected by the thread pool.
+     * Whether or not to use as caller runs as <b>fallback</b> when a task is rejected being added to the thread pool (when its full).
+     * This is only used as fallback if no rejectedPolicy has been configured, or the thread pool has no configured rejection handler.
      * <p/>
      * Is by default <tt>true</tt>
      *

http://git-wip-us.apache.org/repos/asf/camel/blob/19498478/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
index 2f97943..b3c6dc4 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
@@ -18,6 +18,8 @@ package org.apache.camel.processor;
 
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.camel.AsyncCallback;
@@ -62,42 +64,39 @@ public class ThreadsProcessor extends ServiceSupport implements AsyncProcessor,
     private String id;
     private final CamelContext camelContext;
     private final ExecutorService executorService;
+    private final ThreadPoolRejectedPolicy rejectedPolicy;
     private volatile boolean shutdownExecutorService;
     private final AtomicBoolean shutdown = new AtomicBoolean(true);
-    private boolean callerRunsWhenRejected = true;
-    private ThreadPoolRejectedPolicy rejectedPolicy;
 
     private final class ProcessCall implements Runnable, Rejectable {
         private final Exchange exchange;
         private final AsyncCallback callback;
+        private final boolean done;
 
-        ProcessCall(Exchange exchange, AsyncCallback callback) {
+        ProcessCall(Exchange exchange, AsyncCallback callback, boolean done) {
             this.exchange = exchange;
             this.callback = callback;
+            this.done = done;
         }
 
         @Override
         public void run() {
-            LOG.trace("Continue routing exchange {} ", exchange);
+            LOG.trace("Continue routing exchange {}", exchange);
             if (shutdown.get()) {
                 exchange.setException(new RejectedExecutionException("ThreadsProcessor is not running."));
             }
-            callback.done(false);
+            callback.done(done);
         }
 
         @Override
         public void reject() {
-            // abort should mark the exchange with an rejected exception
-            boolean abort = ThreadPoolRejectedPolicy.Abort == rejectedPolicy;
-            if (abort) {
-                exchange.setException(new RejectedExecutionException());
-            }
-            LOG.trace("{} routing exchange {} ", abort ? "Aborted" : "Rejected", exchange);
-
+            // reject should mark the exchange with an rejected exception and mark not to route anymore
+            exchange.setException(new RejectedExecutionException());
+            LOG.trace("Rejected routing exchange {}", exchange);
             if (shutdown.get()) {
                 exchange.setException(new RejectedExecutionException("ThreadsProcessor is not running."));
             }
-            callback.done(false);
+            callback.done(done);
         }
 
         @Override
@@ -106,12 +105,14 @@ public class ThreadsProcessor extends ServiceSupport implements AsyncProcessor,
         }
     }
 
-    public ThreadsProcessor(CamelContext camelContext, ExecutorService executorService, boolean shutdownExecutorService) {
+    public ThreadsProcessor(CamelContext camelContext, ExecutorService executorService, boolean shutdownExecutorService, ThreadPoolRejectedPolicy rejectedPolicy) {
         ObjectHelper.notNull(camelContext, "camelContext");
         ObjectHelper.notNull(executorService, "executorService");
+        ObjectHelper.notNull(rejectedPolicy, "rejectedPolicy");
         this.camelContext = camelContext;
         this.executorService = executorService;
         this.shutdownExecutorService = shutdownExecutorService;
+        this.rejectedPolicy =rejectedPolicy;
     }
 
     public void process(final Exchange exchange) throws Exception {
@@ -131,43 +132,28 @@ public class ThreadsProcessor extends ServiceSupport implements AsyncProcessor,
             return true;
         }
 
-        ProcessCall call = new ProcessCall(exchange, callback);
         try {
+            // process the call in asynchronous mode
+            ProcessCall call = new ProcessCall(exchange, callback, false);
             LOG.trace("Submitting task {}", call);
             executorService.submit(call);
             // tell Camel routing engine we continue routing asynchronous
             return false;
-        } catch (RejectedExecutionException e) {
-            boolean callerRuns = isCallerRunsWhenRejected();
-            if (!callerRuns) {
+        } catch (Throwable e) {
+            if (executorService instanceof ThreadPoolExecutor) {
+                ThreadPoolExecutor tpe = (ThreadPoolExecutor) executorService;
+                // process the call in synchronous mode
+                ProcessCall call = new ProcessCall(exchange, callback, true);
+                rejectedPolicy.asRejectedExecutionHandler().rejectedExecution(call, tpe);
+                return true;
+            } else {
                 exchange.setException(e);
+                callback.done(true);
+                return true;
             }
-
-            LOG.trace("{} executing task {}", callerRuns ? "CallerRuns" : "Aborted", call);
-            if (shutdown.get()) {
-                exchange.setException(new RejectedExecutionException());
-            }
-            callback.done(true);
-            return true;
         }
     }
 
-    public boolean isCallerRunsWhenRejected() {
-        return callerRunsWhenRejected;
-    }
-
-    public void setCallerRunsWhenRejected(boolean callerRunsWhenRejected) {
-        this.callerRunsWhenRejected = callerRunsWhenRejected;
-    }
-
-    public ThreadPoolRejectedPolicy getRejectedPolicy() {
-        return rejectedPolicy;
-    }
-
-    public void setRejectedPolicy(ThreadPoolRejectedPolicy rejectedPolicy) {
-        this.rejectedPolicy = rejectedPolicy;
-    }
-
     public ExecutorService getExecutorService() {
         return executorService;
     }
@@ -184,6 +170,10 @@ public class ThreadsProcessor extends ServiceSupport implements AsyncProcessor,
         this.id = id;
     }
 
+    public ThreadPoolRejectedPolicy getRejectedPolicy() {
+        return rejectedPolicy;
+    }
+
     protected void doStart() throws Exception {
         shutdown.set(false);
     }