You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/09/17 12:51:04 UTC
svn commit: r998065 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/impl/ test/java/org/apache/camel/impl/
Author: davsclaus
Date: Fri Sep 17 10:51:04 2010
New Revision: 998065
URL: http://svn.apache.org/viewvc?rev=998065&view=rev
Log:
CAMEL-3125: Added limit polling consumer poll strategy. Thanks to Ernest for insipiration.
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/LimitedPollingConsumerPollStrategy.java
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/LimitedPollingConsumerPollStrategyTest.java (contents, props changed)
- copied, changed from r997858, camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java?rev=998065&r1=998064&r2=998065&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java Fri Sep 17 10:51:04 2010
@@ -376,6 +376,10 @@ public class DefaultCamelContext extends
public Endpoint getEndpoint(String uri) {
ObjectHelper.notEmpty(uri, "uri");
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Getting endpoint with uri: " + uri);
+ }
+
// in case path has property placeholders then try to let property component resolve those
try {
uri = resolvePropertyPlaceholders(uri);
@@ -391,7 +395,7 @@ public class DefaultCamelContext extends
}
if (LOG.isTraceEnabled()) {
- LOG.trace("Getting endpoint with uri: " + uri);
+ LOG.trace("Getting endpoint with normalized uri: " + uri);
}
Endpoint answer;
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/LimitedPollingConsumerPollStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/LimitedPollingConsumerPollStrategy.java?rev=998065&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/LimitedPollingConsumerPollStrategy.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/LimitedPollingConsumerPollStrategy.java Fri Sep 17 10:51:04 2010
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Service;
+import org.apache.camel.util.ServiceHelper;
+
+/**
+ * A {@link org.apache.camel.spi.PollingConsumerPollStrategy} which supports suspending consumers if they
+ * failed for X number of times in a row.
+ * <p/>
+ * If Camel cannot successfully consumer from a given consumer, then after X consecutive failed attempts the consumer
+ * will be suspended/stopped. This prevents the log to get flooded with failed attempts, for example during nightly runs.
+ *
+ * @version $Revision$
+ */
+public class LimitedPollingConsumerPollStrategy extends DefaultPollingConsumerPollStrategy implements Service {
+
+ private final Map<Consumer, Integer> state = new HashMap<Consumer, Integer>();
+ private int limit = 3;
+
+ public int getLimit() {
+ return limit;
+ }
+
+ /**
+ * Sets the limit for how many straight rollbacks causes this strategy to suspend the fault consumer.
+ * <p/>
+ * When the consumer has been suspended, it has to be manually resumed/started to be active again.
+ * The limit is by default 3.
+ *
+ * @param limit the limit
+ */
+ public void setLimit(int limit) {
+ this.limit = limit;
+ }
+
+ @Override
+ public void commit(Consumer consumer, Endpoint endpoint) {
+ // we could commit so clear state
+ state.remove(consumer);
+ }
+
+ public boolean rollback(Consumer consumer, Endpoint endpoint, int retryCounter, Exception cause) throws Exception {
+ // keep track how many times in a row we have rolled back
+ Integer times = state.get(consumer);
+ if (times == null) {
+ times = 1;
+ } else {
+ times += 1;
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("Rollback occurred after " + times + " times when consuming " + endpoint);
+ }
+
+ boolean retry = false;
+
+ if (times >= limit) {
+ // clear state when we suspend so if its restarted manually we start all over again
+ state.remove(consumer);
+ onSuspend(consumer, endpoint);
+ } else {
+ // error occurred
+ state.put(consumer, times);
+ retry = onRollback(consumer, endpoint);
+ }
+
+ return retry;
+ }
+
+ /**
+ * The consumer is to be suspended because it exceeded the limit
+ *
+ * @param consumer the consumer
+ * @param endpoint the endpoint
+ * @throws Exception is thrown if error suspending the consumer
+ */
+ protected void onSuspend(Consumer consumer, Endpoint endpoint) throws Exception {
+ log.warn("Suspending consumer " + consumer + " after " + limit + " attempts to consume from " + endpoint
+ + ". You have to manually resume the consumer!");
+ ServiceHelper.suspendService(consumer);
+ }
+
+ /**
+ * Rollback occurred.
+ *
+ * @param consumer the consumer
+ * @param endpoint the endpoint
+ * @return whether or not to retry immediately, is default <tt>false</tt>
+ * @throws Exception can be thrown in case something goes wrong
+ */
+ protected boolean onRollback(Consumer consumer, Endpoint endpoint) throws Exception {
+ // do not retry by default
+ return false;
+ }
+
+ public void start() throws Exception {
+ }
+
+ public void stop() throws Exception {
+ state.clear();
+ }
+}
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/LimitedPollingConsumerPollStrategyTest.java (from r997858, camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/LimitedPollingConsumerPollStrategyTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/impl/LimitedPollingConsumerPollStrategyTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerTest.java&r1=997858&r2=998065&rev=998065&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/LimitedPollingConsumerPollStrategyTest.java Fri Sep 17 10:51:04 2010
@@ -16,99 +16,188 @@
*/
package org.apache.camel.impl;
-import org.apache.camel.Consumer;
import org.apache.camel.ContextTestSupport;
-import org.apache.camel.Endpoint;
-import org.apache.camel.spi.PollingConsumerPollStrategy;
+import org.apache.camel.util.ServiceHelper;
-public class ScheduledPollConsumerTest extends ContextTestSupport {
+public class LimitedPollingConsumerPollStrategyTest extends ContextTestSupport {
- private static boolean rollback;
- private static int counter;
- private static String event = "";
+ private LimitedPollingConsumerPollStrategy strategy;
- public void testExceptionOnPollAndCanStartAgain() throws Exception {
+ public void testLimitedPollingConsumerPollStrategy() throws Exception {
+ Exception expectedException = new Exception("Hello");
+
+ strategy = new LimitedPollingConsumerPollStrategy();
+ strategy.setLimit(3);
- final Exception expectedException = new Exception("Hello, I should be thrown on shutdown only!");
MockScheduledPollConsumer consumer = new MockScheduledPollConsumer(expectedException);
+ consumer.setPollStrategy(strategy);
+
+ consumer.start();
+
+ consumer.run();
+ assertTrue("Should still be started", consumer.isStarted());
+ consumer.run();
+ assertTrue("Should still be started", consumer.isStarted());
+ consumer.run();
+ assertTrue("Should be suspended", consumer.isSuspended());
+
+ consumer.stop();
+ }
- consumer.setPollStrategy(new PollingConsumerPollStrategy() {
- public boolean begin(Consumer consumer, Endpoint endpoint) {
- return true;
- }
+ public void testLimitAtTwoLimitedPollingConsumerPollStrategy() throws Exception {
+ Exception expectedException = new Exception("Hello");
- public void commit(Consumer consumer, Endpoint endpoint) {
- }
+ strategy = new LimitedPollingConsumerPollStrategy();
+ strategy.setLimit(2);
- public boolean rollback(Consumer consumer, Endpoint endpoint, int retryCounter, Exception e) throws Exception {
- if (e == expectedException) {
- rollback = true;
- }
- return false;
- }
- });
+ MockScheduledPollConsumer consumer = new MockScheduledPollConsumer(expectedException);
+ consumer.setPollStrategy(strategy);
consumer.start();
- // poll that throws an exception
+
consumer.run();
+ assertTrue("Should still be started", consumer.isStarted());
+ consumer.run();
+ assertTrue("Should be suspended", consumer.isSuspended());
+
consumer.stop();
+ }
- assertEquals("Should have rollback", true, rollback);
+ public void testLimitedPollingConsumerPollStrategySuccess() throws Exception {
+ Exception expectedException = new Exception("Hello");
+
+ strategy = new LimitedPollingConsumerPollStrategy();
+ strategy.setLimit(3);
+
+ MockScheduledPollConsumer consumer = new MockScheduledPollConsumer(expectedException);
+ consumer.setPollStrategy(strategy);
- // prepare for 2nd run but this time it should not thrown an exception on poll
- rollback = false;
- consumer.setExceptionToThrowOnPoll(null);
- // start it again and we should be able to run
consumer.start();
+
consumer.run();
- // should be able to stop with no problem
- consumer.stop();
+ assertTrue("Should still be started", consumer.isStarted());
+ consumer.run();
+ assertTrue("Should still be started", consumer.isStarted());
- assertEquals("Should not have rollback", false, rollback);
+ // now force success
+ consumer.setExceptionToThrowOnPoll(null);
+ consumer.run();
+ assertTrue("Should still be started", consumer.isStarted());
+ consumer.run();
+ assertTrue("Should still be started", consumer.isStarted());
+
+ consumer.stop();
}
-
- public void testRetryAtMostThreeTimes() throws Exception {
- counter = 0;
- event = "";
- final Exception expectedException = new Exception("Hello, I should be thrown on shutdown only!");
+ public void testLimitedPollingConsumerPollStrategySuccessThenFail() throws Exception {
+ Exception expectedException = new Exception("Hello");
+
+ strategy = new LimitedPollingConsumerPollStrategy();
+ strategy.setLimit(3);
+
MockScheduledPollConsumer consumer = new MockScheduledPollConsumer(expectedException);
+ consumer.setPollStrategy(strategy);
- consumer.setPollStrategy(new PollingConsumerPollStrategy() {
- public boolean begin(Consumer consumer, Endpoint endpoint) {
- return true;
- }
+ consumer.start();
- public void commit(Consumer consumer, Endpoint endpoint) {
- event += "commit";
- }
+ // fail 2 times
+ consumer.run();
+ assertTrue("Should still be started", consumer.isStarted());
+ consumer.run();
+ assertTrue("Should still be started", consumer.isStarted());
- public boolean rollback(Consumer consumer, Endpoint endpoint, int retryCounter, Exception e) throws Exception {
- event += "rollback";
- counter++;
- if (retryCounter < 3) {
- return true;
- }
- return false;
- }
- });
+ // now force success 2 times
+ consumer.setExceptionToThrowOnPoll(null);
+ consumer.run();
+ assertTrue("Should still be started", consumer.isStarted());
+ consumer.run();
+ assertTrue("Should still be started", consumer.isStarted());
- consumer.setUseFixedDelay(true);
- consumer.setDelay(60000);
- consumer.start();
- // poll that throws an exception
+ // now fail again, after hitting limit at 3
+ consumer.setExceptionToThrowOnPoll(expectedException);
consumer.run();
+ assertTrue("Should still be started", consumer.isStarted());
+ consumer.run();
+ assertTrue("Should still be started", consumer.isStarted());
+ consumer.run();
+ assertTrue("Should be suspended", consumer.isSuspended());
+
consumer.stop();
+ }
+
+ public void testTwoConsumersLimitedPollingConsumerPollStrategy() throws Exception {
+ Exception expectedException = new Exception("Hello");
+
+ strategy = new LimitedPollingConsumerPollStrategy();
+ strategy.setLimit(3);
+
+ MockScheduledPollConsumer consumer = new MockScheduledPollConsumer(expectedException);
+ consumer.setPollStrategy(strategy);
+
+ MockScheduledPollConsumer consumer2 = new MockScheduledPollConsumer(null);
+ consumer2.setPollStrategy(strategy);
- // 3 retries + 1 last failed attempt when we give up
- assertEquals(4, counter);
- assertEquals("rollbackrollbackrollbackrollback", event);
+ consumer.start();
+ consumer2.start();
+
+ consumer.run();
+ consumer2.run();
+ assertTrue("Should still be started", consumer.isStarted());
+ assertTrue("Should still be started", consumer2.isStarted());
+ consumer.run();
+ consumer2.run();
+ assertTrue("Should still be started", consumer.isStarted());
+ assertTrue("Should still be started", consumer2.isStarted());
+ consumer.run();
+ consumer2.run();
+ assertTrue("Should be suspended", consumer.isSuspended());
+ assertTrue("Should still be started", consumer2.isStarted());
+
+ consumer.stop();
+ consumer2.stop();
}
- public void testNoExceptionOnPoll() throws Exception {
- MockScheduledPollConsumer consumer = new MockScheduledPollConsumer(null);
+ public void testRestartManuallyLimitedPollingConsumerPollStrategy() throws Exception {
+ Exception expectedException = new Exception("Hello");
+
+ strategy = new LimitedPollingConsumerPollStrategy();
+ strategy.setLimit(3);
+
+ MockScheduledPollConsumer consumer = new MockScheduledPollConsumer(expectedException);
+ consumer.setPollStrategy(strategy);
+
consumer.start();
- consumer.run();
+
+ consumer.run();
+ assertTrue("Should still be started", consumer.isStarted());
+ consumer.run();
+ assertTrue("Should still be started", consumer.isStarted());
+ consumer.run();
+ assertTrue("Should be suspended", consumer.isSuspended());
+
+ // now start the consumer again
+ ServiceHelper.resumeService(consumer);
+
+ consumer.run();
+ assertTrue("Should still be started", consumer.isStarted());
+ consumer.run();
+ assertTrue("Should still be started", consumer.isStarted());
+ consumer.run();
+ assertTrue("Should be suspended", consumer.isSuspended());
+
+ // now start the consumer again
+ ServiceHelper.resumeService(consumer);
+ // and let it succeed
+ consumer.setExceptionToThrowOnPoll(null);
+ consumer.run();
+ assertTrue("Should still be started", consumer.isStarted());
+ consumer.run();
+ assertTrue("Should still be started", consumer.isStarted());
+ consumer.run();
+ assertTrue("Should still be started", consumer.isStarted());
+ consumer.run();
+ assertTrue("Should still be started", consumer.isStarted());
+
consumer.stop();
}
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/LimitedPollingConsumerPollStrategyTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/LimitedPollingConsumerPollStrategyTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date