You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@batchee.apache.org by rm...@apache.org on 2013/11/05 08:39:07 UTC

[30/62] importing batchee from github - a fork from the IBm RI

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/impl/controller/JobThreadRootController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/JobThreadRootController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/JobThreadRootController.java
new file mode 100755
index 0000000..1dd8c3b
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/JobThreadRootController.java
@@ -0,0 +1,274 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ * 
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License, 
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.container.impl.controller;
+
+import org.apache.batchee.container.Controller;
+import org.apache.batchee.container.ThreadRootController;
+import org.apache.batchee.container.impl.JobContextImpl;
+import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
+import org.apache.batchee.container.navigator.ModelNavigator;
+import org.apache.batchee.container.proxy.InjectionReferences;
+import org.apache.batchee.container.proxy.JobListenerProxy;
+import org.apache.batchee.container.proxy.ListenerFactory;
+import org.apache.batchee.container.services.JobStatusManagerService;
+import org.apache.batchee.spi.PersistenceManagerService;
+import org.apache.batchee.container.services.ServicesManager;
+import org.apache.batchee.container.status.ExecutionStatus;
+import org.apache.batchee.container.status.ExtendedBatchStatus;
+import org.apache.batchee.container.util.PartitionDataWrapper;
+import org.apache.batchee.jaxb.JSLJob;
+
+import javax.batch.runtime.BatchStatus;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.sql.Timestamp;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public abstract class JobThreadRootController implements ThreadRootController {
+    private static final Logger LOGGER = Logger.getLogger(JobThreadRootController.class.getName());
+
+    protected final RuntimeJobExecution jobExecution;
+    protected final JobContextImpl jobContext;
+    protected final long rootJobExecutionId;
+    protected final long jobInstanceId;
+    private final ListenerFactory listenerFactory;
+    protected final ModelNavigator<JSLJob> jobNavigator;
+    protected final JobStatusManagerService jobStatusService;
+    protected final PersistenceManagerService persistenceService;
+
+    private ExecutionTransitioner transitioner;
+    private BlockingQueue<PartitionDataWrapper> analyzerQueue;
+
+    public JobThreadRootController(final RuntimeJobExecution jobExecution, final long rootJobExecutionId) {
+        this.jobExecution = jobExecution;
+        this.jobContext = jobExecution.getJobContext();
+        this.rootJobExecutionId = rootJobExecutionId;
+        this.jobInstanceId = jobExecution.getInstanceId();
+        this.jobStatusService = ServicesManager.service(JobStatusManagerService.class);
+        this.persistenceService = ServicesManager.service(PersistenceManagerService.class);
+        this.jobNavigator = jobExecution.getJobNavigator();
+
+        final JSLJob jobModel = jobExecution.getJobNavigator().getRootModelElement();
+        final InjectionReferences injectionRef = new InjectionReferences(jobContext, null, null);
+        listenerFactory = new ListenerFactory(jobModel, injectionRef, jobExecution);
+        jobExecution.setListenerFactory(listenerFactory);
+    }
+
+    /*
+     * By not passing the rootJobExecutionId, we are "orphaning" the subjob execution and making it not findable from the parent.
+     * This is exactly what we want for getStepExecutions()... we don't want it to get extraneous entries for the partitions.
+     */
+    public JobThreadRootController(final RuntimeJobExecution jobExecution, final BlockingQueue<PartitionDataWrapper> analyzerQueue) {
+        this(jobExecution, jobExecution.getExecutionId());
+        this.analyzerQueue = analyzerQueue;
+    }
+
+    @Override
+    public ExecutionStatus originateExecutionOnThread() {
+        ExecutionStatus retVal = null;
+        try {
+            // Check if we've already gotten the stop() command.
+            if (!jobContext.getBatchStatus().equals(BatchStatus.STOPPING)) {
+
+                // Now that we're ready to start invoking artifacts, set the status to 'STARTED'
+                markJobStarted();
+
+                jobListenersBeforeJob();
+
+                // --------------------
+                // The BIG loop transitioning
+                // within the job !!!
+                // --------------------
+                transitioner = new ExecutionTransitioner(jobExecution, rootJobExecutionId, jobNavigator, analyzerQueue);
+                retVal = transitioner.doExecutionLoop();
+                ExtendedBatchStatus extBatchStatus = retVal.getExtendedBatchStatus();
+                switch (extBatchStatus) {
+                    case JSL_STOP:
+                        jslStop();
+                        break;
+                    case JSL_FAIL:
+                        updateJobBatchStatus(BatchStatus.FAILED);
+                        break;
+                    case EXCEPTION_THROWN:
+                        updateJobBatchStatus(BatchStatus.FAILED);
+                        break;
+                }
+            }
+        } catch (final Throwable t) {
+            // We still want to try to call the afterJob() listener and persist the batch and exit
+            // status for the failure in an orderly fashion.  So catch and continue.
+            batchStatusFailedFromException();
+            LOGGER.log(Level.SEVERE, t.getMessage(), t);
+        }
+
+        endOfJob();
+
+        return retVal;
+    }
+
+    protected void jslStop() {
+        String restartOn = jobContext.getRestartOn();
+        batchStatusStopping();
+        jobStatusService.updateJobStatusFromJSLStop(jobInstanceId, restartOn);
+    }
+
+    protected void markJobStarted() {
+        updateJobBatchStatus(BatchStatus.STARTED);
+        final Timestamp timestamp = new Timestamp(System.currentTimeMillis());
+        jobExecution.setLastUpdateTime(timestamp);
+        jobExecution.setStartTime(timestamp);
+        persistenceService.markJobStarted(jobExecution.getExecutionId(), timestamp);
+    }
+
+    /*
+     *  Follow similar pattern for end of step in BaseStepController
+     *
+     *  1. Execute the very last artifacts (jobListener)
+     *  2. transition to final batch status
+     *  3. default ExitStatus if necessary
+     *  4. persist statuses and end time data
+     *
+     *  We don't want to give up on the orderly process of 2,3,4, if we blow up
+     *  in after job, so catch that and keep on going.
+     */
+    protected void endOfJob() {
+
+
+        // 1. Execute the very last artifacts (jobListener)
+        try {
+            jobListenersAfterJob();
+        } catch (Throwable t) {
+            final StringWriter sw = new StringWriter();
+            final PrintWriter pw = new PrintWriter(sw);
+            t.printStackTrace(pw);
+            batchStatusFailedFromException();
+        }
+
+        // 2. transition to final batch status
+        transitionToFinalBatchStatus();
+
+        // 3. default ExitStatus if necessary
+        if (jobContext.getExitStatus() == null) {
+            jobContext.setExitStatus(jobContext.getBatchStatus().name());
+        }
+
+        // 4. persist statuses and end time data
+        persistJobBatchAndExitStatus();
+
+    }
+
+    private void persistJobBatchAndExitStatus() {
+        BatchStatus batchStatus = jobContext.getBatchStatus();
+
+        // Take a current timestamp for last updated no matter what the status.
+        long time = System.currentTimeMillis();
+        Timestamp timestamp = new Timestamp(time);
+        jobExecution.setLastUpdateTime(timestamp);
+
+        // Perhaps these should be coordinated in a tran but probably better still would be
+        // rethinking the table design to let the database provide us consistently with a single update.
+        jobStatusService.updateJobBatchStatus(jobInstanceId, batchStatus);
+        jobStatusService.updateJobExecutionStatus(jobExecution.getInstanceId(), jobContext.getBatchStatus(), jobContext.getExitStatus());
+
+        if (batchStatus.equals(BatchStatus.COMPLETED) || batchStatus.equals(BatchStatus.STOPPED) ||
+            batchStatus.equals(BatchStatus.FAILED)) {
+
+            jobExecution.setEndTime(timestamp);
+            persistenceService.updateWithFinalExecutionStatusesAndTimestamps(jobExecution.getExecutionId(),
+                batchStatus, jobContext.getExitStatus(), timestamp);
+        } else {
+            throw new IllegalStateException("Not expected to encounter batchStatus of " + batchStatus + " at this point.  Aborting.");
+        }
+    }
+
+    /**
+     * The only valid states at this point are STARTED or STOPPING.   Shouldn't have
+     * been able to get to COMPLETED, STOPPED, or FAILED at this point in the code.
+     */
+
+    private void transitionToFinalBatchStatus() {
+        BatchStatus currentBatchStatus = jobContext.getBatchStatus();
+        if (currentBatchStatus.equals(BatchStatus.STARTED)) {
+            updateJobBatchStatus(BatchStatus.COMPLETED);
+        } else if (currentBatchStatus.equals(BatchStatus.STOPPING)) {
+            updateJobBatchStatus(BatchStatus.STOPPED);
+        } else if (currentBatchStatus.equals(BatchStatus.FAILED)) {
+            updateJobBatchStatus(BatchStatus.FAILED);  // Should have already been done but maybe better for possible code refactoring to have it here.
+        } else {
+            throw new IllegalStateException("Step batch status should not be in a " + currentBatchStatus.name() + " state");
+        }
+    }
+
+    protected void updateJobBatchStatus(BatchStatus batchStatus) {
+        jobContext.setBatchStatus(batchStatus);
+    }
+
+    /*
+     * The thought here is that while we don't persist all the transitions in batch status (given
+     * we plan to persist at the very end), we do persist STOPPING right away, since if we end up
+     * "stuck in STOPPING" we at least will have a record in the database.
+     */
+    protected void batchStatusStopping() {
+        updateJobBatchStatus(BatchStatus.STOPPING);
+
+        final Timestamp timestamp = new Timestamp(System.currentTimeMillis());
+        jobExecution.setLastUpdateTime(timestamp);
+        persistenceService.updateBatchStatusOnly(jobExecution.getExecutionId(), BatchStatus.STOPPING, timestamp);
+    }
+
+    @Override
+    public void stop() {
+        if (jobContext.getBatchStatus().equals(BatchStatus.STARTING) || jobContext.getBatchStatus().equals(BatchStatus.STARTED)) {
+            batchStatusStopping();
+
+            if (transitioner != null) {
+                final Controller stoppableElementController = transitioner.getCurrentStoppableElementController();
+                if (stoppableElementController != null) {
+                    stoppableElementController.stop();
+                }
+            } // else reusage of thread, this controller was not initialized
+        }
+    }
+
+    // Call beforeJob() on all the job listeners
+    protected void jobListenersBeforeJob() {
+        final List<JobListenerProxy> jobListeners = listenerFactory.getJobListeners();
+        for (final JobListenerProxy listenerProxy : jobListeners) {
+            listenerProxy.beforeJob();
+        }
+    }
+
+    // Call afterJob() on all the job listeners
+    private void jobListenersAfterJob() {
+        final List<JobListenerProxy> jobListeners = listenerFactory.getJobListeners();
+        for (final JobListenerProxy listenerProxy : jobListeners) {
+            listenerProxy.afterJob();
+        }
+    }
+
+    protected void batchStatusFailedFromException() {
+        updateJobBatchStatus(BatchStatus.FAILED);
+    }
+
+    @Override
+    public List<Long> getLastRunStepExecutions() {
+        return this.transitioner.getStepExecIds();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionThreadRootController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionThreadRootController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionThreadRootController.java
new file mode 100755
index 0000000..920ca99
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionThreadRootController.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ * 
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License, 
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.container.impl.controller;
+
+import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
+import org.apache.batchee.container.util.PartitionsBuilderConfig;
+
+/**
+ * Currently there's no special function on top of the subjob required of the partition.
+ */
+public class PartitionThreadRootController extends JobThreadRootController {
+    public PartitionThreadRootController(RuntimeJobExecution jobExecution, PartitionsBuilderConfig config) {
+        super(jobExecution, config.getAnalyzerQueue());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepBuilder.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepBuilder.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepBuilder.java
new file mode 100755
index 0000000..81e63dd
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepBuilder.java
@@ -0,0 +1,161 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ * 
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License, 
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.batchee.container.impl.controller;
+
+import org.apache.batchee.container.jsl.CloneUtility;
+import org.apache.batchee.jaxb.Flow;
+import org.apache.batchee.jaxb.JSLJob;
+import org.apache.batchee.jaxb.ObjectFactory;
+import org.apache.batchee.jaxb.Partition;
+import org.apache.batchee.jaxb.PartitionPlan;
+import org.apache.batchee.jaxb.Split;
+import org.apache.batchee.jaxb.Step;
+
+import javax.batch.runtime.context.JobContext;
+
+public class PartitionedStepBuilder {
+    public static final String JOB_ID_SEPARATOR = ":";  // Use something permissible in NCName to allow us to key off of.
+
+    /*
+     * Build a generated job with only one flow in it to submit to the
+     * BatchKernel. This is used to build subjobs from splits.
+     * 
+     */
+    public static JSLJob buildFlowInSplitSubJob(Long parentJobExecutionId, JobContext jobContext, Split split, Flow flow) {
+
+        ObjectFactory jslFactory = new ObjectFactory();
+        JSLJob subJob = jslFactory.createJSLJob();
+
+        // Set the generated subjob id
+        String subJobId = generateSubJobId(parentJobExecutionId, split.getId(), flow.getId());
+        subJob.setId(subJobId);
+
+
+        //Copy all properties from parent JobContext to flow threads
+        subJob.setProperties(CloneUtility.javaPropsTojslProperties(jobContext.getProperties()));
+
+
+        //We don't need to do a deep copy here since each flow is already independent of all others, unlike in a partition
+        //where one step instance can be executed with different properties on multiple threads.
+
+        subJob.getExecutionElements().add(flow);
+
+        return subJob;
+    }
+
+    /*
+     * Build a generated job with only one step in it to submit to the
+     * BatchKernel. This is used for partitioned steps.
+     * 
+     */
+    public static JSLJob buildPartitionSubJob(Long parentJobInstanceId, JobContext jobContext, Step step, int partitionInstance) {
+
+        ObjectFactory jslFactory = new ObjectFactory();
+        JSLJob subJob = jslFactory.createJSLJob();
+
+
+        // Set the generated subjob id
+        String subJobId = generateSubJobId(parentJobInstanceId, step.getId(), partitionInstance);
+        subJob.setId(subJobId);
+
+
+        //Copy all properties from parent JobContext to partitioned step threads
+        subJob.setProperties(CloneUtility.javaPropsTojslProperties(jobContext.getProperties()));
+
+        // Add one step to job
+        Step newStep = jslFactory.createStep();
+
+        //set id
+        newStep.setId(step.getId());
+
+
+        /***
+         * deep copy all fields in a step
+         */
+        newStep.setAllowStartIfComplete(step.getAllowStartIfComplete());
+
+        if (step.getBatchlet() != null) {
+            newStep.setBatchlet(CloneUtility.cloneBatchlet(step.getBatchlet()));
+        }
+
+        if (step.getChunk() != null) {
+            newStep.setChunk(CloneUtility.cloneChunk(step.getChunk()));
+        }
+
+        // Do not copy next attribute and control elements.  Transitioning should ONLY
+        // take place on the main thread.
+
+        //Do not add step listeners, only call them on parent thread.
+
+        //Add partition artifacts and set instances to 1 as the base case 
+        Partition partition = step.getPartition();
+        if (partition != null) {
+            if (partition.getCollector() != null) {
+
+                Partition basePartition = jslFactory.createPartition();
+
+                PartitionPlan partitionPlan = jslFactory.createPartitionPlan();
+                partitionPlan.setPartitions(null);
+                basePartition.setPlan(partitionPlan);
+
+                basePartition.setCollector(partition.getCollector());
+                newStep.setPartition(basePartition);
+
+            }
+        }
+
+        newStep.setStartLimit(step.getStartLimit());
+        newStep.setProperties(CloneUtility.cloneJSLProperties(step.getProperties()));
+
+        // Don't try to only clone based on type (e.g. ChunkListener vs. StepListener).
+        // We don't know the type at the model level, and a given artifact could implement more
+        // than one listener interface (e.g. ChunkListener AND StepListener).
+        newStep.setListeners(CloneUtility.cloneListeners(step.getListeners()));
+
+        //Add Step properties, need to be careful here to remember the right precedence
+
+        subJob.getExecutionElements().add(newStep);
+
+
+        return subJob;
+    }
+
+    /**
+     * @param parentJobInstanceId the execution id of the parent job
+     * @param splitId             this is the split id where the flows are nested
+     * @param flowId              this is the id of the partitioned control element, it can be a
+     *                            step id or flow id
+     * @return a String of the form
+     * <parentJobExecutionId>:<parentId>:<splitId>:<flowId>
+     */
+    private static String generateSubJobId(Long parentJobInstanceId, String splitId, String flowId) {
+        return JOB_ID_SEPARATOR + parentJobInstanceId.toString() + JOB_ID_SEPARATOR + splitId + JOB_ID_SEPARATOR + flowId;
+    }
+
+    /**
+     * @param parentJobInstanceId the execution id of the parent job
+     * @param stepId              this is the id of the partitioned control element, it can be a
+     *                            step id or flow id
+     * @param partitionInstance   the instance number of the partitioned element
+     * @return a String of the form
+     * <parentJobExecutionId>:<parentId>:<partitionInstance>
+     */
+    private static String generateSubJobId(Long parentJobInstanceId, String stepId, int partitionInstance) {
+        return JOB_ID_SEPARATOR + parentJobInstanceId.toString() + JOB_ID_SEPARATOR + stepId + JOB_ID_SEPARATOR + partitionInstance;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
new file mode 100755
index 0000000..9797233
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
@@ -0,0 +1,449 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ * 
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License, 
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.container.impl.controller;
+
+import org.apache.batchee.container.exception.BatchContainerRuntimeException;
+import org.apache.batchee.container.impl.StepContextImpl;
+import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
+import org.apache.batchee.container.jsl.CloneUtility;
+import org.apache.batchee.container.proxy.InjectionReferences;
+import org.apache.batchee.container.proxy.PartitionAnalyzerProxy;
+import org.apache.batchee.container.proxy.PartitionMapperProxy;
+import org.apache.batchee.container.proxy.PartitionReducerProxy;
+import org.apache.batchee.container.proxy.ProxyFactory;
+import org.apache.batchee.container.proxy.StepListenerProxy;
+import org.apache.batchee.container.util.BatchPartitionPlan;
+import org.apache.batchee.container.util.BatchPartitionWorkUnit;
+import org.apache.batchee.container.util.BatchWorkUnit;
+import org.apache.batchee.container.util.PartitionDataWrapper;
+import org.apache.batchee.container.util.PartitionDataWrapper.PartitionEventType;
+import org.apache.batchee.container.util.PartitionsBuilderConfig;
+import org.apache.batchee.jaxb.Analyzer;
+import org.apache.batchee.jaxb.JSLJob;
+import org.apache.batchee.jaxb.JSLProperties;
+import org.apache.batchee.jaxb.PartitionMapper;
+import org.apache.batchee.jaxb.PartitionReducer;
+import org.apache.batchee.jaxb.Property;
+import org.apache.batchee.jaxb.Step;
+
+import javax.batch.api.partition.PartitionPlan;
+import javax.batch.api.partition.PartitionReducer.PartitionStatus;
+import javax.batch.operations.JobExecutionAlreadyCompleteException;
+import javax.batch.operations.JobExecutionNotMostRecentException;
+import javax.batch.operations.JobRestartException;
+import javax.batch.operations.JobStartException;
+import javax.batch.runtime.BatchStatus;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class PartitionedStepController extends BaseStepController {
+    private static final int DEFAULT_PARTITION_INSTANCES = 1;
+    private static final int DEFAULT_THREADS = 0; //0 means default to number of instances
+
+    private PartitionPlan plan = null;
+
+    private int partitions = DEFAULT_PARTITION_INSTANCES;
+    private int threads = DEFAULT_THREADS;
+
+    private Properties[] partitionProperties = null;
+
+    private volatile List<BatchPartitionWorkUnit> parallelBatchWorkUnits;
+
+    private PartitionReducerProxy partitionReducerProxy = null;
+
+    // On invocation this will be re-primed to reflect already-completed partitions from a previous execution.
+    int numPreviouslyCompleted = 0;
+
+    private PartitionAnalyzerProxy analyzerProxy = null;
+
+    final List<JSLJob> subJobs = new ArrayList<JSLJob>();
+    protected List<StepListenerProxy> stepListeners = null;
+
+    List<BatchPartitionWorkUnit> completedWork = new ArrayList<BatchPartitionWorkUnit>();
+
+    BlockingQueue<BatchPartitionWorkUnit> completedWorkQueue = null;
+
+    protected PartitionedStepController(final RuntimeJobExecution jobExecutionImpl, final Step step, StepContextImpl stepContext, long rootJobExecutionId) {
+        super(jobExecutionImpl, step, stepContext, rootJobExecutionId);
+    }
+
+    @Override
+    public void stop() {
+
+        updateBatchStatus(BatchStatus.STOPPING);
+
+        // It's possible we may try to stop a partitioned step before any
+        // sub steps have been started.
+        synchronized (subJobs) {
+
+            if (parallelBatchWorkUnits != null) {
+                for (BatchWorkUnit subJob : parallelBatchWorkUnits) {
+                    try {
+                        BATCH_KERNEL.stopJob(subJob.getJobExecutionImpl().getExecutionId());
+                    } catch (Exception e) {
+                        // TODO - Is this what we want to know.
+                        // Blow up if it happens to force the issue.
+                        throw new IllegalStateException(e);
+                    }
+                }
+            }
+        }
+    }
+
+    private PartitionPlan generatePartitionPlan() {
+        // Determine the number of partitions
+
+
+        PartitionPlan plan = null;
+        Integer previousNumPartitions = null;
+        final PartitionMapper partitionMapper = step.getPartition().getMapper();
+
+        //from persisted plan from previous run
+        if (stepStatus.getNumPartitions() != null) {
+            previousNumPartitions = stepStatus.getNumPartitions();
+        }
+
+        if (partitionMapper != null) { //from partition mapper
+
+            final List<Property> propertyList = partitionMapper.getProperties() == null ? null
+                : partitionMapper.getProperties().getPropertyList();
+
+            // Set all the contexts associated with this controller.
+            // Some of them may be null
+            final InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propertyList);
+            final PartitionMapperProxy partitionMapperProxy = ProxyFactory.createPartitionMapperProxy(partitionMapper.getRef(), injectionRef, stepContext, jobExecutionImpl);
+
+
+            final PartitionPlan mapperPlan = partitionMapperProxy.mapPartitions();
+
+            //Set up the new partition plan
+            plan = new BatchPartitionPlan();
+            plan.setPartitionsOverride(mapperPlan.getPartitionsOverride());
+
+            //When true is specified, the partition count from the current run
+            //is used and all results from past partitions are discarded.
+            if (mapperPlan.getPartitionsOverride() || previousNumPartitions == null) {
+                plan.setPartitions(mapperPlan.getPartitions());
+            } else {
+                plan.setPartitions(previousNumPartitions);
+            }
+
+            if (mapperPlan.getThreads() == 0) {
+                plan.setThreads(plan.getPartitions());
+            } else {
+                plan.setThreads(mapperPlan.getThreads());
+            }
+
+            plan.setPartitionProperties(mapperPlan.getPartitionProperties());
+        } else if (step.getPartition().getPlan() != null) { //from static partition element in jsl
+
+
+            final String partitionsAttr = step.getPartition().getPlan().getPartitions();
+            String threadsAttr;
+
+            int numPartitions = Integer.MIN_VALUE;
+            int numThreads;
+            Properties[] partitionProps = null;
+
+            if (partitionsAttr != null) {
+                try {
+                    numPartitions = Integer.parseInt(partitionsAttr);
+                } catch (final NumberFormatException e) {
+                    throw new IllegalArgumentException("Could not parse partition instances value in stepId: " + step.getId()
+                        + ", with instances=" + partitionsAttr, e);
+                }
+                partitionProps = new Properties[numPartitions];
+                if (numPartitions < 1) {
+                    throw new IllegalArgumentException("Partition instances value must be 1 or greater in stepId: " + step.getId()
+                        + ", with instances=" + partitionsAttr);
+                }
+            }
+
+            threadsAttr = step.getPartition().getPlan().getThreads();
+            if (threadsAttr != null) {
+                try {
+                    numThreads = Integer.parseInt(threadsAttr);
+                    if (numThreads == 0) {
+                        numThreads = numPartitions;
+                    }
+                } catch (final NumberFormatException e) {
+                    throw new IllegalArgumentException("Could not parse partition threads value in stepId: " + step.getId()
+                        + ", with threads=" + threadsAttr, e);
+                }
+                if (numThreads < 0) {
+                    throw new IllegalArgumentException("Threads value must be 0 or greater in stepId: " + step.getId()
+                        + ", with threads=" + threadsAttr);
+
+                }
+            } else { //default to number of partitions if threads isn't set
+                numThreads = numPartitions;
+            }
+
+
+            if (step.getPartition().getPlan().getProperties() != null) {
+
+                List<JSLProperties> jslProperties = step.getPartition().getPlan().getProperties();
+                for (JSLProperties props : jslProperties) {
+                    int targetPartition = Integer.parseInt(props.getPartition());
+
+                    try {
+                        partitionProps[targetPartition] = CloneUtility.jslPropertiesToJavaProperties(props);
+                    } catch (ArrayIndexOutOfBoundsException e) {
+                        throw new BatchContainerRuntimeException("There are only " + numPartitions + " partition instances, but there are "
+                            + jslProperties.size()
+                            + " partition properties lists defined. Remember that partition indexing is 0 based like Java arrays.", e);
+                    }
+                }
+            }
+            plan = new BatchPartitionPlan();
+            plan.setPartitions(numPartitions);
+            plan.setThreads(numThreads);
+            plan.setPartitionProperties(partitionProps);
+            plan.setPartitionsOverride(false); //FIXME what is the default for a static plan??
+        }
+
+
+        // Set the other instance variables for convenience.
+        this.partitions = plan.getPartitions();
+        this.threads = plan.getThreads();
+        this.partitionProperties = plan.getPartitionProperties();
+
+        return plan;
+    }
+
+
+    @Override
+    protected void invokeCoreStep() throws JobRestartException, JobStartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException {
+
+        this.plan = this.generatePartitionPlan();
+
+        //persist the partition plan so on restart we have the same plan to reuse
+        stepStatus.setNumPartitions(plan.getPartitions());
+
+		/* When true is specified, the partition count from the current run
+         * is used and all results from past partitions are discarded. Any
+		 * resource cleanup or back out of work done in the previous run is the
+		 * responsibility of the application. The PartitionReducer artifact's
+		 * rollbackPartitionedStep method is invoked during restart before any
+		 * partitions begin processing to provide a cleanup hook.
+		 */
+        if (plan.getPartitionsOverride()) {
+            if (this.partitionReducerProxy != null) {
+                this.partitionReducerProxy.rollbackPartitionedStep();
+            }
+        }
+
+        //Set up a blocking queue to pick up collector data from a partitioned thread
+        if (this.analyzerProxy != null) {
+            this.analyzerStatusQueue = new LinkedBlockingQueue<PartitionDataWrapper>();
+        }
+        this.completedWorkQueue = new LinkedBlockingQueue<BatchPartitionWorkUnit>();
+
+        // Build all sub jobs from partitioned step
+        buildSubJobBatchWorkUnits();
+
+        // kick off the threads
+        executeAndWaitForCompletion();
+
+        // Deal with the results.
+        checkCompletedWork();
+    }
+
+    private void buildSubJobBatchWorkUnits() throws JobRestartException, JobStartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException {
+        synchronized (subJobs) {
+            //check if we've already issued a stop
+            if (jobExecutionImpl.getJobContext().getBatchStatus().equals(BatchStatus.STOPPING)) {
+                return;
+            }
+
+            for (int instance = 0; instance < partitions; instance++) {
+                subJobs.add(PartitionedStepBuilder.buildPartitionSubJob(jobExecutionImpl.getInstanceId(), jobExecutionImpl.getJobContext(), step, instance));
+            }
+
+            PartitionsBuilderConfig config = new PartitionsBuilderConfig(subJobs, partitionProperties, analyzerStatusQueue, completedWorkQueue, jobExecutionImpl.getExecutionId());
+            // Then build all the subjobs but do not start them yet
+            if (stepStatus.getStartCount() > 1 && !plan.getPartitionsOverride()) {
+                parallelBatchWorkUnits = BATCH_KERNEL.buildOnRestartParallelPartitions(config);
+            } else {
+                parallelBatchWorkUnits = BATCH_KERNEL.buildNewParallelPartitions(config);
+            }
+
+            // NOTE:  At this point I might not have as many work units as I had partitions, since some may have already completed.
+        }
+    }
+
+    private void executeAndWaitForCompletion() throws JobRestartException {
+
+        if (jobExecutionImpl.getJobContext().getBatchStatus().equals(BatchStatus.STOPPING)) {
+            return;
+        }
+
+        int numTotalForThisExecution = parallelBatchWorkUnits.size();
+        this.numPreviouslyCompleted = partitions - numTotalForThisExecution;
+        int numCurrentCompleted = 0;
+        int numCurrentSubmitted = 0;
+
+        //Start up to to the max num we are allowed from the num threads attribute
+        for (int i = 0; i < this.threads && i < numTotalForThisExecution; i++, numCurrentSubmitted++) {
+            final BatchWorkUnit workUnit = parallelBatchWorkUnits.get(i);
+            if (stepStatus.getStartCount() > 1 && !plan.getPartitionsOverride()) {
+                BATCH_KERNEL.restartGeneratedJob(workUnit);
+            } else {
+                BATCH_KERNEL.startGeneratedJob(workUnit);
+            }
+        }
+
+        while (true) {
+            try {
+                if (analyzerProxy != null) {
+                    PartitionDataWrapper dataWrapper = analyzerStatusQueue.take();
+                    if (PartitionEventType.ANALYZE_COLLECTOR_DATA.equals(dataWrapper.getEventType())) {
+                        analyzerProxy.analyzeCollectorData(dataWrapper.getCollectorData());
+                        continue; // without being ready to submit another
+                    } else if (PartitionEventType.ANALYZE_STATUS.equals(dataWrapper.getEventType())) {
+                        analyzerProxy.analyzeStatus(dataWrapper.getBatchstatus(), dataWrapper.getExitStatus());
+                        completedWork.add(completedWorkQueue.take());  // Shouldn't be a a long wait.
+                    } else {
+                        throw new IllegalStateException("Invalid partition state");
+                    }
+                } else {
+                    // block until at least one thread has finished to
+                    // submit more batch work. hold on to the finished work to look at later
+                    completedWork.add(completedWorkQueue.take());
+                }
+            } catch (final InterruptedException e) {
+                throw new BatchContainerRuntimeException(e);
+            }
+
+            numCurrentCompleted++;
+            if (numCurrentCompleted < numTotalForThisExecution) {
+                if (numCurrentSubmitted < numTotalForThisExecution) {
+                    if (stepStatus.getStartCount() > 1) {
+                        BATCH_KERNEL.startGeneratedJob(parallelBatchWorkUnits.get(numCurrentSubmitted++));
+                    } else {
+                        BATCH_KERNEL.restartGeneratedJob(parallelBatchWorkUnits.get(numCurrentSubmitted++));
+                    }
+                }
+            } else {
+                break;
+            }
+        }
+    }
+
+    private void checkCompletedWork() {
+        /**
+         * check the batch status of each subJob after it's done to see if we need to issue a rollback
+         * start rollback if any have stopped or failed
+         */
+        boolean rollback = false;
+
+        for (final BatchWorkUnit subJob : completedWork) {
+            BatchStatus batchStatus = subJob.getJobExecutionImpl().getJobContext().getBatchStatus();
+            if (batchStatus.equals(BatchStatus.FAILED)) {
+                rollback = true;
+
+                //Keep track of the failing status and throw an exception to propagate after the rest of the partitions are complete
+                stepContext.setBatchStatus(BatchStatus.FAILED);
+            }
+        }
+
+        //If rollback is false we never issued a rollback so we can issue a logicalTXSynchronizationBeforeCompletion
+        //NOTE: this will get issued even in a subjob fails or stops if no logicalTXSynchronizationRollback method is provied
+        //We are assuming that not providing a rollback was intentional
+        if (rollback) {
+            if (this.partitionReducerProxy != null) {
+                this.partitionReducerProxy.rollbackPartitionedStep();
+            }
+            throw new BatchContainerRuntimeException("One or more partitions failed");
+        } else {
+            if (this.partitionReducerProxy != null) {
+                this.partitionReducerProxy.beforePartitionedStepCompletion();
+            }
+        }
+    }
+
+    @Override
+    protected void setupStepArtifacts() {
+        InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, null);
+        this.stepListeners = jobExecutionImpl.getListenerFactory().getStepListeners(step, injectionRef, stepContext, jobExecutionImpl);
+
+        final Analyzer analyzer = step.getPartition().getAnalyzer();
+        if (analyzer != null) {
+            final List<Property> propList = analyzer.getProperties() == null ? null : analyzer.getProperties().getPropertyList();
+            injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propList);
+            analyzerProxy = ProxyFactory.createPartitionAnalyzerProxy(analyzer.getRef(), injectionRef, stepContext, jobExecutionImpl);
+        }
+
+        final PartitionReducer partitionReducer = step.getPartition().getReducer();
+        if (partitionReducer != null) {
+            final List<Property> propList = partitionReducer.getProperties() == null ? null : partitionReducer.getProperties().getPropertyList();
+            injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propList);
+            partitionReducerProxy = ProxyFactory.createPartitionReducerProxy(partitionReducer.getRef(), injectionRef, stepContext, jobExecutionImpl);
+        }
+
+    }
+
+    @Override
+    protected void invokePreStepArtifacts() {
+
+        if (stepListeners != null) {
+            for (StepListenerProxy listenerProxy : stepListeners) {
+                // Call beforeStep on all the step listeners
+                listenerProxy.beforeStep();
+            }
+        }
+
+        // Invoke the reducer before all parallel steps start (must occur
+        // before mapper as well)
+        if (this.partitionReducerProxy != null) {
+            this.partitionReducerProxy.beginPartitionedStep();
+        }
+
+    }
+
+    @Override
+    protected void invokePostStepArtifacts() {
+        // Invoke the reducer after all parallel steps are done
+        if (this.partitionReducerProxy != null) {
+
+            if ((BatchStatus.COMPLETED).equals(stepContext.getBatchStatus())) {
+                this.partitionReducerProxy.afterPartitionedStepCompletion(PartitionStatus.COMMIT);
+            } else {
+                this.partitionReducerProxy.afterPartitionedStepCompletion(PartitionStatus.ROLLBACK);
+            }
+
+        }
+
+        // Called in spec'd order, e.g. Sec. 11.7
+        if (stepListeners != null) {
+            for (StepListenerProxy listenerProxy : stepListeners) {
+                // Call afterStep on all the step listeners
+                listenerProxy.afterStep();
+            }
+        }
+    }
+
+    @Override
+    protected void sendStatusFromPartitionToAnalyzerIfPresent() {
+        // Since we're already on the main thread, there will never
+        // be anything to do on this thread.  It's only on the partitioned
+        // threads that there is something to send back.
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SingleThreadedStepController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SingleThreadedStepController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SingleThreadedStepController.java
new file mode 100755
index 0000000..d085622
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SingleThreadedStepController.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ * 
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License, 
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.container.impl.controller;
+
+import org.apache.batchee.container.Controller;
+import org.apache.batchee.container.impl.StepContextImpl;
+import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
+import org.apache.batchee.container.proxy.InjectionReferences;
+import org.apache.batchee.container.proxy.PartitionCollectorProxy;
+import org.apache.batchee.container.proxy.ProxyFactory;
+import org.apache.batchee.container.proxy.StepListenerProxy;
+import org.apache.batchee.container.util.PartitionDataWrapper;
+import org.apache.batchee.container.util.PartitionDataWrapper.PartitionEventType;
+import org.apache.batchee.jaxb.Collector;
+import org.apache.batchee.jaxb.Property;
+import org.apache.batchee.jaxb.Step;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * When a partitioned step is run, this controller will only be used for the partition threads,
+ * NOT the top-level main thread that the step executes upon.
+ * <p/>
+ * When a non-partitioned step is run this controller will be used as well (and there will be no
+ * separate main thread with controller).
+ */
+public abstract class SingleThreadedStepController extends BaseStepController implements Controller {
+    // Collector only used from partition threads, not main thread
+    protected PartitionCollectorProxy collectorProxy = null;
+
+    protected SingleThreadedStepController(RuntimeJobExecution jobExecutionImpl, Step step, StepContextImpl stepContext, long rootJobExecutionId, BlockingQueue<PartitionDataWrapper> analyzerStatusQueue) {
+        super(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerStatusQueue);
+    }
+
+    List<StepListenerProxy> stepListeners = null;
+
+    protected void setupStepArtifacts() {
+        // set up listeners
+
+        InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, null);
+        this.stepListeners = jobExecutionImpl.getListenerFactory().getStepListeners(step, injectionRef, stepContext, jobExecutionImpl);
+
+        // set up collectors if we are running a partitioned step
+        if (step.getPartition() != null) {
+            Collector collector = step.getPartition().getCollector();
+            if (collector != null) {
+                List<Property> propList = (collector.getProperties() == null) ? null : collector.getProperties().getPropertyList();
+                /**
+                 * Inject job flow, split, and step contexts into partition
+                 * artifacts like collectors and listeners some of these
+                 * contexts may be null
+                 */
+                injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propList);
+                this.collectorProxy = ProxyFactory.createPartitionCollectorProxy(collector.getRef(), injectionRef, this.stepContext, jobExecutionImpl);
+            }
+        }
+    }
+
+    @Override
+    protected void invokePreStepArtifacts() {
+        // Don't call beforeStep() in the partitioned case, since we are now on a partition thread, and
+        // have already called beforeStep() on the main thread as the spec says.
+        if ((stepListeners != null) && (this.jobExecutionImpl.getPartitionInstance() == null)) {
+            for (StepListenerProxy listenerProxy : stepListeners) {
+                listenerProxy.beforeStep();
+            }
+        }
+    }
+
+    @Override
+    protected void invokePostStepArtifacts() {
+        // Don't call beforeStep() in the partitioned case, since we are now on a partition thread, and
+        // have already called beforeStep() on the main thread as the spec says.
+        if ((stepListeners != null) && (this.jobExecutionImpl.getPartitionInstance() == null)) {
+            for (StepListenerProxy listenerProxy : stepListeners) {
+                listenerProxy.afterStep();
+            }
+        }
+    }
+
+    protected void invokeCollectorIfPresent() {
+        if (collectorProxy != null) {
+            final Serializable data = collectorProxy.collectPartitionData();
+            sendCollectorDataToAnalyzerIfPresent(data);
+        }
+    }
+
+    // Useless to have collector without analyzer but let's check so we don't hang or blow up.
+    protected void sendCollectorDataToAnalyzerIfPresent(Serializable data) {
+        if (analyzerStatusQueue != null) {
+            final PartitionDataWrapper dataWrapper = new PartitionDataWrapper();
+            dataWrapper.setCollectorData(data);
+            dataWrapper.setEventType(PartitionEventType.ANALYZE_COLLECTOR_DATA);
+            analyzerStatusQueue.add(dataWrapper);
+        }
+    }
+
+    // Useless to have collector without analyzer but let's check so we don't hang or blow up.
+    @Override
+    protected void sendStatusFromPartitionToAnalyzerIfPresent() {
+        if (analyzerStatusQueue != null) {
+            final PartitionDataWrapper dataWrapper = new PartitionDataWrapper();
+            dataWrapper.setBatchStatus(stepStatus.getBatchStatus());
+            dataWrapper.setExitStatus(stepStatus.getExitStatus());
+            dataWrapper.setEventType(PartitionEventType.ANALYZE_STATUS);
+            analyzerStatusQueue.add(dataWrapper);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SplitController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SplitController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SplitController.java
new file mode 100755
index 0000000..b882de6
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SplitController.java
@@ -0,0 +1,236 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ * 
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License, 
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.container.impl.controller;
+
+import org.apache.batchee.container.ExecutionElementController;
+import org.apache.batchee.container.exception.BatchContainerRuntimeException;
+import org.apache.batchee.container.impl.JobContextImpl;
+import org.apache.batchee.container.impl.jobinstance.RuntimeFlowInSplitExecution;
+import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
+import org.apache.batchee.container.services.BatchKernelService;
+import org.apache.batchee.container.services.ServicesManager;
+import org.apache.batchee.container.status.ExecutionStatus;
+import org.apache.batchee.container.status.ExtendedBatchStatus;
+import org.apache.batchee.container.status.SplitExecutionStatus;
+import org.apache.batchee.container.util.BatchFlowInSplitWorkUnit;
+import org.apache.batchee.container.util.BatchParallelWorkUnit;
+import org.apache.batchee.container.util.FlowInSplitBuilderConfig;
+import org.apache.batchee.jaxb.Flow;
+import org.apache.batchee.jaxb.JSLJob;
+import org.apache.batchee.jaxb.Split;
+
+import javax.batch.operations.JobExecutionAlreadyCompleteException;
+import javax.batch.operations.JobExecutionNotMostRecentException;
+import javax.batch.operations.JobRestartException;
+import javax.batch.operations.JobStartException;
+import javax.batch.operations.NoSuchJobExecutionException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Logger;
+
+public class SplitController implements ExecutionElementController {
+    private final static Logger logger = Logger.getLogger(SplitController.class.getName());
+
+    private final RuntimeJobExecution jobExecution;
+
+    private volatile List<BatchFlowInSplitWorkUnit> parallelBatchWorkUnits;
+
+    private final BatchKernelService batchKernel;
+    private final JobContextImpl jobContext;
+    private final BlockingQueue<BatchFlowInSplitWorkUnit> completedWorkQueue = new LinkedBlockingQueue<BatchFlowInSplitWorkUnit>();
+    private final long rootJobExecutionId;
+
+    private final List<JSLJob> subJobs = new ArrayList<JSLJob>();
+
+    protected Split split;
+
+    public SplitController(RuntimeJobExecution jobExecution, Split split, long rootJobExecutionId) {
+        this.jobExecution = jobExecution;
+        this.jobContext = jobExecution.getJobContext();
+        this.rootJobExecutionId = rootJobExecutionId;
+        this.split = split;
+
+        batchKernel = ServicesManager.service(BatchKernelService.class);
+    }
+
+    @Override
+    public void stop() {
+
+        // It's possible we may try to stop a split before any
+        // sub steps have been started.
+        synchronized (subJobs) {
+
+            if (parallelBatchWorkUnits != null) {
+                for (BatchParallelWorkUnit subJob : parallelBatchWorkUnits) {
+                    try {
+                        batchKernel.stopJob(subJob.getJobExecutionImpl().getExecutionId());
+                    } catch (final Exception e) {
+                        // TODO - Is this what we want to know.
+                        // Blow up if it happens to force the issue.
+                        throw new IllegalStateException(e);
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    public SplitExecutionStatus execute() throws JobRestartException, JobStartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException, NoSuchJobExecutionException {
+        // Build all sub jobs from partitioned step
+        buildSubJobBatchWorkUnits();
+
+        // kick off the threads
+        executeWorkUnits();
+
+        // Deal with the results.
+        return waitForCompletionAndAggregateStatus();
+    }
+
+    /**
+     * Note we restart all flows.  There is no concept of "the flow completed".   It is only steps
+     * within the flows that may have already completed and so may not have needed to be rerun.
+     */
+    private void buildSubJobBatchWorkUnits() {
+
+        List<Flow> flows = this.split.getFlows();
+
+        parallelBatchWorkUnits = new ArrayList<BatchFlowInSplitWorkUnit>();
+
+        // Build all sub jobs from flows in split
+        synchronized (subJobs) {
+            for (Flow flow : flows) {
+                subJobs.add(PartitionedStepBuilder.buildFlowInSplitSubJob(jobExecution.getExecutionId(), jobContext, this.split, flow));
+            }
+            for (JSLJob job : subJobs) {
+                int count = batchKernel.getJobInstanceCount(job.getId());
+                FlowInSplitBuilderConfig config = new FlowInSplitBuilderConfig(job, completedWorkQueue, rootJobExecutionId);
+                if (count == 0) {
+                    parallelBatchWorkUnits.add(batchKernel.buildNewFlowInSplitWorkUnit(config));
+                } else if (count == 1) {
+                    parallelBatchWorkUnits.add(batchKernel.buildOnRestartFlowInSplitWorkUnit(config));
+                } else {
+                    throw new IllegalStateException("There is an inconsistency somewhere in the internal subjob creation");
+                }
+            }
+        }
+    }
+
+    private void executeWorkUnits() {
+        // Then start or restart all subjobs in parallel
+        for (BatchParallelWorkUnit work : parallelBatchWorkUnits) {
+            int count = batchKernel.getJobInstanceCount(work.getJobExecutionImpl().getJobInstance().getJobName());
+
+            assert (count <= 1);
+
+            if (count == 1) {
+                batchKernel.startGeneratedJob(work);
+            } else if (count > 1) {
+                batchKernel.restartGeneratedJob(work);
+            } else {
+                throw new IllegalStateException("There is an inconsistency somewhere in the internal subjob creation");
+            }
+        }
+    }
+
+    private SplitExecutionStatus waitForCompletionAndAggregateStatus() {
+        final SplitExecutionStatus splitStatus = new SplitExecutionStatus();
+
+        for (final JSLJob ignored : subJobs) {
+            final BatchFlowInSplitWorkUnit batchWork;
+            try {
+                batchWork = completedWorkQueue.take(); //wait for each thread to finish and then look at it's status
+            } catch (InterruptedException e) {
+                throw new BatchContainerRuntimeException(e);
+            }
+
+            final RuntimeFlowInSplitExecution flowExecution = batchWork.getJobExecutionImpl();
+            final ExecutionStatus flowStatus = flowExecution.getFlowStatus();
+            aggregateTerminatingStatusFromSingleFlow(null, flowStatus, splitStatus);
+        }
+
+        // If this is still set to 'null' that means all flows completed normally without terminating the job.
+        splitStatus.setExtendedBatchStatus(ExtendedBatchStatus.NORMAL_COMPLETION);
+        return splitStatus;
+    }
+
+
+    //
+    // A <fail> and an uncaught exception are peers.  They each take precedence over a <stop>, which take precedence over an <end>.
+    // Among peers the last one seen gets to set the exit stauts.
+    //
+    private ExtendedBatchStatus aggregateTerminatingStatusFromSingleFlow(final ExtendedBatchStatus aggregateStatus, final ExecutionStatus flowStatus, final SplitExecutionStatus splitStatus) {
+        final String exitStatus = flowStatus.getExitStatus();
+        final String restartOn = flowStatus.getRestartOn();
+        final ExtendedBatchStatus flowBatchStatus = flowStatus.getExtendedBatchStatus();
+
+        if (flowBatchStatus.equals(ExtendedBatchStatus.JSL_END) || flowBatchStatus.equals(ExtendedBatchStatus.JSL_STOP) ||
+            flowBatchStatus.equals(ExtendedBatchStatus.JSL_FAIL) || flowBatchStatus.equals(ExtendedBatchStatus.EXCEPTION_THROWN)) {
+            if (aggregateStatus == null) {
+                setInJobContext(flowBatchStatus, exitStatus, restartOn);
+                return flowBatchStatus;
+            } else {
+                splitStatus.setCouldMoreThanOneFlowHaveTerminatedJob(true);
+                if (aggregateStatus.equals(ExtendedBatchStatus.JSL_END)) {
+                    logger.warning("Current flow's batch and exit status will take precedence over and override earlier one from <end> transition element. " +
+                        "Overriding, setting exit status if non-null and preparing to end job.");
+                    setInJobContext(flowBatchStatus, exitStatus, restartOn);
+                    return flowBatchStatus;
+                } else if (aggregateStatus.equals(ExtendedBatchStatus.JSL_STOP)) {
+                    // Everything but an <end> overrides a <stop>
+                    if (!(flowBatchStatus.equals(ExtendedBatchStatus.JSL_END))) {
+                        logger.warning("Current flow's batch and exit status will take precedence over and override earlier one from <stop> transition element. " +
+                            "Overriding, setting exit status if non-null and preparing to end job.");
+                        setInJobContext(flowBatchStatus, exitStatus, restartOn);
+                        return flowBatchStatus;
+                    }
+                } else if (aggregateStatus.equals(ExtendedBatchStatus.JSL_FAIL) || aggregateStatus.equals(ExtendedBatchStatus.EXCEPTION_THROWN)) {
+                    if (flowBatchStatus.equals(ExtendedBatchStatus.JSL_FAIL) || flowBatchStatus.equals(ExtendedBatchStatus.EXCEPTION_THROWN)) {
+                        logger.warning("Current flow's batch and exit status will take precedence over and override earlier one from <fail> transition element or exception thrown. " +
+                            "Overriding, setting exit status if non-null and preparing to end job.");
+                        setInJobContext(flowBatchStatus, exitStatus, restartOn);
+                        return flowBatchStatus;
+                    }
+                }
+            }
+        }
+
+        return null;
+    }
+
+    private void setInJobContext(ExtendedBatchStatus flowBatchStatus, String exitStatus, String restartOn) {
+        if (exitStatus != null) {
+            jobContext.setExitStatus(exitStatus);
+        }
+        if (ExtendedBatchStatus.JSL_STOP.equals(flowBatchStatus)) {
+            if (restartOn != null) {
+                jobContext.setRestartOn(restartOn);
+            }
+        }
+    }
+
+    @Override
+    public List<Long> getLastRunStepExecutions() {
+        final List<Long> stepExecIdList = new ArrayList<Long>();
+        for (final BatchFlowInSplitWorkUnit workUnit : parallelBatchWorkUnits) {
+            final List<Long> stepExecIds = workUnit.getController().getLastRunStepExecutions();
+            stepExecIdList.addAll(stepExecIds);
+        }
+        return stepExecIdList;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/impl/controller/batchlet/BatchletStepController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/batchlet/BatchletStepController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/batchlet/BatchletStepController.java
new file mode 100755
index 0000000..a0f0cbb
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/batchlet/BatchletStepController.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ * 
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License, 
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.container.impl.controller.batchlet;
+
+import org.apache.batchee.container.exception.BatchContainerServiceException;
+import org.apache.batchee.container.impl.controller.SingleThreadedStepController;
+import org.apache.batchee.container.impl.StepContextImpl;
+import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
+import org.apache.batchee.container.proxy.BatchletProxy;
+import org.apache.batchee.container.proxy.InjectionReferences;
+import org.apache.batchee.container.proxy.ProxyFactory;
+import org.apache.batchee.container.util.PartitionDataWrapper;
+import org.apache.batchee.jaxb.Batchlet;
+import org.apache.batchee.jaxb.Property;
+import org.apache.batchee.jaxb.Step;
+
+import javax.batch.runtime.BatchStatus;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+
+public class BatchletStepController extends SingleThreadedStepController {
+    private BatchletProxy batchletProxy;
+
+    public BatchletStepController(final RuntimeJobExecution jobExecutionImpl, final Step step,
+                                  final StepContextImpl stepContext, final long rootJobExecutionId,
+                                  final BlockingQueue<PartitionDataWrapper> analyzerStatusQueue) {
+        super(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerStatusQueue);
+    }
+
+    private void invokeBatchlet(final Batchlet batchlet) throws BatchContainerServiceException {
+        final String batchletId = batchlet.getRef();
+        final List<Property> propList = (batchlet.getProperties() == null) ? null : batchlet.getProperties().getPropertyList();
+        final InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propList);
+        batchletProxy = ProxyFactory.createBatchletProxy(batchletId, injectionRef, stepContext, jobExecutionImpl);
+
+        if (!wasStopIssued()) {
+            final String processRetVal = batchletProxy.process();
+            stepContext.setBatchletProcessRetVal(processRetVal);
+        }
+    }
+
+    protected synchronized boolean wasStopIssued() {
+        // Might only be set to stopping at the job level.  Use the lock for this object on this
+        // method along with the stop() method
+        if (jobExecutionImpl.getJobContext().getBatchStatus().equals(BatchStatus.STOPPING)) {
+            stepContext.setBatchStatus(BatchStatus.STOPPING);
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    protected void invokeCoreStep() throws BatchContainerServiceException {
+        //TODO If this step is partitioned create partition artifacts
+        /*
+        Partition partition = step.getPartition();
+		if (partition != null) {
+			//partition.getConcurrencyElements();
+		}
+		*/
+        try {
+            invokeBatchlet(step.getBatchlet());
+        } finally {
+            invokeCollectorIfPresent();
+        }
+    }
+
+    @Override
+    public synchronized void stop() {
+        // It is possible for stop() to be issued before process()
+        if (BatchStatus.STARTING.equals(stepContext.getBatchStatus()) ||
+            BatchStatus.STARTED.equals(stepContext.getBatchStatus())) {
+
+            stepContext.setBatchStatus(BatchStatus.STOPPING);
+
+            if (batchletProxy != null) {
+                batchletProxy.stop();
+            }
+        }/* else {
+			// TODO do we need to throw an error if the batchlet is already stopping/stopped
+			// a stop gets issued twice
+		}*/
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointAlgorithmFactory.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointAlgorithmFactory.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointAlgorithmFactory.java
new file mode 100755
index 0000000..34f2fae
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointAlgorithmFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ * 
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License, 
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.batchee.container.impl.controller.chunk;
+
+
+import org.apache.batchee.container.impl.StepContextImpl;
+import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
+import org.apache.batchee.container.proxy.CheckpointAlgorithmProxy;
+import org.apache.batchee.container.proxy.InjectionReferences;
+import org.apache.batchee.container.proxy.ProxyFactory;
+import org.apache.batchee.jaxb.Chunk;
+import org.apache.batchee.jaxb.Step;
+
+public final class CheckpointAlgorithmFactory {
+    public static CheckpointAlgorithmProxy getCheckpointAlgorithmProxy(final Step step, final InjectionReferences injectionReferences, final StepContextImpl stepContext, final RuntimeJobExecution jobExecution) {
+        final Chunk chunk = step.getChunk();
+        final String checkpointType = chunk.getCheckpointPolicy();
+        final CheckpointAlgorithmProxy proxy;
+        if ("custom".equalsIgnoreCase(checkpointType)) {
+            proxy = ProxyFactory.createCheckpointAlgorithmProxy(chunk.getCheckpointAlgorithm().getRef(), injectionReferences, stepContext, jobExecution);
+        } else /* "item" */ {
+            proxy = new CheckpointAlgorithmProxy(new ItemCheckpointAlgorithm());
+        }
+        return proxy;
+
+    }
+
+    private CheckpointAlgorithmFactory() {
+        // no-op
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointData.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointData.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointData.java
new file mode 100755
index 0000000..702e0e9
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointData.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ * 
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License, 
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.batchee.container.impl.controller.chunk;
+
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+
+public class CheckpointData implements Serializable {
+    private static final long serialVersionUID = 1L;
+    private long _jobInstanceId;
+    private CheckpointType type;
+    private String stepName;
+    private byte[] restartToken;
+
+    public CheckpointData(final long jobInstanceId, final String stepname, final CheckpointType ckType) {
+        if (stepname != null && ckType != null) {
+            _jobInstanceId = jobInstanceId;
+            type = ckType;
+            stepName = stepname;
+            try {
+                restartToken = "NOTSET".getBytes("UTF8");
+            } catch (final UnsupportedEncodingException e) {
+                throw new RuntimeException("Doesn't support UTF-8", e);
+            }
+        } else {
+            throw new RuntimeException("Invalid parameters to CheckpointData jobInstanceId: " + _jobInstanceId +
+                " BDS: " + ckType + " stepName: " + stepname);
+        }
+    }
+
+    public long getjobInstanceId() {
+        return _jobInstanceId;
+    }
+
+    public void setjobInstanceId(final long id) {
+        _jobInstanceId = id;
+    }
+
+    public CheckpointType getType() {
+        return type;
+    }
+
+    public void setType(final CheckpointType ckType) {
+        type = ckType;
+    }
+
+    public String getStepName() {
+        return stepName;
+    }
+
+    public void setStepName(final String name) {
+        stepName = name;
+    }
+
+    public byte[] getRestartToken() {
+        return restartToken;
+    }
+
+    public void setRestartToken(byte[] token) {
+        restartToken = token;
+    }
+
+    @Override
+    public String toString() {
+        String restartString;
+        try {
+            restartString = new String(this.restartToken, "UTF8");
+        } catch (UnsupportedEncodingException e) {
+            restartString = "<bytes not UTF-8>";
+        }
+        return " jobInstanceId: " + _jobInstanceId + " stepId: " + this.stepName + " bdsName: " + this.type.name() +
+            " restartToken: [UTF8-bytes: " + restartString;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointDataKey.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointDataKey.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointDataKey.java
new file mode 100755
index 0000000..59ff588
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointDataKey.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ * 
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License, 
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.batchee.container.impl.controller.chunk;
+
+public class CheckpointDataKey {
+    private long jobInstanceId;
+    private CheckpointType type;
+    private String stepName;
+
+    public CheckpointDataKey(final long jobId, final String stepName, final CheckpointType bdsName) {
+        this.jobInstanceId = jobId;
+        this.stepName = stepName;
+        this.type = bdsName;
+    }
+
+    public long getJobInstanceId() {
+        return jobInstanceId;
+    }
+
+    public CheckpointType getType() {
+        return type;
+    }
+
+    public String getStepName() {
+        return stepName;
+    }
+
+    public String getCommaSeparatedKey() {
+        return jobInstanceId + "," + stepName + "," + type.name();
+    }
+
+    @Override
+    public String toString() {
+        return getCommaSeparatedKey();
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        final CheckpointDataKey that = CheckpointDataKey.class.cast(o);
+        return jobInstanceId == that.jobInstanceId
+            && type.equals(that.type)
+            && stepName.equals(that.stepName);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = (int) (jobInstanceId ^ (jobInstanceId >>> 32));
+        result = 31 * result + type.hashCode();
+        result = 31 * result + stepName.hashCode();
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java
new file mode 100755
index 0000000..76cae58
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ * 
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License, 
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.batchee.container.impl.controller.chunk;
+
+import org.apache.batchee.container.exception.BatchContainerRuntimeException;
+import org.apache.batchee.container.exception.BatchContainerServiceException;
+import org.apache.batchee.container.proxy.ItemReaderProxy;
+import org.apache.batchee.container.proxy.ItemWriterProxy;
+import org.apache.batchee.spi.PersistenceManagerService;
+import org.apache.batchee.container.services.ServicesManager;
+
+import javax.batch.api.chunk.CheckpointAlgorithm;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+
+public class CheckpointManager {
+    private final PersistenceManagerService persistenceManagerService;
+    private final ItemReaderProxy readerProxy;
+    private final ItemWriterProxy writerProxy;
+    private final CheckpointAlgorithm checkpointAlgorithm;
+    private final String stepId;
+    private final long jobInstanceID;
+
+
+    public CheckpointManager(final ItemReaderProxy reader, final ItemWriterProxy writer,
+                             final CheckpointAlgorithm chkptAlg,
+                             final long jobInstanceID, final String stepId) {
+        this.readerProxy = reader;
+        this.writerProxy = writer;
+        this.checkpointAlgorithm = chkptAlg;
+        this.stepId = stepId;
+        this.jobInstanceID = jobInstanceID;
+
+        this.persistenceManagerService = ServicesManager.service(PersistenceManagerService.class);
+    }
+
+    public boolean applyCheckPointPolicy() {
+        try {
+            return checkpointAlgorithm.isReadyToCheckpoint();
+        } catch (final Exception e) {
+            throw new BatchContainerRuntimeException("Checkpoint algorithm failed", e);
+        }
+    }
+
+    public void checkpoint() {
+        final ByteArrayOutputStream readerChkptBA = new ByteArrayOutputStream();
+        final ByteArrayOutputStream writerChkptBA = new ByteArrayOutputStream();
+        final ObjectOutputStream readerOOS, writerOOS;
+        final CheckpointDataKey readerChkptDK, writerChkptDK;
+        try {
+            readerOOS = new ObjectOutputStream(readerChkptBA);
+            readerOOS.writeObject(readerProxy.checkpointInfo());
+            readerOOS.close();
+            CheckpointData readerChkptData = new CheckpointData(jobInstanceID, stepId, CheckpointType.READER);
+            readerChkptData.setRestartToken(readerChkptBA.toByteArray());
+            readerChkptDK = new CheckpointDataKey(jobInstanceID, stepId, CheckpointType.READER);
+
+            persistenceManagerService.setCheckpointData(readerChkptDK, readerChkptData);
+
+            writerOOS = new ObjectOutputStream(writerChkptBA);
+            writerOOS.writeObject(writerProxy.checkpointInfo());
+            writerOOS.close();
+            CheckpointData writerChkptData = new CheckpointData(jobInstanceID, stepId, CheckpointType.WRITER);
+            writerChkptData.setRestartToken(writerChkptBA.toByteArray());
+            writerChkptDK = new CheckpointDataKey(jobInstanceID, stepId, CheckpointType.WRITER);
+
+            persistenceManagerService.setCheckpointData(writerChkptDK, writerChkptData);
+
+        } catch (final Exception ex) {
+            // is this what I should be throwing here?
+            throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + stepId + "]", ex);
+        }
+    }
+
+    public int checkpointTimeout() {
+        try {
+            return this.checkpointAlgorithm.checkpointTimeout();
+        } catch (final Exception e) {
+            throw new BatchContainerRuntimeException("Checkpoint algorithm checkpointTimeout() failed", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointType.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointType.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointType.java
new file mode 100644
index 0000000..7f6c46f
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointType.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.container.impl.controller.chunk;
+
+public enum CheckpointType {
+    READER, WRITER
+}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkHelper.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkHelper.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkHelper.java
new file mode 100755
index 0000000..756d108
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkHelper.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ * 
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License, 
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.batchee.container.impl.controller.chunk;
+
+import org.apache.batchee.jaxb.Chunk;
+
+public class ChunkHelper {
+
+    public static int getItemCount(Chunk chunk) {
+        String chunkSizeStr = chunk.getItemCount();
+        int size = 10;
+
+        if (chunkSizeStr != null && !chunkSizeStr.isEmpty()) {
+            size = Integer.valueOf(chunk.getItemCount());
+        }
+
+        chunk.setItemCount(Integer.toString(size));
+        return size;
+    }
+
+    public static int getTimeLimit(Chunk chunk) {
+        String chunkTimeLimitStr = chunk.getTimeLimit();
+        int timeLimit = 0; //default time limit = 0 seconds ie no timelimit
+
+        if (chunkTimeLimitStr != null && !chunkTimeLimitStr.isEmpty()) {
+            timeLimit = Integer.valueOf(chunk.getTimeLimit());
+        }
+
+        chunk.setTimeLimit(Integer.toString(timeLimit));
+        return timeLimit;
+    }
+
+    public static String getCheckpointPolicy(Chunk chunk) {
+        String checkpointPolicy = chunk.getCheckpointPolicy();
+
+        if (checkpointPolicy != null && !checkpointPolicy.isEmpty()) {
+            if (!(checkpointPolicy.equals("item") || checkpointPolicy.equals("custom"))) {
+                throw new IllegalArgumentException("The only supported attributed values for 'checkpoint-policy' are 'item' and 'custom'.");
+            }
+        } else {
+            checkpointPolicy = "item";
+        }
+
+        chunk.setCheckpointPolicy(checkpointPolicy);
+        return checkpointPolicy;
+    }
+
+    public static int getSkipLimit(Chunk chunk) {
+        return Integer.valueOf(chunk.getSkipLimit());
+    }
+
+    public static int getRetryLimit(Chunk chunk) {
+        return Integer.valueOf(chunk.getRetryLimit());
+    }
+
+}