You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/08/25 10:01:20 UTC

git commit: CAMEL-7663: QuartzScheduledPollConsumerScheduler supports jdbc datastore and is cluster aware.

Repository: camel
Updated Branches:
  refs/heads/master 6482bf78b -> ddf9e0be5


CAMEL-7663: QuartzScheduledPollConsumerScheduler supports jdbc datastore and is cluster aware.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ddf9e0be
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ddf9e0be
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ddf9e0be

Branch: refs/heads/master
Commit: ddf9e0be50858c7ed43fc401e4417b62a73152db
Parents: 6482bf7
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Aug 25 10:01:03 2014 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Aug 25 10:01:03 2014 +0200

----------------------------------------------------------------------
 .../camel/component/quartz2/CamelJob.java       |  6 ++--
 .../camel/component/quartz2/QuartzEndpoint.java | 12 +------
 .../camel/component/quartz2/QuartzHelper.java   | 15 +++++++++
 .../quartz2/QuartzScheduledPollConsumerJob.java | 33 +++++++++++++++++---
 .../QuartzScheduledPollConsumerScheduler.java   | 21 ++++++++++++-
 5 files changed, 67 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/ddf9e0be/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/CamelJob.java
----------------------------------------------------------------------
diff --git a/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/CamelJob.java b/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/CamelJob.java
index df580a0..1d5d88d 100644
--- a/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/CamelJob.java
+++ b/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/CamelJob.java
@@ -69,7 +69,7 @@ public class CamelJob implements Job {
         }
     }
 
