You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/05/06 12:49:59 UTC
camel git commit: CAMEL-5585: RedeliverErrorHandler - Should quicker
reject running scheduled redeliver tasks if shutting down and not allowed to
do redeliver
Repository: camel
Updated Branches:
refs/heads/master 8fb2303be -> af8d184ed
CAMEL-5585: RedeliverErrorHandler - Should quicker reject running scheduled redeliver tasks if shutting down and not allowed to do redeliver
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/af8d184e
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/af8d184e
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/af8d184e
Branch: refs/heads/master
Commit: af8d184ed549169e2290c6014199ce2de1cb1586
Parents: 8fb2303
Author: Claus Ibsen <da...@apache.org>
Authored: Fri May 6 14:47:32 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri May 6 14:49:52 2016 +0200
----------------------------------------------------------------------
.../camel/processor/RedeliveryErrorHandler.java | 61 +++++++++++++++-
...iveryWhileStoppingDeadLetterChannelTest.java | 73 ++++++++++++++++++++
.../NotAllowRedeliveryWhileStoppingTest.java | 61 ++++++++++++++++
3 files changed, 193 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/af8d184e/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java b/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
index e172796..361f088 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
@@ -48,6 +48,7 @@ import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.MessageHelper;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.ServiceHelper;
+import org.apache.camel.util.StopWatch;
import org.apache.camel.util.URISupport;
/**
@@ -119,11 +120,58 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
}
/**
+ * Task for sleeping during redelivery attempts.
+ * <p/>
+ * This task is for the synchronous blocking. If using async delayed then a scheduled thread pool
+ * is used for sleeping and trigger redeliveries.
+ */
+ private final class RedeliverSleepTask {
+
+ private final RedeliveryPolicy policy;
+ private final long delay;
+
+ RedeliverSleepTask(RedeliveryPolicy policy, long delay) {
+ this.policy = policy;
+ this.delay = delay;
+ }
+
+ public boolean sleep() throws InterruptedException {
+ // for small delays then just sleep
+ if (delay < 1000) {
+ policy.sleep(delay);
+ return true;
+ }
+
+ StopWatch watch = new StopWatch();
+
+ log.debug("Sleeping for: {} millis until attempting redelivery", delay);
+ while (watch.taken() < delay) {
+ // sleep using 1 sec interval
+
+ long delta = delay - watch.taken();
+ long max = Math.min(1000, delta);
+ if (max > 0) {
+ log.trace("Sleeping for: {} millis until waking up for re-check", max);
+ Thread.sleep(max);
+ }
+
+ // are we preparing for shutdown then only do redelivery if allowed
+ if (preparingShutdown && !policy.isAllowRedeliveryWhileStopping()) {
+ log.debug("Rejected redelivery while stopping");
+ return false;
+ }
+ }
+
+ return true;
+ }
+ }
+
+ /**
* Tasks which performs asynchronous redelivery attempts, and being triggered by a
* {@link java.util.concurrent.ScheduledExecutorService} to avoid having any threads blocking if a task
* has to be delayed before a redelivery attempt is performed.
*/
- private class AsyncRedeliveryTask implements Callable<Boolean> {
+ private final class AsyncRedeliveryTask implements Callable<Boolean> {
private final Exchange exchange;
private final AsyncCallback callback;
@@ -439,8 +487,17 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
try {
// we are doing synchronous redelivery and use thread sleep, so we keep track using a counter how many are sleeping
redeliverySleepCounter.incrementAndGet();
- data.currentRedeliveryPolicy.sleep(data.redeliveryDelay);
+ RedeliverSleepTask task = new RedeliverSleepTask(data.currentRedeliveryPolicy, data.redeliveryDelay);
+ boolean complete = task.sleep();
redeliverySleepCounter.decrementAndGet();
+ if (!complete) {
+ // the task was rejected
+ exchange.setException(new RejectedExecutionException("Redelivery not allowed while stopping"));
+ // mark the exchange as redelivery exhausted so the failure processor / dead letter channel can process the exchange
+ exchange.setProperty(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE);
+ // jump to start of loop which then detects that we are failed and exhausted
+ continue;
+ }
} catch (InterruptedException e) {
redeliverySleepCounter.decrementAndGet();
// we was interrupted so break out
http://git-wip-us.apache.org/repos/asf/camel/blob/af8d184e/camel-core/src/test/java/org/apache/camel/processor/NotAllowRedeliveryWhileStoppingDeadLetterChannelTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/NotAllowRedeliveryWhileStoppingDeadLetterChannelTest.java b/camel-core/src/test/java/org/apache/camel/processor/NotAllowRedeliveryWhileStoppingDeadLetterChannelTest.java
new file mode 100644
index 0000000..519e219
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/NotAllowRedeliveryWhileStoppingDeadLetterChannelTest.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.concurrent.RejectedExecutionException;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.util.StopWatch;
+
+/**
+ * @version
+ */
+public class NotAllowRedeliveryWhileStoppingDeadLetterChannelTest extends ContextTestSupport {
+
+ public void testRedelivery() throws Exception {
+ StopWatch watch = new StopWatch();
+
+ MockEndpoint before = getMockEndpoint("mock:foo");
+ before.expectedMessageCount(1);
+
+ template.sendBody("seda:start", "Hello World");
+
+ assertMockEndpointsSatisfied();
+
+ Thread.sleep(500);
+
+ context.stopRoute("foo");
+
+ // we should reject the task and stop quickly
+ assertTrue("Should stop quickly: " + watch.taken(), watch.taken() < 5000);
+
+ // should go to DLC
+ Exchange dead = getMockEndpoint("mock:dead").getExchanges().get(0);
+ assertNotNull(dead);
+
+ Throwable cause = dead.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class);
+ assertNotNull(cause);
+ assertIsInstanceOf(RejectedExecutionException.class, cause);
+ assertEquals("Redelivery not allowed while stopping", cause.getMessage());
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ errorHandler(deadLetterChannel("mock:dead")
+ .maximumRedeliveries(5).redeliveryDelay(5000).allowRedeliveryWhileStopping(false));
+
+ from("seda:start").routeId("foo")
+ .to("mock:foo")
+ .throwException(new IllegalArgumentException("Forced"));
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/af8d184e/camel-core/src/test/java/org/apache/camel/processor/NotAllowRedeliveryWhileStoppingTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/NotAllowRedeliveryWhileStoppingTest.java b/camel-core/src/test/java/org/apache/camel/processor/NotAllowRedeliveryWhileStoppingTest.java
new file mode 100644
index 0000000..5ca4b59
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/NotAllowRedeliveryWhileStoppingTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.util.StopWatch;
+
+/**
+ * @version
+ */
+public class NotAllowRedeliveryWhileStoppingTest extends ContextTestSupport {
+
+ public void testRedelivery() throws Exception {
+ StopWatch watch = new StopWatch();
+
+ MockEndpoint before = getMockEndpoint("mock:foo");
+ before.expectedMessageCount(1);
+
+ template.sendBody("seda:start", "Hello World");
+
+ assertMockEndpointsSatisfied();
+
+ Thread.sleep(500);
+
+ context.stop();
+
+ // we should reject the task and stop quickly
+ assertTrue("Should stop quickly: " + watch.taken(), watch.taken() < 5000);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ errorHandler(defaultErrorHandler()
+ .maximumRedeliveries(5).redeliveryDelay(5000).allowRedeliveryWhileStopping(false));
+
+ from("seda:start")
+ .to("mock:foo")
+ .throwException(new IllegalArgumentException("Forced"));
+ }
+ };
+ }
+}