You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@batchee.apache.org by rm...@apache.org on 2014/01/02 19:11:02 UTC

git commit: BATCHEE-10 StepLauncher and StepBuilder

Updated Branches:
  refs/heads/master a955dcbcf -> b5fe846f7


BATCHEE-10 StepLauncher and StepBuilder


Project: http://git-wip-us.apache.org/repos/asf/incubator-batchee/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-batchee/commit/b5fe846f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-batchee/tree/b5fe846f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-batchee/diff/b5fe846f

Branch: refs/heads/master
Commit: b5fe846f7d56517168738041952cd61a12a8b587
Parents: a955dcb
Author: Romain Manni-Bucau <rm...@apache.org>
Authored: Thu Jan 2 19:10:54 2014 +0100
Committer: Romain Manni-Bucau <rm...@apache.org>
Committed: Thu Jan 2 19:10:54 2014 +0100

----------------------------------------------------------------------
 .../org/apache/batchee/test/StepBuilder.java    | 267 +++++++++++++++++++
 .../org/apache/batchee/test/StepLauncher.java   | 107 ++++++++
 .../apache/batchee/test/StepLauncherTest.java   | 175 ++++++++++++
 .../batchee/test/components/SleepBatchlet.java  |   2 +-
 4 files changed, 550 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/b5fe846f/test/src/main/java/org/apache/batchee/test/StepBuilder.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/batchee/test/StepBuilder.java b/test/src/main/java/org/apache/batchee/test/StepBuilder.java
