You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@batchee.apache.org by rm...@apache.org on 2013/11/05 08:39:06 UTC

[29/62] importing batchee from github - a fork from the IBm RI

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java
new file mode 100755
index 0000000..7c8eec7
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java
@@ -0,0 +1,888 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ * 
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License, 
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.container.impl.controller.chunk;
+
+import org.apache.batchee.container.exception.BatchContainerRuntimeException;
+import org.apache.batchee.container.exception.BatchContainerServiceException;
+import org.apache.batchee.container.impl.MetricImpl;
+import org.apache.batchee.container.impl.StepContextImpl;
+import org.apache.batchee.container.impl.controller.SingleThreadedStepController;
+import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
+import org.apache.batchee.container.proxy.CheckpointAlgorithmProxy;
+import org.apache.batchee.container.proxy.ChunkListenerProxy;
+import org.apache.batchee.container.proxy.InjectionReferences;
+import org.apache.batchee.container.proxy.ItemProcessListenerProxy;
+import org.apache.batchee.container.proxy.ItemProcessorProxy;
+import org.apache.batchee.container.proxy.ItemReadListenerProxy;
+import org.apache.batchee.container.proxy.ItemReaderProxy;
+import org.apache.batchee.container.proxy.ItemWriteListenerProxy;
+import org.apache.batchee.container.proxy.ItemWriterProxy;
+import org.apache.batchee.container.proxy.ProxyFactory;
+import org.apache.batchee.container.proxy.RetryProcessListenerProxy;
+import org.apache.batchee.container.proxy.RetryReadListenerProxy;
+import org.apache.batchee.container.proxy.RetryWriteListenerProxy;
+import org.apache.batchee.container.proxy.SkipProcessListenerProxy;
+import org.apache.batchee.container.proxy.SkipReadListenerProxy;
+import org.apache.batchee.container.proxy.SkipWriteListenerProxy;
+import org.apache.batchee.container.services.ServicesManager;
+import org.apache.batchee.container.util.PartitionDataWrapper;
+import org.apache.batchee.container.util.TCCLObjectInputStream;
+import org.apache.batchee.jaxb.Chunk;
+import org.apache.batchee.jaxb.ItemProcessor;
+import org.apache.batchee.jaxb.ItemReader;
+import org.apache.batchee.jaxb.ItemWriter;
+import org.apache.batchee.jaxb.Property;
+import org.apache.batchee.jaxb.Step;
+import org.apache.batchee.spi.PersistenceManagerService;
+
+import javax.batch.api.chunk.CheckpointAlgorithm;
+import javax.batch.runtime.BatchStatus;
+import java.io.ByteArrayInputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class ChunkStepController extends SingleThreadedStepController {
+
+    private final static String sourceClass = ChunkStepController.class.getName();
+    private final static Logger logger = Logger.getLogger(sourceClass);
+
+    private final PersistenceManagerService persistenceManagerService = ServicesManager.service(PersistenceManagerService.class);
+
+    private Chunk chunk = null;
+    private ItemReaderProxy readerProxy = null;
+    private ItemProcessorProxy processorProxy = null;
+    private ItemWriterProxy writerProxy = null;
+    private CheckpointAlgorithmProxy checkpointProxy = null;
+    private CheckpointAlgorithm chkptAlg = null;
+    private CheckpointManager checkpointManager;
+    private SkipHandler skipHandler = null;
+    private CheckpointDataKey readerChkptDK, writerChkptDK = null;
+    private List<ChunkListenerProxy> chunkListeners = null;
+    private List<ItemReadListenerProxy> itemReadListeners = null;
+    private List<ItemProcessListenerProxy> itemProcessListeners = null;
+    private List<ItemWriteListenerProxy> itemWriteListeners = null;
+    private RetryHandler retryHandler;
+
+    private boolean rollbackRetry = false;
+
+    public ChunkStepController(final RuntimeJobExecution jobExecutionImpl, final Step step, final StepContextImpl stepContext,
+                               final long rootJobExecutionId, final BlockingQueue<PartitionDataWrapper> analyzerStatusQueue) {
+        super(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerStatusQueue);
+    }
+
+    /**
+     * Utility Class to hold statuses at each level of Read-Process-Write loop
+     */
+    private class ItemStatus {
+
+        public boolean isSkipped() {
+            return skipped;
+        }
+
+        public void setSkipped(boolean skipped) {
+            this.skipped = skipped;
+        }
+
+        public boolean isFiltered() {
+            return filtered;
+        }
+
+        public void setFiltered(boolean filtered) {
+            this.filtered = filtered;
+        }
+
+        public boolean isCheckPointed() {
+            return checkPointed;
+        }
+
+        public void setCheckPointed(boolean checkPointed) {
+            this.checkPointed = checkPointed;
+        }
+
+        public boolean isFinished() {
+            return finished;
+        }
+
+        public void setFinished(boolean finished) {
+            this.finished = finished;
+        }
+
+        public void setRetry(boolean ignored) {
+            // no-op
+        }
+
+        public boolean isRollback() {
+            return rollback;
+        }
+
+        public void setRollback(boolean rollback) {
+            this.rollback = rollback;
+        }
+
+        private boolean skipped = false;
+        private boolean filtered = false;
+        private boolean finished = false;
+        private boolean checkPointed = false;
+        private boolean rollback = false;
+
+    }
+
+    /**
+     * We read and process one item at a time but write in chunks (group of
+     * items). So, this method loops until we either reached the end of the
+     * reader (not more items to read), or the writer buffer is full or a
+     * checkpoint is triggered.
+     *
+     * @param chunkSize write buffer size
+     * @param theStatus flags when the read-process reached the last record or a
+     *                  checkpoint is required
+     * @return an array list of objects to write
+     */
+    private List<Object> readAndProcess(int chunkSize, ItemStatus theStatus) {
+        List<Object> chunkToWrite = new ArrayList<Object>();
+        Object itemRead;
+        Object itemProcessed;
+        int readProcessedCount = 0;
+
+        while (true) {
+            ItemStatus status = new ItemStatus();
+            itemRead = readItem(status);
+
+            if (status.isRollback()) {
+                theStatus.setRollback(true);
+                // inc rollbackCount
+                stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
+                break;
+            }
+
+            if (!status.isSkipped() && !status.isFinished()) {
+                itemProcessed = processItem(itemRead, status);
+
+                if (status.isRollback()) {
+                    theStatus.setRollback(true);
+                    // inc rollbackCount
+                    stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
+                    break;
+                }
+
+                if (!status.isSkipped() && !status.isFiltered()) {
+                    chunkToWrite.add(itemProcessed);
+                    readProcessedCount++;
+                }
+            }
+
+            theStatus.setFinished(status.isFinished());
+            theStatus.setCheckPointed(checkpointManager.applyCheckPointPolicy());
+
+            // This will force the current item to finish processing on a stop
+            // request
+            if (stepContext.getBatchStatus().equals(BatchStatus.STOPPING)) {
+                theStatus.setFinished(true);
+            }
+
+            // write buffer size reached
+            if ((readProcessedCount == chunkSize) && !("custom".equals(checkpointProxy.getCheckpointType()))) {
+                break;
+            }
+
+            // checkpoint reached
+            if (theStatus.isCheckPointed()) {
+                break;
+            }
+
+            // last record in readerProxy reached
+            if (theStatus.isFinished()) {
+                break;
+            }
+
+        }
+        return chunkToWrite;
+    }
+
+    /**
+     * Reads an item from the reader
+     *
+     * @param status flags the current read status
+     * @return the item read
+     */
+    private Object readItem(ItemStatus status) {
+        Object itemRead = null;
+
+        try {
+            // call read listeners before and after the actual read
+            for (ItemReadListenerProxy readListenerProxy : itemReadListeners) {
+                readListenerProxy.beforeRead();
+            }
+
+            itemRead = readerProxy.readItem();
+
+            for (ItemReadListenerProxy readListenerProxy : itemReadListeners) {
+                readListenerProxy.afterRead(itemRead);
+            }
+
+            // itemRead == null means we reached the end of
+            // the readerProxy "resultset"
+            status.setFinished(itemRead == null);
+            if (!status.isFinished()) {
+                stepContext.getMetric(MetricImpl.MetricType.READ_COUNT).incValue();
+            }
+        } catch (Exception e) {
+            stepContext.setException(e);
+            for (ItemReadListenerProxy readListenerProxy : itemReadListeners) {
+                readListenerProxy.onReadError(e);
+            }
+            if (!rollbackRetry) {
+                if (retryReadException(e)) {
+                    for (ItemReadListenerProxy readListenerProxy : itemReadListeners) {
+                        readListenerProxy.onReadError(e);
+                    }
+                    // if not a rollback exception, just retry the current item
+                    if (!retryHandler.isRollbackException(e)) {
+                        itemRead = readItem(status);
+                    } else {
+                        status.setRollback(true);
+                        rollbackRetry = true;
+                        // inc rollbackCount
+                        stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
+                    }
+                } else if (skipReadException(e)) {
+                    status.setSkipped(true);
+                    stepContext.getMetric(MetricImpl.MetricType.READ_SKIP_COUNT).incValue();
+
+                } else {
+                    throw new BatchContainerRuntimeException(e);
+                }
+            } else {
+                // coming from a rollback retry
+                if (skipReadException(e)) {
+                    status.setSkipped(true);
+                    stepContext.getMetric(MetricImpl.MetricType.READ_SKIP_COUNT).incValue();
+
+                } else if (retryReadException(e)) {
+                    if (!retryHandler.isRollbackException(e)) {
+                        itemRead = readItem(status);
+                    } else {
+                        status.setRollback(true);
+                        // inc rollbackCount
+                        stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
+                    }
+                } else {
+                    throw new BatchContainerRuntimeException(e);
+                }
+            }
+
+        } catch (final Throwable e) {
+            throw new BatchContainerRuntimeException(e);
+        }
+
+        return itemRead;
+    }
+
+    /**
+     * Process an item previously read by the reader
+     *
+     * @param itemRead the item read
+     * @param status   flags the current process status
+     * @return the processed item
+     */
+    private Object processItem(final Object itemRead, final ItemStatus status) {
+        Object processedItem = null;
+
+        // if no processor defined for this chunk
+        if (processorProxy == null) {
+            return itemRead;
+        }
+
+        try {
+
+            // call process listeners before and after the actual process call
+            for (final ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
+                processListenerProxy.beforeProcess(itemRead);
+            }
+
+            processedItem = processorProxy.processItem(itemRead);
+
+            if (processedItem == null) {
+                // inc filterCount
+                stepContext.getMetric(MetricImpl.MetricType.FILTER_COUNT).incValue();
+                status.setFiltered(true);
+            }
+
+            for (final ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
+                processListenerProxy.afterProcess(itemRead, processedItem);
+            }
+        } catch (final Exception e) {
+            for (final ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
+                processListenerProxy.onProcessError(processedItem, e);
+            }
+            if (!rollbackRetry) {
+                if (retryProcessException(e, itemRead)) {
+                    if (!retryHandler.isRollbackException(e)) {
+                        // call process listeners before and after the actual
+                        // process call
+                        for (ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
+                            processListenerProxy.beforeProcess(itemRead);
+                        }
+                        processedItem = processItem(itemRead, status);
+                        if (processedItem == null) {
+                            // inc filterCount
+                            stepContext.getMetric(MetricImpl.MetricType.FILTER_COUNT).incValue();
+                            status.setFiltered(true);
+                        }
+
+                        for (final ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
+                            processListenerProxy.afterProcess(itemRead, processedItem);
+                        }
+                    } else {
+                        status.setRollback(true);
+                        rollbackRetry = true;
+                        // inc rollbackCount
+                        stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
+                    }
+                } else if (skipProcessException(e, itemRead)) {
+                    status.setSkipped(true);
+                    stepContext.getMetric(MetricImpl.MetricType.PROCESS_SKIP_COUNT).incValue();
+                } else {
+                    throw new BatchContainerRuntimeException(e);
+                }
+            } else {
+                if (skipProcessException(e, itemRead)) {
+                    status.setSkipped(true);
+                    stepContext.getMetric(MetricImpl.MetricType.PROCESS_SKIP_COUNT).incValue();
+                } else if (retryProcessException(e, itemRead)) {
+                    if (!retryHandler.isRollbackException(e)) {
+                        // call process listeners before and after the actual
+                        // process call
+                        for (final ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
+                            processListenerProxy.beforeProcess(itemRead);
+                        }
+                        processedItem = processItem(itemRead, status);
+                        if (processedItem == null) {
+                            // inc filterCount
+                            stepContext.getMetric(MetricImpl.MetricType.FILTER_COUNT).incValue();
+                            status.setFiltered(true);
+                        }
+
+                        for (final ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
+                            processListenerProxy.afterProcess(itemRead, processedItem);
+                        }
+                    } else {
+                        status.setRollback(true);
+                        rollbackRetry = true;
+                        // inc rollbackCount
+                        stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
+                    }
+                } else {
+                    throw new BatchContainerRuntimeException(e);
+                }
+            }
+
+        } catch (final Throwable e) {
+            throw new BatchContainerRuntimeException(e);
+        }
+
+        return processedItem;
+    }
+
+    /**
+     * Writes items
+     *
+     * @param theChunk the array list with all items processed ready to be written
+     */
+    private void writeChunk(List<Object> theChunk, ItemStatus status) {
+        if (!theChunk.isEmpty()) {
+            try {
+
+                // call read listeners before and after the actual read
+                for (ItemWriteListenerProxy writeListenerProxy : itemWriteListeners) {
+                    writeListenerProxy.beforeWrite(theChunk);
+                }
+
+                writerProxy.writeItems(theChunk);
+
+                for (ItemWriteListenerProxy writeListenerProxy : itemWriteListeners) {
+                    writeListenerProxy.afterWrite(theChunk);
+                }
+                stepContext.getMetric(MetricImpl.MetricType.WRITE_COUNT).incValueBy(theChunk.size());
+            } catch (Exception e) {
+                this.stepContext.setException(e);
+                for (ItemWriteListenerProxy writeListenerProxy : itemWriteListeners) {
+                    writeListenerProxy.onWriteError(theChunk, e);
+                }
+                if (!rollbackRetry) {
+                    if (retryWriteException(e, theChunk)) {
+                        if (!retryHandler.isRollbackException(e)) {
+                            writeChunk(theChunk, status);
+                        } else {
+                            rollbackRetry = true;
+                            status.setRollback(true);
+                            // inc rollbackCount
+                            stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
+                        }
+                    } else if (skipWriteException(e, theChunk)) {
+                        stepContext.getMetric(MetricImpl.MetricType.WRITE_SKIP_COUNT).incValueBy(1);
+                    } else {
+                        throw new BatchContainerRuntimeException(e);
+                    }
+
+                } else {
+                    if (skipWriteException(e, theChunk)) {
+                        stepContext.getMetric(MetricImpl.MetricType.WRITE_SKIP_COUNT).incValueBy(1);
+                    } else if (retryWriteException(e, theChunk)) {
+                        if (!retryHandler.isRollbackException(e)) {
+                            status.setRetry(true);
+                            writeChunk(theChunk, status);
+                        } else {
+                            rollbackRetry = true;
+                            status.setRollback(true);
+                            // inc rollbackCount
+                            stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
+                        }
+                    } else {
+                        throw new BatchContainerRuntimeException(e);
+                    }
+                }
+
+            } catch (Throwable e) {
+                throw new BatchContainerRuntimeException(e);
+            }
+        }
+    }
+
+    private void invokeChunk() {
+        int itemCount = ChunkHelper.getItemCount(chunk);
+        int timeInterval = ChunkHelper.getTimeLimit(chunk);
+        boolean checkPointed = true;
+        boolean rollback = false;
+        Throwable caughtThrowable = null;
+
+        // begin new transaction at first iteration or after a checkpoint commit
+
+        try {
+            transactionManager.begin();
+            this.openReaderAndWriter();
+            transactionManager.commit();
+
+            while (true) {
+
+                if (checkPointed || rollback) {
+                    if ("custom".equals(checkpointProxy.getCheckpointType())) {
+                        int newtimeOut = this.checkpointManager.checkpointTimeout();
+                        transactionManager.setTransactionTimeout(newtimeOut);
+                    }
+                    transactionManager.begin();
+                    for (ChunkListenerProxy chunkProxy : chunkListeners) {
+                        chunkProxy.beforeChunk();
+                    }
+
+                    if (rollback) {
+                        positionReaderAtCheckpoint();
+                        positionWriterAtCheckpoint();
+                        checkpointManager = new CheckpointManager(readerProxy, writerProxy,
+                            getCheckpointAlgorithm(itemCount, timeInterval), jobExecutionImpl
+                            .getJobInstance().getInstanceId(), step.getId());
+                    }
+                }
+
+                ItemStatus status = new ItemStatus();
+
+                if (rollback) {
+                    rollback = false;
+                }
+
+                final List<Object> chunkToWrite = readAndProcess(itemCount, status);
+
+                if (status.isRollback()) {
+                    itemCount = 1;
+                    rollback = true;
+
+                    readerProxy.close();
+                    writerProxy.close();
+
+                    transactionManager.rollback();
+
+                    continue;
+                }
+
+                writeChunk(chunkToWrite, status);
+
+                if (status.isRollback()) {
+                    itemCount = 1;
+                    rollback = true;
+
+                    readerProxy.close();
+                    writerProxy.close();
+
+                    transactionManager.rollback();
+
+                    continue;
+                }
+                checkPointed = status.isCheckPointed();
+
+                // we could finish the chunk in 3 conditions: buffer is full,
+                // checkpoint, not more input
+                if (status.isCheckPointed() || status.isFinished()) {
+                    // TODO: missing before checkpoint listeners
+                    // 1.- check if spec list proper steps for before checkpoint
+                    // 2.- ask Andy about retry
+                    // 3.- when do we stop?
+
+                    checkpointManager.checkpoint();
+
+                    for (ChunkListenerProxy chunkProxy : chunkListeners) {
+                        chunkProxy.afterChunk();
+                    }
+
+                    this.persistUserData();
+
+                    this.chkptAlg.beginCheckpoint();
+
+                    transactionManager.commit();
+
+                    this.chkptAlg.endCheckpoint();
+
+                    invokeCollectorIfPresent();
+
+                    // exit loop when last record is written
+                    if (status.isFinished()) {
+                        transactionManager.begin();
+
+                        readerProxy.close();
+                        writerProxy.close();
+
+                        transactionManager.commit();
+                        // increment commitCount
+                        stepContext.getMetric(MetricImpl.MetricType.COMMIT_COUNT).incValue();
+                        break;
+                    } else {
+                        // increment commitCount
+                        stepContext.getMetric(MetricImpl.MetricType.COMMIT_COUNT).incValue();
+                    }
+
+                }
+
+            }
+        } catch (final Exception e) {
+            caughtThrowable = e;
+            logger.log(Level.SEVERE, "Failure in Read-Process-Write Loop", e);
+            // Only try to call onError() if we have an Exception, but not an Error.
+            for (ChunkListenerProxy chunkProxy : chunkListeners) {
+                try {
+                    chunkProxy.onError(e);
+                } catch (final Exception e1) {
+                    logger.log(Level.SEVERE, e1.getMessage(), e1);
+                }
+            }
+        } catch (final Throwable t) {
+            caughtThrowable = t;
+            logger.log(Level.SEVERE, t.getMessage(), t);
+        } finally {
+            if (caughtThrowable != null) {
+                transactionManager.setRollbackOnly();
+                readerProxy.close();
+                writerProxy.close();
+                transactionManager.rollback();
+                throw new BatchContainerRuntimeException("Failure in Read-Process-Write Loop", caughtThrowable);
+            }
+        }
+    }
+
+    protected void invokeCoreStep() throws BatchContainerServiceException {
+
+        this.chunk = step.getChunk();
+
+        initializeChunkArtifacts();
+
+        invokeChunk();
+    }
+
+    private CheckpointAlgorithm getCheckpointAlgorithm(final int itemCount, final int timeInterval) {
+        final CheckpointAlgorithm alg;
+        if ("item".equals(checkpointProxy.getCheckpointType())) {
+            alg = new ItemCheckpointAlgorithm();
+            ((ItemCheckpointAlgorithm) alg).setThresholds(itemCount, timeInterval);
+        } else { // custom chkpt alg
+            alg = checkpointProxy;
+        }
+
+        return alg;
+    }
+
+    /*
+     * Initialize itemreader, itemwriter, and item processor checkpoint
+     */
+    private void initializeChunkArtifacts() {
+        final int itemCount = ChunkHelper.getItemCount(chunk);
+        final int timeInterval = ChunkHelper.getTimeLimit(chunk);
+
+        {
+            final ItemReader itemReader = chunk.getReader();
+            final List<Property> itemReaderProps = itemReader.getProperties() == null ? null : itemReader.getProperties().getPropertyList();
+            final InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, itemReaderProps);
+            readerProxy = ProxyFactory.createItemReaderProxy(itemReader.getRef(), injectionRef, stepContext, jobExecutionImpl);
+        }
+
+        {
+            final ItemProcessor itemProcessor = chunk.getProcessor();
+            if (itemProcessor != null) {
+                final List<Property> itemProcessorProps = itemProcessor.getProperties() == null ? null : itemProcessor.getProperties().getPropertyList();
+                final InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, itemProcessorProps);
+                processorProxy = ProxyFactory.createItemProcessorProxy(itemProcessor.getRef(), injectionRef, stepContext, jobExecutionImpl);
+            }
+        }
+
+        {
+            final ItemWriter itemWriter = chunk.getWriter();
+            final List<Property> itemWriterProps = itemWriter.getProperties() == null ? null : itemWriter.getProperties().getPropertyList();
+            final InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, itemWriterProps);
+            writerProxy = ProxyFactory.createItemWriterProxy(itemWriter.getRef(), injectionRef, stepContext, jobExecutionImpl);
+        }
+
+        {
+            final List<Property> propList;
+            if (chunk.getCheckpointAlgorithm() != null) {
+                propList = (chunk.getCheckpointAlgorithm().getProperties() == null) ? null : chunk.getCheckpointAlgorithm().getProperties().getPropertyList();
+            } else {
+                propList = null;
+            }
+
+            final InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propList);
+            checkpointProxy = CheckpointAlgorithmFactory.getCheckpointAlgorithmProxy(step, injectionRef, stepContext, jobExecutionImpl);
+        }
+
+        {
+            final InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, null);
+
+            this.chunkListeners = jobExecutionImpl.getListenerFactory().getChunkListeners(step, injectionRef, stepContext, jobExecutionImpl);
+            this.itemReadListeners = jobExecutionImpl.getListenerFactory().getItemReadListeners(step, injectionRef, stepContext, jobExecutionImpl);
+            this.itemProcessListeners = jobExecutionImpl.getListenerFactory().getItemProcessListeners(step, injectionRef, stepContext, jobExecutionImpl);
+            this.itemWriteListeners = jobExecutionImpl.getListenerFactory().getItemWriteListeners(step, injectionRef, stepContext, jobExecutionImpl);
+            final List<SkipProcessListenerProxy> skipProcessListeners = jobExecutionImpl.getListenerFactory().getSkipProcessListeners(step, injectionRef, stepContext, jobExecutionImpl);
+            final List<SkipReadListenerProxy> skipReadListeners = jobExecutionImpl.getListenerFactory().getSkipReadListeners(step, injectionRef, stepContext, jobExecutionImpl);
+            final List<SkipWriteListenerProxy> skipWriteListeners = jobExecutionImpl.getListenerFactory().getSkipWriteListeners(step, injectionRef, stepContext, jobExecutionImpl);
+            final List<RetryProcessListenerProxy> retryProcessListeners = jobExecutionImpl.getListenerFactory().getRetryProcessListeners(step, injectionRef, stepContext, jobExecutionImpl);
+            final List<RetryReadListenerProxy> retryReadListeners = jobExecutionImpl.getListenerFactory().getRetryReadListeners(step, injectionRef, stepContext, jobExecutionImpl);
+            final List<RetryWriteListenerProxy> retryWriteListeners = jobExecutionImpl.getListenerFactory().getRetryWriteListeners(step, injectionRef, stepContext, jobExecutionImpl);
+
+            if ("item".equals(checkpointProxy.getCheckpointType())) {
+                chkptAlg = new ItemCheckpointAlgorithm();
+                ItemCheckpointAlgorithm.class.cast(chkptAlg).setThresholds(itemCount, timeInterval);
+            } else { // custom chkpt alg
+                chkptAlg = checkpointProxy;
+            }
+
+            checkpointManager = new CheckpointManager(readerProxy, writerProxy, chkptAlg, jobExecutionImpl.getJobInstance().getInstanceId(), step.getId());
+
+            skipHandler = new SkipHandler(chunk);
+            skipHandler.addSkipProcessListener(skipProcessListeners);
+            skipHandler.addSkipReadListener(skipReadListeners);
+            skipHandler.addSkipWriteListener(skipWriteListeners);
+
+            retryHandler = new RetryHandler(chunk);
+
+            retryHandler.addRetryProcessListener(retryProcessListeners);
+            retryHandler.addRetryReadListener(retryReadListeners);
+            retryHandler.addRetryWriteListener(retryWriteListeners);
+        }
+    }
+
+    private void openReaderAndWriter() {
+        readerChkptDK = new CheckpointDataKey(jobExecutionImpl.getJobInstance().getInstanceId(), step.getId(), CheckpointType.READER);
+        CheckpointData readerChkptData = persistenceManagerService.getCheckpointData(readerChkptDK);
+        try {
+
+            // check for data in backing store
+            if (readerChkptData != null) {
+                final byte[] readertoken = readerChkptData.getRestartToken();
+                final ByteArrayInputStream readerChkptBA = new ByteArrayInputStream(readertoken);
+                TCCLObjectInputStream readerOIS;
+                try {
+                    readerOIS = new TCCLObjectInputStream(readerChkptBA);
+                    readerProxy.open((Serializable) readerOIS.readObject());
+                    readerOIS.close();
+                } catch (final Exception ex) {
+                    // is this what I should be throwing here?
+                    throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + step.getId() + "]", ex);
+                }
+            } else {
+                // no chkpt data exists in the backing store
+                readerChkptData = null;
+                readerProxy.open(null);
+            }
+        } catch (final ClassCastException e) {
+            throw new IllegalStateException("Expected CheckpointData but found" + readerChkptData);
+        }
+
+        writerChkptDK = new CheckpointDataKey(jobExecutionImpl.getJobInstance().getInstanceId(), step.getId(), CheckpointType.WRITER);
+        CheckpointData writerChkptData = persistenceManagerService.getCheckpointData(writerChkptDK);
+        try {
+            // check for data in backing store
+            if (writerChkptData != null) {
+                final byte[] writertoken = writerChkptData.getRestartToken();
+                final ByteArrayInputStream writerChkptBA = new ByteArrayInputStream(writertoken);
+                TCCLObjectInputStream writerOIS;
+                try {
+                    writerOIS = new TCCLObjectInputStream(writerChkptBA);
+                    writerProxy.open((Serializable) writerOIS.readObject());
+                    writerOIS.close();
+                } catch (final Exception ex) {
+                    // is this what I should be throwing here?
+                    throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + step.getId() + "]", ex);
+                }
+            } else {
+                // no chkpt data exists in the backing store
+                writerChkptData = null;
+                writerProxy.open(null);
+            }
+        } catch (final ClassCastException e) {
+            throw new IllegalStateException("Expected Checkpoint but found" + writerChkptData);
+        }
+
+        // set up metrics
+        // stepContext.addMetric(MetricImpl.Counter.valueOf("READ_COUNT"), 0);
+        // stepContext.addMetric(MetricImpl.Counter.valueOf("WRITE_COUNT"), 0);
+        // stepContext.addMetric(MetricImpl.Counter.valueOf("READ_SKIP_COUNT"), 0);
+        // stepContext.addMetric(MetricImpl.Counter.valueOf("PROCESS_SKIP_COUNT"), 0);
+        // stepContext.addMetric(MetricImpl.Counter.valueOf("WRITE_SKIP_COUNT"), 0);
+    }
+
+    @Override
+    public void stop() {
+        stepContext.setBatchStatus(BatchStatus.STOPPING);
+
+        // we don't need to call stop on the chunk implementation here since a
+        // chunk always returns control to
+        // the batch container after every item.
+
+    }
+
+    private boolean skipReadException(final Exception e) {
+        try {
+            skipHandler.handleExceptionRead(e);
+        } catch (final BatchContainerRuntimeException bcre) {
+            return false;
+        }
+        return true;
+    }
+
+    private boolean retryReadException(final Exception e) {
+        try {
+            retryHandler.handleExceptionRead(e);
+        } catch (final BatchContainerRuntimeException bcre) {
+            return false;
+        }
+        return true;
+
+    }
+
+    private boolean skipProcessException(final Exception e, final Object record) {
+        try {
+            skipHandler.handleExceptionWithRecordProcess(e, record);
+        } catch (BatchContainerRuntimeException bcre) {
+            return false;
+        }
+        return true;
+
+    }
+
+    private boolean retryProcessException(final Exception e, final Object record) {
+        try {
+            retryHandler.handleExceptionProcess(e, record);
+        } catch (BatchContainerRuntimeException bcre) {
+            return false;
+        }
+        return true;
+    }
+
+    private boolean skipWriteException(final Exception e, final List<Object> chunkToWrite) {
+        try {
+            skipHandler.handleExceptionWithRecordListWrite(e, chunkToWrite);
+        } catch (BatchContainerRuntimeException bcre) {
+            return false;
+        }
+        return true;
+    }
+
+    private boolean retryWriteException(final Exception e, final List<Object> chunkToWrite) {
+        try {
+            retryHandler.handleExceptionWrite(e, chunkToWrite);
+        } catch (BatchContainerRuntimeException bcre) {
+            return false;
+        }
+        return true;
+    }
+
+    private void positionReaderAtCheckpoint() {
+        readerChkptDK = new CheckpointDataKey(jobExecutionImpl.getJobInstance().getInstanceId(), step.getId(), CheckpointType.READER);
+
+        CheckpointData readerData = persistenceManagerService.getCheckpointData(readerChkptDK);
+        try {
+            // check for data in backing store
+            if (readerData != null) {
+                byte[] readertoken = readerData.getRestartToken();
+                ByteArrayInputStream readerChkptBA = new ByteArrayInputStream(readertoken);
+                TCCLObjectInputStream readerOIS;
+                try {
+                    readerOIS = new TCCLObjectInputStream(readerChkptBA);
+                    readerProxy.open((Serializable) readerOIS.readObject());
+                    readerOIS.close();
+                } catch (Exception ex) {
+                    // is this what I should be throwing here?
+                    throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + step.getId() + "]", ex);
+                }
+            } else {
+                // no chkpt data exists in the backing store
+                readerData = null;
+                readerProxy.open(null);
+            }
+        } catch (final ClassCastException e) {
+            throw new IllegalStateException("Expected CheckpointData but found" + readerData);
+        }
+    }
+
+    private void positionWriterAtCheckpoint() {
+        writerChkptDK = new CheckpointDataKey(jobExecutionImpl.getJobInstance().getInstanceId(), step.getId(), CheckpointType.WRITER);
+
+        CheckpointData writerData = persistenceManagerService.getCheckpointData(writerChkptDK);
+        try {
+            // check for data in backing store
+            if (writerData != null) {
+                byte[] writertoken = writerData.getRestartToken();
+                ByteArrayInputStream writerChkptBA = new ByteArrayInputStream(writertoken);
+                TCCLObjectInputStream writerOIS;
+                try {
+                    writerOIS = new TCCLObjectInputStream(writerChkptBA);
+                    writerProxy.open((Serializable) writerOIS.readObject());
+                    writerOIS.close();
+                } catch (Exception ex) {
+                    // is this what I should be throwing here?
+                    throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + step.getId() + "]", ex);
+                }
+            } else {
+                // no chkpt data exists in the backing store
+                writerData = null;
+                writerProxy.open(null);
+            }
+        } catch (ClassCastException e) {
+            throw new IllegalStateException("Expected CheckpointData but found" + writerData);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ItemCheckpointAlgorithm.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ItemCheckpointAlgorithm.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ItemCheckpointAlgorithm.java
new file mode 100755
index 0000000..f95aef3
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ItemCheckpointAlgorithm.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ * 
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License, 
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.batchee.container.impl.controller.chunk;
+
+import javax.batch.api.chunk.CheckpointAlgorithm;
+
+public final class ItemCheckpointAlgorithm implements CheckpointAlgorithm {
+    private long requests = 0;
+    private long checkpointBeginTime = 0;
+
+    private int time;
+    private int item;
+    private long currentTime;
+
+    public ItemCheckpointAlgorithm() {
+        checkpointBeginTime = System.currentTimeMillis();
+        currentTime = checkpointBeginTime;
+    }
+
+    @Override
+    public void endCheckpoint() throws Exception {
+        // no-op
+    }
+
+    public boolean isReadyToCheckpointItem() throws Exception {
+        requests++;
+
+        final boolean itemready = (requests >= item);
+        if (itemready) {
+            requests = 0;
+        }
+        return itemready;
+    }
+
+    public boolean isReadyToCheckpointTime() throws Exception {
+        boolean timeready = false;
+        currentTime = System.currentTimeMillis();
+        final long curdiff = currentTime - checkpointBeginTime;
+        final int diff = (int) curdiff / 1000;
+
+        if (diff >= time) {
+            timeready = true;
+
+            checkpointBeginTime = System.currentTimeMillis();
+
+        }
+
+        return timeready;
+    }
+
+    @Override
+    public boolean isReadyToCheckpoint() throws Exception {
+        boolean ready = false;
+
+        if (time == 0) { // no time limit, just check if item count has been reached
+            if (isReadyToCheckpointItem()) {
+                ready = true;
+            }
+        } else if (isReadyToCheckpointItem() || isReadyToCheckpointTime()) {
+            ready = true;
+        }
+
+        return ready;
+    }
+
+    public void setThresholds(int itemthreshold, int timethreshold) {
+        item = itemthreshold;
+        time = timethreshold;
+    }
+
+    @Override
+    public void beginCheckpoint() throws Exception {
+        checkpointBeginTime = currentTime;
+    }
+
+    @Override
+    public int checkpointTimeout() throws Exception {
+        return 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/PersistentDataWrapper.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/PersistentDataWrapper.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/PersistentDataWrapper.java
new file mode 100755
index 0000000..33611f7
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/PersistentDataWrapper.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ * 
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License, 
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.container.impl.controller.chunk;
+
+import java.io.Serializable;
+
+public class PersistentDataWrapper implements Serializable {
+    private static final long serialVersionUID = 1L;
+    private byte[] persistentDataBytes;
+
+    public PersistentDataWrapper(byte[] persistentData) {
+        this.persistentDataBytes = persistentData;
+    }
+
+    public byte[] getPersistentDataBytes() {
+        return persistentDataBytes;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/RetryHandler.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/RetryHandler.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/RetryHandler.java
new file mode 100755
index 0000000..c0deda8
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/RetryHandler.java
@@ -0,0 +1,229 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ * 
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License, 
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.container.impl.controller.chunk;
+
+import org.apache.batchee.container.exception.BatchContainerRuntimeException;
+import org.apache.batchee.container.proxy.RetryProcessListenerProxy;
+import org.apache.batchee.container.proxy.RetryReadListenerProxy;
+import org.apache.batchee.container.proxy.RetryWriteListenerProxy;
+import org.apache.batchee.jaxb.Chunk;
+import org.apache.batchee.jaxb.ExceptionClassFilter;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class RetryHandler {
+    private List<RetryProcessListenerProxy> _retryProcessListeners = null;
+    private List<RetryReadListenerProxy> _retryReadListeners = null;
+    private List<RetryWriteListenerProxy> _retryWriteListeners = null;
+
+    private Set<String> _retryNoRBIncludeExceptions = null;
+    private Set<String> _retryNoRBExcludeExceptions = null;
+    private Set<String> _retryIncludeExceptions = null;
+    private Set<String> _retryExcludeExceptions = null;
+    private int _retryLimit = Integer.MIN_VALUE;
+    private long _retryCount = 0;
+    private Exception _retryException = null;
+
+    public RetryHandler(final Chunk chunk) {
+        try {
+            if (chunk.getRetryLimit() != null) {
+                _retryLimit = Integer.parseInt(chunk.getRetryLimit());
+                if (_retryLimit < 0) {
+                    throw new IllegalArgumentException("The retry-limit attribute on a chunk cannot be a negative value");
+                }
+
+            }
+        } catch (final NumberFormatException nfe) {
+            throw new RuntimeException("NumberFormatException reading retry-limit", nfe);
+        }
+
+        // Read the include/exclude exceptions.
+        _retryIncludeExceptions = new HashSet<String>();
+        _retryExcludeExceptions = new HashSet<String>();
+        _retryNoRBIncludeExceptions = new HashSet<String>();
+        _retryNoRBExcludeExceptions = new HashSet<String>();
+
+        if (chunk.getRetryableExceptionClasses() != null) {
+            if (chunk.getRetryableExceptionClasses().getIncludeList() != null) {
+                final List<ExceptionClassFilter.Include> includes = chunk.getRetryableExceptionClasses().getIncludeList();
+                for (final ExceptionClassFilter.Include include : includes) {
+                    _retryIncludeExceptions.add(include.getClazz().trim());
+                }
+            }
+            if (chunk.getRetryableExceptionClasses().getExcludeList() != null) {
+                final List<ExceptionClassFilter.Exclude> excludes = chunk.getRetryableExceptionClasses().getExcludeList();
+                for (final ExceptionClassFilter.Exclude exclude : excludes) {
+                    _retryExcludeExceptions.add(exclude.getClazz().trim());
+                }
+            }
+        }
+
+        if (chunk.getNoRollbackExceptionClasses() != null) {
+            if (chunk.getNoRollbackExceptionClasses().getIncludeList() != null) {
+                final List<ExceptionClassFilter.Include> includes = chunk.getNoRollbackExceptionClasses().getIncludeList();
+                for (final ExceptionClassFilter.Include include : includes) {
+                    _retryNoRBIncludeExceptions.add(include.getClazz().trim());
+                }
+            }
+            if (chunk.getNoRollbackExceptionClasses().getExcludeList() != null) {
+                final List<ExceptionClassFilter.Exclude> excludes = chunk.getNoRollbackExceptionClasses().getExcludeList();
+                for (final ExceptionClassFilter.Exclude exclude : excludes) {
+                    _retryNoRBExcludeExceptions.add(exclude.getClazz().trim());
+                }
+            }
+        }
+    }
+
+
+    /**
+     * Add the user-defined RetryProcessListener.
+     */
+    public void addRetryProcessListener(List<RetryProcessListenerProxy> retryProcessListeners) {
+        _retryProcessListeners = retryProcessListeners;
+    }
+
+    /**
+     * Add the user-defined RetryReadListener.
+     */
+    public void addRetryReadListener(List<RetryReadListenerProxy> retryReadListeners) {
+        _retryReadListeners = retryReadListeners;
+    }
+
+    /**
+     * Add the user-defined RetryWriteListener.
+     */
+    public void addRetryWriteListener(List<RetryWriteListenerProxy> retryWriteListeners) {
+        _retryWriteListeners = retryWriteListeners;
+    }
+
+    public boolean isRollbackException(Exception e) {
+        return !isNoRollbackException(e);
+    }
+
+    /**
+     * Handle exception from a read failure.
+     */
+    public void handleExceptionRead(final Exception e) {
+        if (isRetryLimitReached() || !isRetryable(e)) {
+            throw new BatchContainerRuntimeException(e);
+        }
+
+        _retryException = e;
+        // Retry it.  Log it.  Call the RetryListener.
+        ++_retryCount;
+
+        if (_retryReadListeners != null) {
+            for (final RetryReadListenerProxy retryReadListenerProxy : _retryReadListeners) {
+                retryReadListenerProxy.onRetryReadException(e);
+            }
+        }
+    }
+
+    /**
+     * Handle exception from a process failure.
+     */
+    public void handleExceptionProcess(final Exception e, final Object w) {
+        if (isRetryLimitReached() || !isRetryable(e)) {
+            throw new BatchContainerRuntimeException(e);
+        }
+
+        _retryException = e;
+        // Retry it.  Log it.  Call the RetryListener.
+        ++_retryCount;
+
+        if (_retryProcessListeners != null) {
+            for (final RetryProcessListenerProxy retryProcessListenerProxy : _retryProcessListeners) {
+                retryProcessListenerProxy.onRetryProcessException(w, e);
+            }
+        }
+    }
+
+    /**
+     * Handle exception from a write failure.
+     */
+    public void handleExceptionWrite(final Exception e, final List<Object> w) {
+        if (isRetryLimitReached() || !isRetryable(e)) {
+            throw new BatchContainerRuntimeException(e);
+        }
+
+        // Retry it.  Log it.  Call the RetryListener.
+        _retryException = e;
+        ++_retryCount;
+
+        if (_retryWriteListeners != null) {
+            for (final RetryWriteListenerProxy retryWriteListenerProxy : _retryWriteListeners) {
+                retryWriteListenerProxy.onRetryWriteException(w, e);
+            }
+        }
+    }
+
+
+    /**
+     * Check the retryable exception lists to determine whether
+     * the given Exception is retryable.
+     */
+    private boolean isRetryable(final Exception e) {
+        return containsException(_retryIncludeExceptions, e) && !containsException(_retryExcludeExceptions, e);
+    }
+
+    private boolean isNoRollbackException(final Exception e) {
+        return containsException(_retryNoRBIncludeExceptions, e) && !containsException(_retryNoRBExcludeExceptions, e);
+    }
+
+    /**
+     * Check whether given exception is in the specified exception list
+     */
+    private boolean containsException(final Set<String> retryList, final Exception e) {
+        boolean retVal = false;
+
+        final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+        for (final String exClassName : retryList) {
+            try {
+                retVal = classLoader.loadClass(exClassName).isInstance(e);
+                if (retVal) {
+                    break;
+                }
+            } catch (final ClassNotFoundException cnf) {
+                // no-op
+            }
+        }
+
+        return retVal;
+    }
+
+    /**
+     * Check if the retry limit has been reached.
+     * <p/>
+     * Note: if retry handling isn't enabled (i.e. not configured in xJCL), then this method
+     * will always return TRUE.
+     */
+    private boolean isRetryLimitReached() {
+        // Unlimited retries if it is never defined
+        return _retryLimit != Integer.MIN_VALUE && (_retryCount >= _retryLimit);
+
+    }
+
+    public Exception getException() {
+        return _retryException;
+    }
+
+    public String toString() {
+        return "RetryHandler{" + super.toString() + "}count:limit=" + _retryCount + ":" + _retryLimit;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/SkipHandler.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/SkipHandler.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/SkipHandler.java
new file mode 100755
index 0000000..02092da
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/SkipHandler.java
@@ -0,0 +1,199 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ * 
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License, 
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.batchee.container.impl.controller.chunk;
+
+import org.apache.batchee.container.exception.BatchContainerRuntimeException;
+import org.apache.batchee.container.proxy.SkipProcessListenerProxy;
+import org.apache.batchee.container.proxy.SkipReadListenerProxy;
+import org.apache.batchee.container.proxy.SkipWriteListenerProxy;
+import org.apache.batchee.jaxb.Chunk;
+import org.apache.batchee.jaxb.ExceptionClassFilter;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class SkipHandler {
+
+    /**
+     * Logic for handling skipped records.
+     */
+
+    private List<SkipProcessListenerProxy> _skipProcessListener = null;
+    private List<SkipReadListenerProxy> _skipReadListener = null;
+    private List<SkipWriteListenerProxy> _skipWriteListener = null;
+
+    private Set<String> _skipIncludeExceptions = null;
+    private Set<String> _skipExcludeExceptions = null;
+    private int _skipLimit = Integer.MIN_VALUE;
+    private long _skipCount = 0;
+
+    public SkipHandler(final Chunk chunk) {
+        try {
+            if (chunk.getSkipLimit() != null) {
+                _skipLimit = Integer.parseInt(chunk.getSkipLimit());
+                if (_skipLimit < 0) {
+                    throw new IllegalArgumentException("The skip-limit attribute on a chunk cannot be a negative value");
+                }
+            }
+        } catch (final NumberFormatException nfe) {
+            throw new RuntimeException("NumberFormatException reading skip-limit", nfe);
+        }
+
+
+        // Read the include/exclude exceptions.
+
+        _skipIncludeExceptions = new HashSet<String>();
+        _skipExcludeExceptions = new HashSet<String>();
+
+        if (chunk.getSkippableExceptionClasses() != null && chunk.getSkippableExceptionClasses().getIncludeList() != null) {
+            final List<ExceptionClassFilter.Include> includes = chunk.getSkippableExceptionClasses().getIncludeList();
+            for (final ExceptionClassFilter.Include include : includes) {
+                _skipIncludeExceptions.add(include.getClazz().trim());
+            }
+        }
+
+        if (chunk.getSkippableExceptionClasses() != null && chunk.getSkippableExceptionClasses().getExcludeList() != null) {
+            final List<ExceptionClassFilter.Exclude> excludes = chunk.getSkippableExceptionClasses().getExcludeList();
+            for (final ExceptionClassFilter.Exclude exclude : excludes) {
+                _skipExcludeExceptions.add(exclude.getClazz().trim());
+            }
+        }
+    }
+
+    /**
+     * Add the user-defined SkipReadListeners.
+     */
+    public void addSkipReadListener(List<SkipReadListenerProxy> skipReadListener) {
+        _skipReadListener = skipReadListener;
+    }
+
+    /**
+     * Add the user-defined SkipWriteListeners.
+     */
+    public void addSkipWriteListener(List<SkipWriteListenerProxy> skipWriteListener) {
+        _skipWriteListener = skipWriteListener;
+    }
+
+    /**
+     * Add the user-defined SkipReadListeners.
+     */
+    public void addSkipProcessListener(List<SkipProcessListenerProxy> skipProcessListener) {
+        _skipProcessListener = skipProcessListener;
+    }
+
+
+    /**
+     * Handle exception from a read failure.
+     */
+    public void handleExceptionRead(Exception e) {
+        if (isSkipLimitReached() || !isSkippable(e)) {
+            throw new BatchContainerRuntimeException(e);
+        }
+
+        // Skip it.  Log it.  Call the SkipListener.
+        ++_skipCount;
+
+        if (_skipReadListener != null) {
+            for (final SkipReadListenerProxy skipReadListenerProxy : _skipReadListener) {
+                skipReadListenerProxy.onSkipReadItem(e);
+            }
+        }
+    }
+
+    /**
+     * Handle exception from a process failure.
+     */
+    public void handleExceptionWithRecordProcess(final Exception e, final Object w) {
+        if (isSkipLimitReached() || !isSkippable(e)) {
+            throw new BatchContainerRuntimeException(e);
+        }
+
+        // Skip it.  Log it.  Call the SkipProcessListener.
+        ++_skipCount;
+
+        if (_skipProcessListener != null) {
+            for (SkipProcessListenerProxy skipProcessListenerProxy : _skipProcessListener) {
+                skipProcessListenerProxy.onSkipProcessItem(w, e);
+            }
+        }
+    }
+
+    /**
+     * Handle exception from a write failure.
+     */
+    public void handleExceptionWithRecordListWrite(final Exception e, final List<Object> items) {
+        if (isSkipLimitReached() || !isSkippable(e)) {
+            throw new BatchContainerRuntimeException(e);
+        }
+
+        // Skip it.  Log it.  Call the SkipListener.
+        ++_skipCount;
+
+        if (_skipWriteListener != null) {
+            for (SkipWriteListenerProxy skipWriteListenerProxy : _skipWriteListener) {
+                skipWriteListenerProxy.onSkipWriteItem(items, e);
+            }
+        }
+    }
+
+
+    /**
+     * Check the skipCount and skippable exception lists to determine whether
+     * the given Exception is skippable.
+     */
+    private boolean isSkippable(final Exception e) {
+        return containsSkippable(_skipIncludeExceptions, e) && !containsSkippable(_skipExcludeExceptions, e);
+    }
+
+    /**
+     * Check whether given exception is in skippable exception list
+     */
+    private boolean containsSkippable(final Set<String> skipList, final Exception e) {
+        final ClassLoader tccl = Thread.currentThread().getContextClassLoader();
+        for (final String exClassName : skipList) {
+            try {
+                if (tccl.loadClass(exClassName).isInstance(e)) {
+                    return true;
+                }
+            } catch (final ClassNotFoundException cnf) {
+                // no-op
+            }
+        }
+
+        return false;
+    }
+
+
+    /**
+     * Check if the skip limit has been reached.
+     * <p/>
+     * Note: if skip handling isn't enabled (i.e. not configured in xJCL), then
+     * this method will always return TRUE.
+     */
+    private boolean isSkipLimitReached() {
+        // Unlimited skips if it is never defined
+        return _skipLimit != Integer.MIN_VALUE && (_skipCount >= _skipLimit);
+
+    }
+
+    public String toString() {
+        return "SkipHandler{" + super.toString() + "}count:limit=" + _skipCount + ":" + _skipLimit;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/JobExecutionHelper.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/JobExecutionHelper.java b/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/JobExecutionHelper.java
new file mode 100755
index 0000000..5b89a88
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/JobExecutionHelper.java
@@ -0,0 +1,217 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ * 
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License, 
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.container.impl.jobinstance;
+
+import org.apache.batchee.container.impl.JobContextImpl;
+import org.apache.batchee.container.impl.JobInstanceImpl;
+import org.apache.batchee.container.jsl.JobModelResolver;
+import org.apache.batchee.container.modelresolver.PropertyResolver;
+import org.apache.batchee.container.modelresolver.PropertyResolverFactory;
+import org.apache.batchee.container.navigator.ModelNavigator;
+import org.apache.batchee.container.navigator.NavigatorFactory;
+import org.apache.batchee.container.services.InternalJobExecution;
+import org.apache.batchee.container.services.JobStatusManagerService;
+import org.apache.batchee.container.services.ServicesManager;
+import org.apache.batchee.container.status.JobStatus;
+import org.apache.batchee.jaxb.JSLJob;
+import org.apache.batchee.jaxb.JSLProperties;
+import org.apache.batchee.spi.PersistenceManagerService;
+import org.apache.batchee.spi.SecurityService;
+
+import javax.batch.operations.JobExecutionAlreadyCompleteException;
+import javax.batch.operations.JobExecutionNotMostRecentException;
+import javax.batch.operations.JobRestartException;
+import javax.batch.operations.JobStartException;
+import javax.batch.operations.NoSuchJobExecutionException;
+import javax.batch.runtime.BatchStatus;
+import javax.batch.runtime.JobInstance;
+import java.util.Properties;
+
+public class JobExecutionHelper {
+    private static final JobStatusManagerService JOB_STATUS_MANAGER_SERVICE = ServicesManager.service(JobStatusManagerService.class);
+    private static final PersistenceManagerService PERSISTENCE_MANAGER_SERVICE = ServicesManager.service(PersistenceManagerService.class);
+    private static final SecurityService SECURITY_SERVICE = ServicesManager.service(SecurityService.class);
+
+    private static ModelNavigator<JSLJob> getResolvedJobNavigator(final String jobXml, final Properties jobParameters, final boolean parallelExecution) {
+        final JSLJob jobModel = new JobModelResolver().resolveModel(jobXml);
+        final PropertyResolver<JSLJob> propResolver = PropertyResolverFactory.createJobPropertyResolver(parallelExecution);
+        propResolver.substituteProperties(jobModel, jobParameters);
+        return NavigatorFactory.createJobNavigator(jobModel);
+    }
+
+    private static ModelNavigator<JSLJob> getResolvedJobNavigator(final JSLJob jobModel, final Properties jobParameters, final boolean parallelExecution) {
+        final PropertyResolver<JSLJob> propResolver = PropertyResolverFactory.createJobPropertyResolver(parallelExecution);
+        propResolver.substituteProperties(jobModel, jobParameters);
+        return NavigatorFactory.createJobNavigator(jobModel);
+    }
+
+    private static JobContextImpl getJobContext(ModelNavigator<JSLJob> jobNavigator) {
+        final JSLProperties jslProperties;
+        if (jobNavigator.getRootModelElement() != null) {
+            jslProperties = jobNavigator.getRootModelElement().getProperties();
+        } else {
+            jslProperties = new JSLProperties();
+        }
+        return new JobContextImpl(jobNavigator, jslProperties);
+    }
+
+    private static JobInstance getNewJobInstance(final String name, final String jobXml) {
+        return PERSISTENCE_MANAGER_SERVICE.createJobInstance(name, SECURITY_SERVICE.getLoggedUser(), jobXml);
+    }
+
+    private static JobInstance getNewSubJobInstance(final String name) {
+        return PERSISTENCE_MANAGER_SERVICE.createSubJobInstance(name, SECURITY_SERVICE.getLoggedUser());
+    }
+
+    private static JobStatus createNewJobStatus(final JobInstance jobInstance) {
+        final long instanceId = jobInstance.getInstanceId();
+        final JobStatus jobStatus = JOB_STATUS_MANAGER_SERVICE.createJobStatus(instanceId);
+        jobStatus.setJobInstance(jobInstance);
+        return jobStatus;
+    }
+
+    private static void validateRestartableFalseJobsDoNotRestart(final JSLJob jobModel)
+        throws JobRestartException {
+        if (jobModel.getRestartable() != null && jobModel.getRestartable().equalsIgnoreCase("false")) {
+            throw new JobRestartException("Job Restartable attribute is false, Job cannot be restarted.");
+        }
+    }
+
+    public static RuntimeJobExecution startJob(final String jobXML, final Properties jobParameters) throws JobStartException {
+        final JSLJob jobModel = new JobModelResolver().resolveModel(jobXML);
+        final ModelNavigator<JSLJob> jobNavigator = getResolvedJobNavigator(jobModel, jobParameters, false);
+        final JobContextImpl jobContext = getJobContext(jobNavigator);
+        final JobInstance jobInstance = getNewJobInstance(jobNavigator.getRootModelElement().getId(), jobXML);
+        final RuntimeJobExecution executionHelper = PERSISTENCE_MANAGER_SERVICE.createJobExecution(jobInstance, jobParameters, jobContext.getBatchStatus());
+
+        executionHelper.prepareForExecution(jobContext);
+
+        final JobStatus jobStatus = createNewJobStatus(jobInstance);
+        JOB_STATUS_MANAGER_SERVICE.updateJobStatus(jobStatus);
+
+        return executionHelper;
+    }
+
+    public static RuntimeFlowInSplitExecution startFlowInSplit(final JSLJob jobModel) throws JobStartException {
+        final ModelNavigator<JSLJob> jobNavigator = getResolvedJobNavigator(jobModel, null, true);
+        final JobContextImpl jobContext = getJobContext(jobNavigator);
+        final JobInstance jobInstance = getNewSubJobInstance(jobNavigator.getRootModelElement().getId());
+        final RuntimeFlowInSplitExecution executionHelper = PERSISTENCE_MANAGER_SERVICE.createFlowInSplitExecution(jobInstance, jobContext.getBatchStatus());
+
+        executionHelper.prepareForExecution(jobContext);
+
+        final JobStatus jobStatus = createNewJobStatus(jobInstance);
+        JOB_STATUS_MANAGER_SERVICE.updateJobStatus(jobStatus);
+
+        return executionHelper;
+    }
+
+    public static RuntimeJobExecution startPartition(JSLJob jobModel, Properties jobParameters) throws JobStartException {
+        final ModelNavigator<JSLJob> jobNavigator = getResolvedJobNavigator(jobModel, jobParameters, true);
+        final JobContextImpl jobContext = getJobContext(jobNavigator);
+
+        final JobInstance jobInstance = getNewSubJobInstance(jobNavigator.getRootModelElement().getId());
+
+        final RuntimeJobExecution executionHelper = PERSISTENCE_MANAGER_SERVICE.createJobExecution(jobInstance, jobParameters, jobContext.getBatchStatus());
+
+        executionHelper.prepareForExecution(jobContext);
+
+        final JobStatus jobStatus = createNewJobStatus(jobInstance);
+        JOB_STATUS_MANAGER_SERVICE.updateJobStatus(jobStatus);
+
+        return executionHelper;
+    }
+
+    private static void validateJobInstanceNotCompleteOrAbandonded(final JobStatus jobStatus) throws JobRestartException, JobExecutionAlreadyCompleteException {
+        if (jobStatus.getBatchStatus() == null) {
+            throw new IllegalStateException("On restart, we didn't find an earlier batch status.");
+        }
+
+        if (jobStatus.getBatchStatus().equals(BatchStatus.COMPLETED)) {
+            throw new JobExecutionAlreadyCompleteException("Already completed job instance = " + jobStatus.getJobInstanceId());
+        } else if (jobStatus.getBatchStatus().equals(BatchStatus.ABANDONED)) {
+            throw new JobRestartException("Abandoned job instance = " + jobStatus.getJobInstanceId());
+        }
+    }
+
+    private static void validateJobExecutionIsMostRecent(final long jobInstanceId, final long executionId) throws JobExecutionNotMostRecentException {
+        final long mostRecentExecutionId = PERSISTENCE_MANAGER_SERVICE.getMostRecentExecutionId(jobInstanceId);
+        if (mostRecentExecutionId != executionId) {
+            throw new JobExecutionNotMostRecentException("ExecutionId: " + executionId + " is not the most recent execution.");
+        }
+    }
+
+    public static RuntimeJobExecution restartPartition(final long execId, final JSLJob gennedJobModel, final Properties partitionProps) throws JobRestartException,
+        JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException, NoSuchJobExecutionException {
+        return restartExecution(execId, gennedJobModel, partitionProps, true, false);
+    }
+
+    public static RuntimeFlowInSplitExecution restartFlowInSplit(final long execId, final JSLJob gennedJobModel) throws JobRestartException,
+        JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException, NoSuchJobExecutionException {
+        return (RuntimeFlowInSplitExecution) restartExecution(execId, gennedJobModel, null, true, true);
+    }
+
+    public static RuntimeJobExecution restartJob(final long executionId, final Properties restartJobParameters) throws JobRestartException,
+        JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException, NoSuchJobExecutionException {
+        return restartExecution(executionId, null, restartJobParameters, false, false);
+    }
+
+    private static RuntimeJobExecution restartExecution(final long executionId, final JSLJob gennedJobModel, final Properties restartJobParameters, final boolean parallelExecution, final boolean flowInSplit) throws JobRestartException,
+        JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException, NoSuchJobExecutionException {
+
+        final long jobInstanceId = PERSISTENCE_MANAGER_SERVICE.getJobInstanceIdByExecutionId(executionId);
+        final JobStatus jobStatus = JOB_STATUS_MANAGER_SERVICE.getJobStatus(jobInstanceId);
+
+        validateJobExecutionIsMostRecent(jobInstanceId, executionId);
+
+        validateJobInstanceNotCompleteOrAbandonded(jobStatus);
+
+        final JobInstanceImpl jobInstance = jobStatus.getJobInstance();
+
+        final ModelNavigator<JSLJob> jobNavigator;
+        // If we are in a parallel job that is genned use the regenned JSL.
+        if (gennedJobModel == null) {
+            jobNavigator = getResolvedJobNavigator(jobInstance.getJobXML(), restartJobParameters, parallelExecution);
+        } else {
+            jobNavigator = getResolvedJobNavigator(gennedJobModel, restartJobParameters, parallelExecution);
+        }
+        // JSLJob jobModel = ModelResolverFactory.createJobResolver().resolveModel(jobInstance.getJobXML());
+        validateRestartableFalseJobsDoNotRestart(jobNavigator.getRootModelElement());
+
+        final JobContextImpl jobContext = getJobContext(jobNavigator);
+
+        final RuntimeJobExecution executionHelper;
+        if (flowInSplit) {
+            executionHelper = PERSISTENCE_MANAGER_SERVICE.createFlowInSplitExecution(jobInstance, jobContext.getBatchStatus());
+        } else {
+            executionHelper = PERSISTENCE_MANAGER_SERVICE.createJobExecution(jobInstance, restartJobParameters, jobContext.getBatchStatus());
+        }
+        executionHelper.prepareForExecution(jobContext, jobStatus.getRestartOn());
+        JOB_STATUS_MANAGER_SERVICE.updateJobStatusWithNewExecution(jobInstance.getInstanceId(), executionHelper.getExecutionId());
+
+        return executionHelper;
+    }
+
+    public static InternalJobExecution getPersistedJobOperatorJobExecution(final long jobExecutionId) throws NoSuchJobExecutionException {
+        return PERSISTENCE_MANAGER_SERVICE.jobOperatorGetJobExecution(jobExecutionId);
+    }
+
+
+    public static JobInstance getJobInstance(final long executionId) {
+        return JOB_STATUS_MANAGER_SERVICE.getJobStatusFromExecutionId(executionId).getJobInstance();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/RuntimeFlowInSplitExecution.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/RuntimeFlowInSplitExecution.java b/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/RuntimeFlowInSplitExecution.java
new file mode 100755
index 0000000..8570351
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/RuntimeFlowInSplitExecution.java
@@ -0,0 +1,37 @@
+/**
+ * Copyright 2012 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.container.impl.jobinstance;
+
+import org.apache.batchee.container.status.ExecutionStatus;
+
+import javax.batch.runtime.JobInstance;
+
+public class RuntimeFlowInSplitExecution extends RuntimeJobExecution {
+    public RuntimeFlowInSplitExecution(final JobInstance jobInstance, final long executionId) {
+        super(jobInstance, executionId);
+    }
+
+    private ExecutionStatus flowStatus;
+
+    public ExecutionStatus getFlowStatus() {
+        return flowStatus;
+    }
+
+    public void setFlowStatus(ExecutionStatus flowStatus) {
+        this.flowStatus = flowStatus;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/RuntimeJobExecution.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/RuntimeJobExecution.java b/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/RuntimeJobExecution.java
new file mode 100755
index 0000000..d9fdb84
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/RuntimeJobExecution.java
@@ -0,0 +1,189 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ * 
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License, 
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.container.impl.jobinstance;
+
+import org.apache.batchee.container.impl.JobContextImpl;
+import org.apache.batchee.container.impl.JobExecutionImpl;
+import org.apache.batchee.container.navigator.ModelNavigator;
+import org.apache.batchee.container.proxy.ListenerFactory;
+import org.apache.batchee.container.services.InternalJobExecution;
+import org.apache.batchee.jaxb.JSLJob;
+
+import javax.batch.runtime.BatchStatus;
+import javax.batch.runtime.JobInstance;
+import java.io.Closeable;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Properties;
+
+public class RuntimeJobExecution {
+    private ModelNavigator<JSLJob> jobNavigator = null;
+    private JobInstance jobInstance;
+    private long executionId;
+    private String restartOn;
+    private JobContextImpl jobContext = null;
+    private ListenerFactory listenerFactory;
+    private InternalJobExecution operatorJobExecution = null;
+    private Integer partitionInstance = null;
+    private Collection<Closeable> releasables = new ArrayList<Closeable>();
+
+    public RuntimeJobExecution(final JobInstance jobInstance, final long executionId) {
+        this.jobInstance = jobInstance;
+        this.executionId = executionId;
+        this.operatorJobExecution = new JobExecutionImpl(executionId, jobInstance.getInstanceId());
+    }
+
+
+	/*
+     * Non-spec'd methods (not on the interface, but maybe we should
+	 * put on a second interface).
+	 */
+
+    public void prepareForExecution(final JobContextImpl jobContext, final String restartOn) {
+        this.jobContext = jobContext;
+        this.jobNavigator = jobContext.getNavigator();
+        jobContext.setExecutionId(executionId);
+        jobContext.setInstanceId(jobInstance.getInstanceId());
+        this.restartOn = restartOn;
+        operatorJobExecution.setJobContext(jobContext);
+    }
+
+    public void prepareForExecution(final JobContextImpl jobContext) {
+        prepareForExecution(jobContext, null);
+    }
+
+    public void setRestartOn(final String restartOn) {
+        this.restartOn = restartOn;
+    }
+
+    public long getExecutionId() {
+        return executionId;
+    }
+
+    public long getInstanceId() {
+        return jobInstance.getInstanceId();
+    }
+
+    public JobInstance getJobInstance() {
+        return jobInstance;
+    }
+
+    public ModelNavigator<JSLJob> getJobNavigator() {
+        return jobNavigator;
+    }
+
+    public JobContextImpl getJobContext() {
+        return jobContext;
+    }
+
+    public String getRestartOn() {
+        return restartOn;
+    }
+
+    public ListenerFactory getListenerFactory() {
+        return listenerFactory;
+    }
+
+    public void setListenerFactory(final ListenerFactory listenerFactory) {
+        this.listenerFactory = listenerFactory;
+    }
+
+    public InternalJobExecution getJobOperatorJobExecution() {
+        return operatorJobExecution;
+    }
+
+    public BatchStatus getBatchStatus() {
+        return this.jobContext.getBatchStatus();
+    }
+
+    public String getExitStatus() {
+        return this.jobContext.getExitStatus();
+    }
+
+    public void setBatchStatus(final String status) {
+        operatorJobExecution.setBatchStatus(status);
+    }
+
+    public void setCreateTime(final Timestamp ts) {
+        operatorJobExecution.setCreateTime(ts);
+    }
+
+    public void setEndTime(final Timestamp ts) {
+        operatorJobExecution.setEndTime(ts);
+    }
+
+    public void setExitStatus(final String status) {
+        //exitStatus = status;
+        operatorJobExecution.setExitStatus(status);
+
+    }
+
+    public void setLastUpdateTime(final Timestamp ts) {
+        operatorJobExecution.setLastUpdateTime(ts);
+    }
+
+    public void setStartTime(final Timestamp ts) {
+        operatorJobExecution.setStartTime(ts);
+    }
+
+    public void setJobParameters(final Properties jProps) {
+        operatorJobExecution.setJobParameters(jProps);
+    }
+
+    public Properties getJobParameters() {
+        return operatorJobExecution.getJobParameters();
+    }
+
+    public Date getStartTime() {
+        return operatorJobExecution.getStartTime();
+    }
+
+    public Date getEndTime() {
+        return operatorJobExecution.getEndTime();
+    }
+
+    public Date getLastUpdatedTime() {
+        return operatorJobExecution.getLastUpdatedTime();
+    }
+
+    public Date getCreateTime() {
+        return operatorJobExecution.getCreateTime();
+    }
+
+    @Override
+    public String toString() {
+        return " executionId: " + executionId + " restartOn: " + restartOn + "\n-----------------------\n" + "jobInstance: \n   " + jobInstance;
+    }
+
+    public Integer getPartitionInstance() {
+        return partitionInstance;
+    }
+
+    public void setPartitionInstance(final Integer partitionInstance) {
+        this.partitionInstance = partitionInstance;
+    }
+
+    public Collection<Closeable> getReleasables() {
+        return releasables;
+    }
+
+    public synchronized void addReleasable(final Closeable releasable) {
+        releasables.add(releasable);
+    }
+}