You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ha...@apache.org on 2009/01/27 18:58:08 UTC

svn commit: r738171 - in /camel/trunk/components/camel-quartz/src: main/java/org/apache/camel/component/quartz/ test/java/org/apache/camel/component/quartz/

Author: hadrian
Date: Tue Jan 27 17:58:08 2009
New Revision: 738171

URL: http://svn.apache.org/viewvc?rev=738171&view=rev
Log:
CAMEL-1002. Patch applied with thanks. 

Added:
    camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/StatefulCamelJob.java
    camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/StatefulQuartzRouteTest.java
Modified:
    camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzComponent.java
    camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzEndpoint.java

Modified: camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzComponent.java?rev=738171&r1=738170&r2=738171&view=diff
==============================================================================
--- camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzComponent.java (original)
+++ camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzComponent.java Tue Jan 27 17:58:08 2009
@@ -46,12 +46,12 @@
     public QuartzComponent() {
     }
 
-    public QuartzComponent(CamelContext context) {
+    public QuartzComponent(final CamelContext context) {
         super(context);
     }
 
     @Override
-    protected QuartzEndpoint createEndpoint(String uri, String remaining, Map parameters) throws Exception {
+    protected QuartzEndpoint createEndpoint(final String uri, final String remaining, final Map parameters) throws Exception {
         QuartzEndpoint answer = new QuartzEndpoint(uri, this, getScheduler());
 
         // lets split the remaining into a group/name
@@ -126,7 +126,7 @@
         return factory;
     }
 
-    public void setFactory(SchedulerFactory factory) {
+    public void setFactory(final SchedulerFactory factory) {
         this.factory = factory;
     }
 
@@ -137,7 +137,7 @@
         return scheduler;
     }
 
-    public void setScheduler(Scheduler scheduler) {
+    public void setScheduler(final Scheduler scheduler) {
         this.scheduler = scheduler;
     }
 
@@ -145,7 +145,7 @@
         return triggers;
     }
 
-    public void setTriggers(Map triggers) {
+    public void setTriggers(final Map triggers) {
         this.triggers = triggers;
     }
 
@@ -156,6 +156,8 @@
     }
 
     protected Scheduler createScheduler() throws SchedulerException {
-        return getFactory().getScheduler();
+    	Scheduler scheduler = getFactory().getScheduler();
+    	scheduler.getContext().put(QuartzEndpoint.CONTEXT_KEY, getCamelContext());
+        return scheduler;
     }
 }

Modified: camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzEndpoint.java?rev=738171&r1=738170&r2=738171&view=diff
==============================================================================
--- camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzEndpoint.java (original)
+++ camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzEndpoint.java Tue Jan 27 17:58:08 2009
@@ -45,25 +45,29 @@
  * @version $Revision:520964 $
  */
 public class QuartzEndpoint extends DefaultEndpoint {
-    public static final String ENDPOINT_KEY = "org.apache.camel.quartz";
     private static final transient Log LOG = LogFactory.getLog(QuartzEndpoint.class);
+
+    public static final String ENDPOINT_KEY = "org.apache.camel.quartz";
+    public static final String CONTEXT_KEY = "org.apache.camel.CamelContext";
+
     private Scheduler scheduler;
     private LoadBalancer loadBalancer;
     private Trigger trigger;
     private JobDetail jobDetail;
     private boolean started;
+    private boolean stateful;
 
-    public QuartzEndpoint(String endpointUri, QuartzComponent component, Scheduler scheduler) {
+    public QuartzEndpoint(final String endpointUri, final QuartzComponent component, final Scheduler scheduler) {
         super(endpointUri, component);
         this.scheduler = scheduler;
     }
 
-    public QuartzEndpoint(String endpointUri, Scheduler scheduler) {
+    public QuartzEndpoint(final String endpointUri, final Scheduler scheduler) {
         super(endpointUri);
         this.scheduler = scheduler;
     }
 
-    public void addTriggers(Map<Trigger, JobDetail> triggerMap) throws SchedulerException {
+    public void addTriggers(final Map<Trigger, JobDetail> triggerMap) throws SchedulerException {
         if (triggerMap != null) {
             Set<Map.Entry<Trigger, JobDetail>> entries = triggerMap.entrySet();
             for (Map.Entry<Trigger, JobDetail> entry : entries) {
@@ -77,7 +81,7 @@
         }
     }
 
-    public void addTrigger(Trigger trigger, JobDetail detail) throws SchedulerException {
+    public void addTrigger(final Trigger trigger, final JobDetail detail) throws SchedulerException {
         // lets default the trigger name to the job name
         if (trigger.getName() == null) {
             trigger.setName(detail.getName());
@@ -90,10 +94,9 @@
         if (trigger.getStartTime() == null) {
             trigger.setStartTime(new Date());
         }
-        detail.getJobDataMap().put(ENDPOINT_KEY, this);
-        Class jobClass = detail.getJobClass();
-        if (jobClass == null) {
-            detail.setJobClass(CamelJob.class);
+        detail.getJobDataMap().put(ENDPOINT_KEY, isStateful() ? getEndpointUri() : this);
+        if (null == detail.getJobClass()) {
+            detail.setJobClass(isStateful() ? StatefulCamelJob.class : CamelJob.class);
         }
         if (detail.getName() == null) {
             detail.setName(getEndpointUri());
@@ -101,7 +104,7 @@
         getScheduler().scheduleJob(detail, trigger);
     }
 
-    public void removeTrigger(Trigger trigger, JobDetail jobDetail) throws SchedulerException {
+    public void removeTrigger(final Trigger trigger, final JobDetail jobDetail) throws SchedulerException {
         getScheduler().unscheduleJob(trigger.getName(), trigger.getGroup());
     }
 
@@ -110,7 +113,7 @@
      *
      * @param jobExecutionContext the Quartz Job context
      */
-    public void onJobExecute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
+    public void onJobExecute(final JobExecutionContext jobExecutionContext) throws JobExecutionException {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Firing Quartz Job with context: " + jobExecutionContext);
         }
@@ -124,7 +127,7 @@
         }
     }
 
-    public Exchange createExchange(JobExecutionContext jobExecutionContext) {
+    public Exchange createExchange(final JobExecutionContext jobExecutionContext) {
         Exchange exchange = createExchange();
         exchange.setIn(new QuartzMessage(exchange, jobExecutionContext));
         return exchange;
@@ -161,7 +164,7 @@
         return loadBalancer;
     }
 
-    public void setLoadBalancer(LoadBalancer loadBalancer) {
+    public void setLoadBalancer(final LoadBalancer loadBalancer) {
         this.loadBalancer = loadBalancer;
     }
 
@@ -172,7 +175,7 @@
         return jobDetail;
     }
 
-    public void setJobDetail(JobDetail jobDetail) {
+    public void setJobDetail(final JobDetail jobDetail) {
         this.jobDetail = jobDetail;
     }
 
@@ -183,13 +186,27 @@
         return trigger;
     }
 
-    public void setTrigger(Trigger trigger) {
+    public void setTrigger(final Trigger trigger) {
         this.trigger = trigger;
     }
+ 
+    /**
+     * @return the stateful mode
+     */
+    public boolean isStateful() {
+        return this.stateful;
+    }
+
+    /**
+     * @param stateful sets the stateful mode
+     */
+    public void setStateful(final boolean stateful) {
+        this.stateful = stateful;
+    }
 
     // Implementation methods
     // -------------------------------------------------------------------------
-    public synchronized void consumerStarted(QuartzConsumer consumer) throws SchedulerException {
+    public synchronized void consumerStarted(final QuartzConsumer consumer) throws SchedulerException {
         getLoadBalancer().addProcessor(consumer.getProcessor());
 
         // if we have not yet added our default trigger, then lets do it
@@ -199,7 +216,7 @@
         }
     }
 
-    public synchronized void consumerStopped(QuartzConsumer consumer) throws SchedulerException {
+    public synchronized void consumerStopped(final QuartzConsumer consumer) throws SchedulerException {
         getLoadBalancer().removeProcessor(consumer.getProcessor());
         if (getLoadBalancer().getProcessors().isEmpty() && started) {
             removeTrigger(getTrigger(), getJobDetail());

Added: camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/StatefulCamelJob.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/StatefulCamelJob.java?rev=738171&view=auto
==============================================================================
--- camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/StatefulCamelJob.java (added)
+++ camel/trunk/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/StatefulCamelJob.java Tue Jan 27 17:58:08 2009
@@ -0,0 +1,38 @@
+/**
+ *
+ */
+package org.apache.camel.component.quartz;
+
+import org.apache.camel.CamelContext;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.quartz.SchedulerContext;
+import org.quartz.SchedulerException;
+import org.quartz.StatefulJob;
+
+/**
+ * @author martin.gilday
+ *
+ */
+public class StatefulCamelJob implements StatefulJob {
+
+	/* (non-Javadoc)
+	 * @see org.quartz.Job#execute(org.quartz.JobExecutionContext)
+	 */
+	public void execute(final JobExecutionContext context) throws JobExecutionException {
+
+		SchedulerContext schedulerContext;
+		try {
+			schedulerContext = context.getScheduler().getContext();
+		}
+		catch (SchedulerException e) {
+			throw new JobExecutionException("Failed to obtain scheduler context for job " + context.getJobDetail().getName());
+		}
+
+		CamelContext camelContext = (CamelContext) schedulerContext.get(QuartzEndpoint.CONTEXT_KEY);
+		String endpointUri = (String) context.getJobDetail().getJobDataMap().get(QuartzEndpoint.ENDPOINT_KEY);
+		QuartzEndpoint quartzEndpoint =	(QuartzEndpoint) camelContext.getEndpoint(endpointUri);
+		quartzEndpoint.onJobExecute(context);
+	}
+
+}

Added: camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/StatefulQuartzRouteTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/StatefulQuartzRouteTest.java?rev=738171&view=auto
==============================================================================
--- camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/StatefulQuartzRouteTest.java (added)
+++ camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/StatefulQuartzRouteTest.java Tue Jan 27 17:58:08 2009
@@ -0,0 +1,53 @@
+/**
+ *
+ */
+package org.apache.camel.component.quartz;
+
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @author martin.gilday
+ *
+ */
+public class StatefulQuartzRouteTest extends ContextTestSupport {
+	protected MockEndpoint resultEndpoint;
+
+    public void testSendAndReceiveMails() throws Exception {
+        resultEndpoint = getMockEndpoint("mock:result");
+        resultEndpoint.expectedMessageCount(2);
+        resultEndpoint.message(0).header("triggerName").isEqualTo("myTimerName");
+        resultEndpoint.message(0).header("triggerGroup").isEqualTo("myGroup");
+
+        // lets test the receive worked
+        resultEndpoint.assertIsSatisfied();
+
+        List<Exchange> list = resultEndpoint.getReceivedExchanges();
+        for (Exchange exchange : list) {
+            Message in = exchange.getIn();
+            log.debug("Received: " + in + " with headers: " + in.getHeaders());
+        }
+    }
+
+
+	/* (non-Javadoc)
+	 * @see org.apache.camel.ContextTestSupport#createRouteBuilder()
+	 */
+	@Override
+	protected RouteBuilder createRouteBuilder() throws Exception {
+		return new RouteBuilder() {
+            @Override
+			public void configure() {
+                // START SNIPPET: example
+                from("quartz://myGroup/myTimerName?trigger.repeatInterval=2&trigger.repeatCount=1&stateful=true").to("mock:result");
+                // END SNIPPET: example
+            }
+        };
+	}
+
+}