You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/07/29 12:08:00 UTC
svn commit: r980386 - in /camel/trunk/components/camel-quartz/src:
main/java/org/apache/camel/component/quartz/
test/java/org/apache/camel/component/quartz/
Author: davsclaus
Date: Thu Jul 29 10:07:59 2010
New Revision: 980386
URL: http://svn.apache.org/viewvc?rev=980386&view=rev
Log:
CAMEL-3011: quartz component supporting using multiple camel contexts. Jobs should lookup and use correct camel context.
Modified:
camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/CamelJob.java
camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzComponent.java
camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzConstants.java
camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzEndpoint.java
camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/StatefulCamelJob.java
camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzOneCamelContextRestartTest.java
camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzTwoCamelContextRestartTest.java
Modified: camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/CamelJob.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/CamelJob.java?rev=980386&r1=980385&r2=980386&view=diff
==============================================================================
--- camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/CamelJob.java (original)
+++ camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/CamelJob.java Thu Jul 29 10:07:59 2010
@@ -18,9 +18,12 @@ package org.apache.camel.component.quart
import java.io.Serializable;
+import org.apache.camel.CamelContext;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
+import org.quartz.SchedulerContext;
+import org.quartz.SchedulerException;
/**
* @version $Revision$
@@ -28,10 +31,25 @@ import org.quartz.JobExecutionException;
public class CamelJob implements Job, Serializable {
public void execute(JobExecutionContext context) throws JobExecutionException {
- QuartzEndpoint endpoint = (QuartzEndpoint) context.getJobDetail().getJobDataMap().get(QuartzConstants.QUARTZ_ENDPOINT);
+ String camelContextName = (String) context.getJobDetail().getJobDataMap().get(QuartzConstants.QUARTZ_CAMEL_CONTEXT_NAME);
+ String endpointUri = (String) context.getJobDetail().getJobDataMap().get(QuartzConstants.QUARTZ_ENDPOINT_URI);
+
+ SchedulerContext schedulerContext;
+ try {
+ schedulerContext = context.getScheduler().getContext();
+ } catch (SchedulerException e) {
+ throw new JobExecutionException("Failed to obtain scheduler context for job " + context.getJobDetail().getName());
+ }
+
+ CamelContext camelContext = (CamelContext) schedulerContext.get(QuartzConstants.QUARTZ_CAMEL_CONTEXT + "-" + camelContextName);
+ if (camelContext == null) {
+ throw new JobExecutionException("No CamelContext could be found with name: " + camelContextName);
+ }
+ QuartzEndpoint endpoint = camelContext.getEndpoint(endpointUri, QuartzEndpoint.class);
if (endpoint == null) {
- throw new JobExecutionException("No quartz endpoint available for key: " + QuartzConstants.QUARTZ_ENDPOINT);
+ throw new JobExecutionException("No QuartzEndpoint could be found with uri: " + endpointUri);
}
endpoint.onJobExecute(context);
}
+
}
\ No newline at end of file
Modified: camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzComponent.java?rev=980386&r1=980385&r2=980386&view=diff
==============================================================================
--- camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzComponent.java (original)
+++ camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzComponent.java Thu Jul 29 10:07:59 2010
@@ -20,7 +20,9 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.text.ParseException;
+import java.util.ArrayList;
import java.util.Date;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
@@ -53,12 +55,31 @@ public class QuartzComponent extends Def
private static final transient Log LOG = LogFactory.getLog(QuartzComponent.class);
private static final AtomicInteger JOBS = new AtomicInteger();
private static Scheduler scheduler;
+ private final List<JobToAdd> jobsToAdd = new ArrayList<JobToAdd>();
private SchedulerFactory factory;
private Properties properties;
private String propertiesFile;
private int startDelayedSeconds;
private boolean autoStartScheduler = true;
+ private final class JobToAdd {
+ private final JobDetail job;
+ private final Trigger trigger;
+
+ private JobToAdd(JobDetail job, Trigger trigger) {
+ this.job = job;
+ this.trigger = trigger;
+ }
+
+ public JobDetail getJob() {
+ return job;
+ }
+
+ public Trigger getTrigger() {
+ return trigger;
+ }
+ }
+
public QuartzComponent() {
}
@@ -125,6 +146,11 @@ public class QuartzComponent extends Def
}
public void onCamelContextStarted(CamelContext camelContext, boolean alreadyStarted) throws Exception {
+ if (scheduler != null) {
+ // register current camel context to scheduler so we can look it up when jobs is being triggered
+ scheduler.getContext().put(QuartzConstants.QUARTZ_CAMEL_CONTEXT + "-" + getCamelContext().getName(), getCamelContext());
+ }
+
// if not configure to auto start then don't start it
if (!isAutoStartScheduler()) {
LOG.info("QuartzComponent configured to not auto start Quartz scheduler.");
@@ -160,7 +186,12 @@ public class QuartzComponent extends Def
}
}
- public void addJob(JobDetail job, Trigger trigger) throws SchedulerException {
+ public void addJob(JobDetail job, Trigger trigger) {
+ // add job to internal list because we will defer adding to the scheduler when camel context has been fully started
+ jobsToAdd.add(new JobToAdd(job, trigger));
+ }
+
+ private void doAddJob(JobDetail job, Trigger trigger) throws SchedulerException {
JOBS.incrementAndGet();
if (getScheduler().getTrigger(trigger.getName(), trigger.getGroup()) == null) {
@@ -228,13 +259,18 @@ public class QuartzComponent extends Def
* @throws SchedulerException can be thrown if error starting
*/
public void startScheduler() throws SchedulerException {
- if (scheduler != null && !scheduler.isStarted()) {
+ for (JobToAdd add : jobsToAdd) {
+ doAddJob(add.getJob(), add.getTrigger());
+ }
+ jobsToAdd.clear();
+
+ if (!getScheduler().isStarted()) {
if (getStartDelayedSeconds() > 0) {
- LOG.info("Starting Quartz scheduler: " + scheduler.getSchedulerName() + " delayed: " + getStartDelayedSeconds() + " seconds.");
- scheduler.startDelayed(getStartDelayedSeconds());
+ LOG.info("Starting Quartz scheduler: " + getScheduler().getSchedulerName() + " delayed: " + getStartDelayedSeconds() + " seconds.");
+ getScheduler().startDelayed(getStartDelayedSeconds());
} else {
- LOG.info("Starting Quartz scheduler: " + scheduler.getSchedulerName());
- scheduler.start();
+ LOG.info("Starting Quartz scheduler: " + getScheduler().getSchedulerName());
+ getScheduler().start();
}
}
}
@@ -333,7 +369,9 @@ public class QuartzComponent extends Def
protected Scheduler createScheduler() throws SchedulerException {
Scheduler scheduler = getFactory().getScheduler();
- scheduler.getContext().put(QuartzConstants.QUARTZ_CAMEL_CONTEXT, getCamelContext());
+ // register current camel context to scheduler so we can look it up when jobs is being triggered
+ scheduler.getContext().put(QuartzConstants.QUARTZ_CAMEL_CONTEXT + "-" + getCamelContext().getName(), getCamelContext());
return scheduler;
}
+
}
Modified: camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzConstants.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzConstants.java?rev=980386&r1=980385&r2=980386&view=diff
==============================================================================
--- camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzConstants.java (original)
+++ camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzConstants.java Thu Jul 29 10:07:59 2010
@@ -21,7 +21,9 @@ package org.apache.camel.component.quart
*/
public final class QuartzConstants {
- public static final String QUARTZ_ENDPOINT = "CamelQuartzEndpoint";
+ public static final String QUARTZ_ENDPOINT_URI = "CamelQuartzEndpoint";
+
+ public static final String QUARTZ_CAMEL_CONTEXT_NAME = "CamelQuartzCamelContextName";
public static final String QUARTZ_CAMEL_CONTEXT = "CamelQuartzCamelContext";
Modified: camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzEndpoint.java?rev=980386&r1=980385&r2=980386&view=diff
==============================================================================
--- camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzEndpoint.java (original)
+++ camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzEndpoint.java Thu Jul 29 10:07:59 2010
@@ -68,7 +68,8 @@ public class QuartzEndpoint extends Defa
if (trigger.getStartTime() == null) {
trigger.setStartTime(new Date());
}
- detail.getJobDataMap().put(QuartzConstants.QUARTZ_ENDPOINT, isStateful() ? getEndpointUri() : this);
+ detail.getJobDataMap().put(QuartzConstants.QUARTZ_ENDPOINT_URI, getEndpointUri());
+ detail.getJobDataMap().put(QuartzConstants.QUARTZ_CAMEL_CONTEXT_NAME, getCamelContext().getName());
if (detail.getJobClass() == null) {
detail.setJobClass(isStateful() ? StatefulCamelJob.class : CamelJob.class);
}
@@ -196,6 +197,9 @@ public class QuartzEndpoint extends Defa
// -------------------------------------------------------------------------
public synchronized void consumerStarted(final QuartzConsumer consumer) throws SchedulerException {
ObjectHelper.notNull(trigger, "trigger");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding consumer " + consumer.getProcessor());
+ }
getLoadBalancer().addProcessor(consumer.getProcessor());
// if we have not yet added our default trigger, then lets do it
@@ -212,6 +216,9 @@ public class QuartzEndpoint extends Defa
started = false;
}
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Removing consumer " + consumer.getProcessor());
+ }
getLoadBalancer().removeProcessor(consumer.getProcessor());
}
Modified: camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/StatefulCamelJob.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/StatefulCamelJob.java?rev=980386&r1=980385&r2=980386&view=diff
==============================================================================
--- camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/StatefulCamelJob.java (original)
+++ camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/StatefulCamelJob.java Thu Jul 29 10:07:59 2010
@@ -16,31 +16,11 @@
*/
package org.apache.camel.component.quartz;
-import java.io.Serializable;
-
-import org.apache.camel.CamelContext;
-import org.quartz.JobExecutionContext;
-import org.quartz.JobExecutionException;
-import org.quartz.SchedulerContext;
-import org.quartz.SchedulerException;
import org.quartz.StatefulJob;
/**
* Stateful job
*/
-public class StatefulCamelJob implements StatefulJob, Serializable {
-
- public void execute(final JobExecutionContext context) throws JobExecutionException {
- SchedulerContext schedulerContext;
- try {
- schedulerContext = context.getScheduler().getContext();
- } catch (SchedulerException e) {
- throw new JobExecutionException("Failed to obtain scheduler context for job " + context.getJobDetail().getName());
- }
+public class StatefulCamelJob extends CamelJob implements StatefulJob {
- CamelContext camelContext = (CamelContext) schedulerContext.get(QuartzConstants.QUARTZ_CAMEL_CONTEXT);
- String endpointUri = (String) context.getJobDetail().getJobDataMap().get(QuartzConstants.QUARTZ_ENDPOINT);
- QuartzEndpoint quartzEndpoint = (QuartzEndpoint) camelContext.getEndpoint(endpointUri);
- quartzEndpoint.onJobExecute(context);
- }
}
Modified: camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzOneCamelContextRestartTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzOneCamelContextRestartTest.java?rev=980386&r1=980385&r2=980386&view=diff
==============================================================================
--- camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzOneCamelContextRestartTest.java (original)
+++ camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzOneCamelContextRestartTest.java Thu Jul 29 10:07:59 2010
@@ -37,7 +37,7 @@ public class QuartzOneCamelContextRestar
camel1.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("quartz://myGroup/myTimerName?cron=0/1+*+*+*+*+?").to("mock:one");
+ from("quartz://myGroup/myTimerName?cron=0/1+*+*+*+*+?").to("log:one", "mock:one");
}
});
camel1.start();
@@ -56,9 +56,10 @@ public class QuartzOneCamelContextRestar
camel1.stop();
+ // fetch mock endpoint again because we have stopped camel context
+ mock1 = camel1.getEndpoint("mock:one", MockEndpoint.class);
// should resume triggers when we start camel 1 again
- mock1.reset();
- mock1.expectedMinimumMessageCount(2);
+ mock1.expectedMinimumMessageCount(3);
camel1.start();
mock1.assertIsSatisfied();
Modified: camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzTwoCamelContextRestartTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzTwoCamelContextRestartTest.java?rev=980386&r1=980385&r2=980386&view=diff
==============================================================================
--- camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzTwoCamelContextRestartTest.java (original)
+++ camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzTwoCamelContextRestartTest.java Thu Jul 29 10:07:59 2010
@@ -38,7 +38,7 @@ public class QuartzTwoCamelContextRestar
camel1.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("quartz://myGroup/myTimerName?cron=0/1+*+*+*+*+?").to("mock:one");
+ from("quartz://myGroup/myTimerName?cron=0/1+*+*+*+*+?").to("log:one", "mock:one");
}
});
camel1.start();
@@ -48,7 +48,7 @@ public class QuartzTwoCamelContextRestar
camel2.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("quartz://myOtherGroup/myOtherTimerName?cron=0/1+*+*+*+*+?").to("mock:two");
+ from("quartz://myOtherGroup/myOtherTimerName?cron=0/1+*+*+*+*+?").to("log:two", "mock:two");
}
});
camel2.start();
@@ -74,8 +74,9 @@ public class QuartzTwoCamelContextRestar
mock2.assertIsSatisfied();
// should resume triggers when we start camel 1 again
- mock1.reset();
- mock1.expectedMinimumMessageCount(2);
+ // fetch mock endpoint again because we have stopped camel context
+ mock1 = camel1.getEndpoint("mock:one", MockEndpoint.class);
+ mock1.expectedMinimumMessageCount(3);
camel1.start();
mock1.assertIsSatisfied();