You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/07/29 12:08:00 UTC

svn commit: r980386 - in /camel/trunk/components/camel-quartz/src: main/java/org/apache/camel/component/quartz/ test/java/org/apache/camel/component/quartz/

Author: davsclaus
Date: Thu Jul 29 10:07:59 2010
New Revision: 980386

URL: http://svn.apache.org/viewvc?rev=980386&view=rev
Log:
CAMEL-3011: quartz component supporting using multiple camel contexts. Jobs should lookup and use correct camel context.

Modified:
    camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/CamelJob.java
    camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzComponent.java
    camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzConstants.java
    camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzEndpoint.java
    camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/StatefulCamelJob.java
    camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzOneCamelContextRestartTest.java
    camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzTwoCamelContextRestartTest.java

Modified: camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/CamelJob.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/CamelJob.java?rev=980386&r1=980385&r2=980386&view=diff
==============================================================================
--- camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/CamelJob.java (original)
+++ camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/CamelJob.java Thu Jul 29 10:07:59 2010
@@ -18,9 +18,12 @@ package org.apache.camel.component.quart
 
 import java.io.Serializable;
 
+import org.apache.camel.CamelContext;
 import org.quartz.Job;
 import org.quartz.JobExecutionContext;
 import org.quartz.JobExecutionException;
+import org.quartz.SchedulerContext;
+import org.quartz.SchedulerException;
 
 /**
  * @version $Revision$
@@ -28,10 +31,25 @@ import org.quartz.JobExecutionException;
 public class CamelJob implements Job, Serializable {
 
     public void execute(JobExecutionContext context) throws JobExecutionException {
-        QuartzEndpoint endpoint = (QuartzEndpoint) context.getJobDetail().getJobDataMap().get(QuartzConstants.QUARTZ_ENDPOINT);
+        String camelContextName = (String) context.getJobDetail().getJobDataMap().get(QuartzConstants.QUARTZ_CAMEL_CONTEXT_NAME);
+        String endpointUri = (String) context.getJobDetail().getJobDataMap().get(QuartzConstants.QUARTZ_ENDPOINT_URI);
+
+        SchedulerContext schedulerContext;
+        try {
+            schedulerContext = context.getScheduler().getContext();
+        } catch (SchedulerException e) {
+            throw new JobExecutionException("Failed to obtain scheduler context for job " + context.getJobDetail().getName());
+        }
+
+        CamelContext camelContext = (CamelContext) schedulerContext.get(QuartzConstants.QUARTZ_CAMEL_CONTEXT + "-" + camelContextName);
+        if (camelContext == null) {
+            throw new JobExecutionException("No CamelContext could be found with name: " + camelContextName);
+        }
+        QuartzEndpoint endpoint = camelContext.getEndpoint(endpointUri, QuartzEndpoint.class);
         if (endpoint == null) {
-            throw new JobExecutionException("No quartz endpoint available for key: " + QuartzConstants.QUARTZ_ENDPOINT);
+            throw new JobExecutionException("No QuartzEndpoint could be found with uri: " + endpointUri);
         }
         endpoint.onJobExecute(context);
     }
+
 }
\ No newline at end of file

Modified: camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzComponent.java?rev=980386&r1=980385&r2=980386&view=diff
==============================================================================
--- camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzComponent.java (original)
+++ camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzComponent.java Thu Jul 29 10:07:59 2010
@@ -20,7 +20,9 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
 import java.text.ParseException;
+import java.util.ArrayList;
 import java.util.Date;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -53,12 +55,31 @@ public class QuartzComponent extends Def
     private static final transient Log LOG = LogFactory.getLog(QuartzComponent.class);
     private static final AtomicInteger JOBS = new AtomicInteger();
     private static Scheduler scheduler;
+    private final List<JobToAdd> jobsToAdd = new ArrayList<JobToAdd>();
     private SchedulerFactory factory;
     private Properties properties;
     private String propertiesFile;
     private int startDelayedSeconds;
     private boolean autoStartScheduler = true;
 
+    private final class JobToAdd {
+        private final JobDetail job;
+        private final Trigger trigger;
+
+        private JobToAdd(JobDetail job, Trigger trigger) {
+            this.job = job;
+            this.trigger = trigger;
+        }
+
+        public JobDetail getJob() {
+            return job;
+        }
+
+        public Trigger getTrigger() {
+            return trigger;
+        }
+    }
+
     public QuartzComponent() {
     }
 
@@ -125,6 +146,11 @@ public class QuartzComponent extends Def
     }
 
     public void onCamelContextStarted(CamelContext camelContext, boolean alreadyStarted) throws Exception {
+        if (scheduler != null) {
+            // register current camel context to scheduler so we can look it up when jobs is being triggered
+            scheduler.getContext().put(QuartzConstants.QUARTZ_CAMEL_CONTEXT + "-" + getCamelContext().getName(), getCamelContext());
+        }
+
         // if not configure to auto start then don't start it
         if (!isAutoStartScheduler()) {
             LOG.info("QuartzComponent configured to not auto start Quartz scheduler.");
@@ -160,7 +186,12 @@ public class QuartzComponent extends Def
         }
     }
 
-    public void addJob(JobDetail job, Trigger trigger) throws SchedulerException {
+    public void addJob(JobDetail job, Trigger trigger) {
+        // add job to internal list because we will defer adding to the scheduler when camel context has been fully started
+        jobsToAdd.add(new JobToAdd(job, trigger));
+    }
+
+    private void doAddJob(JobDetail job, Trigger trigger) throws SchedulerException {
         JOBS.incrementAndGet();
 
         if (getScheduler().getTrigger(trigger.getName(), trigger.getGroup()) == null) {
@@ -228,13 +259,18 @@ public class QuartzComponent extends Def
      * @throws SchedulerException can be thrown if error starting
      */
     public void startScheduler() throws SchedulerException {
-        if (scheduler != null && !scheduler.isStarted()) {
+        for (JobToAdd add : jobsToAdd) {
+            doAddJob(add.getJob(), add.getTrigger());
+        }
+        jobsToAdd.clear();
+
+        if (!getScheduler().isStarted()) {
             if (getStartDelayedSeconds() > 0) {
-                LOG.info("Starting Quartz scheduler: " + scheduler.getSchedulerName() + " delayed: " + getStartDelayedSeconds() + " seconds.");
-                scheduler.startDelayed(getStartDelayedSeconds());
+                LOG.info("Starting Quartz scheduler: " + getScheduler().getSchedulerName() + " delayed: " + getStartDelayedSeconds() + " seconds.");
+                getScheduler().startDelayed(getStartDelayedSeconds());
             } else {
-                LOG.info("Starting Quartz scheduler: " + scheduler.getSchedulerName());
-                scheduler.start();
+                LOG.info("Starting Quartz scheduler: " + getScheduler().getSchedulerName());
+                getScheduler().start();
             }
         }
     }
@@ -333,7 +369,9 @@ public class QuartzComponent extends Def
 
     protected Scheduler createScheduler() throws SchedulerException {
         Scheduler scheduler = getFactory().getScheduler();
-        scheduler.getContext().put(QuartzConstants.QUARTZ_CAMEL_CONTEXT, getCamelContext());
+        // register current camel context to scheduler so we can look it up when jobs is being triggered
+        scheduler.getContext().put(QuartzConstants.QUARTZ_CAMEL_CONTEXT + "-" + getCamelContext().getName(), getCamelContext());
         return scheduler;
     }
+
 }

