You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ha...@apache.org on 2009/01/27 17:04:02 UTC

svn commit: r738122 - in /camel/branches/camel-1.x/components/camel-quartz/src: main/java/org/apache/camel/component/quartz/ test/java/org/apache/camel/component/quartz/

Author: hadrian
Date: Tue Jan 27 16:04:02 2009
New Revision: 738122

URL: http://svn.apache.org/viewvc?rev=738122&view=rev
Log:
CAMEL-1002.  Patch applied with thanks to Martin.

Added:
    camel/branches/camel-1.x/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/StatefulCamelJob.java
    camel/branches/camel-1.x/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/StatefulQuartzRouteTest.java
Modified:
    camel/branches/camel-1.x/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzComponent.java
    camel/branches/camel-1.x/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzEndpoint.java

Modified: camel/branches/camel-1.x/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzComponent.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzComponent.java?rev=738122&r1=738121&r2=738122&view=diff
==============================================================================
--- camel/branches/camel-1.x/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzComponent.java (original)
+++ camel/branches/camel-1.x/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzComponent.java Tue Jan 27 16:04:02 2009
@@ -46,12 +46,12 @@
     public QuartzComponent() {
     }
 
-    public QuartzComponent(CamelContext context) {
+    public QuartzComponent(final CamelContext context) {
         super(context);
     }
 
     @Override
-    protected QuartzEndpoint createEndpoint(String uri, String remaining, Map parameters) throws Exception {
+    protected QuartzEndpoint createEndpoint(final String uri, final String remaining, final Map parameters) throws Exception {
         QuartzEndpoint answer = new QuartzEndpoint(uri, this, getScheduler());
 
         // lets split the remaining into a group/name
@@ -126,7 +126,7 @@
         return factory;
     }
 
-    public void setFactory(SchedulerFactory factory) {
+    public void setFactory(final SchedulerFactory factory) {
         this.factory = factory;
     }
 
@@ -137,7 +137,7 @@
         return scheduler;
     }
 
-    public void setScheduler(Scheduler scheduler) {
+    public void setScheduler(final Scheduler scheduler) {
         this.scheduler = scheduler;
     }
 
@@ -145,7 +145,7 @@
         return triggers;
     }
 
-    public void setTriggers(Map triggers) {
+    public void setTriggers(final Map triggers) {
         this.triggers = triggers;
     }
 
@@ -156,6 +156,8 @@
     }
 
     protected Scheduler createScheduler() throws SchedulerException {
-        return getFactory().getScheduler();
+        Scheduler scheduler = getFactory().getScheduler();
+        scheduler.getContext().put(QuartzEndpoint.CONTEXT_KEY, getCamelContext());
+        return scheduler;
     }
 }

Modified: camel/branches/camel-1.x/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzEndpoint.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzEndpoint.java?rev=738122&r1=738121&r2=738122&view=diff
==============================================================================
--- camel/branches/camel-1.x/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzEndpoint.java (original)
+++ camel/branches/camel-1.x/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzEndpoint.java Tue Jan 27 16:04:02 2009
@@ -44,24 +44,27 @@
  */
 public class QuartzEndpoint extends DefaultEndpoint<QuartzExchange> {
     public static final String ENDPOINT_KEY = "org.apache.camel.quartz";
+    public static final String CONTEXT_KEY = "org.apache.camel.CamelContext";
+
     private static final transient Log LOG = LogFactory.getLog(QuartzEndpoint.class);
     private Scheduler scheduler;
     private LoadBalancer loadBalancer;
     private Trigger trigger;
     private JobDetail jobDetail;
     private boolean started;
+    private boolean stateful;
 
-    public QuartzEndpoint(String endpointUri, QuartzComponent component, Scheduler scheduler) {
+    public QuartzEndpoint(final String endpointUri, final QuartzComponent component, final Scheduler scheduler) {
         super(endpointUri, component);
         this.scheduler = scheduler;
     }
 
-    public QuartzEndpoint(String endpointUri, Scheduler scheduler) {
+    public QuartzEndpoint(final String endpointUri, final Scheduler scheduler) {
         super(endpointUri);
         this.scheduler = scheduler;
     }
 
-    public void addTriggers(Map<Trigger, JobDetail> triggerMap) throws SchedulerException {
+    public void addTriggers(final Map<Trigger, JobDetail> triggerMap) throws SchedulerException {
         if (triggerMap != null) {
             Set<Map.Entry<Trigger, JobDetail>> entries = triggerMap.entrySet();
             for (Map.Entry<Trigger, JobDetail> entry : entries) {
@@ -75,7 +78,7 @@
         }
     }
 
-    public void addTrigger(Trigger trigger, JobDetail detail) throws SchedulerException {
+    public void addTrigger(final Trigger trigger, final JobDetail detail) throws SchedulerException {
         // lets default the trigger name to the job name
         if (trigger.getName() == null) {
             trigger.setName(detail.getName());
@@ -88,10 +91,17 @@
         if (trigger.getStartTime() == null) {
             trigger.setStartTime(new Date());
         }
-        detail.getJobDataMap().put(ENDPOINT_KEY, this);
-        Class jobClass = detail.getJobClass();
-        if (jobClass == null) {
-            detail.setJobClass(CamelJob.class);
+        if (isStateful()) {
+            detail.getJobDataMap().put(ENDPOINT_KEY, getEndpointUri());
+        } else {
+            detail.getJobDataMap().put(ENDPOINT_KEY, this);
+        }
+        if (null == detail.getJobClass()) {
+            if (isStateful()) {
+                detail.setJobClass(StatefulCamelJob.class);
+            } else {
+                detail.setJobClass(CamelJob.class);
+            }
         }
         if (detail.getName() == null) {
             detail.setName(getEndpointUri());
@@ -99,7 +109,7 @@
         getScheduler().scheduleJob(detail, trigger);
     }
 
-    public void removeTrigger(Trigger trigger, JobDetail jobDetail) throws SchedulerException {
+    public void removeTrigger(final Trigger trigger, final JobDetail jobDetail) throws SchedulerException {
         getScheduler().unscheduleJob(trigger.getName(), trigger.getGroup());
     }
 
@@ -108,7 +118,7 @@
      *
      * @param jobExecutionContext the Quartz Job context
      */
-    public void onJobExecute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
+    public void onJobExecute(final JobExecutionContext jobExecutionContext) throws JobExecutionException {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Firing Quartz Job with context: " + jobExecutionContext);
         }
@@ -123,11 +133,11 @@
     }
 
     @Override
-    public QuartzExchange createExchange(ExchangePattern pattern) {
+    public QuartzExchange createExchange(final ExchangePattern pattern) {
         return new QuartzExchange(getCamelContext(), pattern, null);
     }
 
-    public QuartzExchange createExchange(JobExecutionContext jobExecutionContext) {
+    public QuartzExchange createExchange(final JobExecutionContext jobExecutionContext) {
         return new QuartzExchange(getCamelContext(), getExchangePattern(), jobExecutionContext);
     }
 
@@ -135,7 +145,7 @@
         throw new UnsupportedOperationException("You cannot send messages to this endpoint");
     }
 
-    public QuartzConsumer createConsumer(Processor processor) throws Exception {
+    public QuartzConsumer createConsumer(final Processor processor) throws Exception {
         return new QuartzConsumer(this, processor);
     }
 
@@ -162,7 +172,7 @@
         return loadBalancer;
     }
 
-    public void setLoadBalancer(LoadBalancer loadBalancer) {
+    public void setLoadBalancer(final LoadBalancer loadBalancer) {
         this.loadBalancer = loadBalancer;
     }
 
@@ -173,7 +183,7 @@
         return jobDetail;
     }
 
-    public void setJobDetail(JobDetail jobDetail) {
+    public void setJobDetail(final JobDetail jobDetail) {
         this.jobDetail = jobDetail;
     }
 
@@ -184,13 +194,27 @@
         return trigger;
     }
 
-    public void setTrigger(Trigger trigger) {
+    public void setTrigger(final Trigger trigger) {
         this.trigger = trigger;
     }
 
+    /**
+     * @return the stateful
+     */
+    public boolean isStateful() {
+        return this.stateful;
+    }
+
+    /**
+     * @param stateful the stateful to set
+     */
+    public void setStateful(final boolean stateful) {
+        this.stateful = stateful;
+    }
+
     // Implementation methods
     // -------------------------------------------------------------------------
-    public synchronized void consumerStarted(QuartzConsumer consumer) throws SchedulerException {
+    public synchronized void consumerStarted(final QuartzConsumer consumer) throws SchedulerException {
         getLoadBalancer().addProcessor(consumer.getProcessor());
 
         // if we have not yet added our default trigger, then lets do it
@@ -200,7 +224,7 @@
         }
     }
 
-    public synchronized void consumerStopped(QuartzConsumer consumer) throws SchedulerException {
+    public synchronized void consumerStopped(final QuartzConsumer consumer) throws SchedulerException {
         getLoadBalancer().removeProcessor(consumer.getProcessor());
         if (getLoadBalancer().getProcessors().isEmpty() && started) {
             removeTrigger(getTrigger(), getJobDetail());

Added: camel/branches/camel-1.x/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/StatefulCamelJob.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/StatefulCamelJob.java?rev=738122&view=auto
==============================================================================
--- camel/branches/camel-1.x/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/StatefulCamelJob.java (added)
+++ camel/branches/camel-1.x/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/StatefulCamelJob.java Tue Jan 27 16:04:02 2009
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.quartz;
+
+import org.apache.camel.CamelContext;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.quartz.SchedulerContext;
+import org.quartz.SchedulerException;
+import org.quartz.StatefulJob;
+
+/**
+ * @author martin.gilday
+ *
+ */
+public class StatefulCamelJob implements StatefulJob {
+
+    /* (non-Javadoc)
+     * @see org.quartz.Job#execute(org.quartz.JobExecutionContext)
+     */
+    public void execute(final JobExecutionContext context) throws JobExecutionException {
+
+        SchedulerContext schedulerContext;
+        try {
+            schedulerContext = context.getScheduler().getContext();
+        } catch (SchedulerException e) {
+            throw new JobExecutionException("Failed to obtain scheduler context for job " + context.getJobDetail().getName());
+        }
+
+        CamelContext camelContext = (CamelContext) schedulerContext.get(QuartzEndpoint.CONTEXT_KEY);
+        String endpointUri = (String) context.getJobDetail().getJobDataMap().get(QuartzEndpoint.ENDPOINT_KEY);
+        QuartzEndpoint quartzEndpoint =    (QuartzEndpoint) camelContext.getEndpoint(endpointUri);
+        quartzEndpoint.onJobExecute(context);
+    }
+
+}

Added: camel/branches/camel-1.x/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/StatefulQuartzRouteTest.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/StatefulQuartzRouteTest.java?rev=738122&view=auto
==============================================================================
--- camel/branches/camel-1.x/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/StatefulQuartzRouteTest.java (added)
+++ camel/branches/camel-1.x/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/StatefulQuartzRouteTest.java Tue Jan 27 16:04:02 2009
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.quartz;
+
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @author martin.gilday
+ *
+ */
+public class StatefulQuartzRouteTest extends ContextTestSupport {
+    protected MockEndpoint resultEndpoint;
+
+    public void testSendAndReceiveMails() throws Exception {
+        resultEndpoint = getMockEndpoint("mock:result");
+        resultEndpoint.expectedMessageCount(2);
+        resultEndpoint.message(0).header("triggerName").isEqualTo("myTimerName");
+        resultEndpoint.message(0).header("triggerGroup").isEqualTo("myGroup");
+
+        // lets test the receive worked
+        resultEndpoint.assertIsSatisfied();
+
+        List<Exchange> list = resultEndpoint.getReceivedExchanges();
+        for (Exchange exchange : list) {
+            Message in = exchange.getIn();
+            log.debug("Received: " + in + " with headers: " + in.getHeaders());
+        }
+    }
+
+
+    /* (non-Javadoc)
+     * @see org.apache.camel.ContextTestSupport#createRouteBuilder()
+     */
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                // START SNIPPET: example
+                from("quartz://myGroup/myTimerName?trigger.repeatInterval=2&trigger.repeatCount=1&stateful=true").to("mock:result");
+                // END SNIPPET: example
+            }
+        };
+    }
+
+}