You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@batchee.apache.org by rm...@apache.org on 2014/01/02 19:11:02 UTC
git commit: BATCHEE-10 StepLauncher and StepBuilder
Updated Branches:
refs/heads/master a955dcbcf -> b5fe846f7
BATCHEE-10 StepLauncher and StepBuilder
Project: http://git-wip-us.apache.org/repos/asf/incubator-batchee/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-batchee/commit/b5fe846f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-batchee/tree/b5fe846f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-batchee/diff/b5fe846f
Branch: refs/heads/master
Commit: b5fe846f7d56517168738041952cd61a12a8b587
Parents: a955dcb
Author: Romain Manni-Bucau <rm...@apache.org>
Authored: Thu Jan 2 19:10:54 2014 +0100
Committer: Romain Manni-Bucau <rm...@apache.org>
Committed: Thu Jan 2 19:10:54 2014 +0100
----------------------------------------------------------------------
.../org/apache/batchee/test/StepBuilder.java | 267 +++++++++++++++++++
.../org/apache/batchee/test/StepLauncher.java | 107 ++++++++
.../apache/batchee/test/StepLauncherTest.java | 175 ++++++++++++
.../batchee/test/components/SleepBatchlet.java | 2 +-
4 files changed, 550 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/b5fe846f/test/src/main/java/org/apache/batchee/test/StepBuilder.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/batchee/test/StepBuilder.java b/test/src/main/java/org/apache/batchee/test/StepBuilder.java
new file mode 100644
index 0000000..3853780
--- /dev/null
+++ b/test/src/main/java/org/apache/batchee/test/StepBuilder.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.test;
+
+import org.apache.batchee.container.jsl.ExecutionElement;
+import org.apache.batchee.container.jsl.JobModelResolver;
+import org.apache.batchee.container.services.loader.DefaultJobXMLLoaderService;
+import org.apache.batchee.jaxb.Batchlet;
+import org.apache.batchee.jaxb.Chunk;
+import org.apache.batchee.jaxb.ItemProcessor;
+import org.apache.batchee.jaxb.ItemReader;
+import org.apache.batchee.jaxb.ItemWriter;
+import org.apache.batchee.jaxb.JSLJob;
+import org.apache.batchee.jaxb.JSLProperties;
+import org.apache.batchee.jaxb.Property;
+import org.apache.batchee.jaxb.Step;
+
+import java.util.List;
+
+// impl is not a real builder (see immutability etc)
+// but this is more to propose a nice and fluent API than anything else
+public class StepBuilder {
+ public static Step extractFromXml(final String xml, final String name) {
+ final JSLJob job = new JobModelResolver().resolveModel(new DefaultJobXMLLoaderService().loadJSL(xml.replace(".xml", "")));
+ if (name != null) {
+ for (final ExecutionElement step : job.getExecutionElements()) {
+ if (Step.class.isInstance(step) && name.equals(Step.class.cast(step).getId())) {
+ return Step.class.cast(step);
+ }
+ }
+ }
+ if (job.getExecutionElements().size() == 1) {
+ return Step.class.cast(job.getExecutionElements().iterator().next());
+ }
+ throw new IllegalArgumentException("Step '" + name + "' nor found.");
+ }
+
+ public static StepBuilder newStep() {
+ return new StepBuilder();
+ }
+
+ public static BatchletBuilder newBatchlet() {
+ return new StepBuilder().batchlet();
+ }
+
+ public static ChunkBuilder newChunk() {
+ return new StepBuilder().chunk();
+ }
+
+ private StepBuilder() {
+ // no-op
+ }
+
+ private final Step step = new Step();
+
+ public BatchletBuilder batchlet() {
+ final Batchlet batchlet = new Batchlet();
+ batchlet.setProperties(new JSLProperties());
+ step.setBatchlet(batchlet);
+ return new BatchletBuilder(batchlet, this);
+ }
+
+ public ChunkBuilder chunk() {
+ final Chunk chunk = new Chunk();
+ step.setChunk(chunk);
+ return new ChunkBuilder(chunk, this);
+ }
+
+ public StepBuilder property(final String key, final String value) {
+ addProperty(key, value, step.getProperties().getPropertyList());
+ return this;
+ }
+
+ public StepBuilder name(final String name) {
+ step.setId(name);
+ return this;
+ }
+
+ public Step create() {
+ if (step.getId() == null) {
+ step.setId("batchee-test"); // can't be null
+ }
+ return step;
+ }
+
+ public static class BatchletBuilder {
+ private final Batchlet toBuild;
+ private final StepBuilder parent;
+
+ private BatchletBuilder(final Batchlet batchlet, final StepBuilder stepBuilder) {
+ toBuild = batchlet;
+ parent = stepBuilder;
+ }
+
+ public BatchletBuilder ref(final String ref) {
+ toBuild.setRef(ref);
+ return this;
+ }
+
+ public BatchletBuilder property(final String key, final String value) {
+ addProperty(key, value, toBuild.getProperties().getPropertyList());
+ return this;
+ }
+
+ public StepBuilder up() {
+ return parent;
+ }
+
+ public Step create() {
+ return up().create();
+ }
+ }
+
+ public static class WriterBuilder {
+ private final ItemWriter toBuild;
+ private final ChunkBuilder parent;
+
+ private WriterBuilder(final ItemWriter batchlet, final ChunkBuilder stepBuilder) {
+ toBuild = batchlet;
+ parent = stepBuilder;
+ }
+
+ public WriterBuilder ref(final String ref) {
+ toBuild.setRef(ref);
+ return this;
+ }
+
+ public WriterBuilder property(final String key, final String value) {
+ addProperty(key, value, toBuild.getProperties().getPropertyList());
+ return this;
+ }
+
+ public ChunkBuilder up() {
+ return parent;
+ }
+
+ public Step create() {
+ return up().up().create();
+ }
+ }
+
+ public static class ProcessorBuilder {
+ private final ItemProcessor toBuild;
+ private final ChunkBuilder parent;
+
+ private ProcessorBuilder(final ItemProcessor batchlet, final ChunkBuilder stepBuilder) {
+ toBuild = batchlet;
+ parent = stepBuilder;
+ }
+
+ public ProcessorBuilder ref(final String ref) {
+ toBuild.setRef(ref);
+ return this;
+ }
+
+ public ProcessorBuilder property(final String key, final String value) {
+ addProperty(key, value, toBuild.getProperties().getPropertyList());
+ return this;
+ }
+
+ public WriterBuilder writer() {
+ return up().writer();
+ }
+
+ public ChunkBuilder up() {
+ return parent;
+ }
+ }
+
+ public static class ReaderBuilder {
+ private final ItemReader toBuild;
+ private final ChunkBuilder parent;
+
+ private ReaderBuilder(final ItemReader batchlet, final ChunkBuilder stepBuilder) {
+ toBuild = batchlet;
+ parent = stepBuilder;
+ }
+
+ public ReaderBuilder ref(final String ref) {
+ toBuild.setRef(ref);
+ return this;
+ }
+
+ public ReaderBuilder property(final String key, final String value) {
+ addProperty(key, value, toBuild.getProperties().getPropertyList());
+ return this;
+ }
+
+ public ProcessorBuilder processor() {
+ return up().processor();
+ }
+
+ public WriterBuilder writer() {
+ return up().writer();
+ }
+
+ public ChunkBuilder up() {
+ return parent;
+ }
+ }
+
+ public static class ChunkBuilder {
+ private final StepBuilder parent;
+ private final Chunk toBuild;
+
+ private ChunkBuilder(final Chunk chunk, final StepBuilder stepBuilder) {
+ toBuild = chunk;
+ parent = stepBuilder;
+ }
+
+ public ReaderBuilder reader() {
+ final ItemReader reader = new ItemReader();
+ reader.setProperties(new JSLProperties());
+ toBuild.setReader(reader);
+ return new ReaderBuilder(reader, this);
+ }
+
+ public ProcessorBuilder processor() {
+ final ItemProcessor processor = new ItemProcessor();
+ processor.setProperties(new JSLProperties());
+ toBuild.setProcessor(processor);
+ return new ProcessorBuilder(processor, this);
+ }
+
+ public WriterBuilder writer() {
+ final ItemWriter writer = new ItemWriter();
+ writer.setProperties(new JSLProperties());
+ toBuild.setWriter(writer);
+ return new WriterBuilder(writer, this);
+ }
+
+ public ChunkBuilder retryLimit(final int retry) {
+ toBuild.setRetryLimit(Integer.toString(retry));
+ return this;
+ }
+
+ public ChunkBuilder checkpointPolicy(final String policy) {
+ toBuild.setCheckpointPolicy(policy);
+ return this;
+ }
+
+ public StepBuilder up() {
+ return parent;
+ }
+ }
+
+ private static void addProperty(final String key, final String value, final List<Property> propertyList) {
+ final Property e = new Property();
+ e.setName(key);
+ e.setValue(value);
+ propertyList.add(e);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/b5fe846f/test/src/main/java/org/apache/batchee/test/StepLauncher.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/batchee/test/StepLauncher.java b/test/src/main/java/org/apache/batchee/test/StepLauncher.java
new file mode 100644
index 0000000..5224254
--- /dev/null
+++ b/test/src/main/java/org/apache/batchee/test/StepLauncher.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.test;
+
+import org.apache.batchee.container.impl.JobContextImpl;
+import org.apache.batchee.container.impl.StepContextImpl;
+import org.apache.batchee.container.impl.controller.batchlet.BatchletStepController;
+import org.apache.batchee.container.impl.controller.chunk.ChunkStepController;
+import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
+import org.apache.batchee.container.modelresolver.PropertyResolverFactory;
+import org.apache.batchee.container.navigator.JobNavigator;
+import org.apache.batchee.container.proxy.InjectionReferences;
+import org.apache.batchee.container.proxy.ListenerFactory;
+import org.apache.batchee.container.services.JobStatusManagerService;
+import org.apache.batchee.container.services.ServicesManager;
+import org.apache.batchee.container.services.persistence.MemoryPersistenceManager;
+import org.apache.batchee.container.util.PartitionDataWrapper;
+import org.apache.batchee.jaxb.JSLJob;
+import org.apache.batchee.jaxb.JSLProperties;
+import org.apache.batchee.jaxb.Step;
+import org.apache.batchee.spi.BatchArtifactFactory;
+import org.apache.batchee.spi.PersistenceManagerService;
+import org.apache.batchee.spi.SecurityService;
+
+import javax.batch.runtime.BatchStatus;
+import javax.batch.runtime.JobInstance;
+import javax.batch.runtime.StepExecution;
+import java.util.Properties;
+import java.util.concurrent.ArrayBlockingQueue;
+
+public class StepLauncher {
+ private static final Properties EMPTY_PROPERTIES = new Properties();
+ private static final JSLProperties EMPTY_JSL_PROPERTIES = new JSLProperties();
+ private static final JSLJob EMPTY_JOB = new JSLJob();
+
+ private static final Properties TEST_PROPERTIES = new Properties();
+
+ static { // default ATM but could be TestMemoryPersistenceManager
+ TEST_PROPERTIES.put(PersistenceManagerService.class.getName(), MemoryPersistenceManager.class.getName());
+ }
+
+ public static StepExecution execute(final Step step) {
+ return execute(step, EMPTY_PROPERTIES);
+ }
+
+ /**
+ * @param step the step to execute, Note: it can be modified by this method
+ * @param jobParams the job parameters properties
+ * @return the step execution
+ */
+ public static StepExecution execute(final Step step, final Properties jobParams) {
+ // services
+ final ServicesManager manager = new ServicesManager();
+ manager.init(TEST_PROPERTIES);
+
+ final PersistenceManagerService persistenceManagerService = manager.service(PersistenceManagerService.class);
+ final BatchArtifactFactory factory = manager.service(BatchArtifactFactory.class);
+
+ // contextual data
+ final JobInstance jobInstance = persistenceManagerService.createJobInstance(step.getId(), manager.service(SecurityService.class).getLoggedUser(), null);
+ manager.service(JobStatusManagerService.class).createJobStatus(jobInstance.getInstanceId());
+
+ final JobContextImpl jobContext = new JobContextImpl(new JobNavigator(EMPTY_JOB), EMPTY_JSL_PROPERTIES);
+ final StepContextImpl stepContext = new StepContextImpl(step.getId());
+
+ final RuntimeJobExecution runtimeJobExecution = persistenceManagerService.createJobExecution(jobInstance, EMPTY_PROPERTIES, BatchStatus.STARTED);
+ final InjectionReferences injectionRefs = new InjectionReferences(jobContext, stepContext, EMPTY_JSL_PROPERTIES.getPropertyList());
+ final ListenerFactory listenerFactory = new ListenerFactory(factory, EMPTY_JOB, injectionRefs, runtimeJobExecution);
+
+ // execute it!
+ runtimeJobExecution.setListenerFactory(listenerFactory);
+ runtimeJobExecution.prepareForExecution(jobContext, null);
+
+ if (step.getChunk() != null) {
+ step.setChunk(PropertyResolverFactory.createChunkPropertyResolver(false).substituteProperties(step.getChunk(), jobParams));
+ new ChunkStepController(runtimeJobExecution, step, stepContext, jobInstance.getInstanceId(),
+ new ArrayBlockingQueue<PartitionDataWrapper>(1), manager)
+ .execute();
+ } else { // batchlet
+ step.setBatchlet(PropertyResolverFactory.createBatchletPropertyResolver(false).substituteProperties(step.getBatchlet(), jobParams));
+ new BatchletStepController(
+ runtimeJobExecution, step, stepContext, jobInstance.getInstanceId(),
+ new ArrayBlockingQueue<PartitionDataWrapper>(1), manager)
+ .execute();
+ }
+
+ return persistenceManagerService.getStepExecutionByStepExecutionId(stepContext.getStepExecutionId());
+ }
+
+ private StepLauncher() {
+ // no-op
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/b5fe846f/test/src/test/java/org/apache/batchee/test/StepLauncherTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/batchee/test/StepLauncherTest.java b/test/src/test/java/org/apache/batchee/test/StepLauncherTest.java
new file mode 100644
index 0000000..9d96df4
--- /dev/null
+++ b/test/src/test/java/org/apache/batchee/test/StepLauncherTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.test;
+
+import org.testng.annotations.Test;
+
+import javax.batch.api.BatchProperty;
+import javax.batch.api.Batchlet;
+import javax.batch.api.chunk.AbstractItemReader;
+import javax.batch.api.chunk.AbstractItemWriter;
+import javax.batch.api.chunk.ItemProcessor;
+import javax.batch.runtime.BatchStatus;
+import javax.batch.runtime.StepExecution;
+import javax.batch.runtime.context.JobContext;
+import javax.batch.runtime.context.StepContext;
+import javax.inject.Inject;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Properties;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+public class StepLauncherTest {
+
+ public static final String BATCHLET_REF = SimpleBatchlet.class.getName();
+
+ @Test
+ public void stepFromXml() {
+ final StepExecution execution = StepLauncher.execute(StepBuilder.extractFromXml("sleep.xml", "doSleep"));
+ assertEquals("OK", execution.getExitStatus());
+ assertEquals(BatchStatus.COMPLETED, execution.getBatchStatus());
+ }
+
+ @Test
+ public void simpleBatchlet() {
+ final StepExecution execution = StepLauncher.execute(StepBuilder.newBatchlet().ref(BATCHLET_REF).create());
+ assertEquals("default", execution.getExitStatus());
+ assertEquals(BatchStatus.COMPLETED, execution.getBatchStatus());
+ }
+
+ @Test
+ public void batchletWithConfig() {
+ final StepExecution execution = StepLauncher.execute(
+ StepBuilder.newBatchlet()
+ .ref(BATCHLET_REF)
+ .property("config", "override")
+ .create());
+ assertEquals("override", execution.getExitStatus());
+ assertEquals(BatchStatus.COMPLETED, execution.getBatchStatus());
+ }
+
+ @Test
+ public void batchletWithConfigFromJobParams() {
+ final StepExecution execution = StepLauncher.execute(
+ StepBuilder.newBatchlet()
+ .ref(BATCHLET_REF)
+ .property("config", "#{jobParameters['conf']}")
+ .create(),
+ new Properties() {{
+ setProperty("conf", "param");
+ }});
+ assertEquals("param", execution.getExitStatus());
+ assertEquals(BatchStatus.COMPLETED, execution.getBatchStatus());
+ }
+
+ @Test
+ public void simpleChunk() {
+ final StepExecution execution = StepLauncher.execute(StepBuilder.newChunk()
+ .reader().ref(SimpleReader.class.getName())
+ .processor().ref(SimpleProcessor.class.getName())
+ .writer().ref(SimpleWriter.class.getName())
+ .create());
+ assertEquals(BatchStatus.COMPLETED, execution.getBatchStatus());
+ assertNotNull(SimpleWriter.result);
+ assertEquals(2, SimpleWriter.result.size());
+ assertTrue(SimpleWriter.result.contains("0#"));
+ assertTrue(SimpleWriter.result.contains("1#"));
+ }
+
+ @Test
+ public void configuredReader() {
+ final StepExecution execution = StepLauncher.execute(StepBuilder.newChunk()
+ .reader().ref(SimpleReader.class.getName()).property("total", "1")
+ .writer().ref(SimpleWriter.class.getName())
+ .create());
+ assertEquals(BatchStatus.COMPLETED, execution.getBatchStatus());
+ assertNotNull(SimpleWriter.result);
+ assertEquals(1, SimpleWriter.result.size());
+ assertTrue(SimpleWriter.result.contains("#0"));
+ }
+
+ public static class SimpleReader extends AbstractItemReader {
+ @Inject
+ @BatchProperty
+ private String total;
+
+ private int count = 2;
+
+ @Override
+ public void open(final Serializable checkpoint) throws Exception {
+ if (total != null) {
+ count = Integer.parseInt(total);
+ }
+ }
+
+ @Override
+ public Object readItem() throws Exception {
+ if (count-- > 0) {
+ return "#" + count;
+ }
+ return null;
+ }
+ }
+
+ public static class SimpleProcessor implements ItemProcessor {
+ @Override
+ public Object processItem(final Object o) throws Exception {
+ return new StringBuilder(o.toString()).reverse().toString();
+ }
+ }
+
+ public static class SimpleWriter extends AbstractItemWriter {
+ public static List<Object> result;
+
+ @Override
+ public void writeItems(final List<Object> objects) throws Exception {
+ result = objects;
+ }
+ }
+
+ public static class SimpleBatchlet implements Batchlet {
+ @Inject
+ private JobContext job;
+
+ @Inject
+ private StepContext step;
+
+ @Inject
+ @BatchProperty
+ private String config;
+
+ @Override
+ public String process() throws Exception {
+ if (config != null) {
+ job.setExitStatus(config);
+ step.setExitStatus(config);
+ return config;
+ }
+
+ step.setExitStatus("default");
+ return "default";
+ }
+
+ @Override
+ public void stop() throws Exception {
+ // no-op
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/b5fe846f/test/src/test/java/org/apache/batchee/test/components/SleepBatchlet.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/batchee/test/components/SleepBatchlet.java b/test/src/test/java/org/apache/batchee/test/components/SleepBatchlet.java
index e27da3a..abafb2a 100644
--- a/test/src/test/java/org/apache/batchee/test/components/SleepBatchlet.java
+++ b/test/src/test/java/org/apache/batchee/test/components/SleepBatchlet.java
@@ -25,7 +25,7 @@ public class SleepBatchlet implements Batchlet {
@Inject
@BatchProperty
- private String duration;
+ private String duration = "50";
private volatile boolean stopped = false;