You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2014/12/03 15:00:01 UTC

[2/3] camel git commit: CAMEL-8076 Fixed the issue that CamelJob cannot find right QuartzEndpoint when recoverying the job

CAMEL-8076 Fixed the issue that CamelJob cannot find right QuartzEndpoint when recoverying the job


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ff59faef
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ff59faef
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ff59faef

Branch: refs/heads/master
Commit: ff59faefa559d6ffe22020b51b668b647ef01041
Parents: aa6e20a
Author: Willem Jiang <wi...@gmail.com>
Authored: Wed Dec 3 21:43:21 2014 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Wed Dec 3 21:57:52 2014 +0800

----------------------------------------------------------------------
 .../camel/component/quartz2/CamelJob.java       |  9 +++--
 ...rtzConsumerTwoAppsClusteredRecoveryTest.java | 35 ++++++++++++++++----
 ...ingQuartzConsumerRecoveryClusteredAppOne.xml |  6 ++--
 3 files changed, 38 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/ff59faef/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/CamelJob.java
----------------------------------------------------------------------
diff --git a/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/CamelJob.java b/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/CamelJob.java
index 407b93b..3e74abc 100644
--- a/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/CamelJob.java
+++ b/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/CamelJob.java
@@ -23,8 +23,10 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Route;
 import org.quartz.Job;
+import org.quartz.JobDetail;
 import org.quartz.JobExecutionContext;
 import org.quartz.JobExecutionException;
+import org.quartz.JobKey;
 import org.quartz.SchedulerContext;
 import org.quartz.SchedulerException;
 import org.quartz.TriggerKey;
