You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/08/25 10:01:20 UTC
git commit: CAMEL-7663: QuartzScheduledPollConsumerScheduler supports
jdbc datastore and is cluster aware.
Repository: camel
Updated Branches:
refs/heads/master 6482bf78b -> ddf9e0be5
CAMEL-7663: QuartzScheduledPollConsumerScheduler supports jdbc datastore and is cluster aware.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ddf9e0be
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ddf9e0be
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ddf9e0be
Branch: refs/heads/master
Commit: ddf9e0be50858c7ed43fc401e4417b62a73152db
Parents: 6482bf7
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Aug 25 10:01:03 2014 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Aug 25 10:01:03 2014 +0200
----------------------------------------------------------------------
.../camel/component/quartz2/CamelJob.java | 6 ++--
.../camel/component/quartz2/QuartzEndpoint.java | 12 +------
.../camel/component/quartz2/QuartzHelper.java | 15 +++++++++
.../quartz2/QuartzScheduledPollConsumerJob.java | 33 +++++++++++++++++---
.../QuartzScheduledPollConsumerScheduler.java | 21 ++++++++++++-
5 files changed, 67 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/ddf9e0be/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/CamelJob.java
----------------------------------------------------------------------
diff --git a/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/CamelJob.java b/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/CamelJob.java
index df580a0..1d5d88d 100644
--- a/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/CamelJob.java
+++ b/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/CamelJob.java
@@ -69,7 +69,7 @@ public class CamelJob implements Job {
}
}
- private CamelContext getCamelContext(JobExecutionContext context) throws JobExecutionException {
+ protected CamelContext getCamelContext(JobExecutionContext context) throws JobExecutionException {
SchedulerContext schedulerContext = getSchedulerContext(context);
String camelContextName = context.getMergedJobDataMap().getString(QuartzConstants.QUARTZ_CAMEL_CONTEXT_NAME);
CamelContext result = (CamelContext)schedulerContext.get(QuartzConstants.QUARTZ_CAMEL_CONTEXT + "-" + camelContextName);
@@ -79,7 +79,7 @@ public class CamelJob implements Job {
return result;
}
- private SchedulerContext getSchedulerContext(JobExecutionContext context) throws JobExecutionException {
+ protected SchedulerContext getSchedulerContext(JobExecutionContext context) throws JobExecutionException {
try {
return context.getScheduler().getContext();
} catch (SchedulerException e) {
@@ -87,7 +87,7 @@ public class CamelJob implements Job {
}
}
- private QuartzEndpoint lookupQuartzEndpoint(CamelContext camelContext, JobExecutionContext quartzContext) throws JobExecutionException {
+ protected QuartzEndpoint lookupQuartzEndpoint(CamelContext camelContext, JobExecutionContext quartzContext) throws JobExecutionException {
TriggerKey triggerKey = quartzContext.getTrigger().getKey();
if (LOG.isDebugEnabled()) {
LOG.debug("Looking up existing QuartzEndpoint with triggerKey={}", triggerKey);
http://git-wip-us.apache.org/repos/asf/camel/blob/ddf9e0be/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/QuartzEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/QuartzEndpoint.java b/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/QuartzEndpoint.java
index 6d2167d..c4cb731 100644
--- a/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/QuartzEndpoint.java
+++ b/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/QuartzEndpoint.java
@@ -240,7 +240,7 @@ public class QuartzEndpoint extends DefaultEndpoint {
jobDetail = createJobDetail();
trigger = createTrigger(jobDetail);
- updateJobDataMap(jobDetail);
+ QuartzHelper.updateJobDataMap(getCamelContext(), jobDetail, getEndpointUri());
// Schedule it now. Remember that scheduler might not be started it, but we can schedule now.
Date nextFireDate = scheduler.scheduleJob(jobDetail, trigger);
@@ -274,16 +274,6 @@ public class QuartzEndpoint extends DefaultEndpoint {
}
}
- private void updateJobDataMap(JobDetail jobDetail) {
- // Store this camelContext name into the job data
- JobDataMap jobDataMap = jobDetail.getJobDataMap();
- String camelContextName = QuartzHelper.getQuartzContextName(getCamelContext());
- String endpointUri = getEndpointUri();
- LOG.debug("Adding camelContextName={}, endpointUri={} into job data map.", camelContextName, endpointUri);
- jobDataMap.put(QuartzConstants.QUARTZ_CAMEL_CONTEXT_NAME, camelContextName);
- jobDataMap.put(QuartzConstants.QUARTZ_ENDPOINT_URI, endpointUri);
- }
-
private Trigger createTrigger(JobDetail jobDetail) throws Exception {
Trigger result;
Date startTime = new Date();
http://git-wip-us.apache.org/repos/asf/camel/blob/ddf9e0be/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/QuartzHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/QuartzHelper.java b/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/QuartzHelper.java
index 0d5c5bc..bb6db52 100644
--- a/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/QuartzHelper.java
+++ b/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/QuartzHelper.java
@@ -17,9 +17,15 @@
package org.apache.camel.component.quartz2;
import org.apache.camel.CamelContext;
+import org.quartz.JobDataMap;
+import org.quartz.JobDetail;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public final class QuartzHelper {
+ public static final Logger LOG = LoggerFactory.getLogger(QuartzEndpoint.class);
+
private QuartzHelper() {
}
@@ -32,4 +38,13 @@ public final class QuartzHelper {
}
}
+ public static void updateJobDataMap(CamelContext camelContext, JobDetail jobDetail, String endpointUri) {
+ // Store this camelContext name into the job data
+ JobDataMap jobDataMap = jobDetail.getJobDataMap();
+ String camelContextName = QuartzHelper.getQuartzContextName(camelContext);
+ LOG.debug("Adding camelContextName={}, endpointUri={} into job data map.", camelContextName, endpointUri);
+ jobDataMap.put(QuartzConstants.QUARTZ_CAMEL_CONTEXT_NAME, camelContextName);
+ jobDataMap.put(QuartzConstants.QUARTZ_ENDPOINT_URI, endpointUri);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/ddf9e0be/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerJob.java
----------------------------------------------------------------------
diff --git a/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerJob.java b/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerJob.java
index c2ddde0..afaaa7e 100644
--- a/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerJob.java
+++ b/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerJob.java
@@ -16,13 +16,16 @@
*/
package org.apache.camel.pollconsumer.quartz2;
-import org.quartz.Job;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Consumer;
+import org.apache.camel.Route;
+import org.apache.camel.component.quartz2.CamelJob;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class QuartzScheduledPollConsumerJob implements Job {
+public class QuartzScheduledPollConsumerJob extends CamelJob {
private static final Logger LOG = LoggerFactory.getLogger(QuartzScheduledPollConsumerJob.class);
@@ -30,10 +33,30 @@ public class QuartzScheduledPollConsumerJob implements Job {
}
@Override
- public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
- LOG.trace("Execute job: {}", jobExecutionContext);
+ public void execute(JobExecutionContext context) throws JobExecutionException {
+ LOG.trace("Execute job: {}", context);
+
+ CamelContext camelContext = getCamelContext(context);
+
+ Runnable task = (Runnable) context.getJobDetail().getJobDataMap().get("task");
+
+ if (task == null) {
+ // if not task then use the route id to lookup the consumer to be used as the task
+ String routeId = (String) context.getJobDetail().getJobDataMap().get("routeId");
+ if (routeId != null && camelContext != null) {
+ // find the consumer
+ for (Route route : camelContext.getRoutes()) {
+ if (route.getId().equals(routeId)) {
+ Consumer consumer = route.getConsumer();
+ if (consumer instanceof Runnable) {
+ task = (Runnable) consumer;
+ break;
+ }
+ }
+ }
+ }
+ }
- Runnable task = (Runnable) jobExecutionContext.getJobDetail().getJobDataMap().get("task");
if (task != null) {
LOG.trace("Running task: {}", task);
task.run();
http://git-wip-us.apache.org/repos/asf/camel/blob/ddf9e0be/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
----------------------------------------------------------------------
diff --git a/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java b/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
index 0e65438..6ddf670 100644
--- a/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
+++ b/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
@@ -20,8 +20,10 @@ import java.util.TimeZone;
import org.apache.camel.CamelContext;
import org.apache.camel.Consumer;
+import org.apache.camel.Route;
import org.apache.camel.component.quartz2.QuartzComponent;
import org.apache.camel.component.quartz2.QuartzConstants;
+import org.apache.camel.component.quartz2.QuartzHelper;
import org.apache.camel.spi.ScheduledPollConsumerScheduler;
import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.ObjectHelper;
@@ -45,6 +47,7 @@ public class QuartzScheduledPollConsumerScheduler extends ServiceSupport impleme
private static final Logger LOG = LoggerFactory.getLogger(QuartzScheduledPollConsumerScheduler.class);
private Scheduler quartzScheduler;
private CamelContext camelContext;
+ private String routeId;
private Consumer consumer;
private Runnable runnable;
private String cron;
@@ -57,6 +60,13 @@ public class QuartzScheduledPollConsumerScheduler extends ServiceSupport impleme
@Override
public void onInit(Consumer consumer) {
this.consumer = consumer;
+ // find the route of the consumer
+ for (Route route : consumer.getEndpoint().getCamelContext().getRoutes()) {
+ if (route.getConsumer() == consumer) {
+ this.routeId = route.getId();
+ break;
+ }
+ }
}
@Override
@@ -151,7 +161,12 @@ public class QuartzScheduledPollConsumerScheduler extends ServiceSupport impleme
}
JobDataMap map = new JobDataMap();
- map.put("task", runnable);
+ // do not store task as its not serializable, if we have route id
+ if (routeId != null) {
+ map.put("routeId", routeId);
+ } else {
+ map.put("task", runnable);
+ }
map.put(QuartzConstants.QUARTZ_TRIGGER_TYPE, "cron");
map.put(QuartzConstants.QUARTZ_TRIGGER_CRON_EXPRESSION, getCron());
map.put(QuartzConstants.QUARTZ_TRIGGER_CRON_TIMEZONE, getTimeZone().getID());
@@ -160,6 +175,9 @@ public class QuartzScheduledPollConsumerScheduler extends ServiceSupport impleme
.usingJobData(map)
.build();
+ // store additional information on job such as camel context etc
+ QuartzHelper.updateJobDataMap(getCamelContext(), job, null);
+
String id = triggerId;
if (id == null) {
id = "trigger-" + getCamelContext().getUuidGenerator().generateUuid();
@@ -185,4 +203,5 @@ public class QuartzScheduledPollConsumerScheduler extends ServiceSupport impleme
@Override
protected void doShutdown() throws Exception {
}
+
}