You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/05/28 06:55:14 UTC

[5/8] camel git commit: patch to support dynamic job execution based on message headers for sprinbatch component https://issues.apache.org/jira/browse/CAMEL-9733

patch to support dynamic job execution based on message headers for sprinbatch component
https://issues.apache.org/jira/browse/CAMEL-9733


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d1c6ddcf
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d1c6ddcf
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d1c6ddcf

Branch: refs/heads/master
Commit: d1c6ddcfa993344bf41028f7e95787257b90a246
Parents: 7275b33
Author: Joseluis Pedrosa <jo...@elephanttalk.com>
Authored: Thu Apr 28 16:11:21 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat May 28 08:48:19 2016 +0200

----------------------------------------------------------------------
 .../spring/batch/SpringBatchComponent.java      |  1 +
 .../spring/batch/SpringBatchEndpoint.java       |  6 +-
 .../spring/batch/SpringBatchProducer.java       | 29 +++++++++-
 .../spring/batch/SpringBatchEndpointTest.java   | 61 ++++++++++++++++++++
 4 files changed, 94 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/d1c6ddcf/components/camel-spring-batch/src/main/java/org/apache/camel/component/spring/batch/SpringBatchComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-spring-batch/src/main/java/org/apache/camel/component/spring/batch/SpringBatchComponent.java b/components/camel-spring-batch/src/main/java/org/apache/camel/component/spring/batch/SpringBatchComponent.java
