You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/09/17 12:51:04 UTC

svn commit: r998065 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/impl/ test/java/org/apache/camel/impl/

Author: davsclaus
Date: Fri Sep 17 10:51:04 2010
New Revision: 998065

URL: http://svn.apache.org/viewvc?rev=998065&view=rev
Log:
CAMEL-3125: Added limit polling consumer poll strategy. Thanks to Ernest for insipiration.

Added:
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/LimitedPollingConsumerPollStrategy.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/impl/LimitedPollingConsumerPollStrategyTest.java   (contents, props changed)
      - copied, changed from r997858, camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java?rev=998065&r1=998064&r2=998065&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java Fri Sep 17 10:51:04 2010
@@ -376,6 +376,10 @@ public class DefaultCamelContext extends
     public Endpoint getEndpoint(String uri) {
         ObjectHelper.notEmpty(uri, "uri");
 
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Getting endpoint with uri: " + uri);
+        }
+
         // in case path has property placeholders then try to let property component resolve those
         try {
             uri = resolvePropertyPlaceholders(uri);
@@ -391,7 +395,7 @@ public class DefaultCamelContext extends
         }
 
         if (LOG.isTraceEnabled()) {
-            LOG.trace("Getting endpoint with uri: " + uri);
+            LOG.trace("Getting endpoint with normalized uri: " + uri);
         }
 
         Endpoint answer;

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/LimitedPollingConsumerPollStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/LimitedPollingConsumerPollStrategy.java?rev=998065&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/LimitedPollingConsumerPollStrategy.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/LimitedPollingConsumerPollStrategy.java Fri Sep 17 10:51:04 2010
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Service;
+import org.apache.camel.util.ServiceHelper;
+
+/**
+ * A {@link org.apache.camel.spi.PollingConsumerPollStrategy} which supports suspending consumers if they
+ * failed for X number of times in a row.
+ * <p/>
+ * If Camel cannot successfully consumer from a given consumer, then after X consecutive failed attempts the consumer
+ * will be suspended/stopped. This prevents the log to get flooded with failed attempts, for example during nightly runs.
+ *
+ * @version $Revision$
+ */
+public class LimitedPollingConsumerPollStrategy extends DefaultPollingConsumerPollStrategy implements Service {
+
+    private final Map<Consumer, Integer> state = new HashMap<Consumer, Integer>();
+    private int limit = 3;
+
+    public int getLimit() {
+        return limit;
+    }
+
+    /**
+     * Sets the limit for how many straight rollbacks causes this strategy to suspend the fault consumer.
+     * <p/>
+     * When the consumer has been suspended, it has to be manually resumed/started to be active again.
+     * The limit is by default 3.
+     *
+     * @param limit  the limit
+     */
+    public void setLimit(int limit) {
+        this.limit = limit;
+    }
+
+    @Override
+    public void commit(Consumer consumer, Endpoint endpoint) {
+        // we could commit so clear state
+        state.remove(consumer);
+    }
+
+    public boolean rollback(Consumer consumer, Endpoint endpoint, int retryCounter, Exception cause) throws Exception {
+        // keep track how many times in a row we have rolled back
+        Integer times = state.get(consumer);
+        if (times == null) {
+            times = 1;
+        } else {
+            times += 1;
+        }
+        if (log.isDebugEnabled()) {
+            log.debug("Rollback occurred after " + times + " times when consuming " + endpoint);
+        }
+
+        boolean retry = false;
+
+        if (times >= limit) {
+            // clear state when we suspend so if its restarted manually we start all over again
+            state.remove(consumer);
+            onSuspend(consumer, endpoint);
+        } else {
+            // error occurred
+            state.put(consumer, times);
+            retry = onRollback(consumer, endpoint);
+        }
+
+        return retry;
+    }
+
+    /**
+     * The consumer is to be suspended because it exceeded the limit
+     *
+     * @param consumer the consumer
+     * @param endpoint the endpoint
+     * @throws Exception is thrown if error suspending the consumer
+     */
+    protected void onSuspend(Consumer consumer, Endpoint endpoint) throws Exception {
+        log.warn("Suspending consumer " + consumer + " after " + limit + " attempts to consume from " + endpoint
+                + ". You have to manually resume the consumer!");
+        ServiceHelper.suspendService(consumer);
+    }
+
+    /**
+     * Rollback occurred.
+     *
+     * @param consumer the consumer
+     * @param endpoint the endpoint
+     * @return whether or not to retry immediately, is default <tt>false</tt>
+     * @throws Exception can be thrown in case something goes wrong
+     */
+    protected boolean onRollback(Consumer consumer, Endpoint endpoint) throws Exception {
+        // do not retry by default
+        return false;
+    }
+
+    public void start() throws Exception {
+    }
+
+    public void stop() throws Exception {
+        state.clear();
+    }
+}

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/LimitedPollingConsumerPollStrategyTest.java (from r997858, camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/LimitedPollingConsumerPollStrategyTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/impl/LimitedPollingConsumerPollStrategyTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerTest.java&r1=997858&r2=998065&rev=998065&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/LimitedPollingConsumerPollStrategyTest.java Fri Sep 17 10:51:04 2010
@@ -16,99 +16,188 @@
  */
 package org.apache.camel.impl;
 
-import org.apache.camel.Consumer;
 import org.apache.camel.ContextTestSupport;
-import org.apache.camel.Endpoint;
-import org.apache.camel.spi.PollingConsumerPollStrategy;
+import org.apache.camel.util.ServiceHelper;
 
-public class ScheduledPollConsumerTest extends ContextTestSupport {
+public class LimitedPollingConsumerPollStrategyTest extends ContextTestSupport {
 
-    private static boolean rollback;
-    private static int counter;
-    private static String event = "";
+    private LimitedPollingConsumerPollStrategy strategy;
 
-    public void testExceptionOnPollAndCanStartAgain() throws Exception {
+    public void testLimitedPollingConsumerPollStrategy() throws Exception {
+        Exception expectedException = new Exception("Hello");
+
+        strategy = new LimitedPollingConsumerPollStrategy();
+        strategy.setLimit(3);
 
-        final Exception expectedException = new Exception("Hello, I should be thrown on shutdown only!");
         MockScheduledPollConsumer consumer = new MockScheduledPollConsumer(expectedException);
+        consumer.setPollStrategy(strategy);
+
+        consumer.start();
+
+        consumer.run();
+        assertTrue("Should still be started", consumer.isStarted());
+        consumer.run();
+        assertTrue("Should still be started", consumer.isStarted());
+        consumer.run();
+        assertTrue("Should be suspended", consumer.isSuspended());
+
+        consumer.stop();
+    }
 
-        consumer.setPollStrategy(new PollingConsumerPollStrategy() {
-            public boolean begin(Consumer consumer, Endpoint endpoint) {
-                return true;
-            }
+    public void testLimitAtTwoLimitedPollingConsumerPollStrategy() throws Exception {
+        Exception expectedException = new Exception("Hello");
 
-            public void commit(Consumer consumer, Endpoint endpoint) {
-            }
+        strategy = new LimitedPollingConsumerPollStrategy();
+        strategy.setLimit(2);
 
-            public boolean rollback(Consumer consumer, Endpoint endpoint, int retryCounter, Exception e) throws Exception {
-                if (e == expectedException) {
-                    rollback = true;
-                }
-                return false;
-            }
-        });
+        MockScheduledPollConsumer consumer = new MockScheduledPollConsumer(expectedException);
+        consumer.setPollStrategy(strategy);
 
         consumer.start();
-        // poll that throws an exception
+
         consumer.run();
+        assertTrue("Should still be started", consumer.isStarted());
+        consumer.run();
+        assertTrue("Should be suspended", consumer.isSuspended());
+
         consumer.stop();
+    }
 
-        assertEquals("Should have rollback", true, rollback);
+    public void testLimitedPollingConsumerPollStrategySuccess() throws Exception {
+        Exception expectedException = new Exception("Hello");
+
+        strategy = new LimitedPollingConsumerPollStrategy();
+        strategy.setLimit(3);
+
+        MockScheduledPollConsumer consumer = new MockScheduledPollConsumer(expectedException);
+        consumer.setPollStrategy(strategy);
 
-        // prepare for 2nd run but this time it should not thrown an exception on poll
-        rollback = false;
-        consumer.setExceptionToThrowOnPoll(null);
-        // start it again and we should be able to run
         consumer.start();
+
         consumer.run();
-        // should be able to stop with no problem
-        consumer.stop();
+        assertTrue("Should still be started", consumer.isStarted());
+        consumer.run();
+        assertTrue("Should still be started", consumer.isStarted());
 
-        assertEquals("Should not have rollback", false, rollback);
+        // now force success
+        consumer.setExceptionToThrowOnPoll(null);
+        consumer.run();
+        assertTrue("Should still be started", consumer.isStarted());
+        consumer.run();
+        assertTrue("Should still be started", consumer.isStarted());
+
+        consumer.stop();
     }
-    
-    public void testRetryAtMostThreeTimes() throws Exception {
-        counter = 0;
-        event = "";
 
-        final Exception expectedException = new Exception("Hello, I should be thrown on shutdown only!");
+    public void testLimitedPollingConsumerPollStrategySuccessThenFail() throws Exception {
+        Exception expectedException = new Exception("Hello");
+
+        strategy = new LimitedPollingConsumerPollStrategy();
+        strategy.setLimit(3);
+
         MockScheduledPollConsumer consumer = new MockScheduledPollConsumer(expectedException);
+        consumer.setPollStrategy(strategy);
 
-        consumer.setPollStrategy(new PollingConsumerPollStrategy() {
-            public boolean begin(Consumer consumer, Endpoint endpoint) {
-                return true;
-            }
+        consumer.start();
 
-            public void commit(Consumer consumer, Endpoint endpoint) {
-                event += "commit";
-            }
+        // fail 2 times
+        consumer.run();
+        assertTrue("Should still be started", consumer.isStarted());
+        consumer.run();
+        assertTrue("Should still be started", consumer.isStarted());
 
-            public boolean rollback(Consumer consumer, Endpoint endpoint, int retryCounter, Exception e) throws Exception {
-                event += "rollback";
-                counter++;
-                if (retryCounter < 3) {
-                    return true;
-                }
-                return false;
-            }
-        });
+        // now force success 2 times
+        consumer.setExceptionToThrowOnPoll(null);
+        consumer.run();
+        assertTrue("Should still be started", consumer.isStarted());
+        consumer.run();
+        assertTrue("Should still be started", consumer.isStarted());
 
-        consumer.setUseFixedDelay(true);
-        consumer.setDelay(60000);
-        consumer.start();
-        // poll that throws an exception
+        // now fail again, after hitting limit at 3
+        consumer.setExceptionToThrowOnPoll(expectedException);
         consumer.run();
+        assertTrue("Should still be started", consumer.isStarted());
+        consumer.run();
+        assertTrue("Should still be started", consumer.isStarted());
+        consumer.run();
+        assertTrue("Should be suspended", consumer.isSuspended());
+
         consumer.stop();
+    }
+
+    public void testTwoConsumersLimitedPollingConsumerPollStrategy() throws Exception {
+        Exception expectedException = new Exception("Hello");
+
+        strategy = new LimitedPollingConsumerPollStrategy();
+        strategy.setLimit(3);
+
+        MockScheduledPollConsumer consumer = new MockScheduledPollConsumer(expectedException);
+        consumer.setPollStrategy(strategy);
+
+        MockScheduledPollConsumer consumer2 = new MockScheduledPollConsumer(null);
+        consumer2.setPollStrategy(strategy);
 
-        // 3 retries + 1 last failed attempt when we give up
-        assertEquals(4, counter);
-        assertEquals("rollbackrollbackrollbackrollback", event);
+        consumer.start();
+        consumer2.start();
+
+        consumer.run();
+        consumer2.run();
+        assertTrue("Should still be started", consumer.isStarted());
+        assertTrue("Should still be started", consumer2.isStarted());
+        consumer.run();
+        consumer2.run();
+        assertTrue("Should still be started", consumer.isStarted());
+        assertTrue("Should still be started", consumer2.isStarted());
+        consumer.run();
+        consumer2.run();
+        assertTrue("Should be suspended", consumer.isSuspended());
+        assertTrue("Should still be started", consumer2.isStarted());
+
+        consumer.stop();
+        consumer2.stop();
     }
 
-    public void testNoExceptionOnPoll() throws Exception {
-        MockScheduledPollConsumer consumer = new MockScheduledPollConsumer(null);
+    public void testRestartManuallyLimitedPollingConsumerPollStrategy() throws Exception {
+        Exception expectedException = new Exception("Hello");
+
+        strategy = new LimitedPollingConsumerPollStrategy();
+        strategy.setLimit(3);
+
+        MockScheduledPollConsumer consumer = new MockScheduledPollConsumer(expectedException);
+        consumer.setPollStrategy(strategy);
+
         consumer.start();
-        consumer.run(); 
+
+        consumer.run();
+        assertTrue("Should still be started", consumer.isStarted());
+        consumer.run();
+        assertTrue("Should still be started", consumer.isStarted());
+        consumer.run();
+        assertTrue("Should be suspended", consumer.isSuspended());
+
+        // now start the consumer again
+        ServiceHelper.resumeService(consumer);
+
+        consumer.run();
+        assertTrue("Should still be started", consumer.isStarted());
+        consumer.run();
+        assertTrue("Should still be started", consumer.isStarted());
+        consumer.run();
+        assertTrue("Should be suspended", consumer.isSuspended());
+
+        // now start the consumer again
+        ServiceHelper.resumeService(consumer);
+        // and let it succeed
+        consumer.setExceptionToThrowOnPoll(null);
+        consumer.run();
+        assertTrue("Should still be started", consumer.isStarted());
+        consumer.run();
+        assertTrue("Should still be started", consumer.isStarted());
+        consumer.run();
+        assertTrue("Should still be started", consumer.isStarted());
+        consumer.run();
+        assertTrue("Should still be started", consumer.isStarted());
+
         consumer.stop();
     }
 

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/LimitedPollingConsumerPollStrategyTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/LimitedPollingConsumerPollStrategyTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date