You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/08/14 13:01:04 UTC

[2/4] git commit: CAMEL-6631: Introduce ScheduledPollConsumerScheduler SPI to plugin different schedulers for poll consumer components such as file/ftp etc.

CAMEL-6631: Introduce ScheduledPollConsumerScheduler SPI to plugin different schedulers for poll consumer components such as file/ftp etc.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0fb78122
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0fb78122
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0fb78122

Branch: refs/heads/master
Commit: 0fb781227d276d9beb82a690d8e604cddf69e12b
Parents: 36f48fb
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Aug 14 12:06:42 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Aug 14 12:06:42 2013 +0200

----------------------------------------------------------------------
 .../camel/impl/DefaultScheduledPollConsumerScheduler.java |  6 +++++-
 .../java/org/apache/camel/impl/ScheduledPollConsumer.java |  7 ++++---
 .../camel/impl/SingleScheduledPollConsumerScheduler.java  |  9 +++++----
 .../apache/camel/spi/ScheduledPollConsumerScheduler.java  | 10 ++++++++--
 .../component/file/FileConsumerCustomSchedulerTest.java   |  7 ++++++-
 .../quartz2/QuartzScheduledPollConsumerScheduler.java     |  6 +++++-
 6 files changed, 33 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/0fb78122/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java
index 8c0af76..83a40cb 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java
@@ -92,8 +92,12 @@ public class DefaultScheduledPollConsumerScheduler extends org.apache.camel.supp
     }
 
     @Override
-    public void scheduleTask(Consumer consumer, Runnable task) {
+    public void onInit(Consumer consumer) {
         this.consumer = consumer;
+    }
+
+    @Override
+    public void scheduleTask(Runnable task) {
         this.task = task;
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/0fb78122/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
index 1670354..546b83a 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
@@ -397,10 +397,11 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer implements R
             scheduler = new DefaultScheduledPollConsumerScheduler();
         }
         scheduler.setCamelContext(getEndpoint().getCamelContext());
+        scheduler.onInit(this);
 
         if (!(scheduler instanceof SingleScheduledPollConsumerScheduler)) {
             // schedule task if its not the single scheduled
-            scheduler.scheduleTask(this, this);
+            scheduler.scheduleTask(this);
         }
 
         // configure scheduler with options from this consumer
@@ -459,7 +460,7 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer implements R
     public void onInit() throws Exception {
         // use a single scheduler so we do not have it running it periodically when we use
         // this consumer as a EventDrivenPollingConsumer
-        scheduler = new SingleScheduledPollConsumerScheduler(this);
+        scheduler = new SingleScheduledPollConsumerScheduler();
     }
 
     @Override
@@ -467,7 +468,7 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer implements R
         if (LOG.isTraceEnabled()) {
             LOG.trace("Before poll {}", getEndpoint());
         }
-        scheduler.scheduleTask(this, this);
+        scheduler.scheduleTask(this);
         return timeout;
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/0fb78122/camel-core/src/main/java/org/apache/camel/impl/SingleScheduledPollConsumerScheduler.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/SingleScheduledPollConsumerScheduler.java b/camel-core/src/main/java/org/apache/camel/impl/SingleScheduledPollConsumerScheduler.java
index 3d9e22e..347e1d8 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/SingleScheduledPollConsumerScheduler.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/SingleScheduledPollConsumerScheduler.java
@@ -25,7 +25,7 @@ import org.apache.camel.spi.ScheduledPollConsumerScheduler;
 
 /**
  * A {@link ScheduledPollConsumerScheduler} which is <b>not</b> scheduled but uses a regular single-threaded {@link ExecutorService}
- * to execute the task when {@link #scheduleTask(org.apache.camel.Consumer, Runnable)} is invoked.
+ * to execute the task when {@link #scheduleTask(Runnable)} is invoked.
  * <p/>
  * This is used when the {@link org.apache.camel.PollingConsumer} EIP is implemented using the {@link EventDrivenPollingConsumer}
  * bridging a {@link ScheduledPollConsumer} implementation. In this case we use this single threaded regular thread pool
@@ -34,17 +34,18 @@ import org.apache.camel.spi.ScheduledPollConsumerScheduler;
  */
 public class SingleScheduledPollConsumerScheduler extends org.apache.camel.support.ServiceSupport implements ScheduledPollConsumerScheduler {
 
-    private final Consumer consumer;
+    private Consumer consumer;
     private CamelContext camelContext;
     private ExecutorService executorService;
     private Future future;
 
-    public SingleScheduledPollConsumerScheduler(Consumer consumer) {
+    @Override
+    public void onInit(Consumer consumer) {
         this.consumer = consumer;
     }
 
     @Override
-    public void scheduleTask(Consumer consumer, Runnable task) {
+    public void scheduleTask(Runnable task) {
         if (isSchedulerStarted()) {
             future = executorService.submit(task);
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/0fb78122/camel-core/src/main/java/org/apache/camel/spi/ScheduledPollConsumerScheduler.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/spi/ScheduledPollConsumerScheduler.java b/camel-core/src/main/java/org/apache/camel/spi/ScheduledPollConsumerScheduler.java
index db9b41f..784676a 100644
--- a/camel-core/src/main/java/org/apache/camel/spi/ScheduledPollConsumerScheduler.java
+++ b/camel-core/src/main/java/org/apache/camel/spi/ScheduledPollConsumerScheduler.java
@@ -32,12 +32,18 @@ import org.apache.camel.ShutdownableService;
 public interface ScheduledPollConsumerScheduler extends ShutdownableService, CamelContextAware {
 
     /**
-     * Schedules the task to run.
+     * Initializes this {@link ScheduledPollConsumerScheduler} with the associated {@link Consumer}.
      *
      * @param consumer the consumer.
+     */
+    void onInit(Consumer consumer);
+
+    /**
+     * Schedules the task to run.
+     *
      * @param task the task to run.
      */
-    void scheduleTask(Consumer consumer, Runnable task);
+    void scheduleTask(Runnable task);
 
     /**
      * Attempts to unschedules the last task which was scheduled.

http://git-wip-us.apache.org/repos/asf/camel/blob/0fb78122/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomSchedulerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomSchedulerTest.java b/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomSchedulerTest.java
index ca22bf0..a466086 100644
--- a/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomSchedulerTest.java
+++ b/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomSchedulerTest.java
@@ -78,7 +78,12 @@ public class FileConsumerCustomSchedulerTest extends ContextTestSupport {
         private String foo;
 
         @Override
-        public void scheduleTask(final Consumer consumer, final Runnable task) {
+        public void onInit(Consumer consumer) {
+            // noop
+        }
+
+        @Override
+        public void scheduleTask(final Runnable task) {
             this.timerTask = new TimerTask() {
                 @Override
                 public void run() {

http://git-wip-us.apache.org/repos/asf/camel/blob/0fb78122/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
----------------------------------------------------------------------
diff --git a/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java b/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
index fec05c5..5b460e0 100644
--- a/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
+++ b/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
@@ -57,8 +57,12 @@ public class QuartzScheduledPollConsumerScheduler extends ServiceSupport impleme
     private volatile JobDetail job;
 
     @Override
-    public void scheduleTask(Consumer consumer, Runnable runnable) {
+    public void onInit(Consumer consumer) {
         this.consumer = consumer;
+    }
+
+    @Override
+    public void scheduleTask(Runnable runnable) {
         this.runnable = runnable;
     }