You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/09/25 17:47:13 UTC
svn commit: r1389941 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/impl/ test/java/org/apache/camel/processor/
Author: davsclaus
Date: Tue Sep 25 15:47:12 2012
New Revision: 1389941
URL: http://svn.apache.org/viewvc?rev=1389941&view=rev
Log:
CAMEL-5638: Scheduled poll consumer should invoke exception handler on rollback strategy. This allows people to bridge error handler and deal with the caused exception in Camel routes. For example a ftp login error or the likes.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DefaultScheduledPollConsumerBridgeErrorHandlerTest.java
- copied, changed from r1389812, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DefaultConsumerBridgeErrorHandlerTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingConsumerPollStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingConsumerPollStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingConsumerPollStrategy.java?rev=1389941&r1=1389940&r2=1389941&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingConsumerPollStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingConsumerPollStrategy.java Tue Sep 25 15:47:12 2012
@@ -18,15 +18,12 @@ package org.apache.camel.impl;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
-import org.apache.camel.StatefulService;
import org.apache.camel.spi.PollingConsumerPollStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * A default implementation that just logs a <tt>WARN</tt> level log in case of rollback.
- * <p/>
- * The implement will <b>not</b> log if the rollback occurred during shutdown.
+ * A default implementation that will not retry on rollback.
*
* @version
*/
@@ -43,16 +40,6 @@ public class DefaultPollingConsumerPollS
}
public boolean rollback(Consumer consumer, Endpoint endpoint, int retryCounter, Exception e) throws Exception {
- boolean runAllowed = true;
- if (consumer instanceof StatefulService) {
- runAllowed = ((StatefulService) consumer).isRunAllowed();
- }
-
- // only log warn if we are running, otherwise we are just stopping which we should not log the issue in the logs
- if (runAllowed) {
- log.warn("Consumer " + consumer + " could not poll endpoint: " + endpoint + " caused by: " + e.getMessage(), e);
- }
-
// we do not want to retry
return false;
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java?rev=1389941&r1=1389940&r2=1389941&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java Tue Sep 25 15:47:12 2012
@@ -26,6 +26,7 @@ import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
import org.apache.camel.PollingConsumerPollingStrategy;
import org.apache.camel.Processor;
+import org.apache.camel.StatefulService;
import org.apache.camel.SuspendableService;
import org.apache.camel.spi.PollingConsumerPollStrategy;
import org.apache.camel.util.ObjectHelper;
@@ -117,9 +118,11 @@ public abstract class ScheduledPollConsu
int retryCounter = -1;
boolean done = false;
+ Throwable cause = null;
while (!done) {
try {
+ cause = null;
// eager assume we are done
done = true;
if (isPollAllowed()) {
@@ -157,22 +160,32 @@ public abstract class ScheduledPollConsu
try {
boolean retry = pollStrategy.rollback(this, getEndpoint(), retryCounter, e);
if (retry) {
+ // do not set cause as we retry
done = false;
+ } else {
+ cause = e;
+ done = true;
}
} catch (Throwable t) {
- // catch throwable to not let the thread die
- getExceptionHandler().handleException("Consumer " + this + " failed polling endpoint: " + getEndpoint()
- + ". Will try again at next poll", t);
- // we are done due this fatal error
+ cause = t;
done = true;
}
} catch (Throwable t) {
- // catch throwable to not let the thread die
- getExceptionHandler().handleException("Consumer " + this + " failed polling endpoint: " + getEndpoint()
- + ". Will try again at next poll", t);
- // we are done due this fatal error
+ cause = t;
done = true;
}
+
+ if (cause != null && isRunAllowed()) {
+ // let exception handler deal with the caused exception
+ // but suppress this during shutdown as the logs may get flooded with exceptions during shutdown/forced shutdown
+ try {
+ getExceptionHandler().handleException("Consumer " + this + " failed polling endpoint: " + getEndpoint()
+ + ". Will try again at next poll", cause);
+ } catch (Throwable e) {
+ LOG.warn("Error handling exception. This exception will be ignored.", e);
+ }
+ cause = null;
+ }
}
// avoid this thread to throw exceptions because the thread pool wont re-schedule a new thread
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DefaultScheduledPollConsumerBridgeErrorHandlerTest.java (from r1389812, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DefaultConsumerBridgeErrorHandlerTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DefaultScheduledPollConsumerBridgeErrorHandlerTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DefaultScheduledPollConsumerBridgeErrorHandlerTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DefaultConsumerBridgeErrorHandlerTest.java&r1=1389812&r2=1389941&rev=1389941&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DefaultConsumerBridgeErrorHandlerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DefaultScheduledPollConsumerBridgeErrorHandlerTest.java Tue Sep 25 15:47:12 2012
@@ -17,8 +17,6 @@
package org.apache.camel.processor;
import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
import org.apache.camel.Component;
import org.apache.camel.Consumer;
@@ -29,21 +27,17 @@ import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultComponent;
-import org.apache.camel.impl.DefaultConsumer;
import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.ScheduledPollConsumer;
/**
*
*/
-public class DefaultConsumerBridgeErrorHandlerTest extends ContextTestSupport {
-
- protected final CountDownLatch latch = new CountDownLatch(1);
+public class DefaultScheduledPollConsumerBridgeErrorHandlerTest extends ContextTestSupport {
public void testDefaultConsumerBridgeErrorHandler() throws Exception {
- getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Hello World");
- getMockEndpoint("mock:dead").expectedBodiesReceived("Cannot process");
-
- latch.countDown();
+ getMockEndpoint("mock:result").expectedMessageCount(0);
+ getMockEndpoint("mock:dead").expectedMinimumMessageCount(1);
assertMockEndpointsSatisfied();
@@ -54,11 +48,9 @@ public class DefaultConsumerBridgeErrorH
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
- // START SNIPPET: e1
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- // register our custom component
getContext().addComponent("my", new MyComponent());
// configure error handler
@@ -107,48 +99,15 @@ public class DefaultConsumerBridgeErrorH
}
}
- public class MyConsumer extends DefaultConsumer {
-
- private int invoked;
+ public class MyConsumer extends ScheduledPollConsumer {
public MyConsumer(Endpoint endpoint, Processor processor) {
super(endpoint, processor);
}
- public void doSomething() throws Exception {
- try {
- if (invoked++ == 0) {
- throw new IllegalArgumentException("Simulated");
- }
-
- Exchange exchange = getEndpoint().createExchange();
- exchange.getIn().setBody("Hello World");
- getProcessor().process(exchange);
-
- } catch (Exception e) {
- getExceptionHandler().handleException("Cannot process", e);
- }
- }
-
@Override
- protected void doStart() throws Exception {
- super.doStart();
-
- Thread thread = new Thread() {
- @Override
- public void run() {
- try {
- // do not start before the mocks has been setup and is ready
- latch.await(5, TimeUnit.SECONDS);
- doSomething();
- doSomething();
- doSomething();
- } catch (Exception e) {
- // ignore
- }
- }
- };
- thread.start();
+ protected int poll() throws Exception {
+ throw new IllegalArgumentException("Simulated");
}
}
}