index 9f7d10c..bd1fbf9 100644
--- a/components/camel-spring-batch/src/main/java/org/apache/camel/component/spring/batch/SpringBatchComponent.java
+++ b/components/camel-spring-batch/src/main/java/org/apache/camel/component/spring/batch/SpringBatchComponent.java
@@ -25,6 +25,7 @@ import org.springframework.batch.core.launch.JobLauncher;
 public class SpringBatchComponent extends UriEndpointComponent {
 
     private static final String DEFAULT_JOB_LAUNCHER_REF_NAME = "jobLauncher";
+    public static final String DYNAMIC_JOBNAME = "DYNAMIC_JOBNAME_HEADER";
 
     private JobLauncher jobLauncher;
     private JobLauncher defaultResolvedJobLauncher;

http://git-wip-us.apache.org/repos/asf/camel/blob/d1c6ddcf/components/camel-spring-batch/src/main/java/org/apache/camel/component/spring/batch/SpringBatchEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-spring-batch/src/main/java/org/apache/camel/component/spring/batch/SpringBatchEndpoint.java b/components/camel-spring-batch/src/main/java/org/apache/camel/component/spring/batch/SpringBatchEndpoint.java
index 569e089..72df88d 100644
--- a/components/camel-spring-batch/src/main/java/org/apache/camel/component/spring/batch/SpringBatchEndpoint.java
+++ b/components/camel-spring-batch/src/main/java/org/apache/camel/component/spring/batch/SpringBatchEndpoint.java
@@ -37,7 +37,8 @@ import org.springframework.batch.core.launch.JobLauncher;
 @UriEndpoint(scheme = "spring-batch", title = "Spring Batch", syntax = "spring-batch:jobName", producerOnly = true, label = "spring,batch,scheduling")
 public class SpringBatchEndpoint extends DefaultEndpoint {
 
-    @UriPath @Metadata(required = "true")
+    @UriPath()
+    @Metadata(required = "true")
     private String jobName;
 
     /**
@@ -46,6 +47,7 @@ public class SpringBatchEndpoint extends DefaultEndpoint {
      */
     @Deprecated
     private String jobLauncherRef;
+
     @UriParam
     private JobLauncher jobLauncher;
 
@@ -83,7 +85,7 @@ public class SpringBatchEndpoint extends DefaultEndpoint {
         if (jobLauncher == null) {
             jobLauncher = resolveJobLauncher();
         }
-        if (job == null && jobName != null) {
+        if (job == null && jobName != null && jobName.compareTo("dynamic")!=0) {
             job = CamelContextHelper.mandatoryLookup(getCamelContext(), jobName, Job.class);
         }
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/d1c6ddcf/components/camel-spring-batch/src/main/java/org/apache/camel/component/spring/batch/SpringBatchProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-spring-batch/src/main/java/org/apache/camel/component/spring/batch/SpringBatchProducer.java b/components/camel-spring-batch/src/main/java/org/apache/camel/component/spring/batch/SpringBatchProducer.java
index 4fc5395..a192193 100644
--- a/components/camel-spring-batch/src/main/java/org/apache/camel/component/spring/batch/SpringBatchProducer.java
+++ b/components/camel-spring-batch/src/main/java/org/apache/camel/component/spring/batch/SpringBatchProducer.java
@@ -19,8 +19,10 @@ package org.apache.camel.component.spring.batch;
 import java.util.Date;
 import java.util.Map;
 
+import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.CamelContextHelper;
 import org.springframework.batch.core.Job;
 import org.springframework.batch.core.JobExecution;
 import org.springframework.batch.core.JobParameters;
@@ -44,8 +46,33 @@ public class SpringBatchProducer extends DefaultProducer {
 
     @Override
     public void process(Exchange exchange) throws Exception {
+
         JobParameters jobParameters = prepareJobParameters(exchange.getIn().getHeaders());
-        JobExecution jobExecution = jobLauncher.run(job, jobParameters);
+        String messageJobName = jobParameters.getString(SpringBatchComponent.DYNAMIC_JOBNAME);
+
+        Job job2run = this.job;
+
+        if (messageJobName != null)
+        {
+            Job dynamicJob = CamelContextHelper.mandatoryLookup(getEndpoint().getCamelContext(), messageJobName, Job.class);
+
+            job2run = dynamicJob;
+
+            if (job2run == null)
+            {
+                exchange.setException(new CamelExchangeException("Found header " + SpringBatchComponent.DYNAMIC_JOBNAME +
+                        " with value " +messageJobName + " but could not find a Job in camel context", exchange));
+                return;
+            }
+        }
+
+        if (job2run == null) {
+            exchange.setException( new CamelExchangeException("jobName was not specified in the endpoint construction " +
+                    " and header "+ SpringBatchComponent.DYNAMIC_JOBNAME + " could not be found", exchange));
+            return;
+        }
+
+        JobExecution jobExecution = jobLauncher.run(job2run, jobParameters);
         exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders());
         exchange.getOut().setBody(jobExecution);
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/d1c6ddcf/components/camel-spring-batch/src/test/java/org/apache/camel/component/spring/batch/SpringBatchEndpointTest.java
----------------------------------------------------------------------
diff --git a/components/camel-spring-batch/src/test/java/org/apache/camel/component/spring/batch/SpringBatchEndpointTest.java b/components/camel-spring-batch/src/test/java/org/apache/camel/component/spring/batch/SpringBatchEndpointTest.java
index cb962ab..00e6c52 100644
--- a/components/camel-spring-batch/src/test/java/org/apache/camel/component/spring/batch/SpringBatchEndpointTest.java
+++ b/components/camel-spring-batch/src/test/java/org/apache/camel/component/spring/batch/SpringBatchEndpointTest.java
@@ -16,9 +16,13 @@
  */
 package org.apache.camel.component.spring.batch;
 
+import java.time.Instant;
 import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.camel.CamelContext;
+import org.apache.camel.CamelExchangeException;
 import org.apache.camel.EndpointInject;
 import org.apache.camel.FailedToCreateRouteException;
 import org.apache.camel.builder.RouteBuilder;
@@ -55,32 +59,89 @@ public class SpringBatchEndpointTest extends CamelTestSupport {
     @Mock
     Job job;
 
+    @Mock
+    Job dynamicMockjob;
+
     // Camel fixtures
 
     @EndpointInject(uri = "mock:test")
     MockEndpoint mockEndpoint;
 
+    @EndpointInject(uri = "mock:error")
+    MockEndpoint errorEndpoint;
+
+
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
                 from("direct:start").to("spring-batch:mockJob").to("mock:test");
+                from("direct:dynamic").
+                        to("spring-batch:dynamic").
+                        errorHandler(deadLetterChannel("mock:error")).
+                        to("mock:test");
             }
         };
     }
 
+
+
     @Override
     public JndiRegistry createRegistry() throws Exception {
         JndiRegistry registry = super.createRegistry();
         registry.bind("jobLauncher", jobLauncher);
         registry.bind("alternativeJobLauncher", alternativeJobLauncher);
         registry.bind("mockJob", job);
+        registry.bind("dynamicMockjob", dynamicMockjob);
         return registry;
     }
 
     // Tests
 
+   @Test
+    public void dynamicJobFailsIfHeaderNotPressent() throws Exception {
+
+       mockEndpoint.expectedMessageCount(0);
+       errorEndpoint.expectedMessageCount(1);
+
+       //dynamic job should fail as header is not present and the job is dynamic
+       sendBody("direct:dyanmic", "Start the job, please.");
+       mockEndpoint.assertIsSatisfied();
+       mockEndpoint.assertIsSatisfied();
+    }
+
+    @Test
+    public void dynamicJobWorksIfHeaderWithInvalidJobName() throws Exception {
+
+        mockEndpoint.expectedMessageCount(0);
+        errorEndpoint.expectedMessageCount(1);
+
+        //dynamic job should fail as header is present but the job does not exists
+        header(SpringBatchComponent.DYNAMIC_JOBNAME).append("thisJobDoesNotExsistAtAll" +Date.from(Instant.now()));
+        sendBody("direct:dyanmic", "Start the job, please.");
+
+        mockEndpoint.assertIsSatisfied();
+        mockEndpoint.assertIsSatisfied();
+    }
+
+    @Test
+    public void dynamicJobWorksIfHeaderPressentWithvalidJob() throws Exception {
+
+        mockEndpoint.expectedMessageCount(1);
+        errorEndpoint.expectedMessageCount(0);
+        Thread.sleep(5000);
+        //dynamic job work if header is present and the job exists
+        final Map<String, Object> headers = new HashMap<>();
+        headers.put(SpringBatchComponent.DYNAMIC_JOBNAME, "dynamicMockjob");
+
+        sendBody("direct:dynamic", "Start the job, please.", headers);
+
+        mockEndpoint.assertIsSatisfied();
+        errorEndpoint.assertIsSatisfied();
+    }
+
+
     @Test
     public void shouldInjectJobToEndpoint() throws IllegalAccessException {
         SpringBatchEndpoint batchEndpoint = getMandatoryEndpoint("spring-batch:mockJob", SpringBatchEndpoint.class);