@@ -91,10 +93,12 @@ public class CamelJob implements Job {
 
     protected QuartzEndpoint lookupQuartzEndpoint(CamelContext camelContext, JobExecutionContext quartzContext) throws JobExecutionException {
         TriggerKey triggerKey = quartzContext.getTrigger().getKey();
+        JobDetail jobDetail = quartzContext.getJobDetail(); 
+        JobKey jobKey =  jobDetail.getKey();
         if (LOG.isDebugEnabled()) {
             LOG.debug("Looking up existing QuartzEndpoint with triggerKey={}", triggerKey);
         }
-
+        
         // check all active routes for the quartz endpoint this task matches
         // as we prefer to use the existing endpoint from the routes
         for (Route route : camelContext.getRoutes()) {
@@ -108,7 +112,8 @@ public class CamelJob implements Job {
                 if (LOG.isTraceEnabled()) {
                     LOG.trace("Checking route endpoint={} with checkTriggerKey={}", quartzEndpoint, checkTriggerKey);
                 }
-                if (triggerKey.equals(checkTriggerKey)) {
+                if (triggerKey.equals(checkTriggerKey)
+                    || (jobDetail.requestsRecovery() && jobKey.getGroup().equals(checkTriggerKey.getGroup()) && jobKey.getName().equals(checkTriggerKey.getName()))) {
                     return quartzEndpoint;
                 }
             }

http://git-wip-us.apache.org/repos/asf/camel/blob/ff59faef/components/camel-quartz2/src/test/java/org/apache/camel/component/quartz2/SpringQuartzConsumerTwoAppsClusteredRecoveryTest.java
----------------------------------------------------------------------
diff --git a/components/camel-quartz2/src/test/java/org/apache/camel/component/quartz2/SpringQuartzConsumerTwoAppsClusteredRecoveryTest.java b/components/camel-quartz2/src/test/java/org/apache/camel/component/quartz2/SpringQuartzConsumerTwoAppsClusteredRecoveryTest.java
index 4fe1ab9..9758097 100644
--- a/components/camel-quartz2/src/test/java/org/apache/camel/component/quartz2/SpringQuartzConsumerTwoAppsClusteredRecoveryTest.java
+++ b/components/camel-quartz2/src/test/java/org/apache/camel/component/quartz2/SpringQuartzConsumerTwoAppsClusteredRecoveryTest.java
@@ -19,10 +19,14 @@ package org.apache.camel.component.quartz2;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.Predicate;
+import org.apache.camel.Processor;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.test.junit4.TestSupport;
 import org.apache.camel.util.IOHelper;
 import org.junit.Test;
+import org.springframework.beans.BeansException;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
 import org.springframework.context.support.AbstractXmlApplicationContext;
 import org.springframework.context.support.ClassPathXmlApplicationContext;
 
@@ -42,19 +46,20 @@ public class SpringQuartzConsumerTwoAppsClusteredRecoveryTest extends TestSuppor
         // now launch the first clustered app which will acquire the quartz database lock and become the master
         AbstractXmlApplicationContext app = new ClassPathXmlApplicationContext("org/apache/camel/component/quartz2/SpringQuartzConsumerRecoveryClusteredAppOne.xml");
         app.start();
-
-        // as well as the second one which will run in slave mode as it will not be able to acquire the same lock
-        AbstractXmlApplicationContext app2 = new ClassPathXmlApplicationContext("org/apache/camel/component/quartz2/SpringQuartzConsumerRecoveryClusteredAppTwo.xml");
-        app2.start();
-
+        
         // now let's simulate a crash of the first app (the quartz instance 'app-one')
         log.warn("The first app is going to crash NOW!");
-        IOHelper.close(app);
 
         log.warn("Crashed...");
         log.warn("Crashed...");
         log.warn("Crashed...");
-
+        
+        Thread.sleep(2000);
+        
+        // as well as the second one which will run in slave mode as it will not be able to acquire the same lock
+        AbstractXmlApplicationContext app2 = new ClassPathXmlApplicationContext("org/apache/camel/component/quartz2/SpringQuartzConsumerRecoveryClusteredAppTwo.xml");
+        app2.start();
+        
         // wait long enough until the second app takes it over...
         Thread.sleep(20000);
         // inside the logs one can then clearly see how the route of the second app ('app-two') starts consuming:
@@ -91,5 +96,21 @@ public class SpringQuartzConsumerTwoAppsClusteredRecoveryTest extends TestSuppor
         }
 
     }
+    
+    public static class MyProcessor implements Processor, ApplicationContextAware {
+        ApplicationContext applicationContext;
+
+        @Override
+        public void process(Exchange exchange) throws Exception {
+            // shutdown the application context;
+            ((AbstractXmlApplicationContext)applicationContext).close();
+        }
+
+        @Override
+        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+            this.applicationContext = applicationContext;
+        }
+        
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/ff59faef/components/camel-quartz2/src/test/resources/org/apache/camel/component/quartz2/SpringQuartzConsumerRecoveryClusteredAppOne.xml
----------------------------------------------------------------------
diff --git a/components/camel-quartz2/src/test/resources/org/apache/camel/component/quartz2/SpringQuartzConsumerRecoveryClusteredAppOne.xml b/components/camel-quartz2/src/test/resources/org/apache/camel/component/quartz2/SpringQuartzConsumerRecoveryClusteredAppOne.xml
index a01b1c0..91c07b1 100644
--- a/components/camel-quartz2/src/test/resources/org/apache/camel/component/quartz2/SpringQuartzConsumerRecoveryClusteredAppOne.xml
+++ b/components/camel-quartz2/src/test/resources/org/apache/camel/component/quartz2/SpringQuartzConsumerRecoveryClusteredAppOne.xml
@@ -58,6 +58,8 @@
       </props>
     </property>
   </bean>
+  
+  <bean id="myProcessor" class="org.apache.camel.component.quartz2.SpringQuartzConsumerTwoAppsClusteredRecoveryTest$MyProcessor" />
 
   <camelContext id="camelContext" shutdownEager="false" xmlns="http://camel.apache.org/schema/spring">
     <template id="template" />
@@ -67,9 +69,7 @@
         <simple>clustering PINGS!</simple>
       </transform>
       <to uri="log:triggered" />
-      <!--delay>
-         <constant>10000</constant>
-       </delay-->
+      <process ref="myProcessor"/>
       <to uri="mock:result" />
     </route>
   </camelContext>