You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/05/28 06:55:14 UTC
[5/8] camel git commit: patch to support dynamic job execution based
on message headers for sprinbatch component
https://issues.apache.org/jira/browse/CAMEL-9733
patch to support dynamic job execution based on message headers for sprinbatch component
https://issues.apache.org/jira/browse/CAMEL-9733
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d1c6ddcf
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d1c6ddcf
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d1c6ddcf
Branch: refs/heads/master
Commit: d1c6ddcfa993344bf41028f7e95787257b90a246
Parents: 7275b33
Author: Joseluis Pedrosa <jo...@elephanttalk.com>
Authored: Thu Apr 28 16:11:21 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat May 28 08:48:19 2016 +0200
----------------------------------------------------------------------
.../spring/batch/SpringBatchComponent.java | 1 +
.../spring/batch/SpringBatchEndpoint.java | 6 +-
.../spring/batch/SpringBatchProducer.java | 29 +++++++++-
.../spring/batch/SpringBatchEndpointTest.java | 61 ++++++++++++++++++++
4 files changed, 94 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/d1c6ddcf/components/camel-spring-batch/src/main/java/org/apache/camel/component/spring/batch/SpringBatchComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-spring-batch/src/main/java/org/apache/camel/component/spring/batch/SpringBatchComponent.java b/components/camel-spring-batch/src/main/java/org/apache/camel/component/spring/batch/SpringBatchComponent.java
index 9f7d10c..bd1fbf9 100644
--- a/components/camel-spring-batch/src/main/java/org/apache/camel/component/spring/batch/SpringBatchComponent.java
+++ b/components/camel-spring-batch/src/main/java/org/apache/camel/component/spring/batch/SpringBatchComponent.java
@@ -25,6 +25,7 @@ import org.springframework.batch.core.launch.JobLauncher;
public class SpringBatchComponent extends UriEndpointComponent {
private static final String DEFAULT_JOB_LAUNCHER_REF_NAME = "jobLauncher";
+ public static final String DYNAMIC_JOBNAME = "DYNAMIC_JOBNAME_HEADER";
private JobLauncher jobLauncher;
private JobLauncher defaultResolvedJobLauncher;
http://git-wip-us.apache.org/repos/asf/camel/blob/d1c6ddcf/components/camel-spring-batch/src/main/java/org/apache/camel/component/spring/batch/SpringBatchEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-spring-batch/src/main/java/org/apache/camel/component/spring/batch/SpringBatchEndpoint.java b/components/camel-spring-batch/src/main/java/org/apache/camel/component/spring/batch/SpringBatchEndpoint.java
index 569e089..72df88d 100644
--- a/components/camel-spring-batch/src/main/java/org/apache/camel/component/spring/batch/SpringBatchEndpoint.java
+++ b/components/camel-spring-batch/src/main/java/org/apache/camel/component/spring/batch/SpringBatchEndpoint.java
@@ -37,7 +37,8 @@ import org.springframework.batch.core.launch.JobLauncher;
@UriEndpoint(scheme = "spring-batch", title = "Spring Batch", syntax = "spring-batch:jobName", producerOnly = true, label = "spring,batch,scheduling")
public class SpringBatchEndpoint extends DefaultEndpoint {
- @UriPath @Metadata(required = "true")
+ @UriPath()
+ @Metadata(required = "true")
private String jobName;
/**
@@ -46,6 +47,7 @@ public class SpringBatchEndpoint extends DefaultEndpoint {
*/
@Deprecated
private String jobLauncherRef;
+
@UriParam
private JobLauncher jobLauncher;
@@ -83,7 +85,7 @@ public class SpringBatchEndpoint extends DefaultEndpoint {
if (jobLauncher == null) {
jobLauncher = resolveJobLauncher();
}
- if (job == null && jobName != null) {
+ if (job == null && jobName != null && jobName.compareTo("dynamic")!=0) {
job = CamelContextHelper.mandatoryLookup(getCamelContext(), jobName, Job.class);
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/d1c6ddcf/components/camel-spring-batch/src/main/java/org/apache/camel/component/spring/batch/SpringBatchProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-spring-batch/src/main/java/org/apache/camel/component/spring/batch/SpringBatchProducer.java b/components/camel-spring-batch/src/main/java/org/apache/camel/component/spring/batch/SpringBatchProducer.java
index 4fc5395..a192193 100644
--- a/components/camel-spring-batch/src/main/java/org/apache/camel/component/spring/batch/SpringBatchProducer.java
+++ b/components/camel-spring-batch/src/main/java/org/apache/camel/component/spring/batch/SpringBatchProducer.java
@@ -19,8 +19,10 @@ package org.apache.camel.component.spring.batch;
import java.util.Date;
import java.util.Map;
+import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.CamelContextHelper;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
@@ -44,8 +46,33 @@ public class SpringBatchProducer extends DefaultProducer {
@Override
public void process(Exchange exchange) throws Exception {
+
JobParameters jobParameters = prepareJobParameters(exchange.getIn().getHeaders());
- JobExecution jobExecution = jobLauncher.run(job, jobParameters);
+ String messageJobName = jobParameters.getString(SpringBatchComponent.DYNAMIC_JOBNAME);
+
+ Job job2run = this.job;
+
+ if (messageJobName != null)
+ {
+ Job dynamicJob = CamelContextHelper.mandatoryLookup(getEndpoint().getCamelContext(), messageJobName, Job.class);
+
+ job2run = dynamicJob;
+
+ if (job2run == null)
+ {
+ exchange.setException(new CamelExchangeException("Found header " + SpringBatchComponent.DYNAMIC_JOBNAME +
+ " with value " +messageJobName + " but could not find a Job in camel context", exchange));
+ return;
+ }
+ }
+
+ if (job2run == null) {
+ exchange.setException( new CamelExchangeException("jobName was not specified in the endpoint construction " +
+ " and header "+ SpringBatchComponent.DYNAMIC_JOBNAME + " could not be found", exchange));
+ return;
+ }
+
+ JobExecution jobExecution = jobLauncher.run(job2run, jobParameters);
exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders());
exchange.getOut().setBody(jobExecution);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/d1c6ddcf/components/camel-spring-batch/src/test/java/org/apache/camel/component/spring/batch/SpringBatchEndpointTest.java
----------------------------------------------------------------------
diff --git a/components/camel-spring-batch/src/test/java/org/apache/camel/component/spring/batch/SpringBatchEndpointTest.java b/components/camel-spring-batch/src/test/java/org/apache/camel/component/spring/batch/SpringBatchEndpointTest.java
index cb962ab..00e6c52 100644
--- a/components/camel-spring-batch/src/test/java/org/apache/camel/component/spring/batch/SpringBatchEndpointTest.java
+++ b/components/camel-spring-batch/src/test/java/org/apache/camel/component/spring/batch/SpringBatchEndpointTest.java
@@ -16,9 +16,13 @@
*/
package org.apache.camel.component.spring.batch;
+import java.time.Instant;
import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.camel.CamelContext;
+import org.apache.camel.CamelExchangeException;
import org.apache.camel.EndpointInject;
import org.apache.camel.FailedToCreateRouteException;
import org.apache.camel.builder.RouteBuilder;
@@ -55,32 +59,89 @@ public class SpringBatchEndpointTest extends CamelTestSupport {
@Mock
Job job;
+ @Mock
+ Job dynamicMockjob;
+
// Camel fixtures
@EndpointInject(uri = "mock:test")
MockEndpoint mockEndpoint;
+ @EndpointInject(uri = "mock:error")
+ MockEndpoint errorEndpoint;
+
+
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:start").to("spring-batch:mockJob").to("mock:test");
+ from("direct:dynamic").
+ to("spring-batch:dynamic").
+ errorHandler(deadLetterChannel("mock:error")).
+ to("mock:test");
}
};
}
+
+
@Override
public JndiRegistry createRegistry() throws Exception {
JndiRegistry registry = super.createRegistry();
registry.bind("jobLauncher", jobLauncher);
registry.bind("alternativeJobLauncher", alternativeJobLauncher);
registry.bind("mockJob", job);
+ registry.bind("dynamicMockjob", dynamicMockjob);
return registry;
}
// Tests
+ @Test
+ public void dynamicJobFailsIfHeaderNotPressent() throws Exception {
+
+ mockEndpoint.expectedMessageCount(0);
+ errorEndpoint.expectedMessageCount(1);
+
+ //dynamic job should fail as header is not present and the job is dynamic
+ sendBody("direct:dyanmic", "Start the job, please.");
+ mockEndpoint.assertIsSatisfied();
+ mockEndpoint.assertIsSatisfied();
+ }
+
+ @Test
+ public void dynamicJobWorksIfHeaderWithInvalidJobName() throws Exception {
+
+ mockEndpoint.expectedMessageCount(0);
+ errorEndpoint.expectedMessageCount(1);
+
+ //dynamic job should fail as header is present but the job does not exists
+ header(SpringBatchComponent.DYNAMIC_JOBNAME).append("thisJobDoesNotExsistAtAll" +Date.from(Instant.now()));
+ sendBody("direct:dyanmic", "Start the job, please.");
+
+ mockEndpoint.assertIsSatisfied();
+ mockEndpoint.assertIsSatisfied();
+ }
+
+ @Test
+ public void dynamicJobWorksIfHeaderPressentWithvalidJob() throws Exception {
+
+ mockEndpoint.expectedMessageCount(1);
+ errorEndpoint.expectedMessageCount(0);
+ Thread.sleep(5000);
+ //dynamic job work if header is present and the job exists
+ final Map<String, Object> headers = new HashMap<>();
+ headers.put(SpringBatchComponent.DYNAMIC_JOBNAME, "dynamicMockjob");
+
+ sendBody("direct:dynamic", "Start the job, please.", headers);
+
+ mockEndpoint.assertIsSatisfied();
+ errorEndpoint.assertIsSatisfied();
+ }
+
+
@Test
public void shouldInjectJobToEndpoint() throws IllegalAccessException {
SpringBatchEndpoint batchEndpoint = getMandatoryEndpoint("spring-batch:mockJob", SpringBatchEndpoint.class);