You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/09/25 17:47:13 UTC

svn commit: r1389941 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/impl/ test/java/org/apache/camel/processor/

Author: davsclaus
Date: Tue Sep 25 15:47:12 2012
New Revision: 1389941

URL: http://svn.apache.org/viewvc?rev=1389941&view=rev
Log:
CAMEL-5638: Scheduled poll consumer should invoke exception handler on rollback strategy. This allows people to bridge error handler and deal with the caused exception in Camel routes. For example a ftp login error or the likes.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DefaultScheduledPollConsumerBridgeErrorHandlerTest.java
      - copied, changed from r1389812, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DefaultConsumerBridgeErrorHandlerTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingConsumerPollStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingConsumerPollStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingConsumerPollStrategy.java?rev=1389941&r1=1389940&r2=1389941&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingConsumerPollStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingConsumerPollStrategy.java Tue Sep 25 15:47:12 2012
@@ -18,15 +18,12 @@ package org.apache.camel.impl;
 
 import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
-import org.apache.camel.StatefulService;
 import org.apache.camel.spi.PollingConsumerPollStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * A default implementation that just logs a <tt>WARN</tt> level log in case of rollback.
- * <p/>
- * The implement will <b>not</b> log if the rollback occurred during shutdown.
+ * A default implementation that will not retry on rollback.
  *
  * @version 
  */
@@ -43,16 +40,6 @@ public class DefaultPollingConsumerPollS
     }
 
     public boolean rollback(Consumer consumer, Endpoint endpoint, int retryCounter, Exception e) throws Exception {
-        boolean runAllowed = true;
-        if (consumer instanceof StatefulService) {
-            runAllowed = ((StatefulService) consumer).isRunAllowed();
-        }
-
-        // only log warn if we are running, otherwise we are just stopping which we should not log the issue in the logs
-        if (runAllowed) {
-            log.warn("Consumer " + consumer +  " could not poll endpoint: " + endpoint + " caused by: " + e.getMessage(), e);
-        }
-
         // we do not want to retry
         return false;
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java?rev=1389941&r1=1389940&r2=1389941&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java Tue Sep 25 15:47:12 2012
@@ -26,6 +26,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.LoggingLevel;
 import org.apache.camel.PollingConsumerPollingStrategy;
 import org.apache.camel.Processor;
+import org.apache.camel.StatefulService;
 import org.apache.camel.SuspendableService;
 import org.apache.camel.spi.PollingConsumerPollStrategy;
 import org.apache.camel.util.ObjectHelper;
@@ -117,9 +118,11 @@ public abstract class ScheduledPollConsu
 
         int retryCounter = -1;
         boolean done = false;
+        Throwable cause = null;
 
         while (!done) {
             try {
+                cause = null;
                 // eager assume we are done
                 done = true;
                 if (isPollAllowed()) {
@@ -157,22 +160,32 @@ public abstract class ScheduledPollConsu
                 try {
                     boolean retry = pollStrategy.rollback(this, getEndpoint(), retryCounter, e);
                     if (retry) {
+                        // do not set cause as we retry
                         done = false;
+                    } else {
+                        cause = e;
+                        done = true;
                     }
                 } catch (Throwable t) {
-                    // catch throwable to not let the thread die
-                    getExceptionHandler().handleException("Consumer " + this +  " failed polling endpoint: " + getEndpoint()
-                            + ". Will try again at next poll", t);
-                    // we are done due this fatal error
+                    cause = t;
                     done = true;
                 }
             } catch (Throwable t) {
-                // catch throwable to not let the thread die
-                getExceptionHandler().handleException("Consumer " + this +  " failed polling endpoint: " + getEndpoint()
-                        + ". Will try again at next poll", t);
-                // we are done due this fatal error
+                cause = t;
                 done = true;
             }
+
+            if (cause != null && isRunAllowed()) {
+                // let exception handler deal with the caused exception
+                // but suppress this during shutdown as the logs may get flooded with exceptions during shutdown/forced shutdown
+                try {
+                    getExceptionHandler().handleException("Consumer " + this +  " failed polling endpoint: " + getEndpoint()
+                            + ". Will try again at next poll", cause);
+                } catch (Throwable e) {
+                    LOG.warn("Error handling exception. This exception will be ignored.", e);
+                }
+                cause = null;
+            }
         }
 
         // avoid this thread to throw exceptions because the thread pool wont re-schedule a new thread

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DefaultScheduledPollConsumerBridgeErrorHandlerTest.java (from r1389812, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DefaultConsumerBridgeErrorHandlerTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DefaultScheduledPollConsumerBridgeErrorHandlerTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DefaultScheduledPollConsumerBridgeErrorHandlerTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DefaultConsumerBridgeErrorHandlerTest.java&r1=1389812&r2=1389941&rev=1389941&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DefaultConsumerBridgeErrorHandlerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DefaultScheduledPollConsumerBridgeErrorHandlerTest.java Tue Sep 25 15:47:12 2012
@@ -17,8 +17,6 @@
 package org.apache.camel.processor;
 
 import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.Component;
 import org.apache.camel.Consumer;
@@ -29,21 +27,17 @@ import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.impl.DefaultComponent;
-import org.apache.camel.impl.DefaultConsumer;
 import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.ScheduledPollConsumer;
 
 /**
  *
  */
-public class DefaultConsumerBridgeErrorHandlerTest extends ContextTestSupport {
-
-    protected final CountDownLatch latch = new CountDownLatch(1);
+public class DefaultScheduledPollConsumerBridgeErrorHandlerTest extends ContextTestSupport {
 
     public void testDefaultConsumerBridgeErrorHandler() throws Exception {
-        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Hello World");
-        getMockEndpoint("mock:dead").expectedBodiesReceived("Cannot process");
-
-        latch.countDown();
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+        getMockEndpoint("mock:dead").expectedMinimumMessageCount(1);
 
         assertMockEndpointsSatisfied();
 
@@ -54,11 +48,9 @@ public class DefaultConsumerBridgeErrorH
 
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
-        // START SNIPPET: e1
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                // register our custom component
                 getContext().addComponent("my", new MyComponent());
 
                 // configure error handler
@@ -107,48 +99,15 @@ public class DefaultConsumerBridgeErrorH
         }
     }
 
-    public class MyConsumer extends DefaultConsumer {
-
-        private int invoked;
+    public class MyConsumer extends ScheduledPollConsumer {
 
         public MyConsumer(Endpoint endpoint, Processor processor) {
             super(endpoint, processor);
         }
 
-        public void doSomething() throws Exception {
-            try {
-                if (invoked++ == 0) {
-                    throw new IllegalArgumentException("Simulated");
-                }
-
-                Exchange exchange = getEndpoint().createExchange();
-                exchange.getIn().setBody("Hello World");
-                getProcessor().process(exchange);
-
-            } catch (Exception e) {
-                getExceptionHandler().handleException("Cannot process", e);
-            }
-        }
-
         @Override
-        protected void doStart() throws Exception {
-            super.doStart();
-
-            Thread thread = new Thread() {
-                @Override
-                public void run() {
-                    try {
-                        // do not start before the mocks has been setup and is ready
-                        latch.await(5, TimeUnit.SECONDS);
-                        doSomething();
-                        doSomething();
-                        doSomething();
-                    } catch (Exception e) {
-                        // ignore
-                    }
-                }
-            };
-            thread.start();
+        protected int poll() throws Exception {
+            throw new IllegalArgumentException("Simulated");
         }
     }
 }