You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@batchee.apache.org by rm...@apache.org on 2013/11/05 08:39:29 UTC

[52/62] importing batchee from github - a fork from the IBm RI

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/JSR352.Runtime/src/com/ibm/jbatch/container/impl/PartitionedStepBuilder.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/PartitionedStepBuilder.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/PartitionedStepBuilder.java
deleted file mode 100755
index 4a5f038..0000000
--- a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/PartitionedStepBuilder.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- * 
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License, 
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package com.ibm.jbatch.container.impl;
-
-import javax.batch.runtime.context.JobContext;
-
-import com.ibm.jbatch.container.jsl.CloneUtility;
-import com.ibm.jbatch.jsl.model.Flow;
-import com.ibm.jbatch.jsl.model.JSLJob;
-import com.ibm.jbatch.jsl.model.ObjectFactory;
-import com.ibm.jbatch.jsl.model.Partition;
-import com.ibm.jbatch.jsl.model.PartitionPlan;
-import com.ibm.jbatch.jsl.model.Split;
-import com.ibm.jbatch.jsl.model.Step;
-
-public class PartitionedStepBuilder {
-
-	public static final String JOB_ID_SEPARATOR = ":";  // Use something permissible in NCName to allow us to key off of.
-    /*
-     * Build a generated job with only one flow in it to submit to the
-     * BatchKernel. This is used to build subjobs from splits.
-     * 
-     */
-    public static JSLJob buildFlowInSplitSubJob(Long parentJobExecutionId, JobContext jobContext, Split split, Flow flow) {
-
-        ObjectFactory jslFactory = new ObjectFactory();
-        JSLJob subJob = jslFactory.createJSLJob();
-
-        // Set the generated subjob id
-        String subJobId = generateSubJobId(parentJobExecutionId, split.getId(), flow.getId());
-        subJob.setId(subJobId);
-        
-        
-        //Copy all properties from parent JobContext to flow threads
-        subJob.setProperties(CloneUtility.javaPropsTojslProperties(jobContext.getProperties()));
-        
-
-        //We don't need to do a deep copy here since each flow is already independent of all others, unlike in a partition
-        //where one step instance can be executed with different properties on multiple threads.
-
-        subJob.getExecutionElements().add(flow);
-
-        return subJob;
-    }
-	
-    /*
-     * Build a generated job with only one step in it to submit to the
-     * BatchKernel. This is used for partitioned steps.
-     * 
-     */
-    public static JSLJob buildPartitionSubJob(Long parentJobInstanceId, JobContext jobContext, Step step, int partitionInstance) {
-
-        ObjectFactory jslFactory = new ObjectFactory();
-        JSLJob subJob = jslFactory.createJSLJob();
-        
-
-        // Set the generated subjob id
-        String subJobId = generateSubJobId(parentJobInstanceId, step.getId(), partitionInstance);
-        subJob.setId(subJobId);
-        
-        
-        //Copy all properties from parent JobContext to partitioned step threads
-        subJob.setProperties(CloneUtility.javaPropsTojslProperties(jobContext.getProperties()));
-
-        // Add one step to job
-        Step newStep = jslFactory.createStep();
-        
-        //set id
-        newStep.setId(step.getId());
-
-        
-        /***
-         * deep copy all fields in a step
-         */
-        newStep.setAllowStartIfComplete(step.getAllowStartIfComplete());
-        
-        if (step.getBatchlet() != null){
-        	newStep.setBatchlet(CloneUtility.cloneBatchlet(step.getBatchlet()));
-        }
-        
-        if (step.getChunk() != null) {
-        	newStep.setChunk(CloneUtility.cloneChunk(step.getChunk()));
-        }
-        
-        // Do not copy next attribute and control elements.  Transitioning should ONLY
-        // take place on the main thread.
-        
-        //Do not add step listeners, only call them on parent thread.
-
-        //Add partition artifacts and set instances to 1 as the base case 
-        Partition partition = step.getPartition();
-        if (partition != null) {
-        	if (partition.getCollector() != null) {
-        		
-        		Partition basePartition = jslFactory.createPartition();
-        		
-        		PartitionPlan partitionPlan = jslFactory.createPartitionPlan();
-        		partitionPlan.setPartitions(null);
-        		basePartition.setPlan(partitionPlan);
-        		
-        		basePartition.setCollector(partition.getCollector());
-        		newStep.setPartition(basePartition);
-                	
-        	}
-        }
-        
-        newStep.setStartLimit(step.getStartLimit());
-        newStep.setProperties(CloneUtility.cloneJSLProperties(step.getProperties()));
-        
-        // Don't try to only clone based on type (e.g. ChunkListener vs. StepListener).
-        // We don't know the type at the model level, and a given artifact could implement more
-        // than one listener interface (e.g. ChunkListener AND StepListener).
-        newStep.setListeners(CloneUtility.cloneListeners(step.getListeners()));       
-        
-        //Add Step properties, need to be careful here to remember the right precedence
-        
-        subJob.getExecutionElements().add(newStep);
-
-
-        return subJob;
-    }
-
-    /**
-     * @param parentJobInstanceId
-     *            the execution id of the parent job
-     * @param splitId this is the split id where the flows are nested    
-     * @param flowId
-     *            this is the id of the partitioned control element, it can be a
-     *            step id or flow id
-     * @param partitionInstance
-     *            the instance number of the partitioned element
-     * @return a String of the form
-     *         <parentJobExecutionId>:<parentId>:<splitId>:<flowId>
-     */
-    private static String generateSubJobId(Long parentJobInstanceId, String splitId, String flowId) {
-
-        StringBuilder strBuilder = new StringBuilder(JOB_ID_SEPARATOR);
-        strBuilder.append(parentJobInstanceId.toString());
-        strBuilder.append(JOB_ID_SEPARATOR);
-        strBuilder.append(splitId);
-        strBuilder.append(JOB_ID_SEPARATOR);
-        strBuilder.append(flowId);
-
-        return strBuilder.toString();
-    }
-    
-    /**
-     * @param parentJobInstanceId
-     *            the execution id of the parent job
-     * @param stepId
-     *            this is the id of the partitioned control element, it can be a
-     *            step id or flow id
-     * @param partitionInstance
-     *            the instance number of the partitioned element
-     * @return a String of the form
-     *         <parentJobExecutionId>:<parentId>:<partitionInstance>
-     */
-    private static String generateSubJobId(Long parentJobInstanceId, String stepId, int partitionInstance) {
-
-        StringBuilder strBuilder = new StringBuilder(JOB_ID_SEPARATOR);
-        strBuilder.append(parentJobInstanceId.toString());
-        strBuilder.append(JOB_ID_SEPARATOR);
-        strBuilder.append(stepId);
-        strBuilder.append(JOB_ID_SEPARATOR);
-        strBuilder.append(partitionInstance);
-
-        return strBuilder.toString();
-    }
-    
-    
-
-    
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/JSR352.Runtime/src/com/ibm/jbatch/container/impl/PartitionedStepControllerImpl.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/PartitionedStepControllerImpl.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/PartitionedStepControllerImpl.java
deleted file mode 100755
index 1734b11..0000000
--- a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/PartitionedStepControllerImpl.java
+++ /dev/null
@@ -1,528 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- * 
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License, 
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.jbatch.container.impl;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.batch.api.partition.PartitionPlan;
-import javax.batch.api.partition.PartitionReducer.PartitionStatus;
-import javax.batch.operations.JobExecutionAlreadyCompleteException;
-import javax.batch.operations.JobExecutionNotMostRecentException;
-import javax.batch.operations.JobRestartException;
-import javax.batch.operations.JobStartException;
-import javax.batch.runtime.BatchStatus;
-
-import com.ibm.jbatch.container.artifact.proxy.InjectionReferences;
-import com.ibm.jbatch.container.artifact.proxy.PartitionAnalyzerProxy;
-import com.ibm.jbatch.container.artifact.proxy.PartitionMapperProxy;
-import com.ibm.jbatch.container.artifact.proxy.PartitionReducerProxy;
-import com.ibm.jbatch.container.artifact.proxy.ProxyFactory;
-import com.ibm.jbatch.container.artifact.proxy.StepListenerProxy;
-import com.ibm.jbatch.container.context.impl.StepContextImpl;
-import com.ibm.jbatch.container.exception.BatchContainerRuntimeException;
-import com.ibm.jbatch.container.exception.BatchContainerServiceException;
-import com.ibm.jbatch.container.jobinstance.RuntimeJobExecution;
-import com.ibm.jbatch.container.jsl.CloneUtility;
-import com.ibm.jbatch.container.util.BatchPartitionPlan;
-import com.ibm.jbatch.container.util.BatchPartitionWorkUnit;
-import com.ibm.jbatch.container.util.BatchWorkUnit;
-import com.ibm.jbatch.container.util.PartitionDataWrapper;
-import com.ibm.jbatch.container.util.PartitionDataWrapper.PartitionEventType;
-import com.ibm.jbatch.container.util.PartitionsBuilderConfig;
-import com.ibm.jbatch.container.validation.ArtifactValidationException;
-import com.ibm.jbatch.jsl.model.Analyzer;
-import com.ibm.jbatch.jsl.model.JSLJob;
-import com.ibm.jbatch.jsl.model.JSLProperties;
-import com.ibm.jbatch.jsl.model.PartitionMapper;
-import com.ibm.jbatch.jsl.model.PartitionReducer;
-import com.ibm.jbatch.jsl.model.Property;
-import com.ibm.jbatch.jsl.model.Step;
-
-public class PartitionedStepControllerImpl extends BaseStepControllerImpl {
-
-	private final static String sourceClass = PartitionedStepControllerImpl.class.getName();
-	private final static Logger logger = Logger.getLogger(sourceClass);
-
-	private static final int DEFAULT_PARTITION_INSTANCES = 1;
-	private static final int DEFAULT_THREADS = 0; //0 means default to number of instances
-
-	private PartitionPlan plan = null;
-
-	private int partitions = DEFAULT_PARTITION_INSTANCES;
-	private int threads = DEFAULT_THREADS;
-
-	private Properties[] partitionProperties = null;
-
-	private volatile List<BatchPartitionWorkUnit> parallelBatchWorkUnits;
-
-	private PartitionReducerProxy partitionReducerProxy = null;
-
-	// On invocation this will be re-primed to reflect already-completed partitions from a previous execution.
-	int numPreviouslyCompleted = 0;
-
-	private PartitionAnalyzerProxy analyzerProxy = null;
-
-	final List<JSLJob> subJobs = new ArrayList<JSLJob>();
-	protected List<StepListenerProxy> stepListeners = null;
-
-	List<BatchPartitionWorkUnit> completedWork = new ArrayList<BatchPartitionWorkUnit>();
-	
-	BlockingQueue<BatchPartitionWorkUnit> completedWorkQueue = null;
-
-	protected PartitionedStepControllerImpl(final RuntimeJobExecution jobExecutionImpl, final Step step, StepContextImpl stepContext, long rootJobExecutionId) {
-		super(jobExecutionImpl, step, stepContext, rootJobExecutionId);
-	}
-
-	@Override
-	public void stop() {
-
-		updateBatchStatus(BatchStatus.STOPPING);
-
-		// It's possible we may try to stop a partitioned step before any
-		// sub steps have been started.
-		synchronized (subJobs) {
-
-			if (parallelBatchWorkUnits != null) {
-				for (BatchWorkUnit subJob : parallelBatchWorkUnits) {
-					try {
-						batchKernel.stopJob(subJob.getJobExecutionImpl().getExecutionId());
-					} catch (Exception e) {
-						// TODO - Is this what we want to know.  
-						// Blow up if it happens to force the issue.
-						throw new IllegalStateException(e);
-					}
-				}
-			}
-		}
-	}
-
-	private PartitionPlan generatePartitionPlan() {
-		// Determine the number of partitions
-
-
-		PartitionPlan plan = null;
-		Integer previousNumPartitions = null;
-		final PartitionMapper partitionMapper = step.getPartition().getMapper();
-
-		//from persisted plan from previous run
-		if (stepStatus.getNumPartitions() != null) {
-			previousNumPartitions = stepStatus.getNumPartitions();
-		}
-
-		if (partitionMapper != null) { //from partition mapper
-
-			PartitionMapperProxy partitionMapperProxy;
-
-			final List<Property> propertyList = partitionMapper.getProperties() == null ? null
-					: partitionMapper.getProperties().getPropertyList();
-
-			// Set all the contexts associated with this controller.
-			// Some of them may be null
-			InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, 
-					propertyList);
-
-			try {
-				partitionMapperProxy = ProxyFactory.createPartitionMapperProxy(
-						partitionMapper.getRef(), injectionRef, stepContext);
-			} catch (final ArtifactValidationException e) {
-				throw new BatchContainerServiceException(
-						"Cannot create the PartitionMapper ["
-								+ partitionMapper.getRef() + "]", e);
-			}
-
-
-			PartitionPlan mapperPlan = partitionMapperProxy.mapPartitions();
-
-			//Set up the new partition plan
-			plan = new BatchPartitionPlan();
-			plan.setPartitionsOverride(mapperPlan.getPartitionsOverride());
-
-			//When true is specified, the partition count from the current run
-			//is used and all results from past partitions are discarded.
-			if (mapperPlan.getPartitionsOverride() || previousNumPartitions == null){
-				plan.setPartitions(mapperPlan.getPartitions());
-			} else {
-				plan.setPartitions(previousNumPartitions);
-			}
-
-			if (mapperPlan.getThreads() == 0) {
-				plan.setThreads(plan.getPartitions());
-			} else {
-				plan.setThreads(mapperPlan.getThreads());    
-			}
-
-			plan.setPartitionProperties(mapperPlan.getPartitionProperties());
-
-			if (logger.isLoggable(Level.FINE)) {
-				logger.fine("Partition plan defined by partition mapper: " + plan);
-			}
-
-		} else if (step.getPartition().getPlan() != null) { //from static partition element in jsl
-
-
-			String partitionsAttr = step.getPartition().getPlan().getPartitions();
-			String threadsAttr = null;
-
-			int numPartitions = Integer.MIN_VALUE;
-			int numThreads;
-			Properties[] partitionProps = null;
-
-			if (partitionsAttr != null) {
-				try {
-					numPartitions = Integer.parseInt(partitionsAttr);
-				} catch (final NumberFormatException e) {
-					throw new IllegalArgumentException("Could not parse partition instances value in stepId: " + step.getId()
-							+ ", with instances=" + partitionsAttr, e);
-				}   
-				partitionProps = new Properties[numPartitions];
-				if (numPartitions < 1) {
-					throw new IllegalArgumentException("Partition instances value must be 1 or greater in stepId: " + step.getId()
-							+ ", with instances=" + partitionsAttr);
-				}
-			}
-
-			threadsAttr = step.getPartition().getPlan().getThreads();
-			if (threadsAttr != null) {
-				try {
-					numThreads = Integer.parseInt(threadsAttr);
-					if (numThreads == 0) {
-						numThreads = numPartitions;
-					}
-				} catch (final NumberFormatException e) {
-					throw new IllegalArgumentException("Could not parse partition threads value in stepId: " + step.getId()
-							+ ", with threads=" + threadsAttr, e);
-				}   
-				if (numThreads < 0) {
-					throw new IllegalArgumentException("Threads value must be 0 or greater in stepId: " + step.getId()
-							+ ", with threads=" + threadsAttr);
-
-				}
-			} else { //default to number of partitions if threads isn't set
-				numThreads = numPartitions;
-			}
-
-
-			if (step.getPartition().getPlan().getProperties() != null) {
-
-				List<JSLProperties> jslProperties = step.getPartition().getPlan().getProperties();
-				for (JSLProperties props : jslProperties) {
-					int targetPartition = Integer.parseInt(props.getPartition());
-
-                    try {
-                        partitionProps[targetPartition] = CloneUtility.jslPropertiesToJavaProperties(props);
-                    } catch (ArrayIndexOutOfBoundsException e) {
-                        throw new BatchContainerRuntimeException("There are only " + numPartitions + " partition instances, but there are "
-                                + jslProperties.size()
-                                + " partition properties lists defined. Remember that partition indexing is 0 based like Java arrays.", e);
-                    }
-                }
-			}
-			plan = new BatchPartitionPlan();
-			plan.setPartitions(numPartitions);
-			plan.setThreads(numThreads);
-			plan.setPartitionProperties(partitionProps);
-			plan.setPartitionsOverride(false); //FIXME what is the default for a static plan??
-		}
-
-
-		// Set the other instance variables for convenience.
-		this.partitions = plan.getPartitions();
-		this.threads = plan.getThreads();
-		this.partitionProperties = plan.getPartitionProperties();
-
-		return plan;
-	}
-
-
-	@Override
-	protected void invokeCoreStep() throws JobRestartException, JobStartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException {
-
-		this.plan = this.generatePartitionPlan();
-
-		//persist the partition plan so on restart we have the same plan to reuse
-		stepStatus.setNumPartitions(plan.getPartitions());
-
-		/* When true is specified, the partition count from the current run
-		 * is used and all results from past partitions are discarded. Any
-		 * resource cleanup or back out of work done in the previous run is the
-		 * responsibility of the application. The PartitionReducer artifact's
-		 * rollbackPartitionedStep method is invoked during restart before any
-		 * partitions begin processing to provide a cleanup hook.
-		 */
-		if (plan.getPartitionsOverride()) {
-			if (this.partitionReducerProxy != null) {
-				this.partitionReducerProxy.rollbackPartitionedStep();
-			}
-		}
-
-		logger.fine("Number of partitions in step: " + partitions + " in step " + step.getId() + "; Subjob properties defined by partition mapper: " + partitionProperties);
-
-		//Set up a blocking queue to pick up collector data from a partitioned thread
-		if (this.analyzerProxy != null) {
-			this.analyzerStatusQueue =  new LinkedBlockingQueue<PartitionDataWrapper>();
-		}
-		this.completedWorkQueue = new LinkedBlockingQueue<BatchPartitionWorkUnit>();
-
-		// Build all sub jobs from partitioned step
-		buildSubJobBatchWorkUnits();
-
-		// kick off the threads
-		executeAndWaitForCompletion();
-
-		// Deal with the results.
-		checkCompletedWork();
-	}
-	private void buildSubJobBatchWorkUnits() throws JobRestartException, JobStartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException  {
-		synchronized (subJobs) {		
-			//check if we've already issued a stop
-			if (jobExecutionImpl.getJobContext().getBatchStatus().equals(BatchStatus.STOPPING)){
-				logger.fine("Step already in STOPPING state, exiting from buildSubJobBatchWorkUnits() before beginning execution");
-				return;
-			}
-
-			for (int instance = 0; instance < partitions; instance++) {
-				subJobs.add(PartitionedStepBuilder.buildPartitionSubJob(jobExecutionImpl.getInstanceId(),jobExecutionImpl.getJobContext(), step, instance));
-			}
-
-			PartitionsBuilderConfig config = new PartitionsBuilderConfig(subJobs, partitionProperties, analyzerStatusQueue, completedWorkQueue, jobExecutionImpl.getExecutionId());
-			// Then build all the subjobs but do not start them yet
-			if (stepStatus.getStartCount() > 1 && !plan.getPartitionsOverride()) {
-				parallelBatchWorkUnits = batchKernel.buildOnRestartParallelPartitions(config);
-			} else {
-				parallelBatchWorkUnits = batchKernel.buildNewParallelPartitions(config);
-			}
-
-			// NOTE:  At this point I might not have as many work units as I had partitions, since some may have already completed.
-		}
-	}
-
-	private void executeAndWaitForCompletion() throws JobRestartException {
-		
-		if (jobExecutionImpl.getJobContext().getBatchStatus().equals(BatchStatus.STOPPING)){
-			logger.fine("Step already in STOPPING state, exiting from executeAndWaitForCompletion() before beginning execution");
-			return;
-		}
-		
-		int numTotalForThisExecution = parallelBatchWorkUnits.size();
-		this.numPreviouslyCompleted = partitions - numTotalForThisExecution; 
-		int numCurrentCompleted = 0;
-		int numCurrentSubmitted = 0;
-
-		logger.fine("Calculated that " + numPreviouslyCompleted + " partitions are already complete out of total # = " 
-				+ partitions + ", with # remaining =" + numTotalForThisExecution);
-
-		//Start up to to the max num we are allowed from the num threads attribute
-		for (int i=0; i < this.threads && i < numTotalForThisExecution; i++, numCurrentSubmitted++) {
-			if (stepStatus.getStartCount() > 1 && !!!plan.getPartitionsOverride()) {
-				batchKernel.restartGeneratedJob(parallelBatchWorkUnits.get(i));
-			} else {
-				batchKernel.startGeneratedJob(parallelBatchWorkUnits.get(i));
-			}
-		}
-
-		boolean readyToSubmitAnother = false;
-		while (true) {
-			logger.finer("Begin main loop in waitForQueueCompletion(), readyToSubmitAnother = " + readyToSubmitAnother);
-			try {
-				if (analyzerProxy != null) {
-					logger.fine("Found analyzer, proceeding on analyzerQueue path");
-					PartitionDataWrapper dataWrapper = analyzerStatusQueue.take();
-					if (PartitionEventType.ANALYZE_COLLECTOR_DATA.equals(dataWrapper.getEventType())) {
-						logger.finer("Analyze collector data: " + dataWrapper.getCollectorData());
-						analyzerProxy.analyzeCollectorData(dataWrapper.getCollectorData());
-						continue; // without being ready to submit another
-					} else if (PartitionEventType.ANALYZE_STATUS.equals(dataWrapper.getEventType())) {
-						analyzerProxy.analyzeStatus(dataWrapper.getBatchstatus(), dataWrapper.getExitStatus());
-						logger.fine("Analyze status called for completed partition: batchStatus= " + dataWrapper.getBatchstatus() + ", exitStatus = " + dataWrapper.getExitStatus());
-						completedWork.add(completedWorkQueue.take());  // Shouldn't be a a long wait.
-						readyToSubmitAnother = true;
-					} else {
-						logger.warning("Invalid partition state");
-						throw new IllegalStateException("Invalid partition state");
-					}
-				} else {
-					logger.fine("No analyzer, proceeding on completedWorkQueue path");
-					// block until at least one thread has finished to
-					// submit more batch work. hold on to the finished work to look at later
-					completedWork.add(completedWorkQueue.take());
-					readyToSubmitAnother = true;
-				}
-			} catch (InterruptedException e) {
-				logger.severe("Caught exc"+ e);
-				throw new BatchContainerRuntimeException(e);
-			}
-
-			if (readyToSubmitAnother) {
-				numCurrentCompleted++;
-				logger.fine("Ready to submit another (if there is another left to submit); numCurrentCompleted = " + numCurrentCompleted);
-				if (numCurrentCompleted < numTotalForThisExecution) {
-					if (numCurrentSubmitted < numTotalForThisExecution) {
-						logger.fine("Submitting # " + numCurrentSubmitted + " out of " + numTotalForThisExecution + " total for this execution");
-						if (stepStatus.getStartCount() > 1) {
-							batchKernel.startGeneratedJob(parallelBatchWorkUnits.get(numCurrentSubmitted++));
-						} else {
-							batchKernel.restartGeneratedJob(parallelBatchWorkUnits.get(numCurrentSubmitted++));
-						}
-						readyToSubmitAnother = false;
-					}
-				} else {
-					logger.fine("Finished... breaking out of loop");
-					break;
-				}
-			} else {
-				logger.fine("Not ready to submit another."); // Must have just done a collector
-			}
-		}
-	}        
-
-	private void checkCompletedWork() {
-
-		if (logger.isLoggable(Level.FINE)) {
-			logger.fine("Check completed work list.");
-		}
-
-		/**
-		 * check the batch status of each subJob after it's done to see if we need to issue a rollback
-		 * start rollback if any have stopped or failed
-		 */
-		boolean rollback = false;
-		boolean partitionFailed = false;
-		
-		for (final BatchWorkUnit subJob : completedWork) {
-			BatchStatus batchStatus = subJob.getJobExecutionImpl().getJobContext().getBatchStatus();
-			if (batchStatus.equals(BatchStatus.FAILED)) {
-				logger.fine("Subjob " + subJob.getJobExecutionImpl().getExecutionId() + " ended with status '" + batchStatus + "'; Starting logical transaction rollback.");
-
-				rollback = true;
-				partitionFailed = true;
-
-				//Keep track of the failing status and throw an exception to propagate after the rest of the partitions are complete
-				stepContext.setBatchStatus(BatchStatus.FAILED);
-			} 
-		}
-
-		//If rollback is false we never issued a rollback so we can issue a logicalTXSynchronizationBeforeCompletion
-		//NOTE: this will get issued even in a subjob fails or stops if no logicalTXSynchronizationRollback method is provied
-		//We are assuming that not providing a rollback was intentional
-		if (rollback == true) {
-			if (this.partitionReducerProxy != null) {
-				this.partitionReducerProxy.rollbackPartitionedStep();
-			}
-			if (partitionFailed) {
-				throw new BatchContainerRuntimeException("One or more partitions failed");
-			}
-		} else {
-			if (this.partitionReducerProxy != null) {
-				this.partitionReducerProxy.beforePartitionedStepCompletion();
-			}
-		}
-	}
-
-	@Override
-	protected void setupStepArtifacts() {
-
-		InjectionReferences injectionRef = null;
-		injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, null);
-		this.stepListeners = jobExecutionImpl.getListenerFactory().getStepListeners(step, injectionRef, stepContext);
-
-		Analyzer analyzer = step.getPartition().getAnalyzer();
-
-		if (analyzer != null) {
-			final List<Property> propList = analyzer.getProperties() == null ? null : analyzer.getProperties()
-					.getPropertyList();
-
-			injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propList);
-
-			try {
-				analyzerProxy = ProxyFactory.createPartitionAnalyzerProxy(analyzer.getRef(), injectionRef, stepContext);
-			} catch (final ArtifactValidationException e) {
-				throw new BatchContainerServiceException("Cannot create the analyzer [" + analyzer.getRef() + "]", e);
-			}
-		} 
-
-		PartitionReducer partitionReducer = step.getPartition().getReducer();
-
-		if (partitionReducer != null) {
-
-			final List<Property> propList = partitionReducer.getProperties() == null ? null : partitionReducer.getProperties()
-					.getPropertyList();
-
-			injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propList);
-
-			try {
-				this.partitionReducerProxy = ProxyFactory.createPartitionReducerProxy(partitionReducer.getRef(), injectionRef, stepContext);
-			} catch (final ArtifactValidationException e) {
-				throw new BatchContainerServiceException("Cannot create the analyzer [" + partitionReducer.getRef() + "]",
-						e);
-			}
-		}
-
-	}
-
-	@Override
-	protected void invokePreStepArtifacts() {
-
-		if (stepListeners != null) {
-			for (StepListenerProxy listenerProxy : stepListeners) {
-				// Call beforeStep on all the step listeners
-				listenerProxy.beforeStep();
-			}
-		}
-
-		// Invoke the reducer before all parallel steps start (must occur
-		// before mapper as well)
-		if (this.partitionReducerProxy != null) {
-			this.partitionReducerProxy.beginPartitionedStep();
-		}
-
-	}
-
-	@Override
-	protected void invokePostStepArtifacts() {
-		// Invoke the reducer after all parallel steps are done
-		if (this.partitionReducerProxy != null) {
-
-			if ((BatchStatus.COMPLETED).equals(stepContext.getBatchStatus())) {
-				this.partitionReducerProxy.afterPartitionedStepCompletion(PartitionStatus.COMMIT);
-			}else {
-				this.partitionReducerProxy.afterPartitionedStepCompletion(PartitionStatus.ROLLBACK); 
-			}
-
-		}
-
-		// Called in spec'd order, e.g. Sec. 11.7
-		if (stepListeners != null) {
-			for (StepListenerProxy listenerProxy : stepListeners) {
-				// Call afterStep on all the step listeners
-				listenerProxy.afterStep();
-			}
-		}
-	}
-
-	@Override
-	protected void sendStatusFromPartitionToAnalyzerIfPresent() {
-		// Since we're already on the main thread, there will never
-		// be anything to do on this thread.  It's only on the partitioned
-		// threads that there is something to send back.
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/JSR352.Runtime/src/com/ibm/jbatch/container/impl/RetryHandler.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/RetryHandler.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/RetryHandler.java
deleted file mode 100755
index e0906a6..0000000
--- a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/RetryHandler.java
+++ /dev/null
@@ -1,430 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- * 
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License, 
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.jbatch.container.impl;
-
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-
-import com.ibm.jbatch.container.artifact.proxy.RetryProcessListenerProxy;
-import com.ibm.jbatch.container.artifact.proxy.RetryReadListenerProxy;
-import com.ibm.jbatch.container.artifact.proxy.RetryWriteListenerProxy;
-import com.ibm.jbatch.container.exception.BatchContainerRuntimeException;
-import com.ibm.jbatch.jsl.model.Chunk;
-import com.ibm.jbatch.jsl.model.ExceptionClassFilter;
-
-public class RetryHandler {
-
-	/**
-	 *
-	 * Logic for handling retryable records.
-	 *
-	 * A RetryHandler object is attached to every BDS that inherits from AbstractBatchDataStream.
-	 *
-	 */
-
-	private static final String className = RetryHandler.class.getName();
-	private static Logger logger = Logger.getLogger(RetryHandler.class.getPackage().getName());
-
-	public static final String RETRY_COUNT      = "retry-limit";
-	public static final String RETRY_INCLUDE_EX = "include class";
-	public static final String RETRY_EXCLUDE_EX = "exclude class";
-
-	private static final int RETRY_NONE = 0;
-	private static final int RETRY_READ = 1;
-	private static final int RETRY_PROCESS = 2;
-	private static final int RETRY_WRITE = 3;
-
-	private int retryType = RETRY_NONE;
-
-	/*private RetryProcessListenerProxy _retryProcessListener = null;
-	  private RetryReadListenerProxy _retryReadListener = null;
-	  private RetryWriteListenerProxy _retryWriteListener = null;*/
-
-	List<RetryProcessListenerProxy> _retryProcessListeners = null;
-	List<RetryReadListenerProxy> _retryReadListeners = null;
-	List<RetryWriteListenerProxy> _retryWriteListeners = null;
-
-	private long _jobId = 0;
-	private String _stepId = null;
-	private Set<String> _retryNoRBIncludeExceptions = null;
-	private Set<String> _retryNoRBExcludeExceptions = null;
-	private Set<String> _retryIncludeExceptions = null;
-	private Set<String> _retryExcludeExceptions = null;
-	private int _retryLimit = Integer.MIN_VALUE;
-	private long _retryCount = 0;
-	private Exception _retryException = null;
-
-	public RetryHandler(Chunk chunk, long l, String stepId)
-	{
-		_jobId = l;
-		_stepId = stepId;
-
-		initialize(chunk);
-	}
-
-
-	/**
-	 * Add the user-defined RetryProcessListener.
-	 *
-	 */
-	public void addRetryProcessListener(List<RetryProcessListenerProxy> retryProcessListeners)
-	{
-		_retryProcessListeners =  retryProcessListeners;
-	}
-
-	/**
-	 * Add the user-defined RetryReadListener.
-	 *
-	 */
-	public void addRetryReadListener(List<RetryReadListenerProxy> retryReadListeners)
-	{
-		_retryReadListeners = retryReadListeners;
-	}
-
-	/**
-	 * Add the user-defined RetryWriteListener.
-	 *
-	 */
-	public void addRetryWriteListener(List<RetryWriteListenerProxy> retryWriteListeners)
-	{
-		_retryWriteListeners = retryWriteListeners;
-	}
-
-
-	/**
-	 * Read the retry exception lists from the BDS props.
-	 */
-	private void initialize(Chunk chunk)
-	{
-		final String mName = "initialize";
-
-		if(logger.isLoggable(Level.FINER)) 
-			logger.entering(className, mName);
-
-		try
-		{
-			if (chunk.getRetryLimit() != null){
-				_retryLimit = Integer.parseInt(chunk.getRetryLimit());
-                if (_retryLimit < 0) {
-                    throw new IllegalArgumentException("The retry-limit attribute on a chunk cannot be a negative value");
-                }
-
-			}
-		}
-		catch (NumberFormatException nfe)
-		{
-			throw new RuntimeException("NumberFormatException reading " + RETRY_COUNT, nfe);
-		}
-
-        // Read the include/exclude exceptions.
-        _retryIncludeExceptions = new HashSet<String>();
-        _retryExcludeExceptions = new HashSet<String>();
-        _retryNoRBIncludeExceptions = new HashSet<String>();
-        _retryNoRBExcludeExceptions = new HashSet<String>();
-
-        String includeEx = null;
-        String excludeEx = null;
-        String includeExNoRB = null;
-        String excludeExNoRB = null;
-
-        if (chunk.getRetryableExceptionClasses() != null) {
-            if (chunk.getRetryableExceptionClasses().getIncludeList() != null) {
-                List<ExceptionClassFilter.Include> includes = chunk.getRetryableExceptionClasses().getIncludeList();
-                for (ExceptionClassFilter.Include include : includes) {
-                    _retryIncludeExceptions.add(include.getClazz().trim());
-                    logger.finer("RETRYHANDLE: include: " + include.getClazz().trim());
-                }
-
-                if (_retryIncludeExceptions.size() == 0) {
-                    logger.finer("RETRYHANDLE: include element not present");
-
-                }
-            }
-            if (chunk.getRetryableExceptionClasses().getExcludeList() != null) {
-                List<ExceptionClassFilter.Exclude> excludes = chunk.getRetryableExceptionClasses().getExcludeList();
-                for (ExceptionClassFilter.Exclude exclude : excludes) {
-                    _retryExcludeExceptions.add(exclude.getClazz().trim());
-                    logger.finer("SKIPHANDLE: exclude: " + exclude.getClazz().trim());
-                }
-
-                if (_retryExcludeExceptions.size() == 0) {
-                    logger.finer("SKIPHANDLE: exclude element not present");
-
-                }
-            }
-        }
-
-        if (chunk.getNoRollbackExceptionClasses() != null) {
-            if (chunk.getNoRollbackExceptionClasses().getIncludeList() != null) {
-                List<ExceptionClassFilter.Include> includes = chunk.getNoRollbackExceptionClasses().getIncludeList();
-                for (ExceptionClassFilter.Include include : includes) {
-                    _retryNoRBIncludeExceptions.add(include.getClazz().trim());
-                    logger.finer("RETRYHANDLE: include: " + include.getClazz().trim());
-                }
-
-                if (_retryNoRBIncludeExceptions.size() == 0) {
-                    logger.finer("RETRYHANDLE: include element not present");
-
-                }
-            }
-            if (chunk.getNoRollbackExceptionClasses().getExcludeList() != null) {
-                List<ExceptionClassFilter.Exclude> excludes = chunk.getNoRollbackExceptionClasses().getExcludeList();
-                for (ExceptionClassFilter.Exclude exclude : excludes) {
-                    _retryNoRBExcludeExceptions.add(exclude.getClazz().trim());
-                    logger.finer("SKIPHANDLE: exclude: " + exclude.getClazz().trim());
-                }
-
-                if (_retryNoRBExcludeExceptions.size() == 0) {
-                    logger.finer("SKIPHANDLE: exclude element not present");
-
-                }
-            }
-        }
-
-        if (logger.isLoggable(Level.FINE)) {
-            logger.logp(Level.FINE, className, mName, "added include exception " + includeEx + "; added exclude exception " + excludeEx);
-            logger.logp(Level.FINE, className, mName, "added include no rollback exception " + includeExNoRB
-                    + "; added exclude no rollback exception " + excludeExNoRB);
-        }
-	        
-	    if(logger.isLoggable(Level.FINER)) {
-	      logger.exiting(className, mName, this.toString());
-	    }
-	  }
-	  
-	  public boolean isRollbackException(Exception e)
-	  {
-		  return !isNoRollbackException(e);
-	  }
-	  /**
-	   * Handle exception from a read failure.
-	   */
-	  public void handleExceptionRead(Exception e)
-	  {
-	    final String mName = "handleExceptionRead";
-	    
-	    logger.finer("RETRYHANDLE: in retryhandler handle exception on a read:" + e.toString());
-
-	    if(logger.isLoggable(Level.FINER)) 
-	      logger.logp(Level.FINE, className, mName, e.getClass().getName() + "; " + this.toString());
-	    
-	    if (!isRetryLimitReached() && isRetryable(e))
-	    {
-	       retryType = RETRY_READ;
-	       _retryException = e;
-	      // Retry it.  Log it.  Call the RetryListener.
-	      ++_retryCount;
-	      logRetry(e);
-
-	      if (_retryReadListeners != null) {
-	    	  for (RetryReadListenerProxy retryReadListenerProxy : _retryReadListeners) {
-	    		  retryReadListenerProxy.onRetryReadException(e);
-				}
-	      }
-	    }
-	    else
-	    {
-	      // No retry.  Throw it back.
-	      if(logger.isLoggable(Level.FINER)) 
-	        logger.logp(Level.FINE, className, mName, "No retry.  Rethrow", e);
-	      	throw new BatchContainerRuntimeException(e);
-	    }
-
-	    if(logger.isLoggable(Level.FINER)) 
-	      logger.exiting(className, mName, e);
-	  }
-
-	  /** 
-	   * Handle exception from a process failure.
-	   */
-	  public void handleExceptionProcess(Exception e, Object w)
-	  {
-	    final String mName = "handleExceptionProcess";
-	    
-	    if(logger.isLoggable(Level.FINER)) 
-	      logger.logp(Level.FINE, className, mName, e.getClass().getName() + "; " + this.toString());
-	    
-	    if (!isRetryLimitReached() && isRetryable(e))
-	    {
-	      retryType = RETRY_PROCESS;
-	      _retryException = e;
-	      // Retry it.  Log it.  Call the RetryListener.
-	      ++_retryCount;
-	      logRetry(e);
-
-	      if (_retryProcessListeners != null) {
-	    	  for (RetryProcessListenerProxy retryProcessListenerProxy : _retryProcessListeners) {
-	    		  retryProcessListenerProxy.onRetryProcessException(w, e);
-				}
-	      }
-	    }
-	    else
-	    {
-	      // No retry.  Throw it back.
-	      if(logger.isLoggable(Level.FINER)) 
-	        logger.logp(Level.FINE, className, mName, "No retry.  Rethrow ", e);
-	      throw new BatchContainerRuntimeException(e);
-	    }
-	  }
-	  
-	  /** 
-	   * Handle exception from a write failure.
-	   */
-	  public void handleExceptionWrite(Exception e, List<Object> w)
-	  {
-	    final String mName = "handleExceptionWrite";
-	    
-	    if(logger.isLoggable(Level.FINER)) 
-	      logger.logp(Level.FINE, className, mName, e.getClass().getName() + "; " + this.toString());
-
-	    if (!isRetryLimitReached() && isRetryable(e))
-	    {
-	      // Retry it.  Log it.  Call the RetryListener.
-	      retryType = RETRY_WRITE;
-	      _retryException = e;
-	      ++_retryCount;
-	      logRetry(e);
-
-	      if (_retryWriteListeners != null) {
-	    	  for (RetryWriteListenerProxy retryWriteListenerProxy : _retryWriteListeners) {
-	    		  retryWriteListenerProxy.onRetryWriteException(w, e);
-				}
-	      }
-	    }
-	    else
-	    {
-	      // No retry.  Throw it back.
-	      if(logger.isLoggable(Level.FINER)) 
-	        logger.logp(Level.FINE, className, mName, "No retry.  Rethrow ", e);
-	      throw new BatchContainerRuntimeException(e);
-	    }
-	  }
-
-
-	  /**
-	   * Check the retryable exception lists to determine whether
-	   * the given Exception is retryable.
-	   */
-	  private boolean isRetryable(Exception e)
-	  {
-	    final String mName = "isRetryable";
-
-	    String exClassName = e.getClass().getName();
-	    
-	    boolean retVal = containsException(_retryIncludeExceptions, e) && !containsException(_retryExcludeExceptions, e);
-	    
-	    if(logger.isLoggable(Level.FINE)) 
-	      logger.logp(Level.FINE, className, mName, mName + ": " + retVal + ": " + exClassName);
-
-	    return retVal;
-	  }
-	
-	  private boolean isNoRollbackException(Exception e)
-	  {
-		  final String mName = "isNoRollbackException";
-
-		  String exClassName = e.getClass().getName();
-		  
-		  boolean retVal = containsException(_retryNoRBIncludeExceptions, e) && !containsException(_retryNoRBExcludeExceptions, e);
-			  
-		  if(logger.isLoggable(Level.FINE)) 
-		    logger.logp(Level.FINE, className, mName, mName + ": " + retVal + ": " + exClassName);
-
-		  return retVal;
-	  }
-	  
-	  /**
-	   * Check whether given exception is in the specified exception list 
-	   */
-	  private boolean containsException(Set<String> retryList, Exception e)
-	  {
-	    final String mName = "containsException";
-	    boolean retVal = false;
-
-	    for ( Iterator it = retryList.iterator(); it.hasNext(); ) {
-	        String exClassName = (String) it.next();
-	       
-	        try {
-	        	if (retVal = Thread.currentThread().getContextClassLoader().loadClass(exClassName).isInstance(e))
-	        		break;
-	        } catch (ClassNotFoundException cnf) {
-	        	logger.logp(Level.FINE, className, mName, cnf.getLocalizedMessage());
-	        }
-	    }
-
-	    if(logger.isLoggable(Level.FINE)) 
-	      logger.logp(Level.FINE, className, mName, mName + ": " + retVal );
-
-	    return retVal;
-	  }
-
-	  /**
-	   * Check if the retry limit has been reached.
-	   *
-	   * Note: if retry handling isn't enabled (i.e. not configured in xJCL), then this method 
-	   *       will always return TRUE.
-	   */
-	  private boolean isRetryLimitReached()
-	  {
-        // Unlimited retries if it is never defined
-        if (_retryLimit == Integer.MIN_VALUE) {
-            return false;
-        }
-	      
-	    return (_retryCount >= _retryLimit);
-	  }
-
-	  
-	  private void logRetry(Exception e)
-	  {
-	    String key = "record.retried.norollback.by.batch.container";
-	    Object[] details = { _jobId, _stepId, e.getClass().getName() + ": " + e.getMessage() };
-	    //String message = LoggerUtil.getFormattedMessage(key, details, true);
-	    //logger.info(message);	
-		}
-
-	  public Exception getException()
-	  {
-		  return _retryException;
-	  }
-	  
-	  public long getRetryCount()
-	  {
-	    return _retryCount;
-	  }
-
-	  public void setRetryCount(long retryCount)
-	  {
-	    final String mName = "setRetryCount";
-
-	    _retryCount = retryCount;
-
-	    if(logger.isLoggable(Level.FINE)) 
-	      logger.logp(Level.FINE, className, mName, "setRetryCount: " + _retryCount);
-	  }
-
-	  public String toString()
-	  {
-	    return "RetryHandler{" + super.toString() + "}count:limit=" + _retryCount + ":" + _retryLimit;
-	  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/JSR352.Runtime/src/com/ibm/jbatch/container/impl/SingleThreadedStepControllerImpl.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/SingleThreadedStepControllerImpl.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/SingleThreadedStepControllerImpl.java
deleted file mode 100755
index 1c81e7f..0000000
--- a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/SingleThreadedStepControllerImpl.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- * 
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License, 
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.jbatch.container.impl;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.logging.Logger;
-
-
-import com.ibm.jbatch.container.IController;
-import com.ibm.jbatch.container.artifact.proxy.InjectionReferences;
-import com.ibm.jbatch.container.artifact.proxy.PartitionCollectorProxy;
-import com.ibm.jbatch.container.artifact.proxy.ProxyFactory;
-import com.ibm.jbatch.container.artifact.proxy.StepListenerProxy;
-import com.ibm.jbatch.container.context.impl.StepContextImpl;
-import com.ibm.jbatch.container.exception.BatchContainerServiceException;
-import com.ibm.jbatch.container.jobinstance.RuntimeJobExecution;
-import com.ibm.jbatch.container.util.PartitionDataWrapper;
-import com.ibm.jbatch.container.util.PartitionDataWrapper.PartitionEventType;
-import com.ibm.jbatch.container.validation.ArtifactValidationException;
-import com.ibm.jbatch.jsl.model.Collector;
-import com.ibm.jbatch.jsl.model.Property;
-import com.ibm.jbatch.jsl.model.Step;
-
-/**
- * 
- * When a partitioned step is run, this controller will only be used for the partition threads, 
- * NOT the top-level main thread that the step executes upon.
- * 
- * When a non-partitioned step is run this controller will be used as well (and there will be no
- * separate main thread with controller).
- *
- */
-public abstract class SingleThreadedStepControllerImpl extends BaseStepControllerImpl implements IController {
-
-	private final static String sourceClass = SingleThreadedStepControllerImpl.class.getName();
-	private final static Logger logger = Logger.getLogger(sourceClass);
-
-	// Collector only used from partition threads, not main thread
-	protected PartitionCollectorProxy collectorProxy = null;
-
-	protected SingleThreadedStepControllerImpl(RuntimeJobExecution jobExecutionImpl, Step step, StepContextImpl stepContext, long rootJobExecutionId, BlockingQueue<PartitionDataWrapper> analyzerStatusQueue) {
-		super(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerStatusQueue);
-	}
-
-	List<StepListenerProxy> stepListeners = null;
-
-	protected void setupStepArtifacts() {
-		// set up listeners
-
-		InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, null);
-		this.stepListeners = jobExecutionImpl.getListenerFactory().getStepListeners(step, injectionRef, stepContext);
-
-		// set up collectors if we are running a partitioned step
-		if (step.getPartition() != null) {
-			Collector collector = step.getPartition().getCollector();
-			if (collector != null) {
-				List<Property> propList = (collector.getProperties() == null) ? null : collector.getProperties().getPropertyList();
-				/**
-				 * Inject job flow, split, and step contexts into partition
-				 * artifacts like collectors and listeners some of these
-				 * contexts may be null
-				 */
-				injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propList);
-
-				try {
-					this.collectorProxy = ProxyFactory.createPartitionCollectorProxy(collector.getRef(), injectionRef, this.stepContext);
-				} catch (ArtifactValidationException e) {
-					throw new BatchContainerServiceException("Cannot create the collector [" + collector.getRef() + "]", e);
-				}
-			}
-		}
-	}
-
-	@Override
-	protected void invokePreStepArtifacts() {
-		// Don't call beforeStep() in the partitioned case, since we are now on a partition thread, and
-		// have already called beforeStep() on the main thread as the spec says.
-		if ((stepListeners != null) && (this.jobExecutionImpl.getPartitionInstance() == null)) {
-			for (StepListenerProxy listenerProxy : stepListeners) {
-				listenerProxy.beforeStep();
-			}
-		}
-	}
-
-	@Override
-	protected void invokePostStepArtifacts() {
-		// Don't call beforeStep() in the partitioned case, since we are now on a partition thread, and
-		// have already called beforeStep() on the main thread as the spec says.
-		if ((stepListeners != null) && (this.jobExecutionImpl.getPartitionInstance() == null)) {
-			for (StepListenerProxy listenerProxy : stepListeners) {
-				listenerProxy.afterStep();
-			}
-		}
-	}
-
-	protected void invokeCollectorIfPresent() {
-		if (collectorProxy != null) {
-			Serializable data = collectorProxy.collectPartitionData();
-			logger.finer("Got partition data: " + data + ", from collector: " + collectorProxy);
-			sendCollectorDataToAnalyzerIfPresent(data);
-		} 
-	}
-	
-	// Useless to have collector without analyzer but let's check so we don't hang or blow up.
-	protected void sendCollectorDataToAnalyzerIfPresent(Serializable data) {
-		if (analyzerStatusQueue != null) {
-			logger.finer("Sending collector partition data: " + data + " to analyzer queue: " + analyzerStatusQueue);
-			PartitionDataWrapper dataWrapper = new PartitionDataWrapper();
-			dataWrapper.setCollectorData(data);
-			dataWrapper.setEventType(PartitionEventType.ANALYZE_COLLECTOR_DATA);
-			analyzerStatusQueue.add(dataWrapper);
-		} else {
-			logger.fine("Analyzer not configured.");
-		}
-	}
-
-	// Useless to have collector without analyzer but let's check so we don't hang or blow up.
-	@Override
-	protected void sendStatusFromPartitionToAnalyzerIfPresent() {
-		if (analyzerStatusQueue != null) {
-			logger.fine("Send status from partition for analyzeStatus with batchStatus = " + stepStatus.getBatchStatus() + ", exitStatus = " + stepStatus.getExitStatus());
-			PartitionDataWrapper dataWrapper = new PartitionDataWrapper();
-			dataWrapper.setBatchStatus(stepStatus.getBatchStatus());
-			dataWrapper.setExitStatus(stepStatus.getExitStatus());
-			dataWrapper.setEventType(PartitionEventType.ANALYZE_STATUS);
-			analyzerStatusQueue.add(dataWrapper);
-		} else {
-			logger.fine("Analyzer not configured.");
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/JSR352.Runtime/src/com/ibm/jbatch/container/impl/SkipHandler.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/SkipHandler.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/SkipHandler.java
deleted file mode 100755
index f4ba676..0000000
--- a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/SkipHandler.java
+++ /dev/null
@@ -1,360 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- * 
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License, 
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package com.ibm.jbatch.container.impl;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-
-import com.ibm.jbatch.container.artifact.proxy.SkipProcessListenerProxy;
-import com.ibm.jbatch.container.artifact.proxy.SkipReadListenerProxy;
-import com.ibm.jbatch.container.artifact.proxy.SkipWriteListenerProxy;
-import com.ibm.jbatch.container.exception.BatchContainerRuntimeException;
-import com.ibm.jbatch.jsl.model.Chunk;
-import com.ibm.jbatch.jsl.model.ExceptionClassFilter;
-
-public class SkipHandler {
-
-	/**
-	 *
-	 * Logic for handling skipped records.
-	 *
-	 */
-
-	  private static final String className = SkipHandler.class.getName();
-		private static Logger logger = Logger.getLogger(SkipHandler.class.getPackage().getName());
-
-	  public static final String SKIP_COUNT      = "skip-limit";
-	  public static final String SKIP_INCLUDE_EX = "include class";
-	  public static final String SKIP_EXCLUDE_EX = "exclude class";
-
-	  private List<SkipProcessListenerProxy> _skipProcessListener = null;
-	  private List<SkipReadListenerProxy> _skipReadListener = null;
-	  private List<SkipWriteListenerProxy> _skipWriteListener = null;
-
-	  private long _jobId = 0;
-	  private String _stepId = null;
-	  private Set<String> _skipIncludeExceptions = null;
-	  private Set<String> _skipExcludeExceptions = null;
-	  private int _skipLimit = Integer.MIN_VALUE;
-	  private long _skipCount = 0;
-
-	  public SkipHandler(Chunk chunk, long l, String stepId)
-	  {
-	    _jobId = l;
-	    _stepId = stepId;
-
-	    initialize(chunk);
-	  }
-
-	  /**
-	   * Add the user-defined SkipReadListeners.
-	   *
-	   */
-	  public void addSkipReadListener(List<SkipReadListenerProxy> skipReadListener)
-	  {
-	    _skipReadListener = skipReadListener;
-	  }
-	  
-	  /**
-	   * Add the user-defined SkipWriteListeners.
-	   *
-	   */
-	  public void addSkipWriteListener(List<SkipWriteListenerProxy> skipWriteListener)
-	  {
-	    _skipWriteListener = skipWriteListener;
-	  }
-	  
-	  /**
-	   * Add the user-defined SkipReadListeners.
-	   *
-	   */
-	  public void addSkipProcessListener(List<SkipProcessListenerProxy> skipProcessListener)
-	  {
-	    _skipProcessListener = skipProcessListener;
-	  }
-
-
-	  /**
-	   * Read the skip exception lists from the BDS props.
-	   */
-	  private void initialize(Chunk chunk)
-	  {
-	    final String mName = "initialize";
-
-	    if(logger.isLoggable(Level.FINER)) 
-	      logger.entering(className, mName);
-
-	    try
-	    {
-	    	if (chunk.getSkipLimit() != null){
-	    		_skipLimit = Integer.parseInt(chunk.getSkipLimit());
-	    		if (_skipLimit < 0) {
-	    		    throw new IllegalArgumentException("The skip-limit attribute on a chunk cannot be a negative value");
-	    		}
-	    	}
-	    }
-	    catch (NumberFormatException nfe)
-	    {
-	      throw new RuntimeException("NumberFormatException reading " + SKIP_COUNT, nfe);
-	    }
-
-
-        // Read the include/exclude exceptions.
-
-        _skipIncludeExceptions = new HashSet<String>();
-        _skipExcludeExceptions = new HashSet<String>();
-
-        // boolean done = false;
-        List<String> includeEx = new ArrayList<String>();
-        List<String> excludeEx = new ArrayList<String>();
-
-        if (chunk.getSkippableExceptionClasses() != null) {
-            if (chunk.getSkippableExceptionClasses().getIncludeList() != null) {
-                List<ExceptionClassFilter.Include> includes = chunk.getSkippableExceptionClasses().getIncludeList();
-
-                for (ExceptionClassFilter.Include include : includes) {
-                    _skipIncludeExceptions.add(include.getClazz().trim());
-                    logger.finer("SKIPHANDLE: include: " + include.getClazz().trim());
-                }
-
-                if (_skipIncludeExceptions.size() == 0) {
-                    logger.finer("SKIPHANDLE: include element not present");
-
-                }
-            }
-        }
-
-        if (chunk.getSkippableExceptionClasses() != null) {
-            if (chunk.getSkippableExceptionClasses().getExcludeList() != null) {
-                List<ExceptionClassFilter.Exclude> excludes = chunk.getSkippableExceptionClasses().getExcludeList();
-
-                for (ExceptionClassFilter.Exclude exclude : excludes) {
-                    _skipExcludeExceptions.add(exclude.getClazz().trim());
-                    logger.finer("SKIPHANDLE: exclude: " + exclude.getClazz().trim());
-                }
-
-                if (_skipExcludeExceptions.size() == 0) {
-                    logger.finer("SKIPHANDLE: exclude element not present");
-
-                }
-
-            }
-        }
-
-        if (logger.isLoggable(Level.FINE))
-            logger.logp(Level.FINE, className, mName, "added include exception " + includeEx + "; added exclude exception " + excludeEx);
-	        
-	    if(logger.isLoggable(Level.FINER)) 
-	      logger.exiting(className, mName, this.toString());
-	  }
-
-
-	  /**
-	   * Handle exception from a read failure.
-	   */
-	  public void handleExceptionRead(Exception e)
-	  {
-	    final String mName = "handleException";
-	    
-	    logger.finer("SKIPHANDLE: in skiphandler handle exception on a read");
-
-	    if(logger.isLoggable(Level.FINER)) 
-	      logger.logp(Level.FINE, className, mName, e.getClass().getName() + "; " + this.toString());
-	    
-	    if (!isSkipLimitReached() && isSkippable(e))
-	    {
-	      // Skip it.  Log it.  Call the SkipListener.
-	      ++_skipCount;
-	      logSkip(e);
-
-	      if (_skipReadListener != null) {
-	    	  for (SkipReadListenerProxy skipReadListenerProxy : _skipReadListener) {
-	    		  skipReadListenerProxy.onSkipReadItem(e);
-				}
-	      }
-	    }
-	    else
-	    {
-	      // No skip.  Throw it back. don't throw it back - we might want to retry ...
-	      if(logger.isLoggable(Level.FINER)) 
-	        logger.logp(Level.FINE, className, mName, "No skip.  Rethrow", e);
-	      	throw new BatchContainerRuntimeException(e);
-	    }
-
-	    if(logger.isLoggable(Level.FINER)) 
-	      logger.exiting(className, mName, e);
-	  }
-
-	  /** 
-	   * Handle exception from a process failure.
-	   */
-	  public void handleExceptionWithRecordProcess(Exception e, Object w)
-	  {
-	    final String mName = "handleExceptionWithRecordProcess";
-	    if(logger.isLoggable(Level.FINER)) 
-	      logger.logp(Level.FINE, className, mName, e.getClass().getName() + "; " + this.toString());
-
-	    if (!isSkipLimitReached() && isSkippable(e))
-	    {
-	      // Skip it.  Log it.  Call the SkipProcessListener.
-	      ++_skipCount;
-	      logSkip(e);
-
-	      if (_skipProcessListener != null) {
-	    	  for (SkipProcessListenerProxy skipProcessListenerProxy : _skipProcessListener) {
-	    		  skipProcessListenerProxy.onSkipProcessItem(w, e);
-				}
-	      }
-	    }
-	    else
-	    {
-	      // No skip.  Throw it back.
-	      if(logger.isLoggable(Level.FINER)) 
-	        logger.logp(Level.FINE, className, mName, "No skip.  Rethrow ", e);
-	      throw new BatchContainerRuntimeException(e);
-	    }
-	  }
-	  /** 
-	   * Handle exception from a write failure.
-	   */
-	  public void handleExceptionWithRecordListWrite(Exception e, List<?> items)
-	  {
-	    final String mName = "handleExceptionWithRecordListWrite(Exception, List<?>)";
-	    
-	    if(logger.isLoggable(Level.FINER)) 
-	      logger.logp(Level.FINE, className, mName, e.getClass().getName() + "; " + this.toString());
-
-	    if (!isSkipLimitReached() && isSkippable(e))
-	    {
-	      // Skip it.  Log it.  Call the SkipListener.
-	      ++_skipCount;
-	      logSkip(e);
-
-	      if (_skipWriteListener != null) {
-	    	  for (SkipWriteListenerProxy skipWriteListenerProxy : _skipWriteListener) {
-	    		  skipWriteListenerProxy.onSkipWriteItem(items, e);
-				}
-	      }
-	    }
-	    else
-	    {
-	      System.out.println("## NO SKIP");
-	      // No skip.  Throw it back. - No, exit without throwing
-	      if(logger.isLoggable(Level.FINER)) 
-	        logger.logp(Level.FINE, className, mName, "No skip.  Rethrow ", e);
-	      throw new BatchContainerRuntimeException(e);
-	    }
-	  }
-
-
-	  /**
-	   * Check the skipCount and skippable exception lists to determine whether
-	   * the given Exception is skippable.
-	   */
-	  private boolean isSkippable(Exception e)
-	  {
-	    final String mName = "isSkippable";
-
-	    String exClassName = e.getClass().getName();
-
-	    boolean retVal = containsSkippable(_skipIncludeExceptions, e) && !containsSkippable(_skipExcludeExceptions, e);
-
-	    if(logger.isLoggable(Level.FINE)) 
-	      logger.logp(Level.FINE, className, mName, mName + ": " + retVal + ": " + exClassName);
-
-	    return retVal;
-	  }
-
-	  /**
-	   * Check whether given exception is in skippable exception list 
-	   */
-	  private boolean containsSkippable(Set<String> skipList, Exception e)
-	  {
-	    final String mName = "containsSkippable";
-	    boolean retVal = false;
-
-	    for ( Iterator it = skipList.iterator(); it.hasNext(); ) {
-	        String exClassName = (String) it.next();   
-	        try {
-	            ClassLoader tccl = Thread.currentThread().getContextClassLoader();	            
-	        	if (retVal = tccl.loadClass(exClassName).isInstance(e))
-	        		break;
-	        } catch (ClassNotFoundException cnf) {
-	        	logger.logp(Level.FINE, className, mName, cnf.getLocalizedMessage());
-	        }
-	    }
-
-	    if(logger.isLoggable(Level.FINE)) 
-	      logger.logp(Level.FINE, className, mName, mName + ": " + retVal );
-
-	    return retVal;
-	  }
-	  
-
-    /**
-     * Check if the skip limit has been reached.
-     * 
-     * Note: if skip handling isn't enabled (i.e. not configured in xJCL), then
-     * this method will always return TRUE.
-     */
-    private boolean isSkipLimitReached() {
-        // Unlimited skips if it is never defined
-        if (_skipLimit == Integer.MIN_VALUE) {
-            return false;
-        }
-
-        return (_skipCount >= _skipLimit);
-    }
-	  
-	  private void logSkip(Exception e)
-	  {
-	    Object[] details = { _jobId, _stepId, e.getClass().getName() + ": " + e.getMessage() };
-	    if(logger.isLoggable(Level.FINE)) 
-	      logger.logp(Level.FINE, className, "logSkip", "Logging details: ", details); 
-	  }
-
-
-	  public long getSkipCount()
-	  {
-	    return _skipCount;
-	  }
-
-	  public void setSkipCount(long skipCount)
-	  {
-	    final String mName = "setSkipCount";
-
-	    _skipCount = skipCount;
-
-	    if(logger.isLoggable(Level.FINE)) 
-	      logger.logp(Level.FINE, className, mName, "setSkipCount: " + _skipCount);
-	  }
-
-	  public String toString()
-	  {
-	    return "SkipHandler{" + super.toString() + "}count:limit=" + _skipCount + ":" + _skipLimit;
-	  }
-
-	
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/JSR352.Runtime/src/com/ibm/jbatch/container/impl/SplitControllerImpl.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/SplitControllerImpl.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/SplitControllerImpl.java
deleted file mode 100755
index 792b1ce..0000000
--- a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/SplitControllerImpl.java
+++ /dev/null
@@ -1,290 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- * 
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License, 
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.jbatch.container.impl;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.batch.operations.JobExecutionAlreadyCompleteException;
-import javax.batch.operations.JobExecutionNotMostRecentException;
-import javax.batch.operations.JobRestartException;
-import javax.batch.operations.JobStartException;
-import javax.batch.operations.NoSuchJobExecutionException;
-
-import com.ibm.jbatch.container.IExecutionElementController;
-import com.ibm.jbatch.container.context.impl.JobContextImpl;
-import com.ibm.jbatch.container.exception.BatchContainerRuntimeException;
-import com.ibm.jbatch.container.jobinstance.RuntimeFlowInSplitExecution;
-import com.ibm.jbatch.container.jobinstance.RuntimeJobExecution;
-import com.ibm.jbatch.container.services.IBatchKernelService;
-import com.ibm.jbatch.container.servicesmanager.ServicesManager;
-import com.ibm.jbatch.container.servicesmanager.ServicesManagerImpl;
-import com.ibm.jbatch.container.status.ExecutionStatus;
-import com.ibm.jbatch.container.status.ExtendedBatchStatus;
-import com.ibm.jbatch.container.status.SplitExecutionStatus;
-import com.ibm.jbatch.container.util.BatchFlowInSplitWorkUnit;
-import com.ibm.jbatch.container.util.BatchParallelWorkUnit;
-import com.ibm.jbatch.container.util.FlowInSplitBuilderConfig;
-import com.ibm.jbatch.jsl.model.Flow;
-import com.ibm.jbatch.jsl.model.JSLJob;
-import com.ibm.jbatch.jsl.model.Split;
-
-public class SplitControllerImpl implements IExecutionElementController {
-
-	private final static String sourceClass = SplitControllerImpl.class.getName();
-	private final static Logger logger = Logger.getLogger(sourceClass);
-
-	private final RuntimeJobExecution jobExecution;
-
-	private volatile List<BatchFlowInSplitWorkUnit> parallelBatchWorkUnits;
-
-	private final ServicesManager servicesManager;
-	private final IBatchKernelService batchKernel;
-	private final JobContextImpl jobContext;
-	private final BlockingQueue<BatchFlowInSplitWorkUnit> completedWorkQueue = new LinkedBlockingQueue<BatchFlowInSplitWorkUnit>();
-	private final long rootJobExecutionId;
-
-	final List<JSLJob> subJobs = new ArrayList<JSLJob>();
-
-	protected Split split;
-
-	public SplitControllerImpl(RuntimeJobExecution jobExecution, Split split, long rootJobExecutionId) {
-		this.jobExecution = jobExecution;
-		this.jobContext = jobExecution.getJobContext();
-		this.rootJobExecutionId = rootJobExecutionId;
-		this.split = split;
-
-		servicesManager = ServicesManagerImpl.getInstance();
-		batchKernel = servicesManager.getBatchKernelService();
-	}
-
-	@Override
-	public void stop() { 
-
-		// It's possible we may try to stop a split before any
-		// sub steps have been started.
-		synchronized (subJobs) {
-
-			if (parallelBatchWorkUnits != null) {
-				for (BatchParallelWorkUnit subJob : parallelBatchWorkUnits) {
-					try {
-						batchKernel.stopJob(subJob.getJobExecutionImpl().getExecutionId());
-					} catch (Exception e) {
-						// TODO - Is this what we want to know.  
-						// Blow up if it happens to force the issue.
-						throw new IllegalStateException(e);
-					}
-				}
-			}
-		}
-	}
-
-	@Override
-	public SplitExecutionStatus execute() throws JobRestartException, JobStartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException, NoSuchJobExecutionException {
-		String sourceMethod = "execute";
-		if (logger.isLoggable(Level.FINER)) {
-			logger.entering(sourceClass, sourceMethod, "Root JobExecution Id = " + rootJobExecutionId);
-		}
-
-		// Build all sub jobs from partitioned step
-		buildSubJobBatchWorkUnits();
-
-		// kick off the threads
-		executeWorkUnits();
-
-		// Deal with the results.
-		SplitExecutionStatus status = waitForCompletionAndAggregateStatus();
-
-		if (logger.isLoggable(Level.FINER)) {
-			logger.exiting(sourceClass, sourceMethod, status);
-		}
-
-		return status;
-	}
-
-	/**
-	 * Note we restart all flows.  There is no concept of "the flow completed".   It is only steps
-	 * within the flows that may have already completed and so may not have needed to be rerun.
-	 * 
-	 */
-	private void buildSubJobBatchWorkUnits() {
-
-		List<Flow> flows = this.split.getFlows();
-
-		parallelBatchWorkUnits = new ArrayList<BatchFlowInSplitWorkUnit>();
-
-		// Build all sub jobs from flows in split
-		synchronized (subJobs) {
-			for (Flow flow : flows) {
-				subJobs.add(PartitionedStepBuilder.buildFlowInSplitSubJob(jobExecution.getExecutionId(), jobContext, this.split, flow));
-			}
-			for (JSLJob job : subJobs) {				
-				int count = batchKernel.getJobInstanceCount(job.getId());
-				FlowInSplitBuilderConfig config = new FlowInSplitBuilderConfig(job, completedWorkQueue, rootJobExecutionId);
-				if (count == 0) {
-					parallelBatchWorkUnits.add(batchKernel.buildNewFlowInSplitWorkUnit(config));
-				} else if (count == 1) {
-					parallelBatchWorkUnits.add(batchKernel.buildOnRestartFlowInSplitWorkUnit(config));
-				} else {
-					throw new IllegalStateException("There is an inconsistency somewhere in the internal subjob creation");
-				}
-			}
-		}
-	}
-
-	private void executeWorkUnits () {
-		// Then start or restart all subjobs in parallel
-		for (BatchParallelWorkUnit work : parallelBatchWorkUnits) {
-			int count = batchKernel.getJobInstanceCount(work.getJobExecutionImpl().getJobInstance().getJobName());
-
-			assert (count <= 1);
-
-			if (count == 1) {
-				batchKernel.startGeneratedJob(work);
-			} else if (count > 1) {
-				batchKernel.restartGeneratedJob(work);
-			} else {
-				throw new IllegalStateException("There is an inconsistency somewhere in the internal subjob creation");
-			}
-		}
-	}
-
-	private SplitExecutionStatus waitForCompletionAndAggregateStatus() {
-
-		SplitExecutionStatus splitStatus = new SplitExecutionStatus();
-		ExtendedBatchStatus aggregateTerminatingStatus = null;
-
-		for (int i=0; i < subJobs.size(); i++) {
-			BatchFlowInSplitWorkUnit batchWork;
-			try {
-				batchWork = completedWorkQueue.take(); //wait for each thread to finish and then look at it's status
-			} catch (InterruptedException e) {
-				throw new BatchContainerRuntimeException(e);
-			}
-
-			RuntimeFlowInSplitExecution flowExecution = batchWork.getJobExecutionImpl();
-			ExecutionStatus flowStatus = flowExecution.getFlowStatus();
-			if (logger.isLoggable(Level.FINE)) {
-				logger.fine("Subjob " + flowExecution.getExecutionId() + "ended with flow-in-split status: " + flowStatus);
-			}
-			aggregateTerminatingStatusFromSingleFlow(aggregateTerminatingStatus, flowStatus, splitStatus);
-		}
-		
-		// If this is still set to 'null' that means all flows completed normally without terminating the job.
-		if (aggregateTerminatingStatus == null) {
-			logger.fine("Setting normal split status as no contained flows ended the job.");
-			aggregateTerminatingStatus = ExtendedBatchStatus.NORMAL_COMPLETION;
-		}
-
-		splitStatus.setExtendedBatchStatus(aggregateTerminatingStatus);
-		logger.fine("Returning from waitForCompletionAndAggregateStatus with return value: " + splitStatus);
-		return splitStatus;
-	}
-
-
-	//
-	// A <fail> and an uncaught exception are peers.  They each take precedence over a <stop>, which take precedence over an <end>.
-	// Among peers the last one seen gets to set the exit stauts.
-	//
-	private void aggregateTerminatingStatusFromSingleFlow(ExtendedBatchStatus aggregateStatus, ExecutionStatus flowStatus, 
-			SplitExecutionStatus splitStatus) {
-
-		String exitStatus = flowStatus.getExitStatus();
-		String restartOn = flowStatus.getRestartOn();
-		ExtendedBatchStatus flowBatchStatus = flowStatus.getExtendedBatchStatus();
-		
-		logger.fine("Aggregating possible terminating status for flow ending with status: " + flowStatus 
-				+ ", restartOn = " + restartOn);
-
-		if ( flowBatchStatus.equals(ExtendedBatchStatus.JSL_END) || flowBatchStatus.equals(ExtendedBatchStatus.JSL_STOP) || 
-				flowBatchStatus.equals(ExtendedBatchStatus.JSL_FAIL) || flowBatchStatus.equals(ExtendedBatchStatus.EXCEPTION_THROWN) ) {
-			if (aggregateStatus == null) {
-				logger.fine("A flow detected as ended because of a terminating condition: " + flowBatchStatus.name() 
-						+ ". First flow detected in terminating state.  Setting exitStatus if non-null.");
-				setInJobContext(flowBatchStatus, exitStatus, restartOn);
-				aggregateStatus = flowBatchStatus;
-			} else {
-				splitStatus.setCouldMoreThanOneFlowHaveTerminatedJob(true);
-				if (aggregateStatus.equals(ExtendedBatchStatus.JSL_END)) {
-					logger.warning("Current flow's batch and exit status will take precedence over and override earlier one from <end> transition element. " + 
-									"Overriding, setting exit status if non-null and preparing to end job.");
-					setInJobContext(flowBatchStatus, exitStatus, restartOn);
-					aggregateStatus = flowBatchStatus;
-				} else if (aggregateStatus.equals(ExtendedBatchStatus.JSL_STOP)) {
-					// Everything but an <end> overrides a <stop>
-					if (!(flowBatchStatus.equals(ExtendedBatchStatus.JSL_END))) {
-						logger.warning("Current flow's batch and exit status will take precedence over and override earlier one from <stop> transition element. " + 
-										"Overriding, setting exit status if non-null and preparing to end job.");
-						setInJobContext(flowBatchStatus, exitStatus, restartOn);
-						aggregateStatus = flowBatchStatus;
-					} else {
-						logger.fine("End does not override stop.  The flow with <end> will effectively be ignored with respect to terminating the job.");
-					}
-				} else if (aggregateStatus.equals(ExtendedBatchStatus.JSL_FAIL) || aggregateStatus.equals(ExtendedBatchStatus.EXCEPTION_THROWN)) {
-					if (flowBatchStatus.equals(ExtendedBatchStatus.JSL_FAIL) || flowBatchStatus.equals(ExtendedBatchStatus.EXCEPTION_THROWN)) {
-						logger.warning("Current flow's batch and exit status will take precedence over and override earlier one from <fail> transition element or exception thrown. " + 
-										"Overriding, setting exit status if non-null and preparing to end job.");
-						setInJobContext(flowBatchStatus, exitStatus, restartOn);
-						aggregateStatus = flowBatchStatus;
-					} else {
-						logger.fine("End and stop do not override exception thrown or <fail>.   The flow with <end> or <stop> will effectively be ignored with respect to terminating the job.");
-					}
-				}
-			}
-		} else {
-			logger.fine("Flow completing normally without any terminating transition or exception thrown.");
-		}
-	}
-	
-	private void setInJobContext(ExtendedBatchStatus flowBatchStatus, String exitStatus, String restartOn) {
-		if (exitStatus != null) {
-			jobContext.setExitStatus(exitStatus);
-		}			
-		if (ExtendedBatchStatus.JSL_STOP.equals(flowBatchStatus)) {
-			if (restartOn != null) {
-				jobContext.setRestartOn(restartOn);
-			}			
-		}
-	}
-	
-	public List<BatchFlowInSplitWorkUnit> getParallelJobExecs() {
-		
-		return parallelBatchWorkUnits;
-	}
-
-    @Override
-    public List<Long> getLastRunStepExecutions() {
-        
-        List<Long> stepExecIdList = new ArrayList<Long>();
-        
-        for (BatchFlowInSplitWorkUnit workUnit : parallelBatchWorkUnits) {
-            
-            List<Long> stepExecIds = workUnit.getController().getLastRunStepExecutions();
-            
-            stepExecIdList.addAll(stepExecIds);
-        }
-        
-        return stepExecIdList;
-    }
-
-
-
-}