Modified: camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzConstants.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzConstants.java?rev=980386&r1=980385&r2=980386&view=diff
==============================================================================
--- camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzConstants.java (original)
+++ camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzConstants.java Thu Jul 29 10:07:59 2010
@@ -21,7 +21,9 @@ package org.apache.camel.component.quart
  */
 public final class QuartzConstants {
 
-    public static final String QUARTZ_ENDPOINT = "CamelQuartzEndpoint";
+    public static final String QUARTZ_ENDPOINT_URI = "CamelQuartzEndpoint";
+
+    public static final String QUARTZ_CAMEL_CONTEXT_NAME = "CamelQuartzCamelContextName";
 
     public static final String QUARTZ_CAMEL_CONTEXT = "CamelQuartzCamelContext";
 

Modified: camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzEndpoint.java?rev=980386&r1=980385&r2=980386&view=diff
==============================================================================
--- camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzEndpoint.java (original)
+++ camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzEndpoint.java Thu Jul 29 10:07:59 2010
@@ -68,7 +68,8 @@ public class QuartzEndpoint extends Defa
         if (trigger.getStartTime() == null) {
             trigger.setStartTime(new Date());
         }
-        detail.getJobDataMap().put(QuartzConstants.QUARTZ_ENDPOINT, isStateful() ? getEndpointUri() : this);
+        detail.getJobDataMap().put(QuartzConstants.QUARTZ_ENDPOINT_URI, getEndpointUri());
+        detail.getJobDataMap().put(QuartzConstants.QUARTZ_CAMEL_CONTEXT_NAME, getCamelContext().getName());
         if (detail.getJobClass() == null) {
             detail.setJobClass(isStateful() ? StatefulCamelJob.class : CamelJob.class);
         }
@@ -196,6 +197,9 @@ public class QuartzEndpoint extends Defa
     // -------------------------------------------------------------------------
     public synchronized void consumerStarted(final QuartzConsumer consumer) throws SchedulerException {
         ObjectHelper.notNull(trigger, "trigger");
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Adding consumer " + consumer.getProcessor());
+        }
         getLoadBalancer().addProcessor(consumer.getProcessor());
 
         // if we have not yet added our default trigger, then lets do it
@@ -212,6 +216,9 @@ public class QuartzEndpoint extends Defa
             started = false;
         }
 
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Removing consumer " + consumer.getProcessor());
+        }
         getLoadBalancer().removeProcessor(consumer.getProcessor());
     }
 

