You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@batchee.apache.org by rm...@apache.org on 2013/11/05 08:39:30 UTC

[53/62] importing batchee from github - a fork from the IBm RI

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ChunkStepControllerImpl.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ChunkStepControllerImpl.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ChunkStepControllerImpl.java
deleted file mode 100755
index 34dcffc..0000000
--- a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ChunkStepControllerImpl.java
+++ /dev/null
@@ -1,1045 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- * 
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License, 
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.jbatch.container.impl;
-
-import java.io.ByteArrayInputStream;
-import java.io.PrintWriter;
-import java.io.Serializable;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.batch.api.chunk.CheckpointAlgorithm;
-import javax.batch.runtime.BatchStatus;
-
-import com.ibm.jbatch.container.artifact.proxy.CheckpointAlgorithmProxy;
-import com.ibm.jbatch.container.artifact.proxy.ChunkListenerProxy;
-import com.ibm.jbatch.container.artifact.proxy.InjectionReferences;
-import com.ibm.jbatch.container.artifact.proxy.ItemProcessListenerProxy;
-import com.ibm.jbatch.container.artifact.proxy.ItemProcessorProxy;
-import com.ibm.jbatch.container.artifact.proxy.ItemReadListenerProxy;
-import com.ibm.jbatch.container.artifact.proxy.ItemReaderProxy;
-import com.ibm.jbatch.container.artifact.proxy.ItemWriteListenerProxy;
-import com.ibm.jbatch.container.artifact.proxy.ItemWriterProxy;
-import com.ibm.jbatch.container.artifact.proxy.ProxyFactory;
-import com.ibm.jbatch.container.artifact.proxy.RetryProcessListenerProxy;
-import com.ibm.jbatch.container.artifact.proxy.RetryReadListenerProxy;
-import com.ibm.jbatch.container.artifact.proxy.RetryWriteListenerProxy;
-import com.ibm.jbatch.container.artifact.proxy.SkipProcessListenerProxy;
-import com.ibm.jbatch.container.artifact.proxy.SkipReadListenerProxy;
-import com.ibm.jbatch.container.artifact.proxy.SkipWriteListenerProxy;
-import com.ibm.jbatch.container.context.impl.MetricImpl;
-import com.ibm.jbatch.container.context.impl.StepContextImpl;
-import com.ibm.jbatch.container.exception.BatchContainerRuntimeException;
-import com.ibm.jbatch.container.exception.BatchContainerServiceException;
-import com.ibm.jbatch.container.jobinstance.RuntimeJobExecution;
-import com.ibm.jbatch.container.persistence.CheckpointAlgorithmFactory;
-import com.ibm.jbatch.container.persistence.CheckpointData;
-import com.ibm.jbatch.container.persistence.CheckpointDataKey;
-import com.ibm.jbatch.container.persistence.CheckpointManager;
-import com.ibm.jbatch.container.persistence.ItemCheckpointAlgorithm;
-import com.ibm.jbatch.container.services.IPersistenceManagerService;
-import com.ibm.jbatch.container.servicesmanager.ServicesManager;
-import com.ibm.jbatch.container.servicesmanager.ServicesManagerImpl;
-import com.ibm.jbatch.container.util.PartitionDataWrapper;
-import com.ibm.jbatch.container.util.TCCLObjectInputStream;
-import com.ibm.jbatch.container.validation.ArtifactValidationException;
-import com.ibm.jbatch.jsl.model.Chunk;
-import com.ibm.jbatch.jsl.model.ItemProcessor;
-import com.ibm.jbatch.jsl.model.ItemReader;
-import com.ibm.jbatch.jsl.model.ItemWriter;
-import com.ibm.jbatch.jsl.model.Property;
-import com.ibm.jbatch.jsl.model.Step;
-
-public class ChunkStepControllerImpl extends SingleThreadedStepControllerImpl {
-
-	private final static String sourceClass = ChunkStepControllerImpl.class.getName();
-	private final static Logger logger = Logger.getLogger(sourceClass);
-
-	private Chunk chunk = null;
-	private ItemReaderProxy readerProxy = null;
-	private ItemProcessorProxy processorProxy = null;
-	private ItemWriterProxy writerProxy = null;
-	private CheckpointAlgorithmProxy checkpointProxy = null;
-	private CheckpointAlgorithm chkptAlg = null;
-	private CheckpointManager checkpointManager;
-	private ServicesManager servicesManager = ServicesManagerImpl.getInstance();
-	private IPersistenceManagerService _persistenceManagerService = null;
-	private SkipHandler skipHandler = null;
-	CheckpointDataKey readerChkptDK, writerChkptDK = null;
-	CheckpointData readerChkptData = null;
-	CheckpointData writerChkptData = null;
-	List<ChunkListenerProxy> chunkListeners = null;
-	List<SkipProcessListenerProxy> skipProcessListeners = null;
-	List<SkipReadListenerProxy> skipReadListeners = null;
-	List<SkipWriteListenerProxy> skipWriteListeners = null;
-	List<RetryProcessListenerProxy> retryProcessListeners = null;
-	List<RetryReadListenerProxy> retryReadListeners = null;
-	List<RetryWriteListenerProxy> retryWriteListeners = null;
-	List<ItemReadListenerProxy> itemReadListeners = null;
-	List<ItemProcessListenerProxy> itemProcessListeners = null;
-	List<ItemWriteListenerProxy> itemWriteListeners = null;
-	private RetryHandler retryHandler;
-
-	// metrics
-	long readCount = 0;
-	long writeCount = 0;
-	long readSkipCount = 0;
-	long processSkipCount = 0;
-	long writeSkipCount = 0;
-	boolean rollbackRetry = false;
-
-	public ChunkStepControllerImpl(RuntimeJobExecution jobExecutionImpl, Step step, StepContextImpl stepContext, long rootJobExecutionId, BlockingQueue<PartitionDataWrapper> analyzerStatusQueue) {
-		super(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerStatusQueue);
-	}
-
-	/**
-	 * Utility Class to hold statuses at each level of Read-Process-Write loop
-	 * 
-	 */
-	private class ItemStatus {
-
-		public boolean isSkipped() {
-			return skipped;
-		}
-
-		public void setSkipped(boolean skipped) {
-			this.skipped = skipped;
-		}
-
-		public boolean isFiltered() {
-			return filtered;
-		}
-
-		public void setFiltered(boolean filtered) {
-			this.filtered = filtered;
-		}
-
-		public boolean isCheckPointed() {
-			return checkPointed;
-		}
-
-		public void setCheckPointed(boolean checkPointed) {
-			this.checkPointed = checkPointed;
-		}
-
-		public boolean isFinished() {
-			return finished;
-		}
-
-		public void setFinished(boolean finished) {
-			this.finished = finished;
-		}
-
-		public boolean isRetry() {
-			return retry;
-		}
-
-		public void setRetry(boolean retry) {
-			this.retry = retry;
-		}
-
-		public boolean isRollback() {
-			return rollback;
-		}
-
-		public void setRollback(boolean rollback) {
-			this.rollback = rollback;
-		}
-
-		private boolean skipped = false;
-		private boolean filtered = false;
-		private boolean finished = false;
-		private boolean checkPointed = false;
-		private boolean retry = false;
-		private boolean rollback = false;
-
-	}
-
-	/**
-	 * We read and process one item at a time but write in chunks (group of
-	 * items). So, this method loops until we either reached the end of the
-	 * reader (not more items to read), or the writer buffer is full or a
-	 * checkpoint is triggered.
-	 * 
-	 * @param chunkSize
-	 *            write buffer size
-	 * @param theStatus
-	 *            flags when the read-process reached the last record or a
-	 *            checkpoint is required
-	 * @return an array list of objects to write
-	 */
-	private List<Object> readAndProcess(int chunkSize, ItemStatus theStatus) {
-		logger.entering(sourceClass, "readAndProcess", new Object[] { chunkSize, theStatus });
-
-		List<Object> chunkToWrite = new ArrayList<Object>();
-		Object itemRead = null;
-		Object itemProcessed = null;
-		int readProcessedCount = 0;
-
-		while (true) {
-			ItemStatus status = new ItemStatus();
-			itemRead = readItem(status);
-
-			if (status.isRollback()) {
-				theStatus.setRollback(true);
-				// inc rollbackCount
-				stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
-				break;
-			}
-
-			if (!status.isSkipped() && !status.isFinished()) {
-				itemProcessed = processItem(itemRead, status);
-
-				if (status.isRollback()) {
-					theStatus.setRollback(true);
-					// inc rollbackCount
-					stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
-					break;
-				}
-
-				if (!status.isSkipped() && !status.isFiltered()) {
-					chunkToWrite.add(itemProcessed);
-					readProcessedCount++;
-				}
-			}
-
-			theStatus.setFinished(status.isFinished());
-			theStatus.setCheckPointed(checkpointManager.ApplyCheckPointPolicy());
-
-			// This will force the current item to finish processing on a stop
-			// request
-			if (stepContext.getBatchStatus().equals(BatchStatus.STOPPING)) {
-				theStatus.setFinished(true);
-			}
-
-			// write buffer size reached
-			if ((readProcessedCount == chunkSize) && (checkpointProxy.getCheckpointType() != "custom")) {
-				break;
-			}
-
-			// checkpoint reached
-			if (theStatus.isCheckPointed()) {
-				break;
-			}
-
-			// last record in readerProxy reached
-			if (theStatus.isFinished()) {
-				break;
-			}
-
-		}
-		logger.exiting(sourceClass, "readAndProcess", chunkToWrite);
-		return chunkToWrite;
-	}
-
-	/**
-	 * Reads an item from the reader
-	 * 
-	 * @param status
-	 *            flags the current read status
-	 * @return the item read
-	 */
-	private Object readItem(ItemStatus status) {
-		logger.entering(sourceClass, "readItem", status);
-		Object itemRead = null;
-
-		try {
-			// call read listeners before and after the actual read
-			for (ItemReadListenerProxy readListenerProxy : itemReadListeners) {
-				readListenerProxy.beforeRead();
-			}
-
-			itemRead = readerProxy.readItem();
-
-			for (ItemReadListenerProxy readListenerProxy : itemReadListeners) {
-				readListenerProxy.afterRead(itemRead);
-			}
-
-			// itemRead == null means we reached the end of
-			// the readerProxy "resultset"
-			status.setFinished(itemRead == null);
-			if (!status.isFinished()) {
-				stepContext.getMetric(MetricImpl.MetricType.READ_COUNT).incValue();
-			}
-		} catch (Exception e) {
-			stepContext.setException(e);
-			for (ItemReadListenerProxy readListenerProxy : itemReadListeners) {
-				readListenerProxy.onReadError(e);
-			}
-			if(!rollbackRetry) {
-				if (retryReadException(e)) {
-					for (ItemReadListenerProxy readListenerProxy : itemReadListeners) {
-						readListenerProxy.onReadError(e);
-					}
-					// if not a rollback exception, just retry the current item
-					if (!retryHandler.isRollbackException(e)) {
-						itemRead = readItem(status);
-					} else {
-						status.setRollback(true);
-						rollbackRetry = true;
-						// inc rollbackCount
-						stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
-					}
-				}
-				else if(skipReadException(e)) {
-					status.setSkipped(true);
-					stepContext.getMetric(MetricImpl.MetricType.READ_SKIP_COUNT).incValue();
-
-				}
-				else {
-					throw new BatchContainerRuntimeException(e);
-				}
-			}
-			else {
-				// coming from a rollback retry
-				if(skipReadException(e)) {
-					status.setSkipped(true);
-					stepContext.getMetric(MetricImpl.MetricType.READ_SKIP_COUNT).incValue();
-
-				}
-				else if (retryReadException(e)) {
-					if (!retryHandler.isRollbackException(e)) {
-						itemRead = readItem(status);
-					}
-					else {
-						status.setRollback(true);
-						// inc rollbackCount
-						stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
-					}
-				}
-				else {
-					throw new BatchContainerRuntimeException(e);
-				}
-			}
-
-		} catch (Throwable e) {
-			throw new BatchContainerRuntimeException(e);
-		}
-
-		logger.exiting(sourceClass, "readItem", itemRead==null ? "<null>" : itemRead);
-		return itemRead;
-	}
-
-	/**
-	 * Process an item previously read by the reader
-	 * 
-	 * @param itemRead
-	 *            the item read
-	 * @param status
-	 *            flags the current process status
-	 * @return the processed item
-	 */
-	private Object processItem(Object itemRead, ItemStatus status) {
-		logger.entering(sourceClass, "processItem", new Object[] { itemRead, status });
-		Object processedItem = null;
-
-		// if no processor defined for this chunk
-		if (processorProxy == null){
-			return itemRead;
-		}
-
-		try {
-
-			// call process listeners before and after the actual process call
-			for (ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
-				processListenerProxy.beforeProcess(itemRead);
-			}
-
-			processedItem = processorProxy.processItem(itemRead);
-
-			if (processedItem == null) {
-				// inc filterCount
-				stepContext.getMetric(MetricImpl.MetricType.FILTER_COUNT).incValue();
-				status.setFiltered(true);
-			}
-
-			for (ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
-				processListenerProxy.afterProcess(itemRead, processedItem);
-			}
-		} catch (Exception e) {
-			for (ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
-				processListenerProxy.onProcessError(processedItem, e);
-			}
-			if(!rollbackRetry) {
-				if (retryProcessException(e, itemRead)) {
-					if (!retryHandler.isRollbackException(e)) {
-						// call process listeners before and after the actual
-						// process call
-						for (ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
-							processListenerProxy.beforeProcess(itemRead);
-						}
-						processedItem = processItem(itemRead, status);
-						if (processedItem == null) {
-							// inc filterCount
-							stepContext.getMetric(MetricImpl.MetricType.FILTER_COUNT).incValue();
-							status.setFiltered(true);
-						}
-
-						for (ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
-							processListenerProxy.afterProcess(itemRead, processedItem);
-						}
-					} else {
-						status.setRollback(true);
-						rollbackRetry = true;
-						// inc rollbackCount
-						stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
-					}
-				}
-				else if (skipProcessException(e, itemRead)) {
-					status.setSkipped(true);
-					stepContext.getMetric(MetricImpl.MetricType.PROCESS_SKIP_COUNT).incValue();
-				}
-				else {
-					throw new BatchContainerRuntimeException(e);
-				}
-			}
-			else {
-				if (skipProcessException(e, itemRead)) {
-					status.setSkipped(true);
-					stepContext.getMetric(MetricImpl.MetricType.PROCESS_SKIP_COUNT).incValue();
-				} else if (retryProcessException(e, itemRead)) {
-					if (!retryHandler.isRollbackException(e)) {
-						// call process listeners before and after the actual
-						// process call
-						for (ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
-							processListenerProxy.beforeProcess(itemRead);
-						}
-						processedItem = processItem(itemRead, status);
-						if (processedItem == null) {
-							// inc filterCount
-							stepContext.getMetric(MetricImpl.MetricType.FILTER_COUNT).incValue();
-							status.setFiltered(true);
-						}
-
-						for (ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
-							processListenerProxy.afterProcess(itemRead, processedItem);
-						}
-					} else {
-						status.setRollback(true);
-						rollbackRetry = true;
-						// inc rollbackCount
-						stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
-					}
-				} else {
-					throw new BatchContainerRuntimeException(e);
-				}
-			}
-
-		} catch (Throwable e) {
-			throw new BatchContainerRuntimeException(e);
-		}
-
-		logger.exiting(sourceClass, "processItem", processedItem==null ? "<null>" : processedItem);
-		return processedItem;
-	}
-
-	/**
-	 * Writes items
-	 * 
-	 * @param theChunk
-	 *            the array list with all items processed ready to be written
-	 */
-	private void writeChunk(List<Object> theChunk, ItemStatus status) {
-		logger.entering(sourceClass, "writeChunk", theChunk);
-		if (!theChunk.isEmpty()) {
-			try {
-
-				// call read listeners before and after the actual read
-				for (ItemWriteListenerProxy writeListenerProxy : itemWriteListeners) {
-					writeListenerProxy.beforeWrite(theChunk);
-				}
-
-				writerProxy.writeItems(theChunk);
-
-				for (ItemWriteListenerProxy writeListenerProxy : itemWriteListeners) {
-					writeListenerProxy.afterWrite(theChunk);
-				}
-				stepContext.getMetric(MetricImpl.MetricType.WRITE_COUNT).incValueBy(theChunk.size());
-			} catch (Exception e) {
-				this.stepContext.setException(e);
-				for (ItemWriteListenerProxy writeListenerProxy : itemWriteListeners) {
-					writeListenerProxy.onWriteError(theChunk, e);
-				}
-				if(!rollbackRetry)
-				{
-					if (retryWriteException(e, theChunk)) {
-						if (!retryHandler.isRollbackException(e)) {
-							writeChunk(theChunk, status);
-						} else {
-							rollbackRetry = true;
-							status.setRollback(true);
-							// inc rollbackCount
-							stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
-						}
-					} else if (skipWriteException(e, theChunk)) {
-						stepContext.getMetric(MetricImpl.MetricType.WRITE_SKIP_COUNT).incValueBy(1);
-					} else {
-						throw new BatchContainerRuntimeException(e);
-					}
-
-				}
-				else {
-					if (skipWriteException(e, theChunk)) {
-						stepContext.getMetric(MetricImpl.MetricType.WRITE_SKIP_COUNT).incValueBy(1);
-					} else if (retryWriteException(e, theChunk)) {
-						if (!retryHandler.isRollbackException(e)) {
-							status.setRetry(true);
-							writeChunk(theChunk, status);
-						} else {
-							rollbackRetry = true;
-							status.setRollback(true);
-							// inc rollbackCount
-							stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
-						}
-					} else {
-						throw new BatchContainerRuntimeException(e);
-					}
-				}
-
-			} catch (Throwable e) {
-				throw new BatchContainerRuntimeException(e);
-			}
-		}
-		logger.exiting(sourceClass, "writeChunk");
-	}
-
-	/**
-	 * Main Read-Process-Write loop
-	 * 
-	 * @throws Exception
-	 */
-	private void invokeChunk() {
-		logger.entering(sourceClass, "invokeChunk2");
-
-		int itemCount = ChunkHelper.getItemCount(chunk);
-		int timeInterval = ChunkHelper.getTimeLimit(chunk);
-		List<Object> chunkToWrite = new ArrayList<Object>();
-		boolean checkPointed = true;
-		boolean rollback = false;
-		Throwable caughtThrowable = null;
-
-		// begin new transaction at first iteration or after a checkpoint commit
-
-		try {
-			transactionManager.begin();
-			this.openReaderAndWriter();
-			transactionManager.commit();
-
-			while (true) {
-
-				if (checkPointed || rollback) {
-					if (this.checkpointProxy.getCheckpointType() == "custom" ){
-						int newtimeOut = this.checkpointManager.checkpointTimeout();
-						transactionManager.setTransactionTimeout(newtimeOut);
-					}
-					transactionManager.begin();
-					for (ChunkListenerProxy chunkProxy : chunkListeners) {
-						chunkProxy.beforeChunk();
-					}
-
-					if (rollback) {
-						positionReaderAtCheckpoint();
-						positionWriterAtCheckpoint();
-						checkpointManager = new CheckpointManager(readerProxy, writerProxy,
-								getCheckpointAlgorithm(itemCount, timeInterval), jobExecutionImpl.getExecutionId(), jobExecutionImpl
-								.getJobInstance().getInstanceId(), step.getId());
-					}
-				}
-
-				ItemStatus status = new ItemStatus();
-
-				if (rollback) {
-					rollback = false;
-				}
-
-				chunkToWrite = readAndProcess(itemCount, status);
-
-				if (status.isRollback()) {
-					itemCount = 1;
-					rollback = true;
-
-					readerProxy.close();
-					writerProxy.close();
-
-					transactionManager.rollback();
-
-					continue;
-				}
-
-				writeChunk(chunkToWrite, status);
-
-				if (status.isRollback()) {
-					itemCount = 1;
-					rollback = true;
-
-					readerProxy.close();
-					writerProxy.close();
-
-					transactionManager.rollback();
-
-					continue;
-				}
-				checkPointed = status.isCheckPointed();
-
-				// we could finish the chunk in 3 conditions: buffer is full,
-				// checkpoint, not more input
-				if (status.isCheckPointed() || status.isFinished()) {
-					// TODO: missing before checkpoint listeners
-					// 1.- check if spec list proper steps for before checkpoint
-					// 2.- ask Andy about retry
-					// 3.- when do we stop?
-
-							checkpointManager.checkpoint();
-
-							for (ChunkListenerProxy chunkProxy : chunkListeners) {
-								chunkProxy.afterChunk();
-							}
-
-							this.persistUserData();
-
-							this.chkptAlg.beginCheckpoint();
-
-							transactionManager.commit();
-
-							this.chkptAlg.endCheckpoint();
-
-							invokeCollectorIfPresent();
-
-							// exit loop when last record is written
-							if (status.isFinished()) {
-								transactionManager.begin();
-
-								readerProxy.close();
-								writerProxy.close();
-
-								transactionManager.commit();
-								// increment commitCount
-								stepContext.getMetric(MetricImpl.MetricType.COMMIT_COUNT).incValue();
-								break;
-							} else {
-								// increment commitCount
-								stepContext.getMetric(MetricImpl.MetricType.COMMIT_COUNT).incValue();
-							}
-
-				}
-
-			}
-		} catch (Exception e) {
-			caughtThrowable = e;
-			logger.log(Level.SEVERE, "Failure in Read-Process-Write Loop", e);
-			// Only try to call onError() if we have an Exception, but not an Error.
-			for (ChunkListenerProxy chunkProxy : chunkListeners) {
-				try {
-					chunkProxy.onError(e);
-				} catch (Exception e1) {
-					StringWriter sw = new StringWriter();
-					PrintWriter pw = new PrintWriter(sw);
-					e1.printStackTrace(pw);
-					logger.warning("Caught secondary exception when calling chunk listener onError() with stack trace: " + sw.toString() + "\n. Will continue to remaining chunk listeners (if any) and rethrow wrapping the primary exception.");
-				}
-			}
-		} catch (Throwable t) {
-			caughtThrowable = t;
-			logger.log(Level.SEVERE, "Failure in Read-Process-Write Loop", t);
-		} finally {
-			if (caughtThrowable != null) {
-				transactionManager.setRollbackOnly();
-				logger.warning("Caught throwable in chunk processing. Attempting to close all readers and writers.");
-				readerProxy.close();
-				writerProxy.close();
-				transactionManager.rollback();
-				logger.exiting(sourceClass, "invokeChunk");
-				throw new BatchContainerRuntimeException("Failure in Read-Process-Write Loop", caughtThrowable);
-			} else {
-				logger.finest("Exiting normally");
-				logger.exiting(sourceClass, "invokeChunk");
-			}
-		}
-	}
-
-	protected void invokeCoreStep() throws BatchContainerServiceException {
-
-		this.chunk = step.getChunk();
-
-		initializeChunkArtifacts();
-		
-		invokeChunk();
-	}
-
-	private CheckpointAlgorithm getCheckpointAlgorithm(int itemCount, int timeInterval) {
-		CheckpointAlgorithm alg = null;
-
-		if (checkpointProxy.getCheckpointType() == "item") {
-			alg = new ItemCheckpointAlgorithm();
-			((ItemCheckpointAlgorithm) alg).setThresholds(itemCount, timeInterval);
-		} else { // custom chkpt alg
-			alg = (CheckpointAlgorithm) checkpointProxy;
-		}
-
-		return alg;
-	}
-
-	/*
-	 * Initialize itemreader, itemwriter, and item processor checkpoint
-	 */
-	private void initializeChunkArtifacts() {
-		String sourceMethod = "initializeChunkArtifacts";
-		if (logger.isLoggable(Level.FINE))
-			logger.entering(sourceClass, sourceMethod);
-
-		int itemCount = ChunkHelper.getItemCount(chunk);
-		int timeInterval = ChunkHelper.getTimeLimit(chunk);
-		String checkpointPolicy = ChunkHelper.getCheckpointPolicy(chunk);
-
-		ItemReader itemReader = chunk.getReader();
-		List<Property> itemReaderProps = itemReader.getProperties() == null ? null : itemReader.getProperties().getPropertyList();
-		try {
-			InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, 
-					itemReaderProps);
-
-			readerProxy = ProxyFactory.createItemReaderProxy(itemReader.getRef(), injectionRef, stepContext);
-
-			if (logger.isLoggable(Level.FINE)) {
-				logger.fine("Created ItemReaderProxy for " + itemReader.getRef());
-			}
-		} catch (ArtifactValidationException e) {
-			throw new BatchContainerServiceException("Cannot create the ItemReader [" + itemReader.getRef() + "]", e);
-		}
-
-		ItemProcessor itemProcessor = chunk.getProcessor();
-		if (itemProcessor != null){
-			List<Property> itemProcessorProps = itemProcessor.getProperties() == null ? null : itemProcessor.getProperties().getPropertyList();
-			try {
-
-				InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, 
-						itemProcessorProps);
-
-				processorProxy = ProxyFactory.createItemProcessorProxy(itemProcessor.getRef(), injectionRef, stepContext);
-				if (logger.isLoggable(Level.FINE)) {
-					logger.fine("Created ItemProcessorProxy for " + itemProcessor.getRef());
-				}
-			} catch (ArtifactValidationException e) {
-				throw new BatchContainerServiceException("Cannot create the ItemProcessor [" + itemProcessor.getRef() + "]", e);
-			}
-		}
-
-		ItemWriter itemWriter = chunk.getWriter();
-		List<Property> itemWriterProps = itemWriter.getProperties() == null ? null : itemWriter.getProperties().getPropertyList();
-		try {
-			InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, 
-					itemWriterProps);
-
-			writerProxy = ProxyFactory.createItemWriterProxy(itemWriter.getRef(), injectionRef, stepContext);
-			if (logger.isLoggable(Level.FINE)) {
-				logger.fine("Created ItemWriterProxy for " + itemWriter.getRef());
-			}
-		} catch (ArtifactValidationException e) {
-			throw new BatchContainerServiceException("Cannot create the ItemWriter [" + itemWriter.getRef() + "]", e);
-		}
-
-		try {
-			List<Property> propList = null;
-
-			if (chunk.getCheckpointAlgorithm() != null) {
-
-				propList = (chunk.getCheckpointAlgorithm().getProperties() == null) ? null : chunk.getCheckpointAlgorithm().getProperties()
-						.getPropertyList();
-			}
-
-			InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, 
-					propList);
-
-			checkpointProxy = CheckpointAlgorithmFactory.getCheckpointAlgorithmProxy(step, injectionRef, stepContext);
-			if (logger.isLoggable(Level.FINE)) {
-				logger.fine("Created CheckpointAlgorithmProxy for policy [" + checkpointPolicy + "]");
-			}
-		} catch (ArtifactValidationException e) {
-			throw new BatchContainerServiceException("Cannot create the CheckpointAlgorithm for policy [" + chunk.getCheckpointPolicy()
-					+ "]", e);
-		}
-
-		InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, 
-				null);
-
-		this.chunkListeners = jobExecutionImpl.getListenerFactory().getChunkListeners(step, injectionRef, stepContext);
-		this.skipProcessListeners = jobExecutionImpl.getListenerFactory().getSkipProcessListeners(step, injectionRef, stepContext);
-		this.skipReadListeners = jobExecutionImpl.getListenerFactory().getSkipReadListeners(step, injectionRef, stepContext);
-		this.skipWriteListeners = jobExecutionImpl.getListenerFactory().getSkipWriteListeners(step, injectionRef, stepContext);
-		this.retryProcessListeners = jobExecutionImpl.getListenerFactory().getRetryProcessListeners(step, injectionRef, stepContext);
-		this.retryReadListeners = jobExecutionImpl.getListenerFactory().getRetryReadListeners(step, injectionRef, stepContext);
-		this.retryWriteListeners = jobExecutionImpl.getListenerFactory().getRetryWriteListeners(step, injectionRef, stepContext);
-		this.itemReadListeners = jobExecutionImpl.getListenerFactory().getItemReadListeners(step, injectionRef, stepContext);
-		this.itemProcessListeners = jobExecutionImpl.getListenerFactory().getItemProcessListeners(step, injectionRef, stepContext);
-		this.itemWriteListeners = jobExecutionImpl.getListenerFactory().getItemWriteListeners(step, injectionRef, stepContext);
-
-		if (checkpointProxy.getCheckpointType() == "item") {
-			chkptAlg = new ItemCheckpointAlgorithm();
-			((ItemCheckpointAlgorithm) chkptAlg).setThresholds(itemCount, timeInterval);
-		} else { // custom chkpt alg
-			chkptAlg = (CheckpointAlgorithm) checkpointProxy;
-		}
-
-		if (logger.isLoggable(Level.FINE)) {
-			logger.fine("Setting contexts for chunk artifacts");
-		}
-
-		if (logger.isLoggable(Level.FINE))
-			logger.fine("Initialize checkpoint manager with item-count=" + itemCount);
-		logger.fine("Initialize checkpoint manager with time-interval=" + timeInterval);
-
-		checkpointManager = new CheckpointManager(readerProxy, writerProxy, chkptAlg, jobExecutionImpl.getExecutionId(), jobExecutionImpl
-				.getJobInstance().getInstanceId(), step.getId());
-
-		skipHandler = new SkipHandler(chunk, jobExecutionImpl.getJobInstance().getInstanceId(), step.getId());
-		skipHandler.addSkipProcessListener(skipProcessListeners);
-		skipHandler.addSkipReadListener(skipReadListeners);
-		skipHandler.addSkipWriteListener(skipWriteListeners);
-
-		retryHandler = new RetryHandler(chunk, jobExecutionImpl.getJobInstance().getInstanceId(), step.getId());
-
-		retryHandler.addRetryProcessListener(retryProcessListeners);
-		retryHandler.addRetryReadListener(retryReadListeners);
-		retryHandler.addRetryWriteListener(retryWriteListeners);
-
-		if (logger.isLoggable(Level.FINE))
-			logger.exiting(sourceClass, sourceMethod);
-	}
-
-	private void openReaderAndWriter() {
-		String sourceMethod = "openReaderAndWriter";
-
-		if (logger.isLoggable(Level.FINE))
-			logger.entering(sourceClass, sourceMethod);
-
-		_persistenceManagerService = servicesManager.getPersistenceManagerService();
-		readerChkptDK = new CheckpointDataKey(jobExecutionImpl.getJobInstance().getInstanceId(), step.getId(), "READER");
-		CheckpointData readerChkptData = _persistenceManagerService.getCheckpointData(readerChkptDK);
-		try {
-
-			// check for data in backing store
-			if (readerChkptData != null) {
-
-				byte[] readertoken = readerChkptData.getRestartToken();
-				ByteArrayInputStream readerChkptBA = new ByteArrayInputStream(readertoken);
-				TCCLObjectInputStream readerOIS = null;
-				try {
-					readerOIS = new TCCLObjectInputStream(readerChkptBA);
-					readerProxy.open((Serializable) readerOIS.readObject());
-					readerOIS.close();
-				} catch (Exception ex) {
-					// is this what I should be throwing here?
-							throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + step.getId() + "]", ex);
-				}
-			} else {
-				// no chkpt data exists in the backing store
-				readerChkptData = null;
-				readerProxy.open(null);
-			}
-		} catch (ClassCastException e) {
-			logger.warning("Expected CheckpointData but found" + readerChkptData );
-			throw new IllegalStateException("Expected CheckpointData but found" + readerChkptData );
-		}
-
-		writerChkptDK = new CheckpointDataKey(jobExecutionImpl.getJobInstance().getInstanceId(), step.getId(), "WRITER");
-		CheckpointData writerChkptData = _persistenceManagerService.getCheckpointData(writerChkptDK);
-
-		try {
-			// check for data in backing store
-			if (writerChkptData != null) {
-				byte[] writertoken = writerChkptData.getRestartToken();
-				ByteArrayInputStream writerChkptBA = new ByteArrayInputStream(writertoken);
-				TCCLObjectInputStream writerOIS = null;
-				try {
-					writerOIS = new TCCLObjectInputStream(writerChkptBA);
-					writerProxy.open((Serializable) writerOIS.readObject());
-					writerOIS.close();
-				} catch (Exception ex) {
-					// is this what I should be throwing here?
-							throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + step.getId() + "]", ex);
-				}
-			} else {
-				// no chkpt data exists in the backing store
-				writerChkptData = null;
-				writerProxy.open(null);
-			}
-		} catch (ClassCastException e) {
-			logger.warning("Expected Checkpoint but found" + writerChkptData);
-			throw new IllegalStateException("Expected Checkpoint but found" + writerChkptData);
-		}
-
-		// set up metrics
-		// stepContext.addMetric(MetricImpl.Counter.valueOf("READ_COUNT"), 0);
-		// stepContext.addMetric(MetricImpl.Counter.valueOf("WRITE_COUNT"), 0);
-		// stepContext.addMetric(MetricImpl.Counter.valueOf("READ_SKIP_COUNT"),
-		// 0);
-		// stepContext.addMetric(MetricImpl.Counter.valueOf("PROCESS_SKIP_COUNT"),
-		// 0);
-		// stepContext.addMetric(MetricImpl.Counter.valueOf("WRITE_SKIP_COUNT"),
-		// 0);
-
-		if (logger.isLoggable(Level.FINE))
-			logger.exiting(sourceClass, sourceMethod);
-	}
-
-	@Override
-	public void stop() {
-		stepContext.setBatchStatus(BatchStatus.STOPPING);
-
-		// we don't need to call stop on the chunk implementation here since a
-		// chunk always returns control to
-		// the batch container after every item.
-
-	}
-
-	boolean skipReadException(Exception e) {
-
-		try {
-			skipHandler.handleExceptionRead(e);
-		} catch (BatchContainerRuntimeException bcre) {
-			return false;
-		}
-
-		return true;
-
-	}
-
-	boolean retryReadException(Exception e) {
-
-		try {
-			retryHandler.handleExceptionRead(e);
-		} catch (BatchContainerRuntimeException bcre) {
-			return false;
-		}
-
-		return true;
-
-	}
-
-	boolean skipProcessException(Exception e, Object record) {
-
-		try {
-			skipHandler.handleExceptionWithRecordProcess(e, record);
-		} catch (BatchContainerRuntimeException bcre) {
-			return false;
-		}
-
-		return true;
-
-	}
-
-	boolean retryProcessException(Exception e, Object record) {
-
-		try {
-			retryHandler.handleExceptionProcess(e, record);
-		} catch (BatchContainerRuntimeException bcre) {
-			return false;
-		}
-
-		return true;
-
-	}
-
-	boolean skipWriteException(Exception e, List<Object> chunkToWrite) {
-
-
-
-		try {
-			skipHandler.handleExceptionWithRecordListWrite(e, chunkToWrite);
-		} catch (BatchContainerRuntimeException bcre) {
-			return false;
-		}
-
-
-		return true;
-
-	}
-
-	boolean retryWriteException(Exception e, List<Object> chunkToWrite) {
-
-		try {
-			retryHandler.handleExceptionWrite(e, chunkToWrite);
-		} catch (BatchContainerRuntimeException bcre) {
-			return false;
-		}
-
-		return true;
-
-	}
-
-	private void positionReaderAtCheckpoint() {
-		_persistenceManagerService = servicesManager.getPersistenceManagerService();
-		readerChkptDK = new CheckpointDataKey(jobExecutionImpl.getJobInstance().getInstanceId(), step.getId(), "READER");
-
-		CheckpointData readerData = _persistenceManagerService.getCheckpointData(readerChkptDK);
-		try {
-			// check for data in backing store
-			if (readerData != null) {
-				byte[] readertoken = readerData.getRestartToken();
-				ByteArrayInputStream readerChkptBA = new ByteArrayInputStream(readertoken);
-				TCCLObjectInputStream readerOIS = null;
-				try {
-					readerOIS = new TCCLObjectInputStream(readerChkptBA);
-					readerProxy.open((Serializable) readerOIS.readObject());
-					readerOIS.close();
-				} catch (Exception ex) {
-					// is this what I should be throwing here?
-							throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + step.getId() + "]", ex);
-				}
-			} else {
-				// no chkpt data exists in the backing store
-				readerData = null;
-				readerProxy.open(null);
-			}
-		} catch (ClassCastException e) {
-			throw new IllegalStateException("Expected CheckpointData but found" + readerData);
-		}
-	}
-
-	private void positionWriterAtCheckpoint() {
-		_persistenceManagerService = servicesManager.getPersistenceManagerService();
-		writerChkptDK = new CheckpointDataKey(jobExecutionImpl.getJobInstance().getInstanceId(), step.getId(), "WRITER");
-
-		CheckpointData writerData =  _persistenceManagerService.getCheckpointData(writerChkptDK);
-
-		try {
-			// check for data in backing store
-			if (writerData != null) {
-				byte[] writertoken = writerData.getRestartToken();
-				ByteArrayInputStream writerChkptBA = new ByteArrayInputStream(writertoken);
-				TCCLObjectInputStream writerOIS = null;
-				try {
-					writerOIS = new TCCLObjectInputStream(writerChkptBA);
-					writerProxy.open((Serializable) writerOIS.readObject());
-					writerOIS.close();
-				} catch (Exception ex) {
-					// is this what I should be throwing here?
-							throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + step.getId() + "]", ex);
-				}
-			} else {
-				// no chkpt data exists in the backing store
-				writerData = null;
-				writerProxy.open(null);
-			}
-		} catch (ClassCastException e) {
-			throw new IllegalStateException("Expected CheckpointData but found" + writerData);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/JSR352.Runtime/src/com/ibm/jbatch/container/impl/DecisionControllerImpl.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/DecisionControllerImpl.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/DecisionControllerImpl.java
deleted file mode 100755
index 0595c48..0000000
--- a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/DecisionControllerImpl.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- * 
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License, 
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.jbatch.container.impl;
-
-import java.util.List;
-import java.util.logging.Logger;
-
-import javax.batch.runtime.StepExecution;
-
-import com.ibm.jbatch.container.IExecutionElementController;
-import com.ibm.jbatch.container.artifact.proxy.DeciderProxy;
-import com.ibm.jbatch.container.artifact.proxy.InjectionReferences;
-import com.ibm.jbatch.container.artifact.proxy.ProxyFactory;
-import com.ibm.jbatch.container.exception.BatchContainerRuntimeException;
-import com.ibm.jbatch.container.exception.BatchContainerServiceException;
-import com.ibm.jbatch.container.jobinstance.RuntimeJobExecution;
-import com.ibm.jbatch.container.jsl.ExecutionElement;
-import com.ibm.jbatch.container.services.IPersistenceManagerService;
-import com.ibm.jbatch.container.servicesmanager.ServicesManagerImpl;
-import com.ibm.jbatch.container.status.ExecutionStatus;
-import com.ibm.jbatch.container.status.ExtendedBatchStatus;
-import com.ibm.jbatch.container.validation.ArtifactValidationException;
-import com.ibm.jbatch.jsl.model.Decision;
-import com.ibm.jbatch.jsl.model.Property;
-
-public class DecisionControllerImpl implements IExecutionElementController {
-
-	private final static String sourceClass = SplitControllerImpl.class.getName();
-	private final static Logger logger = Logger.getLogger(sourceClass);
-
-	private RuntimeJobExecution jobExecution; 
-
-	private Decision decision;
-
-	private StepExecution[] previousStepExecutions = null;
-
-	private IPersistenceManagerService persistenceService = null;
-
-	public DecisionControllerImpl(RuntimeJobExecution jobExecution, Decision decision) {
-		this.jobExecution = jobExecution;
-		this.decision = decision;
-		persistenceService = ServicesManagerImpl.getInstance().getPersistenceManagerService();
-	}
-
-	@Override
-	public ExecutionStatus execute() {
-
-		String deciderId = decision.getRef();
-		List<Property> propList = (decision.getProperties() == null) ? null : decision.getProperties().getPropertyList();
-
-		DeciderProxy deciderProxy;
-
-		//Create a decider proxy and inject the associated properties
-
-		/* Set the contexts associated with this scope */
-		//job context is always in scope
-		//the parent controller will only pass one valid context to a decision controller
-		//so two of these contexts will always be null
-		InjectionReferences injectionRef = new InjectionReferences(jobExecution.getJobContext(), null, propList);
-
-		try {
-			deciderProxy = ProxyFactory.createDeciderProxy(deciderId,injectionRef );
-		} catch (ArtifactValidationException e) {
-			throw new BatchContainerServiceException("Cannot create the decider [" + deciderId + "]", e);
-		}
-
-		String exitStatus = deciderProxy.decide(this.previousStepExecutions);
-
-		logger.fine("Decider exiting and setting job-level exit status to " + exitStatus);
-
-		//Set the value returned from the decider as the job context exit status.
-		this.jobExecution.getJobContext().setExitStatus(exitStatus);
-
-		return new ExecutionStatus(ExtendedBatchStatus.NORMAL_COMPLETION, exitStatus);
-	}
-
-	protected void setPreviousStepExecutions(ExecutionElement previousExecutionElement, IExecutionElementController previousElementController) { 
-		if (previousExecutionElement == null) {
-			// only job context is available to the decider 
-		} else if (previousExecutionElement instanceof Decision) {
-			
-			throw new BatchContainerRuntimeException("A decision cannot precede another decision.");
-			
-		}
-		
-		List<Long> previousStepExecsIds = previousElementController.getLastRunStepExecutions();
-		
-		StepExecution[] stepExecArray = new StepExecution[previousStepExecsIds.size()];
-		
-		for (int i=0; i < stepExecArray.length; i++) {
-		    StepExecution stepExec = persistenceService.getStepExecutionByStepExecutionId(previousStepExecsIds.get(i));
-		    stepExecArray[i] = stepExec;
-		}
-
-		this.previousStepExecutions =  stepExecArray;
-		
-	}
-
-
-	@Override
-	public void stop() { 
-		// no-op
-	}
-
-    @Override
-    public List<Long> getLastRunStepExecutions() {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ExecutionElementControllerFactory.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ExecutionElementControllerFactory.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ExecutionElementControllerFactory.java
deleted file mode 100755
index ad504b7..0000000
--- a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ExecutionElementControllerFactory.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- * 
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License, 
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.jbatch.container.impl;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-
-import com.ibm.jbatch.container.context.impl.StepContextImpl;
-import com.ibm.jbatch.container.jobinstance.RuntimeJobExecution;
-import com.ibm.jbatch.container.util.PartitionDataWrapper;
-import com.ibm.jbatch.jsl.model.Batchlet;
-import com.ibm.jbatch.jsl.model.Chunk;
-import com.ibm.jbatch.jsl.model.Decision;
-import com.ibm.jbatch.jsl.model.Flow;
-import com.ibm.jbatch.jsl.model.Partition;
-import com.ibm.jbatch.jsl.model.Split;
-import com.ibm.jbatch.jsl.model.Step;
-
-public class ExecutionElementControllerFactory {
-
-	private final static String CLASSNAME = ExecutionElementControllerFactory.class.getName();
-	private final static Logger logger = Logger.getLogger(CLASSNAME);
-
-	public static BaseStepControllerImpl getStepController(RuntimeJobExecution jobExecutionImpl, Step step, StepContextImpl stepContext, long rootJobExecutionId,  BlockingQueue<PartitionDataWrapper> analyzerQueue) {
-
-		String methodName = "getStepController";
-
-		if(logger.isLoggable(Level.FINER)) { logger.logp (Level.FINER, CLASSNAME, methodName, "Get StepController for", step.getId());}
-
-		Partition partition = step.getPartition();
-		if (partition != null) {
-
-			if (partition.getMapper() != null ) {
-				if (logger.isLoggable(Level.FINER)) {
-					logger.logp(Level.FINER, CLASSNAME, methodName, "Found partitioned step with mapper" , step);
-				}
-				return new PartitionedStepControllerImpl(jobExecutionImpl, step, stepContext, rootJobExecutionId);
-			}
-
-			if (partition.getPlan() != null) {
-				if (partition.getPlan().getPartitions() != null) {
-					if (logger.isLoggable(Level.FINER)) {
-						logger.logp(Level.FINER, CLASSNAME, methodName, "Found partitioned step with plan", step);
-					}
-					return new PartitionedStepControllerImpl(jobExecutionImpl, step, stepContext, rootJobExecutionId);
-				}
-			}
-		}
-
-		Batchlet batchlet = step.getBatchlet();
-		if (batchlet != null) {
-			if(logger.isLoggable(Level.FINER)) {  
-				logger.finer("Found batchlet: " + batchlet + ", with ref= " + batchlet.getRef());
-			}
-			if (step.getChunk() != null) {
-				throw new IllegalArgumentException("Step contains both a batchlet and a chunk.  Aborting.");
-			}       
-			return new BatchletStepControllerImpl(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerQueue);
-		} else {
-			Chunk chunk = step.getChunk();
-			if(logger.isLoggable(Level.FINER)) {  
-				logger.finer("Found chunk: " + chunk);
-			}
-			if (chunk == null) {
-				throw new IllegalArgumentException("Step does not contain either a batchlet or a chunk.  Aborting.");
-			}
-			return new ChunkStepControllerImpl(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerQueue);
-		}           
-	} 
-
-	public static DecisionControllerImpl getDecisionController(RuntimeJobExecution jobExecutionImpl, Decision decision) {
-		return new DecisionControllerImpl(jobExecutionImpl, decision);
-	} 
-	
-	public static FlowControllerImpl getFlowController(RuntimeJobExecution jobExecutionImpl, Flow flow, long rootJobExecutionId) {
-		return new FlowControllerImpl(jobExecutionImpl, flow, rootJobExecutionId);
-	} 
-	
-	public static SplitControllerImpl getSplitController(RuntimeJobExecution jobExecutionImpl, Split split, long rootJobExecutionId) {
-		return new SplitControllerImpl(jobExecutionImpl, split, rootJobExecutionId);
-	}  
-}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ExecutionTransitioner.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ExecutionTransitioner.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ExecutionTransitioner.java
deleted file mode 100755
index 665f6e4..0000000
--- a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ExecutionTransitioner.java
+++ /dev/null
@@ -1,275 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- * 
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License, 
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.jbatch.container.impl;
-
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.logging.Logger;
-
-import javax.batch.runtime.BatchStatus;
-
-import com.ibm.jbatch.container.IController;
-import com.ibm.jbatch.container.IExecutionElementController;
-import com.ibm.jbatch.container.context.impl.JobContextImpl;
-import com.ibm.jbatch.container.context.impl.StepContextImpl;
-import com.ibm.jbatch.container.exception.BatchContainerRuntimeException;
-import com.ibm.jbatch.container.jobinstance.RuntimeJobExecution;
-import com.ibm.jbatch.container.jsl.ExecutionElement;
-import com.ibm.jbatch.container.jsl.IllegalTransitionException;
-import com.ibm.jbatch.container.jsl.Transition;
-import com.ibm.jbatch.container.jsl.TransitionElement;
-import com.ibm.jbatch.container.navigator.ModelNavigator;
-import com.ibm.jbatch.container.status.ExtendedBatchStatus;
-import com.ibm.jbatch.container.status.ExecutionStatus;
-import com.ibm.jbatch.container.util.PartitionDataWrapper;
-import com.ibm.jbatch.jsl.model.Decision;
-import com.ibm.jbatch.jsl.model.End;
-import com.ibm.jbatch.jsl.model.Fail;
-import com.ibm.jbatch.jsl.model.Flow;
-import com.ibm.jbatch.jsl.model.JSLJob;
-import com.ibm.jbatch.jsl.model.Split;
-import com.ibm.jbatch.jsl.model.Step;
-import com.ibm.jbatch.jsl.model.Stop;
-
-public class ExecutionTransitioner {
-
-	private final static String CLASSNAME = ExecutionTransitioner.class.getName();
-	private final static Logger logger = Logger.getLogger(CLASSNAME);
-
-	private RuntimeJobExecution jobExecution;
-	private long rootJobExecutionId;
-	private ModelNavigator<?> modelNavigator;
-	
-	// 'volatile' since it receives stop on separate thread.
-	private volatile IExecutionElementController currentStoppableElementController;
-	private IExecutionElementController previousElementController;
-	private ExecutionElement currentExecutionElement = null;
-	private ExecutionElement previousExecutionElement = null;
-
-	
-	private JobContextImpl jobContext;
-	private BlockingQueue<PartitionDataWrapper> analyzerQueue = null;
-	
-	private List<Long> stepExecIds;
-	
-	public ExecutionTransitioner(RuntimeJobExecution jobExecution, long rootJobExecutionId, ModelNavigator<?> modelNavigator) {
-		this.jobExecution = jobExecution;
-		this.rootJobExecutionId = rootJobExecutionId;
-		this.modelNavigator = modelNavigator;
-		this.jobContext = jobExecution.getJobContext(); 
-	}
-	
-	public ExecutionTransitioner(RuntimeJobExecution jobExecution, long rootJobExecutionId, ModelNavigator<JSLJob> jobNavigator, BlockingQueue<PartitionDataWrapper> analyzerQueue) {
-		this.jobExecution = jobExecution;
-		this.rootJobExecutionId = rootJobExecutionId;
-		this.modelNavigator = jobNavigator;
-		this.jobContext = jobExecution.getJobContext(); 
-		this.analyzerQueue = analyzerQueue;
-	}
-	
-	/**
-	 * Used for job and flow.
-	 * @return
-	 */
-	public ExecutionStatus doExecutionLoop() {
-
-		final String methodName = "doExecutionLoop";
-		
-		try {
-			currentExecutionElement = modelNavigator.getFirstExecutionElement(jobExecution.getRestartOn());
-		} catch (IllegalTransitionException e) {
-			String errorMsg = "Could not transition to first execution element within job.";
-			logger.warning(errorMsg);
-			throw new IllegalArgumentException(errorMsg, e);
-		}
-
-		logger.fine("First execution element = " + currentExecutionElement.getId());
-
-		while (true) {
-
-			if (jobContext.getBatchStatus().equals(BatchStatus.STOPPING)) {
-				logger.fine(methodName + " Exiting execution loop as job is now in stopping state.");
-				return new ExecutionStatus(ExtendedBatchStatus.JOB_OPERATOR_STOPPING);
-			}
-			
-			IExecutionElementController currentElementController = getNextElementController();
-			currentStoppableElementController = currentElementController;
-			
-			ExecutionStatus status = currentElementController.execute();
-
-			// Nothing special for decision or step except to get exit status.  For flow and split we want to bubble up though.
-			if ((currentExecutionElement instanceof Split) || (currentExecutionElement instanceof Flow)) {
-				// Exit status and restartOn should both be in the job context.
-				if (!status.getExtendedBatchStatus().equals(ExtendedBatchStatus.NORMAL_COMPLETION)) {
-					logger.fine("Breaking out of loop with return status = " + status.getExtendedBatchStatus().name());
-					return status;
-				}
-			} 
-
-			// Seems like this should only happen if an Error is thrown at the step level, since normally a step-level
-			// exception is caught and the fact that it was thrown capture in the ExecutionStatus
-			if (jobContext.getBatchStatus().equals(BatchStatus.FAILED)) {
-				String errorMsg = "Sub-execution returned its own BatchStatus of FAILED.  Deal with this by throwing exception to the next layer.";
-				logger.warning(errorMsg);
-				throw new BatchContainerRuntimeException(errorMsg);
-			}
-
-			// set the execution element controller to null so we don't try to call stop on it after the element has finished executing
-			currentStoppableElementController = null;
-			
-			logger.fine("Done executing element=" + currentExecutionElement.getId() + ", exitStatus=" + status.getExitStatus());
-
-			if (jobContext.getBatchStatus().equals(BatchStatus.STOPPING)) {
-				logger.fine(methodName + " Exiting as job has been stopped");
-				return new ExecutionStatus(ExtendedBatchStatus.JOB_OPERATOR_STOPPING);
-			}
-
-			Transition nextTransition = null;
-			try {
-				nextTransition = modelNavigator.getNextTransition(currentExecutionElement, status);
-			} catch (IllegalTransitionException e) {
-				String errorMsg = "Problem transitioning to next execution element.";
-				logger.warning(errorMsg);
-				throw new BatchContainerRuntimeException(errorMsg, e);
-			} 
-
-			//
-			// We will find ourselves in one of four states now.  
-			// 
-			// 1. Finished transitioning after a normal execution, but nothing to do 'next'.
-			// 2. We just executed a step which through an exception, but didn't match a transition element.
-			// 3. We are going to 'next' to another execution element (and jump back to the top of this '
-			//    'while'-loop.
-			// 4. We matched a terminating transition element (<end>, <stop> or <fail).
-			//
-			
-			// 1.
-			if (nextTransition.isFinishedTransitioning()) {
-				logger.fine(methodName + "No next execution element, and no transition element found either.  Looks like we're done and ready for COMPLETED state.");
-				this.stepExecIds =  currentElementController.getLastRunStepExecutions();
-				// Consider just passing the last 'status' back, but let's unwrap the exit status and pass a new NORMAL_COMPLETION
-				// status back instead.
-				return new ExecutionStatus(ExtendedBatchStatus.NORMAL_COMPLETION, status.getExitStatus());
-			// 2.
-			} else if (nextTransition.noTransitionElementMatchedAfterException()) {
-				return new ExecutionStatus(ExtendedBatchStatus.EXCEPTION_THROWN, status.getExitStatus());
-			// 3.
-			} else if (nextTransition.getNextExecutionElement() != null) {
-				// hold on to the previous execution element for the decider
-				// we need it because we need to inject the context of the
-				// previous execution element into the decider
-				previousExecutionElement = currentExecutionElement;
-				previousElementController = currentElementController;
-				currentExecutionElement = nextTransition.getNextExecutionElement();
-			// 4.
-			} else if (nextTransition.getTransitionElement() != null) {
-				ExecutionStatus terminatingStatus = handleTerminatingTransitionElement(nextTransition.getTransitionElement());
-				logger.finer(methodName + " , Breaking out of execution loop after processing terminating transition element.");
-				return terminatingStatus;
-			} else {
-				throw new IllegalStateException("Not sure how we'd end up in this state...aborting rather than looping.");
-			}
-		}
-	}
-
-	
-	private IExecutionElementController getNextElementController() {
-		IExecutionElementController elementController =null;
-
-		if (currentExecutionElement instanceof Decision) {
-			Decision decision = (Decision)currentExecutionElement;
-			elementController = ExecutionElementControllerFactory.getDecisionController(jobExecution, decision);			
-			DecisionControllerImpl decisionController = (DecisionControllerImpl)elementController;
-			decisionController.setPreviousStepExecutions(previousExecutionElement, previousElementController);
-		} else if (currentExecutionElement instanceof Flow) {
-			Flow flow = (Flow)currentExecutionElement;
-			elementController = ExecutionElementControllerFactory.getFlowController(jobExecution, flow, rootJobExecutionId);
-		} else if (currentExecutionElement instanceof Split) {
-			Split split = (Split)currentExecutionElement;
-			elementController = ExecutionElementControllerFactory.getSplitController(jobExecution, split, rootJobExecutionId);
-		} else if (currentExecutionElement instanceof Step) {
-			Step step = (Step)currentExecutionElement;
-			StepContextImpl stepContext = new StepContextImpl(step.getId());
-			elementController = ExecutionElementControllerFactory.getStepController(jobExecution, step, stepContext, rootJobExecutionId, analyzerQueue);
-		}
-		logger.fine("Next execution element controller = " + elementController);
-		return elementController;
-	}
-			
-			
-	private ExecutionStatus handleTerminatingTransitionElement(TransitionElement transitionElement) {
-
-		ExecutionStatus retVal;
-		
-		logger.fine("Found terminating transition element (stop, end, or fail).");
-
-		if (transitionElement instanceof Stop) {
-
-			Stop stopElement = (Stop)transitionElement;
-			String restartOn = stopElement.getRestart();
-			String exitStatusFromJSL = stopElement.getExitStatus();
-			logger.fine("Next transition element is a <stop> : " + transitionElement + " with restartOn=" + restartOn + 
-					" , and JSL exit status = " + exitStatusFromJSL);
-
-			retVal = new ExecutionStatus(ExtendedBatchStatus.JSL_STOP);
-			
-			if (exitStatusFromJSL != null) {
-				jobContext.setExitStatus(exitStatusFromJSL);  
-				retVal.setExitStatus(exitStatusFromJSL);  
-			}
-			if (restartOn != null) {
-				jobContext.setRestartOn(restartOn);				
-				retVal.setRestartOn(restartOn);				
-			}
-		} else if (transitionElement instanceof End) {
-
-			End endElement = (End)transitionElement;
-			String exitStatusFromJSL = endElement.getExitStatus();
-			logger.fine("Next transition element is an <end> : " + transitionElement + 
-					" with JSL exit status = " + exitStatusFromJSL);
-			retVal = new ExecutionStatus(ExtendedBatchStatus.JSL_END);
-			if (exitStatusFromJSL != null) {
-				jobContext.setExitStatus(exitStatusFromJSL);  
-				retVal.setExitStatus(exitStatusFromJSL);  
-			}
-		} else if (transitionElement instanceof Fail) {
-
-			Fail failElement = (Fail)transitionElement;
-			String exitStatusFromJSL = failElement.getExitStatus();
-			logger.fine("Next transition element is a <fail> : " + transitionElement + 
-					" with JSL exit status = " + exitStatusFromJSL);
-			retVal = new ExecutionStatus(ExtendedBatchStatus.JSL_FAIL);
-			if (exitStatusFromJSL != null) {
-				jobContext.setExitStatus(exitStatusFromJSL);  
-				retVal.setExitStatus(exitStatusFromJSL);  
-			}
-		} else {
-			throw new IllegalStateException("Not sure how we'd get here...aborting.");
-		}
-		return retVal;
-	}
-
-	public IController getCurrentStoppableElementController() {
-		return currentStoppableElementController;
-	}
-
-    public List<Long> getStepExecIds() {
-        return stepExecIds;
-    }
-
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/JSR352.Runtime/src/com/ibm/jbatch/container/impl/FlowControllerImpl.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/FlowControllerImpl.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/FlowControllerImpl.java
deleted file mode 100755
index 255d871..0000000
--- a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/FlowControllerImpl.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- * 
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License, 
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.jbatch.container.impl;
-
-import java.util.List;
-import java.util.logging.Logger;
-
-import javax.batch.runtime.BatchStatus;
-
-import com.ibm.jbatch.container.IController;
-import com.ibm.jbatch.container.IExecutionElementController;
-import com.ibm.jbatch.container.context.impl.JobContextImpl;
-import com.ibm.jbatch.container.jobinstance.RuntimeJobExecution;
-import com.ibm.jbatch.container.navigator.ModelNavigator;
-import com.ibm.jbatch.container.navigator.NavigatorFactory;
-import com.ibm.jbatch.container.services.IPersistenceManagerService;
-import com.ibm.jbatch.container.servicesmanager.ServicesManagerImpl;
-import com.ibm.jbatch.container.status.ExtendedBatchStatus;
-import com.ibm.jbatch.container.status.ExecutionStatus;
-import com.ibm.jbatch.jsl.model.Flow;
-
-public class FlowControllerImpl implements IExecutionElementController {
-
-	private final static String CLASSNAME = FlowControllerImpl.class.getName();
-	private final static Logger logger = Logger.getLogger(CLASSNAME);
-
-	private final RuntimeJobExecution jobExecution;
-	private final JobContextImpl jobContext;
-
-	protected ModelNavigator<Flow> flowNavigator;
-
-	protected Flow flow;
-	private long rootJobExecutionId;
-
-	private ExecutionTransitioner transitioner;
-
-	//
-	// The currently executing controller, this will only be set to the 
-	// local variable reference when we are ready to accept stop events for
-	// this execution.
-	private volatile IController currentStoppableElementController = null;
-	
-    private static IPersistenceManagerService _persistenceManagementService = ServicesManagerImpl.getInstance().getPersistenceManagerService();
-
-
-	public FlowControllerImpl(RuntimeJobExecution jobExecution, Flow flow, long rootJobExecutionId) {
-		this.jobExecution = jobExecution;
-		this.jobContext = jobExecution.getJobContext();
-		this.flowNavigator = NavigatorFactory.createFlowNavigator(flow);
-		this.flow = flow;
-		this.rootJobExecutionId = rootJobExecutionId;
-	}
-
-	@Override
-	public ExecutionStatus execute() {
-		if (!jobContext.getBatchStatus().equals(BatchStatus.STOPPING)) { 
-			transitioner = new ExecutionTransitioner(jobExecution, rootJobExecutionId, flowNavigator);
-			return transitioner.doExecutionLoop();
-		} else {
-			return new ExecutionStatus(ExtendedBatchStatus.JOB_OPERATOR_STOPPING);
-		}
-	}
-
-
-	@Override
-	public void stop() { 
-		// Since this is not a top-level controller, don't try to filter based on existing status.. just pass
-		// along the stop().
-		IController stoppableElementController = transitioner.getCurrentStoppableElementController();
-		if (stoppableElementController != null) {
-			stoppableElementController.stop();
-		}
-	}
-
-    @Override
-    public List<Long> getLastRunStepExecutions() {
-        return this.transitioner.getStepExecIds();
-    }
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/JSR352.Runtime/src/com/ibm/jbatch/container/impl/FlowInSplitThreadRootControllerImpl.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/FlowInSplitThreadRootControllerImpl.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/FlowInSplitThreadRootControllerImpl.java
deleted file mode 100755
index f680586..0000000
--- a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/FlowInSplitThreadRootControllerImpl.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- * 
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License, 
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.jbatch.container.impl;
-
-import com.ibm.jbatch.container.jobinstance.RuntimeFlowInSplitExecution;
-import com.ibm.jbatch.container.status.ExtendedBatchStatus;
-import com.ibm.jbatch.container.status.ExecutionStatus;
-import com.ibm.jbatch.container.util.FlowInSplitBuilderConfig;
-
-public class FlowInSplitThreadRootControllerImpl extends JobThreadRootControllerImpl {
-
-	private final static String CLASSNAME = FlowInSplitThreadRootControllerImpl.class.getName();
-	
-	// Careful, we have a separately named reference to the same object in the parent class
-	RuntimeFlowInSplitExecution flowInSplitExecution;
-	
-	public FlowInSplitThreadRootControllerImpl(RuntimeFlowInSplitExecution flowInSplitExecution, FlowInSplitBuilderConfig config) {
-		super(flowInSplitExecution, config.getRootJobExecutionId());
-		this.flowInSplitExecution = flowInSplitExecution;
-	}
-	
-	@Override
-	/**
-	 * Not only are we setting the status correctly at the subjob level, we are also setting it on the execution
-	 * so that it is visible by the parent split.
-	 */
-	public ExecutionStatus originateExecutionOnThread() {
-		ExecutionStatus status = super.originateExecutionOnThread();
-		flowInSplitExecution.setFlowStatus(status);
-		return status;
-	}
-	
-	@Override 
-	protected void batchStatusFailedFromException() {
-		super.batchStatusFailedFromException();
-		flowInSplitExecution.getFlowStatus().setExtendedBatchStatus(ExtendedBatchStatus.EXCEPTION_THROWN);
-	}
-	
-}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/JSR352.Runtime/src/com/ibm/jbatch/container/impl/JobControllerImpl.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/JobControllerImpl.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/JobControllerImpl.java
deleted file mode 100755
index 75e1332..0000000
--- a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/JobControllerImpl.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- * 
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License, 
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.jbatch.container.impl;
-
-import com.ibm.jbatch.container.jobinstance.RuntimeJobExecution;
-
-public class JobControllerImpl extends JobThreadRootControllerImpl {
-
-	private final static String CLASSNAME = JobControllerImpl.class.getName();
-	
-	private JobControllerImpl(RuntimeJobExecution jobExecution, long rootJobExecutionId) {
-		super(jobExecution, rootJobExecutionId);
-	}
-
-	public JobControllerImpl(RuntimeJobExecution jobExecution) {
-		this(jobExecution, jobExecution.getExecutionId());
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/JSR352.Runtime/src/com/ibm/jbatch/container/impl/JobThreadRootControllerImpl.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/JobThreadRootControllerImpl.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/JobThreadRootControllerImpl.java
deleted file mode 100755
index 3e30ab2..0000000
--- a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/JobThreadRootControllerImpl.java
+++ /dev/null
@@ -1,328 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- * 
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License, 
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.jbatch.container.impl;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.sql.Timestamp;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.batch.runtime.BatchStatus;
-
-import com.ibm.jbatch.container.IController;
-import com.ibm.jbatch.container.IThreadRootController;
-import com.ibm.jbatch.container.artifact.proxy.InjectionReferences;
-import com.ibm.jbatch.container.artifact.proxy.JobListenerProxy;
-import com.ibm.jbatch.container.artifact.proxy.ListenerFactory;
-import com.ibm.jbatch.container.context.impl.JobContextImpl;
-import com.ibm.jbatch.container.jobinstance.RuntimeJobExecution;
-import com.ibm.jbatch.container.navigator.ModelNavigator;
-import com.ibm.jbatch.container.services.IJobStatusManagerService;
-import com.ibm.jbatch.container.services.IPersistenceManagerService;
-import com.ibm.jbatch.container.servicesmanager.ServicesManagerImpl;
-import com.ibm.jbatch.container.status.ExtendedBatchStatus;
-import com.ibm.jbatch.container.status.ExecutionStatus;
-import com.ibm.jbatch.container.util.PartitionDataWrapper;
-import com.ibm.jbatch.jsl.model.JSLJob;
-import com.ibm.jbatch.jsl.model.JSLProperties;
-import com.ibm.jbatch.jsl.model.Property;
-
-public abstract class JobThreadRootControllerImpl implements IThreadRootController {
-
-	private final static String CLASSNAME = JobThreadRootControllerImpl.class.getName();
-	private final static Logger logger = Logger.getLogger(CLASSNAME);
-
-	protected RuntimeJobExecution jobExecution;
-	protected JobContextImpl jobContext;
-	protected long rootJobExecutionId;
-	protected long jobInstanceId;
-	protected IJobStatusManagerService jobStatusService;
-	protected IPersistenceManagerService persistenceService;
-	private ListenerFactory listenerFactory = null;
-
-	private ExecutionTransitioner transitioner; 
-	protected final ModelNavigator<JSLJob> jobNavigator;
-	private BlockingQueue<PartitionDataWrapper> analyzerQueue;
-
-	public JobThreadRootControllerImpl(RuntimeJobExecution jobExecution, long rootJobExecutionId) {
-		this.jobExecution = jobExecution;
-		this.jobContext = jobExecution.getJobContext();
-		this.rootJobExecutionId = rootJobExecutionId;
-		this.jobInstanceId = jobExecution.getInstanceId();
-		this.jobStatusService = ServicesManagerImpl.getInstance().getJobStatusManagerService();
-		this.persistenceService = ServicesManagerImpl.getInstance().getPersistenceManagerService();
-		this.jobNavigator = jobExecution.getJobNavigator();
-		setupListeners();
-	}
-
-	public JobThreadRootControllerImpl(RuntimeJobExecution jobExecution, long rootJobExecutionId, BlockingQueue<PartitionDataWrapper> analyzerQueue) {
-		this(jobExecution, rootJobExecutionId);
-		this.analyzerQueue = analyzerQueue;
-	}
-	
-	/*
-	 * By not passing the rootJobExecutionId, we are "orphaning" the subjob execution and making it not findable from the parent.  
-	 * This is exactly what we want for getStepExecutions()... we don't want it to get extraneous entries for the partitions.   
-	 */
-	public JobThreadRootControllerImpl(RuntimeJobExecution jobExecution, BlockingQueue<PartitionDataWrapper> analyzerQueue) {
-		this(jobExecution, jobExecution.getExecutionId());
-		this.analyzerQueue = analyzerQueue;
-	}
-
-	@Override
-	public ExecutionStatus originateExecutionOnThread() {
-		String methodName = "executeJob";
-		logger.entering(CLASSNAME, methodName);
-
-		ExecutionStatus retVal = null;
-		try {
-			// Check if we've already gotten the stop() command.
-			if (!jobContext.getBatchStatus().equals(BatchStatus.STOPPING)) { 
-
-				// Now that we're ready to start invoking artifacts, set the status to 'STARTED'
-				markJobStarted();
-
-				jobListenersBeforeJob();
-
-				// --------------------
-				// The BIG loop transitioning 
-				// within the job !!!
-				// --------------------
-				transitioner = new ExecutionTransitioner(jobExecution, rootJobExecutionId, jobNavigator, analyzerQueue);
-				retVal = transitioner.doExecutionLoop();
-				ExtendedBatchStatus extBatchStatus = retVal.getExtendedBatchStatus();
-				switch (extBatchStatus)  {
-					case JSL_STOP : 		jslStop();
-											break;
-					case JSL_FAIL : 		updateJobBatchStatus(BatchStatus.FAILED);
-											break;
-					case EXCEPTION_THROWN : updateJobBatchStatus(BatchStatus.FAILED);
-											break;
-				}
-			}
-		} catch (Throwable t) {
-			// We still want to try to call the afterJob() listener and persist the batch and exit
-			// status for the failure in an orderly fashion.  So catch and continue.
-			logWarning("Caught throwable in main execution loop", t);
-			batchStatusFailedFromException();
-		}
-
-		endOfJob();
-
-		logger.exiting(CLASSNAME, methodName);
-		return retVal;
-	}
-
-	protected void setContextProperties() {
-		JSLJob jobModel = jobExecution.getJobNavigator().getRootModelElement();
-		JSLProperties jslProps = jobModel.getProperties();
-
-		if (jslProps != null) {
-			Properties contextProps = jobContext.getProperties();
-			for (Property property : jslProps.getPropertyList()) {
-				contextProps.setProperty(property.getName(), property.getValue());
-			}	
-		}
-	}
-
-	protected void jslStop() {
-		String restartOn = jobContext.getRestartOn();	
-		logger.fine("Logging JSL stop(): exitStatus = " + jobContext.getExitStatus() + ", restartOn = " +restartOn );
-		batchStatusStopping();
-		jobStatusService.updateJobStatusFromJSLStop(jobInstanceId, restartOn);
-		return;
-	}
-
-	protected void markJobStarted() {
-		updateJobBatchStatus(BatchStatus.STARTED);
-		long time = System.currentTimeMillis();
-		Timestamp timestamp = new Timestamp(time);
-		jobExecution.setLastUpdateTime(timestamp);
-		jobExecution.setStartTime(timestamp);
-		persistenceService.markJobStarted(jobExecution.getExecutionId(), timestamp);
-	}
-
-	/*
-	 *  Follow similar pattern for end of step in BaseStepControllerImpl
-	 *  
-	 *  1. Execute the very last artifacts (jobListener)
-	 *  2. transition to final batch status
-	 *  3. default ExitStatus if necessary
-	 *  4. persist statuses and end time data
-	 *  
-	 *  We don't want to give up on the orderly process of 2,3,4, if we blow up 
-	 *  in after job, so catch that and keep on going.
-	 */
-	protected void endOfJob() {
-
-
-		// 1. Execute the very last artifacts (jobListener)
-		try {
-			jobListenersAfterJob();
-		} catch (Throwable t) {
-			StringWriter sw = new StringWriter();
-			PrintWriter pw = new PrintWriter(sw);
-			t.printStackTrace(pw);
-			logger.warning("Error invoking jobListener.afterJob(). Stack trace: " + sw.toString());
-			batchStatusFailedFromException();
-		}
-
-		// 2. transition to final batch status
-		transitionToFinalBatchStatus();
-
-		// 3. default ExitStatus if necessary
-		if (jobContext.getExitStatus() == null) {
-			logger.fine("No job-level exitStatus set, defaulting to job batch Status = " + jobContext.getBatchStatus());
-			jobContext.setExitStatus(jobContext.getBatchStatus().name());
-		}
-
-		// 4. persist statuses and end time data
-		logger.fine("Job complete for job id=" + jobExecution.getJobInstance().getJobName() + ", executionId=" + jobExecution.getExecutionId() 
-				+ ", batchStatus=" + jobContext.getBatchStatus() + ", exitStatus=" + jobContext.getExitStatus());
-		persistJobBatchAndExitStatus();
-
-	}
-
-	private void persistJobBatchAndExitStatus() {
-		BatchStatus batchStatus = jobContext.getBatchStatus();
-
-		// Take a current timestamp for last updated no matter what the status.
-		long time = System.currentTimeMillis();
-		Timestamp timestamp = new Timestamp(time);
-		jobExecution.setLastUpdateTime(timestamp);
-
-		// Perhaps these should be coordinated in a tran but probably better still would be
-		// rethinking the table design to let the database provide us consistently with a single update.
-		jobStatusService.updateJobBatchStatus(jobInstanceId, batchStatus);
-		jobStatusService.updateJobExecutionStatus(jobExecution.getInstanceId(), jobContext.getBatchStatus(), jobContext.getExitStatus());
-
-		if (batchStatus.equals(BatchStatus.COMPLETED) || batchStatus.equals(BatchStatus.STOPPED) ||  
-				batchStatus.equals(BatchStatus.FAILED)) {
-
-			jobExecution.setEndTime(timestamp);
-			persistenceService.updateWithFinalExecutionStatusesAndTimestamps(jobExecution.getExecutionId(), 
-					batchStatus, jobContext.getExitStatus(), timestamp);
-		} else {
-			throw new IllegalStateException("Not expected to encounter batchStatus of " + batchStatus +" at this point.  Aborting.");
-		}
-	}
-
-	/**
-	 * The only valid states at this point are STARTED or STOPPING.   Shouldn't have
-	 * been able to get to COMPLETED, STOPPED, or FAILED at this point in the code.
-	 */
-
-	private void transitionToFinalBatchStatus() {
-		BatchStatus currentBatchStatus = jobContext.getBatchStatus();
-		if (currentBatchStatus.equals(BatchStatus.STARTED)) {
-			updateJobBatchStatus(BatchStatus.COMPLETED);
-		} else if (currentBatchStatus.equals(BatchStatus.STOPPING)) {
-			updateJobBatchStatus(BatchStatus.STOPPED);
-		} else if (currentBatchStatus.equals(BatchStatus.FAILED)) {
-			updateJobBatchStatus(BatchStatus.FAILED);  // Should have already been done but maybe better for possible code refactoring to have it here.
-		} else {
-			throw new IllegalStateException("Step batch status should not be in a " + currentBatchStatus.name() + " state");
-		}
-	}
-
-	protected void updateJobBatchStatus(BatchStatus batchStatus) {
-		logger.fine("Setting job batch status to: " + batchStatus);
-		jobContext.setBatchStatus(batchStatus);
-	}
-
-
-	protected void logWarning(String msg, Throwable t) {
-		StringWriter sw = new StringWriter();
-		t.printStackTrace(new PrintWriter(sw));
-		logger.warning(msg + " with Throwable message: " + t.getMessage() + ", and stack trace: " + sw.toString());
-	}
-
-	/*
-	 * The thought here is that while we don't persist all the transitions in batch status (given
-	 * we plan to persist at the very end), we do persist STOPPING right away, since if we end up
-	 * "stuck in STOPPING" we at least will have a record in the database.
-	 */
-	protected void batchStatusStopping() {
-		updateJobBatchStatus(BatchStatus.STOPPING);
-		long time = System.currentTimeMillis();
-		Timestamp timestamp = new Timestamp(time);
-		jobExecution.setLastUpdateTime(timestamp);
-		persistenceService.updateBatchStatusOnly(jobExecution.getExecutionId(), BatchStatus.STOPPING, timestamp);
-	}
-
-
-
-	private void setupListeners() {
-		JSLJob jobModel = jobExecution.getJobNavigator().getRootModelElement();   
-		InjectionReferences injectionRef = new InjectionReferences(jobContext, null, null);
-		listenerFactory = new ListenerFactory(jobModel, injectionRef);
-		jobExecution.setListenerFactory(listenerFactory);
-	}
-
-
-	@Override
-	public void stop() {
-		if (jobContext.getBatchStatus().equals(BatchStatus.STARTING) || jobContext.getBatchStatus().equals(BatchStatus.STARTED)) {
-
-			batchStatusStopping();
-
-			IController stoppableElementController = transitioner.getCurrentStoppableElementController();
-			if (stoppableElementController != null) {
-				stoppableElementController.stop();
-			}
-		} else {
-			logger.info("Stop ignored since batch status for job is already set to: " + jobContext.getBatchStatus());
-		}
-	}
-
-	// Call beforeJob() on all the job listeners
-	protected void jobListenersBeforeJob() {
-		List<JobListenerProxy> jobListeners = listenerFactory.getJobListeners();
-		for (JobListenerProxy listenerProxy : jobListeners) {
-			if (logger.isLoggable(Level.FINE)) {
-				logger.fine("Invoking beforeJob() on jobListener: " + listenerProxy.getDelegate() + " of type: " + listenerProxy.getDelegate().getClass());
-			}
-			listenerProxy.beforeJob();
-		}
-	}
-
-	// Call afterJob() on all the job listeners
-	private void jobListenersAfterJob() {
-		List<JobListenerProxy> jobListeners = listenerFactory.getJobListeners();
-		for (JobListenerProxy listenerProxy : jobListeners) {
-			if (logger.isLoggable(Level.FINE)) {
-				logger.fine(" Invoking afterJob() on jobListener: " + listenerProxy.getDelegate() + " of type: " + listenerProxy.getDelegate().getClass());
-			}
-			listenerProxy.afterJob();
-		}	
-	}
-
-	protected void batchStatusFailedFromException() {
-		updateJobBatchStatus(BatchStatus.FAILED);
-	}
-	
-    @Override
-    public List<Long> getLastRunStepExecutions() {
-        
-        return this.transitioner.getStepExecIds();
-        
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/JSR352.Runtime/src/com/ibm/jbatch/container/impl/PartitionThreadRootControllerImpl.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/PartitionThreadRootControllerImpl.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/PartitionThreadRootControllerImpl.java
deleted file mode 100755
index 08d105b..0000000
--- a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/PartitionThreadRootControllerImpl.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- * 
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License, 
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.jbatch.container.impl;
-
-import com.ibm.jbatch.container.jobinstance.RuntimeJobExecution;
-import com.ibm.jbatch.container.util.PartitionsBuilderConfig;
-
-/**
- * Currently there's no special function on top of the subjob required of the partition.
- *
- */
-public class PartitionThreadRootControllerImpl extends JobThreadRootControllerImpl {
-
-	private final static String CLASSNAME = PartitionThreadRootControllerImpl.class.getName();
-	
-	public PartitionThreadRootControllerImpl(RuntimeJobExecution jobExecution, PartitionsBuilderConfig config) {
-		super(jobExecution, config.getAnalyzerQueue());
-	}
-
-}