You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/05/06 12:49:59 UTC

camel git commit: CAMEL-5585: RedeliverErrorHandler - Should quicker reject running scheduled redeliver tasks if shutting down and not allowed to do redeliver

Repository: camel
Updated Branches:
  refs/heads/master 8fb2303be -> af8d184ed


CAMEL-5585: RedeliverErrorHandler - Should quicker reject running scheduled redeliver tasks if shutting down and not allowed to do redeliver


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/af8d184e
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/af8d184e
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/af8d184e

Branch: refs/heads/master
Commit: af8d184ed549169e2290c6014199ce2de1cb1586
Parents: 8fb2303
Author: Claus Ibsen <da...@apache.org>
Authored: Fri May 6 14:47:32 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri May 6 14:49:52 2016 +0200

----------------------------------------------------------------------
 .../camel/processor/RedeliveryErrorHandler.java | 61 +++++++++++++++-
 ...iveryWhileStoppingDeadLetterChannelTest.java | 73 ++++++++++++++++++++
 .../NotAllowRedeliveryWhileStoppingTest.java    | 61 ++++++++++++++++
 3 files changed, 193 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/af8d184e/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java b/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
index e172796..361f088 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
@@ -48,6 +48,7 @@ import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.MessageHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.ServiceHelper;
+import org.apache.camel.util.StopWatch;
 import org.apache.camel.util.URISupport;
 
 /**
@@ -119,11 +120,58 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
     }
 
     /**
+     * Task for sleeping during redelivery attempts.
+     * <p/>
+     * This task is for the synchronous blocking. If using async delayed then a scheduled thread pool
+     * is used for sleeping and trigger redeliveries.
+     */
+    private final class RedeliverSleepTask {
+
+        private final RedeliveryPolicy policy;
+        private final long delay;
+
+        RedeliverSleepTask(RedeliveryPolicy policy, long delay) {
+            this.policy = policy;
+            this.delay = delay;
+        }
+
+        public boolean sleep() throws InterruptedException {
+            // for small delays then just sleep
+            if (delay < 1000) {
+                policy.sleep(delay);
+                return true;
+            }
+
+            StopWatch watch = new StopWatch();
+
+            log.debug("Sleeping for: {} millis until attempting redelivery", delay);
+            while (watch.taken() < delay) {
+                // sleep using 1 sec interval
+
+                long delta = delay - watch.taken();
+                long max = Math.min(1000, delta);
+                if (max > 0) {
+                    log.trace("Sleeping for: {} millis until waking up for re-check", max);
+                    Thread.sleep(max);
+                }
+
+                // are we preparing for shutdown then only do redelivery if allowed
+                if (preparingShutdown && !policy.isAllowRedeliveryWhileStopping()) {
+                    log.debug("Rejected redelivery while stopping");
+                    return false;
+                }
+            }
+
+            return true;
+        }
+    }
+
+    /**
      * Tasks which performs asynchronous redelivery attempts, and being triggered by a
      * {@link java.util.concurrent.ScheduledExecutorService} to avoid having any threads blocking if a task
      * has to be delayed before a redelivery attempt is performed.
      */
-    private class AsyncRedeliveryTask implements Callable<Boolean> {
+    private final class AsyncRedeliveryTask implements Callable<Boolean> {
 
         private final Exchange exchange;
         private final AsyncCallback callback;
@@ -439,8 +487,17 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
                         try {
                             // we are doing synchronous redelivery and use thread sleep, so we keep track using a counter how many are sleeping
                             redeliverySleepCounter.incrementAndGet();
-                            data.currentRedeliveryPolicy.sleep(data.redeliveryDelay);
+                            RedeliverSleepTask task = new RedeliverSleepTask(data.currentRedeliveryPolicy, data.redeliveryDelay);
+                            boolean complete = task.sleep();
                             redeliverySleepCounter.decrementAndGet();
+                            if (!complete) {
+                                // the task was rejected
+                                exchange.setException(new RejectedExecutionException("Redelivery not allowed while stopping"));
+                                // mark the exchange as redelivery exhausted so the failure processor / dead letter channel can process the exchange
+                                exchange.setProperty(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE);
+                                // jump to start of loop which then detects that we are failed and exhausted
+                                continue;
+                            }
                         } catch (InterruptedException e) {
                             redeliverySleepCounter.decrementAndGet();
                             // we was interrupted so break out

http://git-wip-us.apache.org/repos/asf/camel/blob/af8d184e/camel-core/src/test/java/org/apache/camel/processor/NotAllowRedeliveryWhileStoppingDeadLetterChannelTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/NotAllowRedeliveryWhileStoppingDeadLetterChannelTest.java b/camel-core/src/test/java/org/apache/camel/processor/NotAllowRedeliveryWhileStoppingDeadLetterChannelTest.java
new file mode 100644
index 0000000..519e219
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/NotAllowRedeliveryWhileStoppingDeadLetterChannelTest.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.concurrent.RejectedExecutionException;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.util.StopWatch;
+
+/**
+ * @version 
+ */
+public class NotAllowRedeliveryWhileStoppingDeadLetterChannelTest extends ContextTestSupport {
+
+    public void testRedelivery() throws Exception {
+        StopWatch watch = new StopWatch();
+
+        MockEndpoint before = getMockEndpoint("mock:foo");
+        before.expectedMessageCount(1);
+
+        template.sendBody("seda:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+
+        Thread.sleep(500);
+
+        context.stopRoute("foo");
+
+        // we should reject the task and stop quickly
+        assertTrue("Should stop quickly: " + watch.taken(), watch.taken() < 5000);
+
+        // should go to DLC
+        Exchange dead = getMockEndpoint("mock:dead").getExchanges().get(0);
+        assertNotNull(dead);
+
+        Throwable cause = dead.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class);
+        assertNotNull(cause);
+        assertIsInstanceOf(RejectedExecutionException.class, cause);
+        assertEquals("Redelivery not allowed while stopping", cause.getMessage());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                errorHandler(deadLetterChannel("mock:dead")
+                    .maximumRedeliveries(5).redeliveryDelay(5000).allowRedeliveryWhileStopping(false));
+
+                from("seda:start").routeId("foo")
+                    .to("mock:foo")
+                    .throwException(new IllegalArgumentException("Forced"));
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/af8d184e/camel-core/src/test/java/org/apache/camel/processor/NotAllowRedeliveryWhileStoppingTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/NotAllowRedeliveryWhileStoppingTest.java b/camel-core/src/test/java/org/apache/camel/processor/NotAllowRedeliveryWhileStoppingTest.java
new file mode 100644
index 0000000..5ca4b59
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/NotAllowRedeliveryWhileStoppingTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.util.StopWatch;
+
+/**
+ * @version 
+ */
+public class NotAllowRedeliveryWhileStoppingTest extends ContextTestSupport {
+
+    public void testRedelivery() throws Exception {
+        StopWatch watch = new StopWatch();
+
+        MockEndpoint before = getMockEndpoint("mock:foo");
+        before.expectedMessageCount(1);
+
+        template.sendBody("seda:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+
+        Thread.sleep(500);
+
+        context.stop();
+
+        // we should reject the task and stop quickly
+        assertTrue("Should stop quickly: " + watch.taken(), watch.taken() < 5000);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                errorHandler(defaultErrorHandler()
+                    .maximumRedeliveries(5).redeliveryDelay(5000).allowRedeliveryWhileStopping(false));
+
+                from("seda:start")
+                    .to("mock:foo")
+                    .throwException(new IllegalArgumentException("Forced"));
+            }
+        };
+    }
+}