You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/08/09 11:58:26 UTC

svn commit: r1155292 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/ main/java/org/apache/camel/impl/ test/java/org/apache/camel/component/file/

Author: davsclaus
Date: Tue Aug  9 09:58:25 2011
New Revision: 1155292

URL: http://svn.apache.org/viewvc?rev=1155292&view=rev
Log:
CAMEL-4318: Scheduled poll consumer from pollEnrich should not be polling when route starts, but on demand

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichNoWaitTest.java
      - copied, changed from r1155236, camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/PollingConsumerPollingStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/PollingConsumerPollingStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/PollingConsumerPollingStrategy.java?rev=1155292&r1=1155291&r2=1155292&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/PollingConsumerPollingStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/PollingConsumerPollingStrategy.java Tue Aug  9 09:58:25 2011
@@ -36,9 +36,12 @@ public interface PollingConsumerPollingS
     /**
      * Callback invoked before the poll.
      *
+     * @param timeout the timeout
      * @throws Exception can be thrown if error occurred
+     * @return timeout to be used, this allows returning a higher timeout value
+     * to ensure at least one poll is being performed
      */
-    void beforePoll() throws Exception;
+    long beforePoll(long timeout) throws Exception;
 
     /**
      * Callback invoked after the poll.

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java?rev=1155292&r1=1155291&r2=1155292&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java Tue Aug  9 09:58:25 2011
@@ -65,7 +65,8 @@ public class EventDrivenPollingConsumer 
 
         while (isRunAllowed()) {
             try {
-                beforePoll();
+                beforePoll(0);
+                // take will block waiting for message
                 return queue.take();
             } catch (InterruptedException e) {
                 handleInterruptedException(e);
@@ -83,13 +84,9 @@ public class EventDrivenPollingConsumer 
             throw new RejectedExecutionException(this + " is not started, but in state: " + getStatus().name());
         }
 
-        // if the queue is empty and there is no wait then return null
-        if (timeout == 0 && queue.isEmpty()) {
-            return null;
-        }
-
         try {
-            beforePoll();
+            // use the timeout value returned from beforePoll
+            timeout = beforePoll(timeout);
             return queue.poll(timeout, TimeUnit.MILLISECONDS);
         } catch (InterruptedException e) {
             handleInterruptedException(e);
@@ -115,15 +112,16 @@ public class EventDrivenPollingConsumer 
         getInterruptedExceptionHandler().handleException(e);
     }
 
-    protected void beforePoll() {
+    protected long beforePoll(long timeout) {
         if (consumer instanceof PollingConsumerPollingStrategy) {
             PollingConsumerPollingStrategy strategy = (PollingConsumerPollingStrategy) consumer;
             try {
-                strategy.beforePoll();
+                timeout = strategy.beforePoll(timeout);
             } catch (Exception e) {
                 LOG.debug("Error occurred before polling " + consumer + ". This exception will be ignored.", e);
             }
         }
+        return timeout;
     }
 
     protected void afterPoll() {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java?rev=1155292&r1=1155291&r2=1155292&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java Tue Aug  9 09:58:25 2011
@@ -276,20 +276,28 @@ public abstract class ScheduledPollConsu
 
     @Override
     public void onInit() throws Exception {
-        // start our self
-        ServiceHelper.startService(this);
+        // noop
     }
 
     @Override
-    public void beforePoll() throws Exception {
-        // resume our self
-        ServiceHelper.resumeService(this);
+    public long beforePoll(long timeout) throws Exception {
+        LOG.trace("Before poll {}", getEndpoint());
+        // resume or start our self
+        if (!ServiceHelper.resumeService(this)) {
+            ServiceHelper.startService(this);
+        };
+
+        // ensure at least timeout is as long as one poll delay
+        return Math.max(timeout, getDelay());
     }
 
     @Override
     public void afterPoll() throws Exception {
-        // suspend our self
-        ServiceHelper.suspendService(this);
+        LOG.trace("After poll {}", getEndpoint());
+        // suspend or stop our self
+        if (!ServiceHelper.suspendService(this)) {
+            ServiceHelper.stopService(this);
+        }
     }
 
 }

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichNoWaitTest.java (from r1155236, camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichNoWaitTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichNoWaitTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichTest.java&r1=1155236&r2=1155292&rev=1155292&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichNoWaitTest.java Tue Aug  9 09:58:25 2011
@@ -28,7 +28,7 @@ import org.apache.camel.component.mock.M
 /**
  * @version 
  */
-public class FilePollEnrichTest extends ContextTestSupport {
+public class FilePollEnrichNoWaitTest extends ContextTestSupport {
 
     @Override
     protected void setUp() throws Exception {
@@ -36,7 +36,7 @@ public class FilePollEnrichTest extends 
         super.setUp();
     }
 
-    public void testFilePollEnrich() throws Exception {
+    public void testFilePollEnrichNoWait() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedBodiesReceived("Hello World");
         mock.expectedFileExists("target/pollenrich/done/hello.txt");
@@ -56,9 +56,14 @@ public class FilePollEnrichTest extends 
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("timer:foo?period=2000").routeId("foo")
-                    .pollEnrich("file:target/pollenrich?move=done", 5000)
+                from("timer:foo?period=1000").routeId("foo")
+                    .log("Trigger timer foo")
+                    .pollEnrich("file:target/pollenrich?move=done")
                     .convertBodyTo(String.class)
+                    .filter(body().isNull())
+                        .stop()
+                    .end()
+                    .log("Polled filed ${file:name}")
                     .to("mock:result")
                     .process(new Processor() {
                         public void process(Exchange exchange) throws Exception {

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichTest.java?rev=1155292&r1=1155291&r2=1155292&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichTest.java Tue Aug  9 09:58:25 2011
@@ -56,9 +56,11 @@ public class FilePollEnrichTest extends 
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("timer:foo?period=2000").routeId("foo")
+                from("timer:foo?period=1000").routeId("foo")
+                    .log("Trigger timer foo")
                     .pollEnrich("file:target/pollenrich?move=done", 5000)
                     .convertBodyTo(String.class)
+                    .log("Polled filed ${file:name}")
                     .to("mock:result")
                     .process(new Processor() {
                         public void process(Exchange exchange) throws Exception {