Modified: camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/StatefulCamelJob.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/StatefulCamelJob.java?rev=980386&r1=980385&r2=980386&view=diff
==============================================================================
--- camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/StatefulCamelJob.java (original)
+++ camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/StatefulCamelJob.java Thu Jul 29 10:07:59 2010
@@ -16,31 +16,11 @@
  */
 package org.apache.camel.component.quartz;
 
-import java.io.Serializable;
-
-import org.apache.camel.CamelContext;
-import org.quartz.JobExecutionContext;
-import org.quartz.JobExecutionException;
-import org.quartz.SchedulerContext;
-import org.quartz.SchedulerException;
 import org.quartz.StatefulJob;
 
 /**
  * Stateful job
  */
-public class StatefulCamelJob implements StatefulJob, Serializable {
-
-    public void execute(final JobExecutionContext context) throws JobExecutionException {
-        SchedulerContext schedulerContext;
-        try {
-            schedulerContext = context.getScheduler().getContext();
-        } catch (SchedulerException e) {
-            throw new JobExecutionException("Failed to obtain scheduler context for job " + context.getJobDetail().getName());
-        }
+public class StatefulCamelJob extends CamelJob implements StatefulJob {
 
-        CamelContext camelContext = (CamelContext) schedulerContext.get(QuartzConstants.QUARTZ_CAMEL_CONTEXT);
-        String endpointUri = (String) context.getJobDetail().getJobDataMap().get(QuartzConstants.QUARTZ_ENDPOINT);
-        QuartzEndpoint quartzEndpoint = (QuartzEndpoint) camelContext.getEndpoint(endpointUri);
-        quartzEndpoint.onJobExecute(context);
-    }
 }

Modified: camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzOneCamelContextRestartTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzOneCamelContextRestartTest.java?rev=980386&r1=980385&r2=980386&view=diff
==============================================================================
--- camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzOneCamelContextRestartTest.java (original)
+++ camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzOneCamelContextRestartTest.java Thu Jul 29 10:07:59 2010
@@ -37,7 +37,7 @@ public class QuartzOneCamelContextRestar
         camel1.addRoutes(new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("quartz://myGroup/myTimerName?cron=0/1+*+*+*+*+?").to("mock:one");
+                from("quartz://myGroup/myTimerName?cron=0/1+*+*+*+*+?").to("log:one", "mock:one");
             }
         });
         camel1.start();
@@ -56,9 +56,10 @@ public class QuartzOneCamelContextRestar
 
         camel1.stop();
 
+        // fetch mock endpoint again because we have stopped camel context
+        mock1 = camel1.getEndpoint("mock:one", MockEndpoint.class);
         // should resume triggers when we start camel 1 again
-        mock1.reset();
-        mock1.expectedMinimumMessageCount(2);
+        mock1.expectedMinimumMessageCount(3);
         camel1.start();
 
         mock1.assertIsSatisfied();

Modified: camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzTwoCamelContextRestartTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzTwoCamelContextRestartTest.java?rev=980386&r1=980385&r2=980386&view=diff
==============================================================================
--- camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzTwoCamelContextRestartTest.java (original)
+++ camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzTwoCamelContextRestartTest.java Thu Jul 29 10:07:59 2010
@@ -38,7 +38,7 @@ public class QuartzTwoCamelContextRestar
         camel1.addRoutes(new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("quartz://myGroup/myTimerName?cron=0/1+*+*+*+*+?").to("mock:one");
+                from("quartz://myGroup/myTimerName?cron=0/1+*+*+*+*+?").to("log:one", "mock:one");
             }
         });
         camel1.start();
@@ -48,7 +48,7 @@ public class QuartzTwoCamelContextRestar
         camel2.addRoutes(new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("quartz://myOtherGroup/myOtherTimerName?cron=0/1+*+*+*+*+?").to("mock:two");
+                from("quartz://myOtherGroup/myOtherTimerName?cron=0/1+*+*+*+*+?").to("log:two", "mock:two");
             }
         });
         camel2.start();
@@ -74,8 +74,9 @@ public class QuartzTwoCamelContextRestar
         mock2.assertIsSatisfied();
 
         // should resume triggers when we start camel 1 again
-        mock1.reset();
-        mock1.expectedMinimumMessageCount(2);
+        // fetch mock endpoint again because we have stopped camel context
+        mock1 = camel1.getEndpoint("mock:one", MockEndpoint.class);
+        mock1.expectedMinimumMessageCount(3);
         camel1.start();
 
         mock1.assertIsSatisfied();