You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/07/02 09:23:34 UTC
svn commit: r959882 - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/builder/
camel-core/src/main/java/org/apache/camel/model/
camel-core/src/main/java/org/apache/camel/processor/
camel-core/src/test/java/org/apache/camel/processor/ camel-c...
Author: davsclaus
Date: Fri Jul 2 07:23:33 2010
New Revision: 959882
URL: http://svn.apache.org/viewvc?rev=959882&view=rev
Log:
CAMEL-2890: Will now not block while sleeping between redelivery attempts.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryErrorHandlerBlockedDelayTest.java (with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryErrorHandlerNonBlockedDelayTest.java (with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryOnExceptionBlockedDelayTest.java (with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRedeliveryErrorHandlerNonBlockedDelay2Test.java (with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRedeliveryErrorHandlerNonBlockedDelay3Test.java (with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRedeliveryErrorHandlerNonBlockedDelayTest.java (with props)
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DefaultErrorHandlerBuilder.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnExceptionDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/RedeliveryPolicyDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactionalClientDataSourceRedeliveryTest.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DefaultErrorHandlerBuilder.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DefaultErrorHandlerBuilder.java?rev=959882&r1=959881&r2=959882&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DefaultErrorHandlerBuilder.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DefaultErrorHandlerBuilder.java Fri Jul 2 07:23:33 2010
@@ -87,6 +87,11 @@ public class DefaultErrorHandlerBuilder
return this;
}
+ public DefaultErrorHandlerBuilder syncDelayedRedelivery() {
+ getRedeliveryPolicy().syncDelayedRedelivery();
+ return this;
+ }
+
public DefaultErrorHandlerBuilder delayPattern(String delayPattern) {
getRedeliveryPolicy().delayPattern(delayPattern);
return this;
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnExceptionDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnExceptionDefinition.java?rev=959882&r1=959881&r2=959882&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnExceptionDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnExceptionDefinition.java Fri Jul 2 07:23:33 2010
@@ -19,7 +19,6 @@ package org.apache.camel.model;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlAttribute;
@@ -368,6 +367,17 @@ public class OnExceptionDefinition exten
}
/**
+ * Only allow synchronous delayed redelivery.
+ *
+ * @see org.apache.camel.processor.RedeliveryPolicy#setAsyncDelayedRedelivery(boolean)
+ * @return the builder
+ */
+ public OnExceptionDefinition syncDelayedRedelivery() {
+ getOrCreateRedeliveryPolicy().setSyncDelayedRedelivery(true);
+ return this;
+ }
+
+ /**
* Sets the logging level to use when retries has exhausted
*
* @param retriesExhaustedLogLevel the logging level
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/RedeliveryPolicyDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/RedeliveryPolicyDefinition.java?rev=959882&r1=959881&r2=959882&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/RedeliveryPolicyDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/RedeliveryPolicyDefinition.java Fri Jul 2 07:23:33 2010
@@ -41,6 +41,8 @@ public class RedeliveryPolicyDefinition
@XmlAttribute
private Long redeliveryDelay;
@XmlAttribute
+ private Boolean syncDelayedRedelivery;
+ @XmlAttribute
private Double backOffMultiplier;
@XmlAttribute
private Boolean useExponentialBackOff;
@@ -91,6 +93,9 @@ public class RedeliveryPolicyDefinition
if (redeliveryDelay != null) {
answer.setRedeliveryDelay(redeliveryDelay);
}
+ if (syncDelayedRedelivery != null && syncDelayedRedelivery) {
+ answer.syncDelayedRedelivery();
+ }
if (retriesExhaustedLogLevel != null) {
answer.setRetriesExhaustedLogLevel(retriesExhaustedLogLevel);
}
@@ -380,6 +385,14 @@ public class RedeliveryPolicyDefinition
this.redeliveryDelay = delay;
}
+ public Boolean getSyncDelayedRedelivery() {
+ return syncDelayedRedelivery;
+ }
+
+ public void setSyncDelayedRedelivery(Boolean syncDelayedRedelivery) {
+ this.syncDelayedRedelivery = syncDelayedRedelivery;
+ }
+
public Integer getMaximumRedeliveries() {
return maximumRedeliveries;
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java?rev=959882&r1=959881&r2=959882&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java Fri Jul 2 07:23:33 2010
@@ -68,6 +68,7 @@ public abstract class RedeliveryErrorHan
int redeliveryCounter;
long redeliveryDelay;
Predicate retryWhilePredicate;
+ boolean redeliverFromSync;
// default behavior which can be overloaded on a per exception basis
RedeliveryPolicy currentRedeliveryPolicy = redeliveryPolicy;
@@ -84,13 +85,13 @@ public abstract class RedeliveryErrorHan
* {@link java.util.concurrent.ScheduledExecutorService} to avoid having any threads blocking if a task
* has to be delayed before a redelivery attempt is performed.
*/
- private class RedeliveryTask implements Callable<Boolean> {
+ private class AsyncRedeliveryTask implements Callable<Boolean> {
private final Exchange exchange;
private final AsyncCallback callback;
private final RedeliveryData data;
- public RedeliveryTask(Exchange exchange, AsyncCallback callback, RedeliveryData data) {
+ public AsyncRedeliveryTask(Exchange exchange, AsyncCallback callback, RedeliveryData data) {
this.exchange = exchange;
this.callback = callback;
this.data = data;
@@ -101,36 +102,66 @@ public abstract class RedeliveryErrorHan
prepareExchangeForRedelivery(exchange);
// letting onRedeliver be executed at first
- deliverToRedeliveryProcessor(exchange, data);
+ deliverToOnRedeliveryProcessor(exchange, data);
if (log.isTraceEnabled()) {
log.trace("Redelivering exchangeId: " + exchange.getExchangeId() + " -> " + outputAsync + " for Exchange: " + exchange);
}
// process the exchange (also redelivery)
- boolean sync = AsyncProcessorHelper.process(outputAsync, exchange, new AsyncCallback() {
- public void done(boolean sync) {
- if (log.isTraceEnabled()) {
- log.trace("Redelivering exchangeId: " + exchange.getExchangeId() + " done");
- }
+ boolean sync;
+ if (data.redeliverFromSync) {
+ // this redelivery task was scheduled from synchronous, which we forced to be asynchronous from
+ // this error handler, which means we have to invoke the callback with false, to have the callback
+ // be notified when we are done
+ sync = AsyncProcessorHelper.process(outputAsync, exchange, new AsyncCallback() {
+ public void done(boolean doneSync) {
+ if (log.isTraceEnabled()) {
+ log.trace("Redelivering exchangeId: " + exchange.getExchangeId() + " done sync: " + doneSync);
+ }
+
+ // mark we are in sync mode now
+ data.sync = false;
+
+ // only process if the exchange hasn't failed
+ // and it has not been handled by the error processor
+ if (isDone(exchange)) {
+ callback.done(false);
+ return;
+ }
- // this callback should only handle the async case
- if (sync) {
- return;
+ // error occurred so loop back around which we do by invoking the processAsyncErrorHandler
+ processAsyncErrorHandler(exchange, callback, data);
}
-
- // mark we are in async mode now
- data.sync = false;
- // only process if the exchange hasn't failed
- // and it has not been handled by the error processor
- if (isDone(exchange)) {
- callback.done(sync);
- return;
+ });
+ } else {
+ // this redelivery task was scheduled from asynchronous, which means we should only
+ // handle when the asynchronous task was done
+ sync = AsyncProcessorHelper.process(outputAsync, exchange, new AsyncCallback() {
+ public void done(boolean doneSync) {
+ if (log.isTraceEnabled()) {
+ log.trace("Redelivering exchangeId: " + exchange.getExchangeId() + " done sync: " + doneSync);
+ }
+
+ // this callback should only handle the async case
+ if (doneSync) {
+ return;
+ }
+
+ // mark we are in async mode now
+ data.sync = false;
+
+ // only process if the exchange hasn't failed
+ // and it has not been handled by the error processor
+ if (isDone(exchange)) {
+ callback.done(doneSync);
+ return;
+ }
+ // error occurred so loop back around which we do by invoking the processAsyncErrorHandler
+ processAsyncErrorHandler(exchange, callback, data);
}
- // error occurred so loop back around which we do by invoking the processAsyncErrorHandler
- processAsyncErrorHandler(exchange, callback, data);
- }
- });
+ });
+ }
return sync;
}
@@ -180,6 +211,7 @@ public abstract class RedeliveryErrorHan
if (exchange.getException() == null) {
exchange.setException(new RejectedExecutionException());
}
+ // we cannot process so invoke callback
callback.done(data.sync);
return data.sync;
}
@@ -203,23 +235,48 @@ public abstract class RedeliveryErrorHan
}
if (data.redeliveryCounter > 0) {
- // prepare for redelivery
- prepareExchangeForRedelivery(exchange);
+ // calculate delay
+ data.redeliveryDelay = data.currentRedeliveryPolicy.calculateRedeliveryDelay(data.redeliveryDelay, data.redeliveryCounter);
+
+ if (data.redeliveryDelay > 0) {
+ // okay there is a delay so create a scheduled task to have it executed in the future
- // if we are redelivering then sleep before trying again
- // wait until we should redeliver
- try {
- data.redeliveryDelay = data.currentRedeliveryPolicy.sleep(data.redeliveryDelay, data.redeliveryCounter);
- } catch (InterruptedException e) {
- if (log.isDebugEnabled()) {
- log.debug("Sleep interrupted, are we stopping? " + (isStopping() || isStopped()));
+ if (data.currentRedeliveryPolicy.isAsyncDelayedRedelivery() && !exchange.isTransacted()) {
+ // let the RedeliverTask be the logic which tries to redeliver the Exchange which we can used a scheduler to
+ // have it being executed in the future, or immediately
+ // we are continuing asynchronously
+
+ // mark we are routing async from now and that this redelivery task came from a synchronous routing
+ data.sync = false;
+ data.redeliverFromSync = true;
+ AsyncRedeliveryTask task = new AsyncRedeliveryTask(exchange, callback, data);
+
+ // schedule the redelivery task
+ if (log.isTraceEnabled()) {
+ log.trace("Scheduling redelivery task to run in " + data.redeliveryDelay + " millis for exchangeId: " + exchange.getExchangeId());
+ }
+ executorService.schedule(task, data.redeliveryDelay, TimeUnit.MILLISECONDS);
+
+ return false;
+ } else {
+ // async delayed redelivery was disabled or we are transacted so we must be synchronous
+ // as the transaction manager requires to execute in the same thread context
+ try {
+ data.currentRedeliveryPolicy.sleep(data.redeliveryDelay);
+ } catch (InterruptedException e) {
+ // we was interrupted so break out
+ exchange.setException(e);
+ callback.done(data.sync);
+ return data.sync;
+ }
}
- // continue from top
- continue;
}
+ // prepare for redelivery
+ prepareExchangeForRedelivery(exchange);
+
// letting onRedeliver be executed
- deliverToRedeliveryProcessor(exchange, data);
+ deliverToOnRedeliveryProcessor(exchange, data);
}
// process the exchange (also redelivery)
@@ -300,7 +357,9 @@ public abstract class RedeliveryErrorHan
if (data.redeliveryCounter > 0) {
// let the RedeliverTask be the logic which tries to redeliver the Exchange which we can used a scheduler to
// have it being executed in the future, or immediately
- RedeliveryTask task = new RedeliveryTask(exchange, callback, data);
+ // Note: the data.redeliverFromSync should be kept as is, in case it was enabled previously
+ // to ensure the callback will continue routing from where we left
+ AsyncRedeliveryTask task = new AsyncRedeliveryTask(exchange, callback, data);
// calculate the redelivery delay
data.redeliveryDelay = data.currentRedeliveryPolicy.calculateRedeliveryDelay(data.redeliveryDelay, data.redeliveryCounter);
@@ -442,7 +501,7 @@ public abstract class RedeliveryErrorHan
* Gives an optional configure redelivery processor a chance to process before the Exchange
* will be redelivered. This can be used to alter the Exchange.
*/
- protected void deliverToRedeliveryProcessor(final Exchange exchange, final RedeliveryData data) {
+ protected void deliverToOnRedeliveryProcessor(final Exchange exchange, final RedeliveryData data) {
if (data.onRedeliveryProcessor == null) {
return;
}
@@ -746,7 +805,7 @@ public abstract class RedeliveryErrorHan
if (executorService == null || executorService.isShutdown()) {
// camel context will shutdown the executor when it shutdown so no need to shut it down when stopping
executorService = camelContext.getExecutorServiceStrategy().newScheduledThreadPool(this,
- "RedeliveryErrorHandler-AsyncRedeliveryTask", poolSize);
+ "RedeliveryErrorHandler-RedeliveryTask", poolSize);
}
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java?rev=959882&r1=959881&r2=959882&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java Fri Jul 2 07:23:33 2010
@@ -37,6 +37,7 @@ import org.apache.commons.logging.LogFac
* <li>maximumRedeliveries = 0</li>
* <li>redeliveryDelay = 1000L (the initial delay)</li>
* <li>maximumRedeliveryDelay = 60 * 1000L</li>
+ * <li>asyncDelayedRedelivery = true</li>
* <li>backOffMultiplier = 2</li>
* <li>useExponentialBackOff = false</li>
* <li>collisionAvoidanceFactor = 0.15d</li>
@@ -94,6 +95,7 @@ public class RedeliveryPolicy implements
protected boolean logExhausted = true;
protected boolean logRetryAttempted = true;
protected String delayPattern;
+ protected boolean asyncDelayedRedelivery = true;
public RedeliveryPolicy() {
}
@@ -103,6 +105,7 @@ public class RedeliveryPolicy implements
return "RedeliveryPolicy[maximumRedeliveries=" + maximumRedeliveries
+ ", redeliveryDelay=" + redeliveryDelay
+ ", maximumRedeliveryDelay=" + maximumRedeliveryDelay
+ + ", asyncDelayedRedelivery=" + asyncDelayedRedelivery
+ ", retriesExhaustedLogLevel=" + retriesExhaustedLogLevel
+ ", retryAttemptedLogLevel=" + retryAttemptedLogLevel
+ ", logRetryAttempted=" + logRetryAttempted
@@ -164,15 +167,25 @@ public class RedeliveryPolicy implements
redeliveryDelay = calculateRedeliveryDelay(redeliveryDelay, redeliveryCounter);
if (redeliveryDelay > 0) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Sleeping for: " + redeliveryDelay + " millis until attempting redelivery");
- }
- Thread.sleep(redeliveryDelay);
+ sleep(redeliveryDelay);
}
return redeliveryDelay;
}
/**
+ * Sleeps for the given delay
+ *
+ * @param redeliveryDelay the delay
+ * @throws InterruptedException is thrown if the sleep is interrupted likely because of shutdown
+ */
+ public void sleep(long redeliveryDelay) throws InterruptedException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sleeping for: " + redeliveryDelay + " millis until attempting redelivery");
+ }
+ Thread.sleep(redeliveryDelay);
+ }
+
+ /**
* Calculates the new redelivery delay based on the last one
*
* @param previousDelay previous redelivery delay
@@ -234,7 +247,6 @@ public class RedeliveryPolicy implements
return answer;
}
-
// Builder methods
// -------------------------------------------------------------------------
@@ -382,6 +394,16 @@ public class RedeliveryPolicy implements
return this;
}
+ /**
+ * Only allow synchronous delayed redelivery.
+ *
+ * @see #setAsyncDelayedRedelivery(boolean)
+ */
+ public RedeliveryPolicy syncDelayedRedelivery() {
+ setAsyncDelayedRedelivery(false);
+ return this;
+ }
+
// Properties
// -------------------------------------------------------------------------
@Deprecated
@@ -598,4 +620,24 @@ public class RedeliveryPolicy implements
public void setLogExhausted(boolean logExhausted) {
this.logExhausted = logExhausted;
}
+
+ public boolean isAsyncDelayedRedelivery() {
+ return asyncDelayedRedelivery;
+ }
+
+ /**
+ * Sets whether asynchronous delayed redelivery is allowed.
+ * <p/>
+ * This is enabled by default, which allows Camel to schedule a future task for delayed
+ * redelivery which prevents current thread from blocking while waiting. You can use
+ * this option to turn it off to ensure current thread will block.
+ * <p/>
+ * Exchange which is transacted will however always use synchronous delayed redelivery
+ * because the transaction must execute in the same thread context.
+ *
+ * @param asyncDelayedRedelivery whether asynchronous delayed redelivery is allowed
+ */
+ public void setAsyncDelayedRedelivery(boolean asyncDelayedRedelivery) {
+ this.asyncDelayedRedelivery = asyncDelayedRedelivery;
+ }
}
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryErrorHandlerBlockedDelayTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryErrorHandlerBlockedDelayTest.java?rev=959882&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryErrorHandlerBlockedDelayTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryErrorHandlerBlockedDelayTest.java Fri Jul 2 07:23:33 2010
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @version $Revision$
+ */
+public class RedeliveryErrorHandlerBlockedDelayTest extends ContextTestSupport {
+
+ private static final Log LOG = LogFactory.getLog(RedeliveryErrorHandlerBlockedDelayTest.class);
+
+ private static volatile int attempt;
+
+ public void testRedelivery() throws Exception {
+ MockEndpoint before = getMockEndpoint("mock:result");
+ before.expectedBodiesReceived("Hello World", "Hello Camel");
+
+ // we use blocked redelivery delay so the messages arrive in the same order
+ MockEndpoint result = getMockEndpoint("mock:result");
+ result.expectedBodiesReceived("Hello World", "Hello Camel");
+
+ template.sendBody("seda:start", "World");
+ template.sendBody("seda:start", "Camel");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ // do block redelivery attempts
+ errorHandler(defaultErrorHandler()
+ .maximumRedeliveries(5).redeliveryDelay(2000).syncDelayedRedelivery());
+
+ from("seda:start")
+ .to("log:before")
+ .to("mock:before")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ LOG.info("Processing at attempt " + attempt + " " + exchange);
+
+ String body = exchange.getIn().getBody(String.class);
+ if (body.contains("World")) {
+ if (++attempt <= 2) {
+ LOG.info("Processing failed will thrown an exception");
+ throw new IllegalArgumentException("Damn");
+ }
+ }
+
+ exchange.getIn().setBody("Hello " + body);
+ LOG.info("Processing at attempt " + attempt + " complete " + exchange);
+ }
+ })
+ .to("log:after")
+ .to("mock:result");
+ }
+ };
+ }
+}
\ No newline at end of file
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryErrorHandlerBlockedDelayTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryErrorHandlerBlockedDelayTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryErrorHandlerNonBlockedDelayTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryErrorHandlerNonBlockedDelayTest.java?rev=959882&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryErrorHandlerNonBlockedDelayTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryErrorHandlerNonBlockedDelayTest.java Fri Jul 2 07:23:33 2010
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @version $Revision$
+ */
+public class RedeliveryErrorHandlerNonBlockedDelayTest extends ContextTestSupport {
+
+ private static final Log LOG = LogFactory.getLog(RedeliveryErrorHandlerNonBlockedDelayTest.class);
+
+ private static volatile int attempt;
+
+ public void testRedelivery() throws Exception {
+ MockEndpoint before = getMockEndpoint("mock:result");
+ before.expectedBodiesReceived("Hello World", "Hello Camel");
+
+ // we use NON blocked redelivery delay so the messages arrive which completes first
+ MockEndpoint result = getMockEndpoint("mock:result");
+ result.expectedBodiesReceived("Hello Camel", "Hello World");
+
+ template.sendBody("seda:start", "World");
+ template.sendBody("seda:start", "Camel");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ errorHandler(defaultErrorHandler().maximumRedeliveries(5).redeliveryDelay(2000));
+
+ from("seda:start")
+ .to("log:before")
+ .to("mock:before")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ LOG.info("Processing at attempt " + attempt + " " + exchange);
+
+ String body = exchange.getIn().getBody(String.class);
+ if (body.contains("World")) {
+ if (++attempt <= 2) {
+ LOG.info("Processing failed will thrown an exception");
+ throw new IllegalArgumentException("Damn");
+ }
+ }
+
+ exchange.getIn().setBody("Hello " + body);
+ LOG.info("Processing at attempt " + attempt + " complete " + exchange);
+ }
+ })
+ .to("log:after")
+ .to("mock:result");
+ }
+ };
+ }
+}
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryErrorHandlerNonBlockedDelayTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryErrorHandlerNonBlockedDelayTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryOnExceptionBlockedDelayTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryOnExceptionBlockedDelayTest.java?rev=959882&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryOnExceptionBlockedDelayTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryOnExceptionBlockedDelayTest.java Fri Jul 2 07:23:33 2010
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @version $Revision$
+ */
+public class RedeliveryOnExceptionBlockedDelayTest extends ContextTestSupport {
+
+ private static final Log LOG = LogFactory.getLog(RedeliveryOnExceptionBlockedDelayTest.class);
+
+ private static volatile int attempt;
+
+ public void testRedelivery() throws Exception {
+ MockEndpoint before = getMockEndpoint("mock:result");
+ before.expectedBodiesReceived("Hello World", "Hello Camel");
+
+ // we use blocked redelivery delay so the messages arrive in the same order
+ MockEndpoint result = getMockEndpoint("mock:result");
+ result.expectedBodiesReceived("Hello World", "Hello Camel");
+
+ template.sendBody("seda:start", "World");
+ template.sendBody("seda:start", "Camel");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ // do block if this exception was thrown
+ onException(IllegalArgumentException.class)
+ .maximumRedeliveries(5).redeliveryDelay(2000).syncDelayedRedelivery();
+
+ from("seda:start")
+ .to("log:before")
+ .to("mock:before")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ LOG.info("Processing at attempt " + attempt + " " + exchange);
+
+ String body = exchange.getIn().getBody(String.class);
+ if (body.contains("World")) {
+ if (++attempt <= 2) {
+ LOG.info("Processing failed will thrown an exception");
+ throw new IllegalArgumentException("Damn");
+ }
+ }
+
+ exchange.getIn().setBody("Hello " + body);
+ LOG.info("Processing at attempt " + attempt + " complete " + exchange);
+ }
+ })
+ .to("log:after")
+ .to("mock:result");
+ }
+ };
+ }
+}
\ No newline at end of file
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryOnExceptionBlockedDelayTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryOnExceptionBlockedDelayTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRedeliveryErrorHandlerNonBlockedDelay2Test.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRedeliveryErrorHandlerNonBlockedDelay2Test.java?rev=959882&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRedeliveryErrorHandlerNonBlockedDelay2Test.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRedeliveryErrorHandlerNonBlockedDelay2Test.java Fri Jul 2 07:23:33 2010
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.async;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @version $Revision$
+ */
+public class AsyncEndpointRedeliveryErrorHandlerNonBlockedDelay2Test extends ContextTestSupport {
+
+ private static final Log LOG = LogFactory.getLog(AsyncEndpointRedeliveryErrorHandlerNonBlockedDelay2Test.class);
+
+ private static volatile int attempt;
+ private static String beforeThreadName;
+ private static String afterThreadName;
+
+ public void testRedelivery() throws Exception {
+ MockEndpoint before = getMockEndpoint("mock:result");
+ before.expectedBodiesReceived("World");
+
+ MockEndpoint result = getMockEndpoint("mock:result");
+ result.expectedBodiesReceived("Hello Camel");
+
+ template.sendBody("seda:start", "World");
+
+ assertMockEndpointsSatisfied();
+
+ assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ context.addComponent("async", new MyAsyncComponent());
+
+ errorHandler(defaultErrorHandler().maximumRedeliveries(5).redeliveryDelay(2000));
+
+ from("seda:start")
+ .to("log:before")
+ .to("mock:before")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ beforeThreadName = Thread.currentThread().getName();
+ }
+ })
+ .to("async:Camel")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ LOG.info("Processing at attempt " + attempt + " " + exchange);
+
+ String body = exchange.getIn().getBody(String.class);
+ if (body.contains("Camel")) {
+ if (++attempt <= 2) {
+ LOG.info("Processing failed will thrown an exception");
+ throw new IllegalArgumentException("Damn");
+ }
+ }
+
+ exchange.getIn().setBody("Hello " + body);
+ LOG.info("Processing at attempt " + attempt + " complete " + exchange);
+ }
+ })
+ .to("log:after")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ afterThreadName = Thread.currentThread().getName();
+ }
+ })
+ .to("mock:result");
+ }
+ };
+ }
+}
\ No newline at end of file
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRedeliveryErrorHandlerNonBlockedDelay2Test.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRedeliveryErrorHandlerNonBlockedDelay2Test.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRedeliveryErrorHandlerNonBlockedDelay3Test.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRedeliveryErrorHandlerNonBlockedDelay3Test.java?rev=959882&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRedeliveryErrorHandlerNonBlockedDelay3Test.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRedeliveryErrorHandlerNonBlockedDelay3Test.java Fri Jul 2 07:23:33 2010
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.async;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version $Revision$
+ */
+public class AsyncEndpointRedeliveryErrorHandlerNonBlockedDelay3Test extends ContextTestSupport {
+
+ private static String beforeThreadName;
+ private static String afterThreadName;
+
+ public void testRedelivery() throws Exception {
+ MockEndpoint before = getMockEndpoint("mock:result");
+ before.expectedBodiesReceived("Hello World");
+
+ MockEndpoint result = getMockEndpoint("mock:result");
+ result.expectedBodiesReceived("Bye Camel");
+
+ template.sendBody("seda:start", "Hello World");
+
+ assertMockEndpointsSatisfied();
+
+ assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ context.addComponent("async", new MyAsyncComponent());
+
+ errorHandler(defaultErrorHandler().maximumRedeliveries(5).redeliveryDelay(1000));
+
+ from("seda:start")
+ .to("log:before")
+ .to("mock:before")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ beforeThreadName = Thread.currentThread().getName();
+ }
+ })
+ .to("async:Bye Camel?failFirstAttempts=2")
+ .to("log:after")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ afterThreadName = Thread.currentThread().getName();
+ }
+ })
+ .to("mock:result");
+ }
+ };
+ }
+}
\ No newline at end of file
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRedeliveryErrorHandlerNonBlockedDelay3Test.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRedeliveryErrorHandlerNonBlockedDelay3Test.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRedeliveryErrorHandlerNonBlockedDelayTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRedeliveryErrorHandlerNonBlockedDelayTest.java?rev=959882&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRedeliveryErrorHandlerNonBlockedDelayTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRedeliveryErrorHandlerNonBlockedDelayTest.java Fri Jul 2 07:23:33 2010
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.async;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @version $Revision$
+ */
+public class AsyncEndpointRedeliveryErrorHandlerNonBlockedDelayTest extends ContextTestSupport {
+
+ private static final Log LOG = LogFactory.getLog(AsyncEndpointRedeliveryErrorHandlerNonBlockedDelayTest.class);
+
+ private static volatile int attempt;
+ private static String beforeThreadName;
+ private static String afterThreadName;
+
+ public void testRedelivery() throws Exception {
+ MockEndpoint before = getMockEndpoint("mock:result");
+ before.expectedBodiesReceived("Hello World", "Hello Camel");
+
+ MockEndpoint result = getMockEndpoint("mock:result");
+ result.expectedBodiesReceived("Bye Camel", "Bye Camel");
+
+ template.sendBody("seda:start", "World");
+ template.sendBody("seda:start", "Camel");
+
+ assertMockEndpointsSatisfied();
+
+ assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ context.addComponent("async", new MyAsyncComponent());
+
+ errorHandler(defaultErrorHandler().maximumRedeliveries(5).redeliveryDelay(2000));
+
+ from("seda:start")
+ .to("log:before")
+ .to("mock:before")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ LOG.info("Processing at attempt " + attempt + " " + exchange);
+
+ String body = exchange.getIn().getBody(String.class);
+ if (body.contains("World")) {
+ if (++attempt <= 2) {
+ LOG.info("Processing failed will thrown an exception");
+ throw new IllegalArgumentException("Damn");
+ }
+ }
+
+ exchange.getIn().setBody("Hello " + body);
+ LOG.info("Processing at attempt " + attempt + " complete " + exchange);
+ }
+ })
+ .to("log:after")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ beforeThreadName = Thread.currentThread().getName();
+ }
+ })
+ .to("async:Bye Camel")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ afterThreadName = Thread.currentThread().getName();
+ }
+ })
+ .to("mock:result");
+ }
+ };
+ }
+}
\ No newline at end of file
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRedeliveryErrorHandlerNonBlockedDelayTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRedeliveryErrorHandlerNonBlockedDelayTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactionalClientDataSourceRedeliveryTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactionalClientDataSourceRedeliveryTest.java?rev=959882&r1=959881&r2=959882&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactionalClientDataSourceRedeliveryTest.java (original)
+++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactionalClientDataSourceRedeliveryTest.java Fri Jul 2 07:23:33 2010
@@ -54,10 +54,11 @@ public class TransactionalClientDataSour
return new SpringRouteBuilder() {
public void configure() throws Exception {
// configure transacted error handler to use up till 4 redeliveries
+ // with 100 millis delay between each redelivery attempt
// we have not passed in any spring TX manager. Camel will automatic
// find it in the spring application context. You only need to help
// Camel in case you have multiple TX managers
- errorHandler(transactionErrorHandler().maximumRedeliveries(4));
+ errorHandler(transactionErrorHandler().maximumRedeliveries(4).redeliveryDelay(100));
// START SNIPPET: e1
from("direct:okay")