-    private CamelContext getCamelContext(JobExecutionContext context) throws JobExecutionException {
+    protected CamelContext getCamelContext(JobExecutionContext context) throws JobExecutionException {
         SchedulerContext schedulerContext = getSchedulerContext(context);
         String camelContextName = context.getMergedJobDataMap().getString(QuartzConstants.QUARTZ_CAMEL_CONTEXT_NAME);
         CamelContext result = (CamelContext)schedulerContext.get(QuartzConstants.QUARTZ_CAMEL_CONTEXT + "-" + camelContextName);
@@ -79,7 +79,7 @@ public class CamelJob implements Job {
         return result;
     }
 
-    private SchedulerContext getSchedulerContext(JobExecutionContext context) throws JobExecutionException {
+    protected SchedulerContext getSchedulerContext(JobExecutionContext context) throws JobExecutionException {
         try {
             return context.getScheduler().getContext();
         } catch (SchedulerException e) {
@@ -87,7 +87,7 @@ public class CamelJob implements Job {
         }
     }
 
-    private QuartzEndpoint lookupQuartzEndpoint(CamelContext camelContext, JobExecutionContext quartzContext) throws JobExecutionException {
+    protected QuartzEndpoint lookupQuartzEndpoint(CamelContext camelContext, JobExecutionContext quartzContext) throws JobExecutionException {
         TriggerKey triggerKey = quartzContext.getTrigger().getKey();
         if (LOG.isDebugEnabled()) {
             LOG.debug("Looking up existing QuartzEndpoint with triggerKey={}", triggerKey);

http://git-wip-us.apache.org/repos/asf/camel/blob/ddf9e0be/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/QuartzEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/QuartzEndpoint.java b/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/QuartzEndpoint.java
index 6d2167d..c4cb731 100644
--- a/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/QuartzEndpoint.java
+++ b/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/QuartzEndpoint.java
@@ -240,7 +240,7 @@ public class QuartzEndpoint extends DefaultEndpoint {
             jobDetail = createJobDetail();
             trigger = createTrigger(jobDetail);
 
-            updateJobDataMap(jobDetail);
+            QuartzHelper.updateJobDataMap(getCamelContext(), jobDetail, getEndpointUri());
 
             // Schedule it now. Remember that scheduler might not be started it, but we can schedule now.
             Date nextFireDate = scheduler.scheduleJob(jobDetail, trigger);
@@ -274,16 +274,6 @@ public class QuartzEndpoint extends DefaultEndpoint {
         }
     }
 
-    private void updateJobDataMap(JobDetail jobDetail) {
-        // Store this camelContext name into the job data
-        JobDataMap jobDataMap = jobDetail.getJobDataMap();
-        String camelContextName = QuartzHelper.getQuartzContextName(getCamelContext());
-        String endpointUri = getEndpointUri();
-        LOG.debug("Adding camelContextName={}, endpointUri={} into job data map.", camelContextName, endpointUri);
-        jobDataMap.put(QuartzConstants.QUARTZ_CAMEL_CONTEXT_NAME, camelContextName);
-        jobDataMap.put(QuartzConstants.QUARTZ_ENDPOINT_URI, endpointUri);
-    }
-
     private Trigger createTrigger(JobDetail jobDetail) throws Exception {
         Trigger result;
         Date startTime = new Date();

http://git-wip-us.apache.org/repos/asf/camel/blob/ddf9e0be/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/QuartzHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/QuartzHelper.java b/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/QuartzHelper.java
index 0d5c5bc..bb6db52 100644
--- a/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/QuartzHelper.java
+++ b/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/QuartzHelper.java
@@ -17,9 +17,15 @@
 package org.apache.camel.component.quartz2;
 
 import org.apache.camel.CamelContext;
+import org.quartz.JobDataMap;
+import org.quartz.JobDetail;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public final class QuartzHelper {
 
+    public static final Logger LOG = LoggerFactory.getLogger(QuartzEndpoint.class);
+
     private QuartzHelper() {
     }
 
@@ -32,4 +38,13 @@ public final class QuartzHelper {
         }
     }
 
+    public static void updateJobDataMap(CamelContext camelContext, JobDetail jobDetail, String endpointUri) {
+        // Store this camelContext name into the job data
+        JobDataMap jobDataMap = jobDetail.getJobDataMap();
+        String camelContextName = QuartzHelper.getQuartzContextName(camelContext);
+        LOG.debug("Adding camelContextName={}, endpointUri={} into job data map.", camelContextName, endpointUri);
+        jobDataMap.put(QuartzConstants.QUARTZ_CAMEL_CONTEXT_NAME, camelContextName);
+        jobDataMap.put(QuartzConstants.QUARTZ_ENDPOINT_URI, endpointUri);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/ddf9e0be/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerJob.java
----------------------------------------------------------------------
diff --git a/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerJob.java b/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerJob.java
index c2ddde0..afaaa7e 100644
--- a/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerJob.java
+++ b/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerJob.java
@@ -16,13 +16,16 @@
  */
 package org.apache.camel.pollconsumer.quartz2;
 
-import org.quartz.Job;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Consumer;
+import org.apache.camel.Route;
+import org.apache.camel.component.quartz2.CamelJob;
 import org.quartz.JobExecutionContext;
 import org.quartz.JobExecutionException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class QuartzScheduledPollConsumerJob implements Job {
+public class QuartzScheduledPollConsumerJob extends CamelJob {
 
     private static final Logger LOG = LoggerFactory.getLogger(QuartzScheduledPollConsumerJob.class);
 
@@ -30,10 +33,30 @@ public class QuartzScheduledPollConsumerJob implements Job {
     }
 
     @Override
-    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
-        LOG.trace("Execute job: {}", jobExecutionContext);
+    public void execute(JobExecutionContext context) throws JobExecutionException {
+        LOG.trace("Execute job: {}", context);
+
+        CamelContext camelContext = getCamelContext(context);
+
+        Runnable task = (Runnable) context.getJobDetail().getJobDataMap().get("task");
+
+        if (task == null) {
+            // if not task then use the route id to lookup the consumer to be used as the task
+            String routeId = (String) context.getJobDetail().getJobDataMap().get("routeId");
+            if (routeId != null && camelContext != null) {
+                // find the consumer
+                for (Route route : camelContext.getRoutes()) {
+                    if (route.getId().equals(routeId)) {
+                        Consumer consumer = route.getConsumer();
+                        if (consumer instanceof Runnable) {
+                            task = (Runnable) consumer;
+                            break;
+                        }
+                    }
+                }
+            }
+        }
 
-        Runnable task = (Runnable) jobExecutionContext.getJobDetail().getJobDataMap().get("task");
         if (task != null) {
             LOG.trace("Running task: {}", task);
             task.run();

http://git-wip-us.apache.org/repos/asf/camel/blob/ddf9e0be/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
----------------------------------------------------------------------
diff --git a/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java b/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
index 0e65438..6ddf670 100644
--- a/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
+++ b/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
@@ -20,8 +20,10 @@ import java.util.TimeZone;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Consumer;
+import org.apache.camel.Route;
 import org.apache.camel.component.quartz2.QuartzComponent;
 import org.apache.camel.component.quartz2.QuartzConstants;
+import org.apache.camel.component.quartz2.QuartzHelper;
 import org.apache.camel.spi.ScheduledPollConsumerScheduler;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.ObjectHelper;
@@ -45,6 +47,7 @@ public class QuartzScheduledPollConsumerScheduler extends ServiceSupport impleme
     private static final Logger LOG = LoggerFactory.getLogger(QuartzScheduledPollConsumerScheduler.class);
     private Scheduler quartzScheduler;
     private CamelContext camelContext;
+    private String routeId;
     private Consumer consumer;
     private Runnable runnable;
     private String cron;
@@ -57,6 +60,13 @@ public class QuartzScheduledPollConsumerScheduler extends ServiceSupport impleme
     @Override
     public void onInit(Consumer consumer) {
         this.consumer = consumer;
+        // find the route of the consumer
+        for (Route route : consumer.getEndpoint().getCamelContext().getRoutes()) {
+            if (route.getConsumer() == consumer) {
+                this.routeId = route.getId();
+                break;
+            }
+        }
     }
 
     @Override
@@ -151,7 +161,12 @@ public class QuartzScheduledPollConsumerScheduler extends ServiceSupport impleme
         }
 
         JobDataMap map = new JobDataMap();
-        map.put("task", runnable);
+        // do not store task as its not serializable, if we have route id
+        if (routeId != null) {
+            map.put("routeId", routeId);
+        } else {
+            map.put("task", runnable);
+        }
         map.put(QuartzConstants.QUARTZ_TRIGGER_TYPE, "cron");
         map.put(QuartzConstants.QUARTZ_TRIGGER_CRON_EXPRESSION, getCron());
         map.put(QuartzConstants.QUARTZ_TRIGGER_CRON_TIMEZONE, getTimeZone().getID());
@@ -160,6 +175,9 @@ public class QuartzScheduledPollConsumerScheduler extends ServiceSupport impleme
                 .usingJobData(map)
                 .build();
 
+        // store additional information on job such as camel context etc
+        QuartzHelper.updateJobDataMap(getCamelContext(), job, null);
+
         String id = triggerId;
         if (id == null) {
             id = "trigger-" + getCamelContext().getUuidGenerator().generateUuid();
@@ -185,4 +203,5 @@ public class QuartzScheduledPollConsumerScheduler extends ServiceSupport impleme
     @Override
     protected void doShutdown() throws Exception {
     }
+
 }