new file mode 100644
index 0000000..3853780
--- /dev/null
+++ b/test/src/main/java/org/apache/batchee/test/StepBuilder.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.test;
+
+import org.apache.batchee.container.jsl.ExecutionElement;
+import org.apache.batchee.container.jsl.JobModelResolver;
+import org.apache.batchee.container.services.loader.DefaultJobXMLLoaderService;
+import org.apache.batchee.jaxb.Batchlet;
+import org.apache.batchee.jaxb.Chunk;
+import org.apache.batchee.jaxb.ItemProcessor;
+import org.apache.batchee.jaxb.ItemReader;
+import org.apache.batchee.jaxb.ItemWriter;
+import org.apache.batchee.jaxb.JSLJob;
+import org.apache.batchee.jaxb.JSLProperties;
+import org.apache.batchee.jaxb.Property;
+import org.apache.batchee.jaxb.Step;
+
+import java.util.List;
+
+// impl is not a real builder (see immutability etc)
+// but this is more to propose a nice and fluent API than anything else
+public class StepBuilder {
+    public static Step extractFromXml(final String xml, final String name) {
+        final JSLJob job = new JobModelResolver().resolveModel(new DefaultJobXMLLoaderService().loadJSL(xml.replace(".xml", "")));
+        if (name != null) {
+            for (final ExecutionElement step : job.getExecutionElements()) {
+                if (Step.class.isInstance(step) && name.equals(Step.class.cast(step).getId())) {
+                    return Step.class.cast(step);
+                }
+            }
+        }
+        if (job.getExecutionElements().size() == 1) {
+            return Step.class.cast(job.getExecutionElements().iterator().next());
+        }
+        throw new IllegalArgumentException("Step '" + name + "' nor found.");
+    }
+
+    public static StepBuilder newStep() {
+        return new StepBuilder();
+    }
+
+    public static BatchletBuilder newBatchlet() {
+        return new StepBuilder().batchlet();
+    }
+
+    public static ChunkBuilder newChunk() {
+        return new StepBuilder().chunk();
+    }
+
+    private StepBuilder() {
+        // no-op
+    }
+
+    private final Step step = new Step();
+
+    public BatchletBuilder batchlet() {
+        final Batchlet batchlet = new Batchlet();
+        batchlet.setProperties(new JSLProperties());
+        step.setBatchlet(batchlet);
+        return new BatchletBuilder(batchlet, this);
+    }
+
+    public ChunkBuilder chunk() {
+        final Chunk chunk = new Chunk();
+        step.setChunk(chunk);
+        return new ChunkBuilder(chunk, this);
+    }
+
+    public StepBuilder property(final String key, final String value) {
+        addProperty(key, value, step.getProperties().getPropertyList());
+        return this;
+    }
+
+    public StepBuilder name(final String name) {
+        step.setId(name);
+        return this;
+    }
+
+    public Step create() {
+        if (step.getId() == null) {
+            step.setId("batchee-test"); // can't be null
+        }
+        return step;
+    }
+
+    public static class BatchletBuilder {
+        private final Batchlet toBuild;
+        private final StepBuilder parent;
+
+        private BatchletBuilder(final Batchlet batchlet, final StepBuilder stepBuilder) {
+            toBuild = batchlet;
+            parent = stepBuilder;
+        }
+
+        public BatchletBuilder ref(final String ref) {
+            toBuild.setRef(ref);
+            return this;
+        }
+
+        public BatchletBuilder property(final String key, final String value) {
+            addProperty(key, value, toBuild.getProperties().getPropertyList());
+            return this;
+        }
+
+        public StepBuilder up() {
+            return parent;
+        }
+
+        public Step create() {
+            return up().create();
+        }
+    }
+
+    public static class WriterBuilder {
+        private final ItemWriter toBuild;
+        private final ChunkBuilder parent;
+
+        private WriterBuilder(final ItemWriter batchlet, final ChunkBuilder stepBuilder) {
+            toBuild = batchlet;
+            parent = stepBuilder;
+        }
+
+        public WriterBuilder ref(final String ref) {
+            toBuild.setRef(ref);
+            return this;
+        }
+
+        public WriterBuilder property(final String key, final String value) {
+            addProperty(key, value, toBuild.getProperties().getPropertyList());
+            return this;
+        }
+
+        public ChunkBuilder up() {
+            return parent;
+        }
+
+        public Step create() {
+            return up().up().create();
+        }
+    }
+
+    public static class ProcessorBuilder {
+        private final ItemProcessor toBuild;
+        private final ChunkBuilder parent;
+
+        private ProcessorBuilder(final ItemProcessor batchlet, final ChunkBuilder stepBuilder) {
+            toBuild = batchlet;
+            parent = stepBuilder;
+        }
+
+        public ProcessorBuilder ref(final String ref) {
+            toBuild.setRef(ref);
+            return this;
+        }
+
+        public ProcessorBuilder property(final String key, final String value) {
+            addProperty(key, value, toBuild.getProperties().getPropertyList());
+            return this;
+        }
+
+        public WriterBuilder writer() {
+            return up().writer();
+        }
+
+        public ChunkBuilder up() {
+            return parent;
+        }
+    }
+
+    public static class ReaderBuilder {
+        private final ItemReader toBuild;
+        private final ChunkBuilder parent;
+
+        private ReaderBuilder(final ItemReader batchlet, final ChunkBuilder stepBuilder) {
+            toBuild = batchlet;
+            parent = stepBuilder;
+        }
+
+        public ReaderBuilder ref(final String ref) {
+            toBuild.setRef(ref);
+            return this;
+        }
+
+        public ReaderBuilder property(final String key, final String value) {
+            addProperty(key, value, toBuild.getProperties().getPropertyList());
+            return this;
+        }
+
+        public ProcessorBuilder processor() {
+            return up().processor();
+        }
+
+        public WriterBuilder writer() {
+            return up().writer();
+        }
+
+        public ChunkBuilder up() {
+            return parent;
+        }
+    }
+
+    public static class ChunkBuilder {
+        private final StepBuilder parent;
+        private final Chunk toBuild;
+
+        private ChunkBuilder(final Chunk chunk, final StepBuilder stepBuilder) {
+            toBuild = chunk;
+            parent = stepBuilder;
+        }
+
+        public ReaderBuilder reader() {
+            final ItemReader reader = new ItemReader();
+            reader.setProperties(new JSLProperties());
+            toBuild.setReader(reader);
+            return new ReaderBuilder(reader, this);
+        }
+
+        public ProcessorBuilder processor() {
+            final ItemProcessor processor = new ItemProcessor();
+            processor.setProperties(new JSLProperties());
+            toBuild.setProcessor(processor);
+            return new ProcessorBuilder(processor, this);
+        }
+
+        public WriterBuilder writer() {
+            final ItemWriter writer = new ItemWriter();
+            writer.setProperties(new JSLProperties());
+            toBuild.setWriter(writer);
+            return new WriterBuilder(writer, this);
+        }
+
+        public ChunkBuilder retryLimit(final int retry) {
+            toBuild.setRetryLimit(Integer.toString(retry));
+            return this;
+        }
+
+        public ChunkBuilder checkpointPolicy(final String policy) {
+            toBuild.setCheckpointPolicy(policy);
+            return this;
+        }
+
+        public StepBuilder up() {
+            return parent;
+        }
+    }
+
+    private static void addProperty(final String key, final String value, final List<Property> propertyList) {
+        final Property e = new Property();
+        e.setName(key);
+        e.setValue(value);
+        propertyList.add(e);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/b5fe846f/test/src/main/java/org/apache/batchee/test/StepLauncher.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/batchee/test/StepLauncher.java b/test/src/main/java/org/apache/batchee/test/StepLauncher.java
new file mode 100644
index 0000000..5224254
--- /dev/null
+++ b/test/src/main/java/org/apache/batchee/test/StepLauncher.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.test;
+
+import org.apache.batchee.container.impl.JobContextImpl;
+import org.apache.batchee.container.impl.StepContextImpl;
+import org.apache.batchee.container.impl.controller.batchlet.BatchletStepController;
+import org.apache.batchee.container.impl.controller.chunk.ChunkStepController;
+import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
+import org.apache.batchee.container.modelresolver.PropertyResolverFactory;
+import org.apache.batchee.container.navigator.JobNavigator;
+import org.apache.batchee.container.proxy.InjectionReferences;
+import org.apache.batchee.container.proxy.ListenerFactory;
+import org.apache.batchee.container.services.JobStatusManagerService;
+import org.apache.batchee.container.services.ServicesManager;
+import org.apache.batchee.container.services.persistence.MemoryPersistenceManager;
+import org.apache.batchee.container.util.PartitionDataWrapper;
+import org.apache.batchee.jaxb.JSLJob;
+import org.apache.batchee.jaxb.JSLProperties;
+import org.apache.batchee.jaxb.Step;
+import org.apache.batchee.spi.BatchArtifactFactory;
+import org.apache.batchee.spi.PersistenceManagerService;
+import org.apache.batchee.spi.SecurityService;
+
+import javax.batch.runtime.BatchStatus;
+import javax.batch.runtime.JobInstance;
+import javax.batch.runtime.StepExecution;
+import java.util.Properties;
+import java.util.concurrent.ArrayBlockingQueue;
+
+public class StepLauncher {
+    private static final Properties EMPTY_PROPERTIES = new Properties();
+    private static final JSLProperties EMPTY_JSL_PROPERTIES = new JSLProperties();
+    private static final JSLJob EMPTY_JOB = new JSLJob();
+
+    private static final Properties TEST_PROPERTIES = new Properties();
+
+    static { // default ATM but could be TestMemoryPersistenceManager
+        TEST_PROPERTIES.put(PersistenceManagerService.class.getName(), MemoryPersistenceManager.class.getName());
+    }
+
+    public static StepExecution execute(final Step step) {
+        return execute(step, EMPTY_PROPERTIES);
+    }
+
+    /**
+     * @param step the step to execute, Note: it can be modified by this method
+     * @param jobParams the job parameters properties
+     * @return the step execution
+     */
+    public static StepExecution execute(final Step step, final Properties jobParams) {
+        // services
+        final ServicesManager manager = new ServicesManager();
+        manager.init(TEST_PROPERTIES);
+
+        final PersistenceManagerService persistenceManagerService = manager.service(PersistenceManagerService.class);
+        final BatchArtifactFactory factory = manager.service(BatchArtifactFactory.class);
+
+        // contextual data
+        final JobInstance jobInstance = persistenceManagerService.createJobInstance(step.getId(), manager.service(SecurityService.class).getLoggedUser(), null);
+        manager.service(JobStatusManagerService.class).createJobStatus(jobInstance.getInstanceId());
+
+        final JobContextImpl jobContext = new JobContextImpl(new JobNavigator(EMPTY_JOB), EMPTY_JSL_PROPERTIES);
+        final StepContextImpl stepContext = new StepContextImpl(step.getId());
+
+        final RuntimeJobExecution runtimeJobExecution = persistenceManagerService.createJobExecution(jobInstance, EMPTY_PROPERTIES, BatchStatus.STARTED);
+        final InjectionReferences injectionRefs = new InjectionReferences(jobContext, stepContext, EMPTY_JSL_PROPERTIES.getPropertyList());
+        final ListenerFactory listenerFactory = new ListenerFactory(factory, EMPTY_JOB, injectionRefs, runtimeJobExecution);
+
+        // execute it!
+        runtimeJobExecution.setListenerFactory(listenerFactory);
+        runtimeJobExecution.prepareForExecution(jobContext, null);
+
+        if (step.getChunk() != null) {
+            step.setChunk(PropertyResolverFactory.createChunkPropertyResolver(false).substituteProperties(step.getChunk(), jobParams));
+            new ChunkStepController(runtimeJobExecution, step, stepContext, jobInstance.getInstanceId(),
+                new ArrayBlockingQueue<PartitionDataWrapper>(1), manager)
+                .execute();
+        } else { // batchlet
+            step.setBatchlet(PropertyResolverFactory.createBatchletPropertyResolver(false).substituteProperties(step.getBatchlet(), jobParams));
+            new BatchletStepController(
+                runtimeJobExecution, step, stepContext, jobInstance.getInstanceId(),
+                new ArrayBlockingQueue<PartitionDataWrapper>(1), manager)
+                .execute();
+        }
+
+        return persistenceManagerService.getStepExecutionByStepExecutionId(stepContext.getStepExecutionId());
+    }
+
+    private StepLauncher() {
+        // no-op
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/b5fe846f/test/src/test/java/org/apache/batchee/test/StepLauncherTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/batchee/test/StepLauncherTest.java b/test/src/test/java/org/apache/batchee/test/StepLauncherTest.java
new file mode 100644
index 0000000..9d96df4
--- /dev/null
+++ b/test/src/test/java/org/apache/batchee/test/StepLauncherTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.test;
+
+import org.testng.annotations.Test;
+
+import javax.batch.api.BatchProperty;
+import javax.batch.api.Batchlet;
+import javax.batch.api.chunk.AbstractItemReader;
+import javax.batch.api.chunk.AbstractItemWriter;
+import javax.batch.api.chunk.ItemProcessor;
+import javax.batch.runtime.BatchStatus;
+import javax.batch.runtime.StepExecution;
+import javax.batch.runtime.context.JobContext;
+import javax.batch.runtime.context.StepContext;
+import javax.inject.Inject;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Properties;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+public class StepLauncherTest {
+
+    public static final String BATCHLET_REF = SimpleBatchlet.class.getName();
+
+    @Test
+    public void stepFromXml() {
+        final StepExecution execution = StepLauncher.execute(StepBuilder.extractFromXml("sleep.xml", "doSleep"));
+        assertEquals("OK", execution.getExitStatus());
+        assertEquals(BatchStatus.COMPLETED, execution.getBatchStatus());
+    }
+
+    @Test
+    public void simpleBatchlet() {
+        final StepExecution execution = StepLauncher.execute(StepBuilder.newBatchlet().ref(BATCHLET_REF).create());
+        assertEquals("default", execution.getExitStatus());
+        assertEquals(BatchStatus.COMPLETED, execution.getBatchStatus());
+    }
+
+    @Test
+    public void batchletWithConfig() {
+        final StepExecution execution = StepLauncher.execute(
+                StepBuilder.newBatchlet()
+                        .ref(BATCHLET_REF)
+                        .property("config", "override")
+                        .create());
+        assertEquals("override", execution.getExitStatus());
+        assertEquals(BatchStatus.COMPLETED, execution.getBatchStatus());
+    }
+
+    @Test
+    public void batchletWithConfigFromJobParams() {
+        final StepExecution execution = StepLauncher.execute(
+                StepBuilder.newBatchlet()
+                        .ref(BATCHLET_REF)
+                        .property("config", "#{jobParameters['conf']}")
+                        .create(),
+                new Properties() {{
+                    setProperty("conf", "param");
+                }});
+        assertEquals("param", execution.getExitStatus());
+        assertEquals(BatchStatus.COMPLETED, execution.getBatchStatus());
+    }
+
+    @Test
+    public void simpleChunk() {
+        final StepExecution execution = StepLauncher.execute(StepBuilder.newChunk()
+                .reader().ref(SimpleReader.class.getName())
+                .processor().ref(SimpleProcessor.class.getName())
+                .writer().ref(SimpleWriter.class.getName())
+                .create());
+        assertEquals(BatchStatus.COMPLETED, execution.getBatchStatus());
+        assertNotNull(SimpleWriter.result);
+        assertEquals(2, SimpleWriter.result.size());
+        assertTrue(SimpleWriter.result.contains("0#"));
+        assertTrue(SimpleWriter.result.contains("1#"));
+    }
+
+    @Test
+    public void configuredReader() {
+        final StepExecution execution = StepLauncher.execute(StepBuilder.newChunk()
+                .reader().ref(SimpleReader.class.getName()).property("total", "1")
+                .writer().ref(SimpleWriter.class.getName())
+                .create());
+        assertEquals(BatchStatus.COMPLETED, execution.getBatchStatus());
+        assertNotNull(SimpleWriter.result);
+        assertEquals(1, SimpleWriter.result.size());
+        assertTrue(SimpleWriter.result.contains("#0"));
+    }
+
+    public static class SimpleReader extends AbstractItemReader {
+        @Inject
+        @BatchProperty
+        private String total;
+
+        private int count = 2;
+
+        @Override
+        public void open(final Serializable checkpoint) throws Exception {
+            if (total != null) {
+                count = Integer.parseInt(total);
+            }
+        }
+
+        @Override
+        public Object readItem() throws Exception {
+            if (count-- > 0) {
+                return "#" + count;
+            }
+            return null;
+        }
+    }
+
+    public static class SimpleProcessor implements ItemProcessor {
+        @Override
+        public Object processItem(final Object o) throws Exception {
+            return new StringBuilder(o.toString()).reverse().toString();
+        }
+    }
+
+    public static class SimpleWriter extends AbstractItemWriter {
+        public static List<Object> result;
+
+        @Override
+        public void writeItems(final List<Object> objects) throws Exception {
+            result = objects;
+        }
+    }
+
+    public static class SimpleBatchlet implements Batchlet {
+        @Inject
+        private JobContext job;
+
+        @Inject
+        private StepContext step;
+
+        @Inject
+        @BatchProperty
+        private String config;
+
+        @Override
+        public String process() throws Exception {
+            if (config != null) {
+                job.setExitStatus(config);
+                step.setExitStatus(config);
+                return config;
+            }
+
+            step.setExitStatus("default");
+            return "default";
+        }
+
+        @Override
+        public void stop() throws Exception {
+            // no-op
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/b5fe846f/test/src/test/java/org/apache/batchee/test/components/SleepBatchlet.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/batchee/test/components/SleepBatchlet.java b/test/src/test/java/org/apache/batchee/test/components/SleepBatchlet.java
index e27da3a..abafb2a 100644
--- a/test/src/test/java/org/apache/batchee/test/components/SleepBatchlet.java
+++ b/test/src/test/java/org/apache/batchee/test/components/SleepBatchlet.java
@@ -25,7 +25,7 @@ public class SleepBatchlet implements Batchlet {
 
     @Inject
     @BatchProperty
-    private String duration;
+    private String duration = "50";
 
     private volatile boolean stopped = false;