You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by dk...@apache.org on 2011/09/19 19:47:37 UTC

svn commit: r1172730 - in /camel/branches/camel-2.8.x: ./ camel-core/src/main/java/org/apache/camel/ camel-core/src/main/java/org/apache/camel/impl/ camel-core/src/test/java/org/apache/camel/component/file/

Author: dkulp
Date: Mon Sep 19 17:47:36 2011
New Revision: 1172730

URL: http://svn.apache.org/viewvc?rev=1172730&view=rev
Log:
Merged revisions 1155292 via svnmerge from 
https://svn.apache.org/repos/asf/camel/trunk

........
  r1155292 | davsclaus | 2011-08-09 05:58:25 -0400 (Tue, 09 Aug 2011) | 1 line
  
  CAMEL-4318: Scheduled poll consumer from pollEnrich should not be polling when route starts, but on demand
........

Added:
    camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichNoWaitTest.java
      - copied unchanged from r1155292, camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichNoWaitTest.java
Modified:
    camel/branches/camel-2.8.x/   (props changed)
    camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/PollingConsumerPollingStrategy.java
    camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
    camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
    camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichTest.java

Propchange: camel/branches/camel-2.8.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Sep 19 17:47:36 2011
@@ -1 +1 @@
-/camel/trunk:1148706,1148710,1149570,1150300-1150311,1150651,1151000,1151054,1151087,1151126,1151362,1152170,1152569,1152733,1152755,1152868,1153620,1153812,1153829,1154684,1155230,1156108,1156260,1156277,1156479,1156524,1157348,1157749,1157798,1157831,1157878,1158153,1158230,1158295,1159171,1159174,1159326,1159457,1159460,1159606,1159682-1159683,1159867,1160547,1160637,1161010,1161082,1161524,1162309,1162395,1163231,1163420,1163656-1163669,1163725,1164343,1164544,1164557,1164633,1164972-1165000,1165152,1165157,1165658,1165971,1165987,1167098,1167131,1167448,1167487,1167555,1169610,1169620,1170122,1170226,1170397,1170956,1171396,1171755,1171941,1171947
+/camel/trunk:1148706,1148710,1149570,1150300-1150311,1150651,1151000,1151054,1151087,1151126,1151362,1152170,1152569,1152733,1152755,1152868,1153620,1153812,1153829,1154684,1155230,1155292,1156108,1156260,1156277,1156479,1156524,1157348,1157749,1157798,1157831,1157878,1158153,1158230,1158295,1159171,1159174,1159326,1159457,1159460,1159606,1159682-1159683,1159867,1160547,1160637,1161010,1161082,1161524,1162309,1162395,1163231,1163420,1163656-1163669,1163725,1164343,1164544,1164557,1164633,1164972-1165000,1165152,1165157,1165658,1165971,1165987,1167098,1167131,1167448,1167487,1167555,1169610,1169620,1170122,1170226,1170397,1170956,1171396,1171755,1171941,1171947

Propchange: camel/branches/camel-2.8.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/PollingConsumerPollingStrategy.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/PollingConsumerPollingStrategy.java?rev=1172730&r1=1172729&r2=1172730&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/PollingConsumerPollingStrategy.java (original)
+++ camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/PollingConsumerPollingStrategy.java Mon Sep 19 17:47:36 2011
@@ -36,9 +36,12 @@ public interface PollingConsumerPollingS
     /**
      * Callback invoked before the poll.
      *
+     * @param timeout the timeout
      * @throws Exception can be thrown if error occurred
+     * @return timeout to be used, this allows returning a higher timeout value
+     * to ensure at least one poll is being performed
      */
-    void beforePoll() throws Exception;
+    long beforePoll(long timeout) throws Exception;
 
     /**
      * Callback invoked after the poll.

Modified: camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java?rev=1172730&r1=1172729&r2=1172730&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java (original)
+++ camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java Mon Sep 19 17:47:36 2011
@@ -65,7 +65,8 @@ public class EventDrivenPollingConsumer 
 
         while (isRunAllowed()) {
             try {
-                beforePoll();
+                beforePoll(0);
+                // take will block waiting for message
                 return queue.take();
             } catch (InterruptedException e) {
                 handleInterruptedException(e);
@@ -83,13 +84,9 @@ public class EventDrivenPollingConsumer 
             throw new RejectedExecutionException(this + " is not started, but in state: " + getStatus().name());
         }
 
-        // if the queue is empty and there is no wait then return null
-        if (timeout == 0 && queue.isEmpty()) {
-            return null;
-        }
-
         try {
-            beforePoll();
+            // use the timeout value returned from beforePoll
+            timeout = beforePoll(timeout);
             return queue.poll(timeout, TimeUnit.MILLISECONDS);
         } catch (InterruptedException e) {
             handleInterruptedException(e);
@@ -115,15 +112,16 @@ public class EventDrivenPollingConsumer 
         getInterruptedExceptionHandler().handleException(e);
     }
 
-    protected void beforePoll() {
+    protected long beforePoll(long timeout) {
         if (consumer instanceof PollingConsumerPollingStrategy) {
             PollingConsumerPollingStrategy strategy = (PollingConsumerPollingStrategy) consumer;
             try {
-                strategy.beforePoll();
+                timeout = strategy.beforePoll(timeout);
             } catch (Exception e) {
                 LOG.debug("Error occurred before polling " + consumer + ". This exception will be ignored.", e);
             }
         }
+        return timeout;
     }
 
     protected void afterPoll() {

Modified: camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java?rev=1172730&r1=1172729&r2=1172730&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java (original)
+++ camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java Mon Sep 19 17:47:36 2011
@@ -275,20 +275,28 @@ public abstract class ScheduledPollConsu
 
     @Override
     public void onInit() throws Exception {
-        // start our self
-        ServiceHelper.startService(this);
+        // noop
     }
 
     @Override
-    public void beforePoll() throws Exception {
-        // resume our self
-        ServiceHelper.resumeService(this);
+    public long beforePoll(long timeout) throws Exception {
+        LOG.trace("Before poll {}", getEndpoint());
+        // resume or start our self
+        if (!ServiceHelper.resumeService(this)) {
+            ServiceHelper.startService(this);
+        };
+
+        // ensure at least timeout is as long as one poll delay
+        return Math.max(timeout, getDelay());
     }
 
     @Override
     public void afterPoll() throws Exception {
-        // suspend our self
-        ServiceHelper.suspendService(this);
+        LOG.trace("After poll {}", getEndpoint());
+        // suspend or stop our self
+        if (!ServiceHelper.suspendService(this)) {
+            ServiceHelper.stopService(this);
+        }
     }
 
 }

Modified: camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichTest.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichTest.java?rev=1172730&r1=1172729&r2=1172730&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichTest.java (original)
+++ camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichTest.java Mon Sep 19 17:47:36 2011
@@ -56,9 +56,11 @@ public class FilePollEnrichTest extends 
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("timer:foo?period=2000").routeId("foo")
+                from("timer:foo?period=1000").routeId("foo")
+                    .log("Trigger timer foo")
                     .pollEnrich("file:target/pollenrich?move=done", 5000)
                     .convertBodyTo(String.class)
+                    .log("Polled filed ${file:name}")
                     .to("mock:result")
                     .process(new Processor() {
                         public void process(Exchange exchange) throws Exception {