You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ha...@apache.org on 2009/01/27 17:04:02 UTC
svn commit: r738122 - in
/camel/branches/camel-1.x/components/camel-quartz/src:
main/java/org/apache/camel/component/quartz/
test/java/org/apache/camel/component/quartz/
Author: hadrian
Date: Tue Jan 27 16:04:02 2009
New Revision: 738122
URL: http://svn.apache.org/viewvc?rev=738122&view=rev
Log:
CAMEL-1002. Patch applied with thanks to Martin.
Added:
camel/branches/camel-1.x/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/StatefulCamelJob.java
camel/branches/camel-1.x/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/StatefulQuartzRouteTest.java
Modified:
camel/branches/camel-1.x/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzComponent.java
camel/branches/camel-1.x/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzEndpoint.java
Modified: camel/branches/camel-1.x/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzComponent.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzComponent.java?rev=738122&r1=738121&r2=738122&view=diff
==============================================================================
--- camel/branches/camel-1.x/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzComponent.java (original)
+++ camel/branches/camel-1.x/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzComponent.java Tue Jan 27 16:04:02 2009
@@ -46,12 +46,12 @@
public QuartzComponent() {
}
- public QuartzComponent(CamelContext context) {
+ public QuartzComponent(final CamelContext context) {
super(context);
}
@Override
- protected QuartzEndpoint createEndpoint(String uri, String remaining, Map parameters) throws Exception {
+ protected QuartzEndpoint createEndpoint(final String uri, final String remaining, final Map parameters) throws Exception {
QuartzEndpoint answer = new QuartzEndpoint(uri, this, getScheduler());
// lets split the remaining into a group/name
@@ -126,7 +126,7 @@
return factory;
}
- public void setFactory(SchedulerFactory factory) {
+ public void setFactory(final SchedulerFactory factory) {
this.factory = factory;
}
@@ -137,7 +137,7 @@
return scheduler;
}
- public void setScheduler(Scheduler scheduler) {
+ public void setScheduler(final Scheduler scheduler) {
this.scheduler = scheduler;
}
@@ -145,7 +145,7 @@
return triggers;
}
- public void setTriggers(Map triggers) {
+ public void setTriggers(final Map triggers) {
this.triggers = triggers;
}
@@ -156,6 +156,8 @@
}
protected Scheduler createScheduler() throws SchedulerException {
- return getFactory().getScheduler();
+ Scheduler scheduler = getFactory().getScheduler();
+ scheduler.getContext().put(QuartzEndpoint.CONTEXT_KEY, getCamelContext());
+ return scheduler;
}
}
Modified: camel/branches/camel-1.x/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzEndpoint.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzEndpoint.java?rev=738122&r1=738121&r2=738122&view=diff
==============================================================================
--- camel/branches/camel-1.x/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzEndpoint.java (original)
+++ camel/branches/camel-1.x/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzEndpoint.java Tue Jan 27 16:04:02 2009
@@ -44,24 +44,27 @@
*/
public class QuartzEndpoint extends DefaultEndpoint<QuartzExchange> {
public static final String ENDPOINT_KEY = "org.apache.camel.quartz";
+ public static final String CONTEXT_KEY = "org.apache.camel.CamelContext";
+
private static final transient Log LOG = LogFactory.getLog(QuartzEndpoint.class);
private Scheduler scheduler;
private LoadBalancer loadBalancer;
private Trigger trigger;
private JobDetail jobDetail;
private boolean started;
+ private boolean stateful;
- public QuartzEndpoint(String endpointUri, QuartzComponent component, Scheduler scheduler) {
+ public QuartzEndpoint(final String endpointUri, final QuartzComponent component, final Scheduler scheduler) {
super(endpointUri, component);
this.scheduler = scheduler;
}
- public QuartzEndpoint(String endpointUri, Scheduler scheduler) {
+ public QuartzEndpoint(final String endpointUri, final Scheduler scheduler) {
super(endpointUri);
this.scheduler = scheduler;
}
- public void addTriggers(Map<Trigger, JobDetail> triggerMap) throws SchedulerException {
+ public void addTriggers(final Map<Trigger, JobDetail> triggerMap) throws SchedulerException {
if (triggerMap != null) {
Set<Map.Entry<Trigger, JobDetail>> entries = triggerMap.entrySet();
for (Map.Entry<Trigger, JobDetail> entry : entries) {
@@ -75,7 +78,7 @@
}
}
- public void addTrigger(Trigger trigger, JobDetail detail) throws SchedulerException {
+ public void addTrigger(final Trigger trigger, final JobDetail detail) throws SchedulerException {
// lets default the trigger name to the job name
if (trigger.getName() == null) {
trigger.setName(detail.getName());
@@ -88,10 +91,17 @@
if (trigger.getStartTime() == null) {
trigger.setStartTime(new Date());
}
- detail.getJobDataMap().put(ENDPOINT_KEY, this);
- Class jobClass = detail.getJobClass();
- if (jobClass == null) {
- detail.setJobClass(CamelJob.class);
+ if (isStateful()) {
+ detail.getJobDataMap().put(ENDPOINT_KEY, getEndpointUri());
+ } else {
+ detail.getJobDataMap().put(ENDPOINT_KEY, this);
+ }
+ if (null == detail.getJobClass()) {
+ if (isStateful()) {
+ detail.setJobClass(StatefulCamelJob.class);
+ } else {
+ detail.setJobClass(CamelJob.class);
+ }
}
if (detail.getName() == null) {
detail.setName(getEndpointUri());
@@ -99,7 +109,7 @@
getScheduler().scheduleJob(detail, trigger);
}
- public void removeTrigger(Trigger trigger, JobDetail jobDetail) throws SchedulerException {
+ public void removeTrigger(final Trigger trigger, final JobDetail jobDetail) throws SchedulerException {
getScheduler().unscheduleJob(trigger.getName(), trigger.getGroup());
}
@@ -108,7 +118,7 @@
*
* @param jobExecutionContext the Quartz Job context
*/
- public void onJobExecute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
+ public void onJobExecute(final JobExecutionContext jobExecutionContext) throws JobExecutionException {
if (LOG.isDebugEnabled()) {
LOG.debug("Firing Quartz Job with context: " + jobExecutionContext);
}
@@ -123,11 +133,11 @@
}
@Override
- public QuartzExchange createExchange(ExchangePattern pattern) {
+ public QuartzExchange createExchange(final ExchangePattern pattern) {
return new QuartzExchange(getCamelContext(), pattern, null);
}
- public QuartzExchange createExchange(JobExecutionContext jobExecutionContext) {
+ public QuartzExchange createExchange(final JobExecutionContext jobExecutionContext) {
return new QuartzExchange(getCamelContext(), getExchangePattern(), jobExecutionContext);
}
@@ -135,7 +145,7 @@
throw new UnsupportedOperationException("You cannot send messages to this endpoint");
}
- public QuartzConsumer createConsumer(Processor processor) throws Exception {
+ public QuartzConsumer createConsumer(final Processor processor) throws Exception {
return new QuartzConsumer(this, processor);
}
@@ -162,7 +172,7 @@
return loadBalancer;
}
- public void setLoadBalancer(LoadBalancer loadBalancer) {
+ public void setLoadBalancer(final LoadBalancer loadBalancer) {
this.loadBalancer = loadBalancer;
}
@@ -173,7 +183,7 @@
return jobDetail;
}
- public void setJobDetail(JobDetail jobDetail) {
+ public void setJobDetail(final JobDetail jobDetail) {
this.jobDetail = jobDetail;
}
@@ -184,13 +194,27 @@
return trigger;
}
- public void setTrigger(Trigger trigger) {
+ public void setTrigger(final Trigger trigger) {
this.trigger = trigger;
}
+ /**
+ * @return the stateful
+ */
+ public boolean isStateful() {
+ return this.stateful;
+ }
+
+ /**
+ * @param stateful the stateful to set
+ */
+ public void setStateful(final boolean stateful) {
+ this.stateful = stateful;
+ }
+
// Implementation methods
// -------------------------------------------------------------------------
- public synchronized void consumerStarted(QuartzConsumer consumer) throws SchedulerException {
+ public synchronized void consumerStarted(final QuartzConsumer consumer) throws SchedulerException {
getLoadBalancer().addProcessor(consumer.getProcessor());
// if we have not yet added our default trigger, then lets do it
@@ -200,7 +224,7 @@
}
}
- public synchronized void consumerStopped(QuartzConsumer consumer) throws SchedulerException {
+ public synchronized void consumerStopped(final QuartzConsumer consumer) throws SchedulerException {
getLoadBalancer().removeProcessor(consumer.getProcessor());
if (getLoadBalancer().getProcessors().isEmpty() && started) {
removeTrigger(getTrigger(), getJobDetail());
Added: camel/branches/camel-1.x/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/StatefulCamelJob.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/StatefulCamelJob.java?rev=738122&view=auto
==============================================================================
--- camel/branches/camel-1.x/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/StatefulCamelJob.java (added)
+++ camel/branches/camel-1.x/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/StatefulCamelJob.java Tue Jan 27 16:04:02 2009
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.quartz;
+
+import org.apache.camel.CamelContext;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.quartz.SchedulerContext;
+import org.quartz.SchedulerException;
+import org.quartz.StatefulJob;
+
+/**
+ * @author martin.gilday
+ *
+ */
+public class StatefulCamelJob implements StatefulJob {
+
+ /* (non-Javadoc)
+ * @see org.quartz.Job#execute(org.quartz.JobExecutionContext)
+ */
+ public void execute(final JobExecutionContext context) throws JobExecutionException {
+
+ SchedulerContext schedulerContext;
+ try {
+ schedulerContext = context.getScheduler().getContext();
+ } catch (SchedulerException e) {
+ throw new JobExecutionException("Failed to obtain scheduler context for job " + context.getJobDetail().getName());
+ }
+
+ CamelContext camelContext = (CamelContext) schedulerContext.get(QuartzEndpoint.CONTEXT_KEY);
+ String endpointUri = (String) context.getJobDetail().getJobDataMap().get(QuartzEndpoint.ENDPOINT_KEY);
+ QuartzEndpoint quartzEndpoint = (QuartzEndpoint) camelContext.getEndpoint(endpointUri);
+ quartzEndpoint.onJobExecute(context);
+ }
+
+}
Added: camel/branches/camel-1.x/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/StatefulQuartzRouteTest.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/StatefulQuartzRouteTest.java?rev=738122&view=auto
==============================================================================
--- camel/branches/camel-1.x/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/StatefulQuartzRouteTest.java (added)
+++ camel/branches/camel-1.x/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/StatefulQuartzRouteTest.java Tue Jan 27 16:04:02 2009
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.quartz;
+
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @author martin.gilday
+ *
+ */
+public class StatefulQuartzRouteTest extends ContextTestSupport {
+ protected MockEndpoint resultEndpoint;
+
+ public void testSendAndReceiveMails() throws Exception {
+ resultEndpoint = getMockEndpoint("mock:result");
+ resultEndpoint.expectedMessageCount(2);
+ resultEndpoint.message(0).header("triggerName").isEqualTo("myTimerName");
+ resultEndpoint.message(0).header("triggerGroup").isEqualTo("myGroup");
+
+ // lets test the receive worked
+ resultEndpoint.assertIsSatisfied();
+
+ List<Exchange> list = resultEndpoint.getReceivedExchanges();
+ for (Exchange exchange : list) {
+ Message in = exchange.getIn();
+ log.debug("Received: " + in + " with headers: " + in.getHeaders());
+ }
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.camel.ContextTestSupport#createRouteBuilder()
+ */
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ // START SNIPPET: example
+ from("quartz://myGroup/myTimerName?trigger.repeatInterval=2&trigger.repeatCount=1&stateful=true").to("mock:result");
+ // END SNIPPET: example
+ }
+ };
+ }
+
+}