You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/08/14 13:01:04 UTC
[2/4] git commit: CAMEL-6631: Introduce
ScheduledPollConsumerScheduler SPI to plugin different schedulers for poll
consumer components such as file/ftp etc.
CAMEL-6631: Introduce ScheduledPollConsumerScheduler SPI to plugin different schedulers for poll consumer components such as file/ftp etc.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0fb78122
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0fb78122
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0fb78122
Branch: refs/heads/master
Commit: 0fb781227d276d9beb82a690d8e604cddf69e12b
Parents: 36f48fb
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Aug 14 12:06:42 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Aug 14 12:06:42 2013 +0200
----------------------------------------------------------------------
.../camel/impl/DefaultScheduledPollConsumerScheduler.java | 6 +++++-
.../java/org/apache/camel/impl/ScheduledPollConsumer.java | 7 ++++---
.../camel/impl/SingleScheduledPollConsumerScheduler.java | 9 +++++----
.../apache/camel/spi/ScheduledPollConsumerScheduler.java | 10 ++++++++--
.../component/file/FileConsumerCustomSchedulerTest.java | 7 ++++++-
.../quartz2/QuartzScheduledPollConsumerScheduler.java | 6 +++++-
6 files changed, 33 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/0fb78122/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java
index 8c0af76..83a40cb 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java
@@ -92,8 +92,12 @@ public class DefaultScheduledPollConsumerScheduler extends org.apache.camel.supp
}
@Override
- public void scheduleTask(Consumer consumer, Runnable task) {
+ public void onInit(Consumer consumer) {
this.consumer = consumer;
+ }
+
+ @Override
+ public void scheduleTask(Runnable task) {
this.task = task;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/0fb78122/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
index 1670354..546b83a 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
@@ -397,10 +397,11 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer implements R
scheduler = new DefaultScheduledPollConsumerScheduler();
}
scheduler.setCamelContext(getEndpoint().getCamelContext());
+ scheduler.onInit(this);
if (!(scheduler instanceof SingleScheduledPollConsumerScheduler)) {
// schedule task if its not the single scheduled
- scheduler.scheduleTask(this, this);
+ scheduler.scheduleTask(this);
}
// configure scheduler with options from this consumer
@@ -459,7 +460,7 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer implements R
public void onInit() throws Exception {
// use a single scheduler so we do not have it running it periodically when we use
// this consumer as a EventDrivenPollingConsumer
- scheduler = new SingleScheduledPollConsumerScheduler(this);
+ scheduler = new SingleScheduledPollConsumerScheduler();
}
@Override
@@ -467,7 +468,7 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer implements R
if (LOG.isTraceEnabled()) {
LOG.trace("Before poll {}", getEndpoint());
}
- scheduler.scheduleTask(this, this);
+ scheduler.scheduleTask(this);
return timeout;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/0fb78122/camel-core/src/main/java/org/apache/camel/impl/SingleScheduledPollConsumerScheduler.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/SingleScheduledPollConsumerScheduler.java b/camel-core/src/main/java/org/apache/camel/impl/SingleScheduledPollConsumerScheduler.java
index 3d9e22e..347e1d8 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/SingleScheduledPollConsumerScheduler.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/SingleScheduledPollConsumerScheduler.java
@@ -25,7 +25,7 @@ import org.apache.camel.spi.ScheduledPollConsumerScheduler;
/**
* A {@link ScheduledPollConsumerScheduler} which is <b>not</b> scheduled but uses a regular single-threaded {@link ExecutorService}
- * to execute the task when {@link #scheduleTask(org.apache.camel.Consumer, Runnable)} is invoked.
+ * to execute the task when {@link #scheduleTask(Runnable)} is invoked.
* <p/>
* This is used when the {@link org.apache.camel.PollingConsumer} EIP is implemented using the {@link EventDrivenPollingConsumer}
* bridging a {@link ScheduledPollConsumer} implementation. In this case we use this single threaded regular thread pool
@@ -34,17 +34,18 @@ import org.apache.camel.spi.ScheduledPollConsumerScheduler;
*/
public class SingleScheduledPollConsumerScheduler extends org.apache.camel.support.ServiceSupport implements ScheduledPollConsumerScheduler {
- private final Consumer consumer;
+ private Consumer consumer;
private CamelContext camelContext;
private ExecutorService executorService;
private Future future;
- public SingleScheduledPollConsumerScheduler(Consumer consumer) {
+ @Override
+ public void onInit(Consumer consumer) {
this.consumer = consumer;
}
@Override
- public void scheduleTask(Consumer consumer, Runnable task) {
+ public void scheduleTask(Runnable task) {
if (isSchedulerStarted()) {
future = executorService.submit(task);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/0fb78122/camel-core/src/main/java/org/apache/camel/spi/ScheduledPollConsumerScheduler.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/spi/ScheduledPollConsumerScheduler.java b/camel-core/src/main/java/org/apache/camel/spi/ScheduledPollConsumerScheduler.java
index db9b41f..784676a 100644
--- a/camel-core/src/main/java/org/apache/camel/spi/ScheduledPollConsumerScheduler.java
+++ b/camel-core/src/main/java/org/apache/camel/spi/ScheduledPollConsumerScheduler.java
@@ -32,12 +32,18 @@ import org.apache.camel.ShutdownableService;
public interface ScheduledPollConsumerScheduler extends ShutdownableService, CamelContextAware {
/**
- * Schedules the task to run.
+ * Initializes this {@link ScheduledPollConsumerScheduler} with the associated {@link Consumer}.
*
* @param consumer the consumer.
+ */
+ void onInit(Consumer consumer);
+
+ /**
+ * Schedules the task to run.
+ *
* @param task the task to run.
*/
- void scheduleTask(Consumer consumer, Runnable task);
+ void scheduleTask(Runnable task);
/**
* Attempts to unschedules the last task which was scheduled.
http://git-wip-us.apache.org/repos/asf/camel/blob/0fb78122/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomSchedulerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomSchedulerTest.java b/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomSchedulerTest.java
index ca22bf0..a466086 100644
--- a/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomSchedulerTest.java
+++ b/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomSchedulerTest.java
@@ -78,7 +78,12 @@ public class FileConsumerCustomSchedulerTest extends ContextTestSupport {
private String foo;
@Override
- public void scheduleTask(final Consumer consumer, final Runnable task) {
+ public void onInit(Consumer consumer) {
+ // noop
+ }
+
+ @Override
+ public void scheduleTask(final Runnable task) {
this.timerTask = new TimerTask() {
@Override
public void run() {
http://git-wip-us.apache.org/repos/asf/camel/blob/0fb78122/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
----------------------------------------------------------------------
diff --git a/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java b/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
index fec05c5..5b460e0 100644
--- a/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
+++ b/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
@@ -57,8 +57,12 @@ public class QuartzScheduledPollConsumerScheduler extends ServiceSupport impleme
private volatile JobDetail job;
@Override
- public void scheduleTask(Consumer consumer, Runnable runnable) {
+ public void onInit(Consumer consumer) {
this.consumer = consumer;
+ }
+
+ @Override
+ public void scheduleTask(Runnable runnable) {
this.runnable = runnable;
}