You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@batchee.apache.org by rm...@apache.org on 2013/11/05 08:38:45 UTC

[08/62] Importing JBatch Reference Implementation from IBM. We'll fork it but this commit is to keep a track of what we forked.

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/f7740962/JSR352.Runtime/src/com/ibm/jbatch/container/impl/BatchKernelImpl.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/BatchKernelImpl.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/BatchKernelImpl.java
new file mode 100755
index 0000000..98d9fc9
--- /dev/null
+++ b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/BatchKernelImpl.java
@@ -0,0 +1,459 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ * 
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License, 
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.jbatch.container.impl;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.batch.operations.JobExecutionAlreadyCompleteException;
+import javax.batch.operations.JobExecutionNotMostRecentException;
+import javax.batch.operations.JobExecutionNotRunningException;
+import javax.batch.operations.JobRestartException;
+import javax.batch.operations.JobStartException;
+import javax.batch.operations.NoSuchJobExecutionException;
+import javax.batch.runtime.JobInstance;
+
+import com.ibm.jbatch.container.IThreadRootController;
+import com.ibm.jbatch.container.callback.IJobEndCallbackService;
+import com.ibm.jbatch.container.exception.BatchContainerServiceException;
+import com.ibm.jbatch.container.jobinstance.JobExecutionHelper;
+import com.ibm.jbatch.container.jobinstance.RuntimeFlowInSplitExecution;
+import com.ibm.jbatch.container.jobinstance.RuntimeJobExecution;
+import com.ibm.jbatch.container.services.IBatchKernelService;
+import com.ibm.jbatch.container.services.IJobExecution;
+import com.ibm.jbatch.container.services.IPersistenceManagerService;
+import com.ibm.jbatch.container.services.impl.NoOpBatchSecurityHelper;
+import com.ibm.jbatch.container.services.impl.RuntimeBatchJobUtil;
+import com.ibm.jbatch.container.servicesmanager.ServicesManager;
+import com.ibm.jbatch.container.servicesmanager.ServicesManagerImpl;
+import com.ibm.jbatch.container.util.BatchFlowInSplitWorkUnit;
+import com.ibm.jbatch.container.util.BatchPartitionWorkUnit;
+import com.ibm.jbatch.container.util.BatchWorkUnit;
+import com.ibm.jbatch.container.util.FlowInSplitBuilderConfig;
+import com.ibm.jbatch.container.util.PartitionsBuilderConfig;
+import com.ibm.jbatch.jsl.model.JSLJob;
+import com.ibm.jbatch.spi.BatchJobUtil;
+import com.ibm.jbatch.spi.BatchSPIManager;
+import com.ibm.jbatch.spi.BatchSecurityHelper;
+import com.ibm.jbatch.spi.services.IBatchConfig;
+import com.ibm.jbatch.spi.services.IBatchThreadPoolService;
+import com.ibm.jbatch.spi.services.ParallelTaskResult;
+
+public class BatchKernelImpl implements IBatchKernelService {
+
+	private final static String sourceClass = BatchKernelImpl.class.getName();
+	private final static Logger logger = Logger.getLogger(sourceClass);
+
+	private Map<Long, IThreadRootController> executionId2jobControllerMap = new ConcurrentHashMap<Long, IThreadRootController>();
+	private Set<Long> instanceIdExecutingSet = new HashSet<Long>();
+
+	ServicesManager servicesManager = ServicesManagerImpl.getInstance();
+
+	private IBatchThreadPoolService executorService = null;
+
+	private IJobEndCallbackService callbackService = null;
+
+	private IPersistenceManagerService persistenceService = null;
+
+	private BatchSecurityHelper batchSecurity = null;
+
+	private BatchJobUtil batchJobUtil = null;
+
+	public BatchKernelImpl() {
+		executorService = servicesManager.getThreadPoolService();
+		callbackService = servicesManager.getJobCallbackService();
+		persistenceService = servicesManager.getPersistenceManagerService();
+
+		// registering our implementation of the util class used to purge by apptag
+		batchJobUtil = new RuntimeBatchJobUtil();
+		BatchSPIManager.getInstance().registerBatchJobUtil(batchJobUtil);
+	}
+
+	public BatchSecurityHelper getBatchSecurityHelper() {
+		batchSecurity = BatchSPIManager.getInstance().getBatchSecurityHelper();
+		if (batchSecurity == null) { 
+			batchSecurity = new NoOpBatchSecurityHelper();
+		}
+		return batchSecurity;
+	}
+
+	public void init(IBatchConfig pgcConfig) throws BatchContainerServiceException {
+	}
+
+	@Override
+	public void shutdown() throws BatchContainerServiceException {
+		// TODO Auto-generated method stub
+
+	}
+
+	@Override
+	public IJobExecution startJob(String jobXML) throws JobStartException {
+		return startJob(jobXML, null);
+	}
+
+	@Override
+	public IJobExecution startJob(String jobXML, Properties jobParameters) throws JobStartException {
+		String method = "startJob";
+
+		if (logger.isLoggable(Level.FINER)) {
+			logger.entering(sourceClass, method, new Object[] { jobXML, jobParameters != null ? jobParameters : "<null>" });
+		}
+
+		RuntimeJobExecution jobExecution = JobExecutionHelper.startJob(jobXML, jobParameters);
+
+		// TODO - register with status manager
+
+		if (logger.isLoggable(Level.FINE)) {
+			logger.fine("JobExecution constructed: " + jobExecution);
+		}
+
+		BatchWorkUnit batchWork = new BatchWorkUnit(this, jobExecution);
+		registerCurrentInstanceAndExecution(jobExecution, batchWork.getController());
+
+		executorService.executeTask(batchWork, null);
+
+		if (logger.isLoggable(Level.FINER)) {
+			logger.exiting(sourceClass, method, jobExecution);
+		}
+
+		return jobExecution.getJobOperatorJobExecution();
+	}
+
+	@Override
+	public void stopJob(long executionId) throws NoSuchJobExecutionException, JobExecutionNotRunningException {
+
+		IThreadRootController controller = this.executionId2jobControllerMap.get(executionId);
+		if (controller == null) {
+			String msg = "JobExecution with execution id of " + executionId + "is not running.";
+			logger.warning("stopJob(): " + msg);
+			throw new JobExecutionNotRunningException(msg);
+		}
+		controller.stop();
+	}
+
+	@Override
+	public IJobExecution restartJob(long executionId) throws JobRestartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException, NoSuchJobExecutionException {
+		String method = "restartJob";
+
+		if (logger.isLoggable(Level.FINER)) {
+			logger.entering(sourceClass, method);
+		}
+
+		Properties dummyPropObj = new Properties();
+		return this.restartJob(executionId, dummyPropObj);
+	}
+
+
+
+
+	@Override
+	public IJobExecution restartJob(long executionId, Properties jobOverrideProps) throws JobRestartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException, NoSuchJobExecutionException {
+		String method = "restartJob";
+
+		if (logger.isLoggable(Level.FINER)) {
+			logger.entering(sourceClass, method);
+		}
+
+		RuntimeJobExecution jobExecution = 
+				JobExecutionHelper.restartJob(executionId, jobOverrideProps);
+
+		if (logger.isLoggable(Level.FINE)) {
+			logger.fine("JobExecution constructed: " + jobExecution);
+		}
+
+		BatchWorkUnit batchWork = new BatchWorkUnit(this, jobExecution);
+
+		registerCurrentInstanceAndExecution(jobExecution, batchWork.getController());
+
+		executorService.executeTask(batchWork, null);
+
+		if (logger.isLoggable(Level.FINER)) {
+			logger.exiting(sourceClass, method, jobExecution);
+		}
+
+		return jobExecution.getJobOperatorJobExecution();
+	}
+
+	@Override
+	public void jobExecutionDone(RuntimeJobExecution jobExecution) {
+
+		if (logger.isLoggable(Level.FINE)) {
+			logger.fine("JobExecution done with batchStatus: " + jobExecution.getBatchStatus() + " , getting ready to invoke callbacks for JobExecution: " + jobExecution.getExecutionId());
+		}
+
+		callbackService.done(jobExecution.getExecutionId());
+
+		if (logger.isLoggable(Level.FINE)) {
+			logger.fine("Done invoking callbacks for JobExecution: " + jobExecution.getExecutionId());
+		}
+
+		// Remove from executionId, instanceId map,set after job is done        
+		this.executionId2jobControllerMap.remove(jobExecution.getExecutionId());
+		this.instanceIdExecutingSet.remove(jobExecution.getInstanceId());
+
+		// AJM: ah - purge jobExecution from map here and flush to DB?
+		// edit: no long want a 2 tier for the jobexecution...do want it for step execution
+		// renamed method to flushAndRemoveStepExecution
+
+	}
+
+	public IJobExecution getJobExecution(long executionId) throws NoSuchJobExecutionException {
+		/*
+		 *  Keep logging on finest for apps like TCK which do polling
+		 */
+		logger.finest("Entering " + sourceClass + ".getJobExecution(), executionId = " + executionId);
+		IJobExecution retVal = JobExecutionHelper.getPersistedJobOperatorJobExecution(executionId);
+
+		logger.finest("Exiting " + sourceClass + ".getJobExecution(), retVal = " + retVal);
+		return retVal;
+	}
+
+	@Override
+	public void startGeneratedJob(BatchWorkUnit batchWork) {
+		String method = "startGeneratedJob";
+
+		if (logger.isLoggable(Level.FINER)) {
+			logger.entering(sourceClass, method, new Object[] { batchWork });
+		}
+
+		//This call is non-blocking
+		ParallelTaskResult result = executorService.executeParallelTask(batchWork, null);
+
+		if (logger.isLoggable(Level.FINER)) {
+			logger.exiting(sourceClass, method, new Object[] { batchWork });
+		}
+	}
+
+	@Override
+	public int getJobInstanceCount(String jobName) {
+		int jobInstanceCount = 0;
+
+		jobInstanceCount = persistenceService.jobOperatorGetJobInstanceCount(jobName);
+
+		return jobInstanceCount;
+	}
+
+	@Override
+	public JobInstance getJobInstance(long executionId){
+		return JobExecutionHelper.getJobInstance(executionId);
+	}
+
+
+	/**
+	 * Build a list of batch work units and set them up in STARTING state but don't start them yet.
+	 */
+
+	@Override
+	public List<BatchPartitionWorkUnit> buildNewParallelPartitions(PartitionsBuilderConfig config) 
+			throws JobRestartException, JobStartException {
+
+		List<JSLJob> jobModels = config.getJobModels();
+		Properties[] partitionPropertiesArray = config.getPartitionProperties();
+
+		List<BatchPartitionWorkUnit> batchWorkUnits = new ArrayList<BatchPartitionWorkUnit>(jobModels.size());
+
+		int instance = 0;
+		for (JSLJob parallelJob  : jobModels){
+			Properties partitionProps = (partitionPropertiesArray == null) ? null : partitionPropertiesArray[instance];    			
+
+			if (logger.isLoggable(Level.FINER)) {
+				logger.finer("Starting execution for jobModel = " + parallelJob.toString());
+			}
+			RuntimeJobExecution jobExecution = JobExecutionHelper.startPartition(parallelJob, partitionProps);
+			jobExecution.setPartitionInstance(instance);
+
+			if (logger.isLoggable(Level.FINE)) {
+				logger.fine("JobExecution constructed: " + jobExecution);
+			}
+			BatchPartitionWorkUnit batchWork = new BatchPartitionWorkUnit(this, jobExecution, config);
+
+			registerCurrentInstanceAndExecution(jobExecution, batchWork.getController());
+			
+			batchWorkUnits.add(batchWork);
+			instance++;
+		}
+
+		return batchWorkUnits;
+	}
+
+	@Override
+	public List<BatchPartitionWorkUnit> buildOnRestartParallelPartitions(PartitionsBuilderConfig config) throws JobRestartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException {
+
+		List<JSLJob> jobModels = config.getJobModels();
+		Properties[] partitionProperties = config.getPartitionProperties();
+
+		List<BatchPartitionWorkUnit> batchWorkUnits = new ArrayList<BatchPartitionWorkUnit>(jobModels.size());
+
+		//for now let always use a Properties array. We can add some more convenience methods later for null properties and what not
+
+		int instance = 0;
+		for (JSLJob parallelJob  : jobModels){
+
+			Properties partitionProps = (partitionProperties == null) ? null : partitionProperties[instance];    
+
+			try {
+				long execId = getMostRecentExecutionId(parallelJob);
+
+				RuntimeJobExecution jobExecution = null;
+				try {		
+					jobExecution = JobExecutionHelper.restartPartition(execId, parallelJob, partitionProps);
+					jobExecution.setPartitionInstance(instance);
+				} catch (NoSuchJobExecutionException e) {
+					String errorMsg = "Caught NoSuchJobExecutionException but this is an internal JobExecution so this shouldn't have happened: execId =" + execId;
+					logger.severe(errorMsg);
+					throw new IllegalStateException(errorMsg, e);
+				}
+
+				if (logger.isLoggable(Level.FINE)) {
+					logger.fine("JobExecution constructed: " + jobExecution);
+				}
+
+				BatchPartitionWorkUnit batchWork = new BatchPartitionWorkUnit(this, jobExecution, config);
+				registerCurrentInstanceAndExecution(jobExecution, batchWork.getController());
+
+				batchWorkUnits.add(batchWork);
+			} catch (JobExecutionAlreadyCompleteException e) {
+				logger.fine("This execution already completed: " + parallelJob.getId());
+			}
+
+			instance++;
+		}
+
+		return batchWorkUnits;
+	}
+
+	@Override
+	public void  restartGeneratedJob(BatchWorkUnit batchWork) throws JobRestartException {
+		String method = "restartGeneratedJob";
+
+		if (logger.isLoggable(Level.FINER)) {
+			logger.entering(sourceClass, method, new Object[] { batchWork });
+		}
+
+		//This call is non-blocking
+		ParallelTaskResult result = executorService.executeParallelTask(batchWork, null);
+
+		if (logger.isLoggable(Level.FINER)) {
+			logger.exiting(sourceClass, method, batchWork);
+		}
+
+	}
+
+	@Override
+	public BatchFlowInSplitWorkUnit buildNewFlowInSplitWorkUnit(FlowInSplitBuilderConfig config) {
+		JSLJob parallelJob = config.getJobModel();
+
+		RuntimeFlowInSplitExecution execution = JobExecutionHelper.startFlowInSplit(parallelJob);
+
+		if (logger.isLoggable(Level.FINE)) {
+			logger.fine("JobExecution constructed: " + execution);
+		}
+		BatchFlowInSplitWorkUnit batchWork = new BatchFlowInSplitWorkUnit(this, execution, config);
+
+		registerCurrentInstanceAndExecution(execution, batchWork.getController());
+		return batchWork;
+	}
+
+	private long getMostRecentExecutionId(JSLJob jobModel) {
+
+		//There can only be one instance associated with a subjob's id since it is generated from an unique
+		//job instance id. So there should be no way to directly start a subjob with particular
+		List<Long> instanceIds = persistenceService.jobOperatorGetJobInstanceIds(jobModel.getId(), 0, 2);
+
+		// Maybe we should blow up on '0' too?
+		if (instanceIds.size() > 1) {
+			String errorMsg = "Found " + instanceIds.size() + " entries for instance id = " + jobModel.getId() + ", which should not have happened.  Blowing up."; 
+			logger.severe(errorMsg);
+			throw new IllegalStateException(errorMsg);
+		}
+
+		List<IJobExecution> partitionExecs = persistenceService.jobOperatorGetJobExecutions(instanceIds.get(0));
+
+		Long execId = Long.MIN_VALUE;
+
+		for (IJobExecution partitionExec : partitionExecs ) {
+			if (partitionExec.getExecutionId() > execId ) {
+				execId = partitionExec.getExecutionId();
+			}
+		}
+		return execId;
+	}
+
+	@Override
+	public BatchFlowInSplitWorkUnit buildOnRestartFlowInSplitWorkUnit(FlowInSplitBuilderConfig config)  
+			throws JobRestartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException {
+
+		String method = "buildOnRestartFlowInSplitWorkUnit";
+
+		JSLJob jobModel = config.getJobModel();
+
+		if (logger.isLoggable(Level.FINER)) {
+			logger.entering(sourceClass, method, jobModel);
+		}
+
+		long execId = getMostRecentExecutionId(jobModel);
+
+		RuntimeFlowInSplitExecution jobExecution = null;
+		try {		
+			jobExecution = JobExecutionHelper.restartFlowInSplit(execId, jobModel);
+		} catch (NoSuchJobExecutionException e) {
+			String errorMsg = "Caught NoSuchJobExecutionException but this is an internal JobExecution so this shouldn't have happened: execId =" + execId;
+			logger.severe(errorMsg);
+			throw new IllegalStateException(errorMsg, e);
+		}
+
+		if (logger.isLoggable(Level.FINE)) {
+			logger.fine("JobExecution constructed: " + jobExecution);
+		}
+
+		BatchFlowInSplitWorkUnit batchWork = new BatchFlowInSplitWorkUnit(this, jobExecution, config);
+		
+		registerCurrentInstanceAndExecution(jobExecution, batchWork.getController());
+		return batchWork;
+	}
+
+	private void registerCurrentInstanceAndExecution(RuntimeJobExecution jobExecution, IThreadRootController controller) {
+		long execId = jobExecution.getExecutionId();
+		long instanceId = jobExecution.getInstanceId();
+		String errorPrefix = "Tried to execute with Job executionId = " + execId + " and instanceId = " + instanceId + " ";
+		if (executionId2jobControllerMap.get(execId) != null) {
+			String errorMsg = errorPrefix + "but executionId is already currently executing.";
+			logger.warning(errorMsg);
+			throw new IllegalStateException(errorMsg);
+		} else if (instanceIdExecutingSet.contains(instanceId)) {
+			String errorMsg = errorPrefix + "but another execution with this instanceId is already currently executing.";
+			logger.warning(errorMsg);
+			throw new IllegalStateException(errorMsg);
+		} else {
+			instanceIdExecutingSet.add(instanceId);
+			executionId2jobControllerMap.put(jobExecution.getExecutionId(), controller);
+		}
+	}
+	
+	@Override
+	public boolean isExecutionRunning(long executionId) {
+		return executionId2jobControllerMap.containsKey(executionId);
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/f7740962/JSR352.Runtime/src/com/ibm/jbatch/container/impl/BatchletStepControllerImpl.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/BatchletStepControllerImpl.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/BatchletStepControllerImpl.java
new file mode 100755
index 0000000..2129c95
--- /dev/null
+++ b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/BatchletStepControllerImpl.java
@@ -0,0 +1,133 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ * 
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License, 
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.jbatch.container.impl;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.batch.runtime.BatchStatus;
+
+
+import com.ibm.jbatch.container.artifact.proxy.BatchletProxy;
+import com.ibm.jbatch.container.artifact.proxy.InjectionReferences;
+import com.ibm.jbatch.container.artifact.proxy.ProxyFactory;
+import com.ibm.jbatch.container.context.impl.StepContextImpl;
+import com.ibm.jbatch.container.exception.BatchContainerServiceException;
+import com.ibm.jbatch.container.jobinstance.RuntimeJobExecution;
+import com.ibm.jbatch.container.util.PartitionDataWrapper;
+import com.ibm.jbatch.container.validation.ArtifactValidationException;
+import com.ibm.jbatch.jsl.model.Batchlet;
+import com.ibm.jbatch.jsl.model.Partition;
+import com.ibm.jbatch.jsl.model.Property;
+import com.ibm.jbatch.jsl.model.Step;
+
+public class BatchletStepControllerImpl extends SingleThreadedStepControllerImpl {
+
+	private final static String sourceClass = BatchletStepControllerImpl.class.getName();
+	private final static Logger logger = Logger.getLogger(sourceClass);
+
+	private BatchletProxy batchletProxy;
+
+	public BatchletStepControllerImpl(RuntimeJobExecution jobExecutionImpl, Step step, StepContextImpl stepContext, long rootJobExecutionId, BlockingQueue<PartitionDataWrapper> analyzerStatusQueue) {
+		super(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerStatusQueue);
+	}
+
+	private void invokeBatchlet(Batchlet batchlet) throws BatchContainerServiceException {
+
+		String batchletId = batchlet.getRef();
+		List<Property> propList = (batchlet.getProperties() == null) ? null : batchlet.getProperties().getPropertyList();
+
+		String sourceMethod = "invokeBatchlet";
+		if (logger.isLoggable(Level.FINER)) {
+			logger.entering(sourceClass, sourceMethod, batchletId);
+		}
+
+		String exitStatus = null;
+
+		InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, 
+				propList);
+
+		try {
+			batchletProxy = ProxyFactory.createBatchletProxy(batchletId, injectionRef, stepContext);
+		} catch (ArtifactValidationException e) {
+			throw new BatchContainerServiceException("Cannot create the batchlet [" + batchletId + "]", e);
+		}
+
+		if (logger.isLoggable(Level.FINE))
+			logger.fine("Batchlet is loaded and validated: " + batchletProxy);
+
+
+		if (wasStopIssued()) {
+			logger.fine("Exit without executing batchlet since stop() request has been received.");
+		} else {
+			String processRetVal = batchletProxy.process();
+
+			logger.fine("Set process() return value = " + processRetVal + " for possible use as exitStatus");
+			stepContext.setBatchletProcessRetVal(processRetVal);
+
+			logger.exiting(sourceClass, sourceMethod, exitStatus==null ? "<null>" : exitStatus);
+		}
+	}
+
+	protected synchronized boolean wasStopIssued() { 
+		// Might only be set to stopping at the job level.  Use the lock for this object on this
+		// method along with the stop() method 
+		if (jobExecutionImpl.getJobContext().getBatchStatus().equals(BatchStatus.STOPPING)){
+			stepContext.setBatchStatus(BatchStatus.STOPPING);
+			return true;
+		} else {
+			return false;
+		}
+	}
+	@Override
+	protected void invokeCoreStep() throws BatchContainerServiceException {
+
+		//TODO If this step is partitioned create partition artifacts
+		Partition partition = step.getPartition();
+		if (partition != null) {
+			//partition.getConcurrencyElements();
+		}
+		try {
+			invokeBatchlet(step.getBatchlet());
+		} finally {
+			invokeCollectorIfPresent();
+		}
+
+	}
+
+	@Override
+	public synchronized void stop() { 
+
+		// It is possible for stop() to be issued before process() 
+		if (BatchStatus.STARTING.equals(stepContext.getBatchStatus()) ||
+				BatchStatus.STARTED.equals(stepContext.getBatchStatus())) {
+
+			stepContext.setBatchStatus(BatchStatus.STOPPING);
+
+			if (batchletProxy != null) {
+				batchletProxy.stop();	
+			}
+		} else {
+			//TODO do we need to throw an error if the batchlet is already stopping/stopped
+			//a stop gets issued twice
+		}
+	}
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/f7740962/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ChunkHelper.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ChunkHelper.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ChunkHelper.java
new file mode 100755
index 0000000..8cda7da
--- /dev/null
+++ b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ChunkHelper.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ * 
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License, 
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package com.ibm.jbatch.container.impl;
+
+import com.ibm.jbatch.jsl.model.Chunk;
+
+public class ChunkHelper {
+
+	public static int getItemCount(Chunk chunk) {
+		String chunkSizeStr = chunk.getItemCount();
+		int size = 10;
+		
+		if (chunkSizeStr != null && !chunkSizeStr.isEmpty()) {
+			size = Integer.valueOf(chunk.getItemCount());
+		} 
+		
+		chunk.setItemCount(Integer.toString(size));
+		return size;
+	}
+    
+    public static int getTimeLimit(Chunk chunk){    
+		String chunkTimeLimitStr = chunk.getTimeLimit();
+		int timeLimit = 0; //default time limit = 0 seconds ie no timelimit
+		
+		if (chunkTimeLimitStr != null && !chunkTimeLimitStr.isEmpty()) {
+			timeLimit = Integer.valueOf(chunk.getTimeLimit());
+		} 
+		
+		chunk.setTimeLimit(Integer.toString(timeLimit));
+		return timeLimit;
+    }
+    
+    public static String getCheckpointPolicy(Chunk chunk) {
+		String checkpointPolicy = chunk.getCheckpointPolicy();
+		
+		if (checkpointPolicy != null && !checkpointPolicy.isEmpty()) {
+			if (!(checkpointPolicy.equals("item") || checkpointPolicy.equals("custom"))) {
+				throw new IllegalArgumentException("The only supported attributed values for 'checkpoint-policy' are 'item' and 'custom'.");				
+			}
+		} else {
+			checkpointPolicy = "item";
+		}
+		
+		chunk.setCheckpointPolicy(checkpointPolicy);
+		return checkpointPolicy;
+    }
+    
+    public static int getSkipLimit(Chunk chunk) {
+    	return Integer.valueOf(chunk.getSkipLimit());
+    }
+    
+    public static int getRetryLimit(Chunk chunk) {
+    	return Integer.valueOf(chunk.getRetryLimit());
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/f7740962/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ChunkStepControllerImpl.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ChunkStepControllerImpl.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ChunkStepControllerImpl.java
new file mode 100755
index 0000000..34dcffc
--- /dev/null
+++ b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ChunkStepControllerImpl.java
@@ -0,0 +1,1045 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ * 
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License, 
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.jbatch.container.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.PrintWriter;
+import java.io.Serializable;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.batch.api.chunk.CheckpointAlgorithm;
+import javax.batch.runtime.BatchStatus;
+
+import com.ibm.jbatch.container.artifact.proxy.CheckpointAlgorithmProxy;
+import com.ibm.jbatch.container.artifact.proxy.ChunkListenerProxy;
+import com.ibm.jbatch.container.artifact.proxy.InjectionReferences;
+import com.ibm.jbatch.container.artifact.proxy.ItemProcessListenerProxy;
+import com.ibm.jbatch.container.artifact.proxy.ItemProcessorProxy;
+import com.ibm.jbatch.container.artifact.proxy.ItemReadListenerProxy;
+import com.ibm.jbatch.container.artifact.proxy.ItemReaderProxy;
+import com.ibm.jbatch.container.artifact.proxy.ItemWriteListenerProxy;
+import com.ibm.jbatch.container.artifact.proxy.ItemWriterProxy;
+import com.ibm.jbatch.container.artifact.proxy.ProxyFactory;
+import com.ibm.jbatch.container.artifact.proxy.RetryProcessListenerProxy;
+import com.ibm.jbatch.container.artifact.proxy.RetryReadListenerProxy;
+import com.ibm.jbatch.container.artifact.proxy.RetryWriteListenerProxy;
+import com.ibm.jbatch.container.artifact.proxy.SkipProcessListenerProxy;
+import com.ibm.jbatch.container.artifact.proxy.SkipReadListenerProxy;
+import com.ibm.jbatch.container.artifact.proxy.SkipWriteListenerProxy;
+import com.ibm.jbatch.container.context.impl.MetricImpl;
+import com.ibm.jbatch.container.context.impl.StepContextImpl;
+import com.ibm.jbatch.container.exception.BatchContainerRuntimeException;
+import com.ibm.jbatch.container.exception.BatchContainerServiceException;
+import com.ibm.jbatch.container.jobinstance.RuntimeJobExecution;
+import com.ibm.jbatch.container.persistence.CheckpointAlgorithmFactory;
+import com.ibm.jbatch.container.persistence.CheckpointData;
+import com.ibm.jbatch.container.persistence.CheckpointDataKey;
+import com.ibm.jbatch.container.persistence.CheckpointManager;
+import com.ibm.jbatch.container.persistence.ItemCheckpointAlgorithm;
+import com.ibm.jbatch.container.services.IPersistenceManagerService;
+import com.ibm.jbatch.container.servicesmanager.ServicesManager;
+import com.ibm.jbatch.container.servicesmanager.ServicesManagerImpl;
+import com.ibm.jbatch.container.util.PartitionDataWrapper;
+import com.ibm.jbatch.container.util.TCCLObjectInputStream;
+import com.ibm.jbatch.container.validation.ArtifactValidationException;
+import com.ibm.jbatch.jsl.model.Chunk;
+import com.ibm.jbatch.jsl.model.ItemProcessor;
+import com.ibm.jbatch.jsl.model.ItemReader;
+import com.ibm.jbatch.jsl.model.ItemWriter;
+import com.ibm.jbatch.jsl.model.Property;
+import com.ibm.jbatch.jsl.model.Step;
+
+public class ChunkStepControllerImpl extends SingleThreadedStepControllerImpl {
+
+	private final static String sourceClass = ChunkStepControllerImpl.class.getName();
+	private final static Logger logger = Logger.getLogger(sourceClass);
+
+	private Chunk chunk = null;
+	private ItemReaderProxy readerProxy = null;
+	private ItemProcessorProxy processorProxy = null;
+	private ItemWriterProxy writerProxy = null;
+	private CheckpointAlgorithmProxy checkpointProxy = null;
+	private CheckpointAlgorithm chkptAlg = null;
+	private CheckpointManager checkpointManager;
+	private ServicesManager servicesManager = ServicesManagerImpl.getInstance();
+	private IPersistenceManagerService _persistenceManagerService = null;
+	private SkipHandler skipHandler = null;
+	CheckpointDataKey readerChkptDK, writerChkptDK = null;
+	CheckpointData readerChkptData = null;
+	CheckpointData writerChkptData = null;
+	List<ChunkListenerProxy> chunkListeners = null;
+	List<SkipProcessListenerProxy> skipProcessListeners = null;
+	List<SkipReadListenerProxy> skipReadListeners = null;
+	List<SkipWriteListenerProxy> skipWriteListeners = null;
+	List<RetryProcessListenerProxy> retryProcessListeners = null;
+	List<RetryReadListenerProxy> retryReadListeners = null;
+	List<RetryWriteListenerProxy> retryWriteListeners = null;
+	List<ItemReadListenerProxy> itemReadListeners = null;
+	List<ItemProcessListenerProxy> itemProcessListeners = null;
+	List<ItemWriteListenerProxy> itemWriteListeners = null;
+	private RetryHandler retryHandler;
+
+	// metrics
+	long readCount = 0;
+	long writeCount = 0;
+	long readSkipCount = 0;
+	long processSkipCount = 0;
+	long writeSkipCount = 0;
+	boolean rollbackRetry = false;
+
+	public ChunkStepControllerImpl(RuntimeJobExecution jobExecutionImpl, Step step, StepContextImpl stepContext, long rootJobExecutionId, BlockingQueue<PartitionDataWrapper> analyzerStatusQueue) {
+		super(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerStatusQueue);
+	}
+
+	/**
+	 * Utility Class to hold statuses at each level of Read-Process-Write loop
+	 * 
+	 */
+	private class ItemStatus {
+
+		public boolean isSkipped() {
+			return skipped;
+		}
+
+		public void setSkipped(boolean skipped) {
+			this.skipped = skipped;
+		}
+
+		public boolean isFiltered() {
+			return filtered;
+		}
+
+		public void setFiltered(boolean filtered) {
+			this.filtered = filtered;
+		}
+
+		public boolean isCheckPointed() {
+			return checkPointed;
+		}
+
+		public void setCheckPointed(boolean checkPointed) {
+			this.checkPointed = checkPointed;
+		}
+
+		public boolean isFinished() {
+			return finished;
+		}
+
+		public void setFinished(boolean finished) {
+			this.finished = finished;
+		}
+
+		public boolean isRetry() {
+			return retry;
+		}
+
+		public void setRetry(boolean retry) {
+			this.retry = retry;
+		}
+
+		public boolean isRollback() {
+			return rollback;
+		}
+
+		public void setRollback(boolean rollback) {
+			this.rollback = rollback;
+		}
+
+		private boolean skipped = false;
+		private boolean filtered = false;
+		private boolean finished = false;
+		private boolean checkPointed = false;
+		private boolean retry = false;
+		private boolean rollback = false;
+
+	}
+
+	/**
+	 * We read and process one item at a time but write in chunks (group of
+	 * items). So, this method loops until we either reached the end of the
+	 * reader (not more items to read), or the writer buffer is full or a
+	 * checkpoint is triggered.
+	 * 
+	 * @param chunkSize
+	 *            write buffer size
+	 * @param theStatus
+	 *            flags when the read-process reached the last record or a
+	 *            checkpoint is required
+	 * @return an array list of objects to write
+	 */
+	private List<Object> readAndProcess(int chunkSize, ItemStatus theStatus) {
+		logger.entering(sourceClass, "readAndProcess", new Object[] { chunkSize, theStatus });
+
+		List<Object> chunkToWrite = new ArrayList<Object>();
+		Object itemRead = null;
+		Object itemProcessed = null;
+		int readProcessedCount = 0;
+
+		while (true) {
+			ItemStatus status = new ItemStatus();
+			itemRead = readItem(status);
+
+			if (status.isRollback()) {
+				theStatus.setRollback(true);
+				// inc rollbackCount
+				stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
+				break;
+			}
+
+			if (!status.isSkipped() && !status.isFinished()) {
+				itemProcessed = processItem(itemRead, status);
+
+				if (status.isRollback()) {
+					theStatus.setRollback(true);
+					// inc rollbackCount
+					stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
+					break;
+				}
+
+				if (!status.isSkipped() && !status.isFiltered()) {
+					chunkToWrite.add(itemProcessed);
+					readProcessedCount++;
+				}
+			}
+
+			theStatus.setFinished(status.isFinished());
+			theStatus.setCheckPointed(checkpointManager.ApplyCheckPointPolicy());
+
+			// This will force the current item to finish processing on a stop
+			// request
+			if (stepContext.getBatchStatus().equals(BatchStatus.STOPPING)) {
+				theStatus.setFinished(true);
+			}
+
+			// write buffer size reached
+			if ((readProcessedCount == chunkSize) && (checkpointProxy.getCheckpointType() != "custom")) {
+				break;
+			}
+
+			// checkpoint reached
+			if (theStatus.isCheckPointed()) {
+				break;
+			}
+
+			// last record in readerProxy reached
+			if (theStatus.isFinished()) {
+				break;
+			}
+
+		}
+		logger.exiting(sourceClass, "readAndProcess", chunkToWrite);
+		return chunkToWrite;
+	}
+
+	/**
+	 * Reads an item from the reader
+	 * 
+	 * @param status
+	 *            flags the current read status
+	 * @return the item read
+	 */
+	private Object readItem(ItemStatus status) {
+		logger.entering(sourceClass, "readItem", status);
+		Object itemRead = null;
+
+		try {
+			// call read listeners before and after the actual read
+			for (ItemReadListenerProxy readListenerProxy : itemReadListeners) {
+				readListenerProxy.beforeRead();
+			}
+
+			itemRead = readerProxy.readItem();
+
+			for (ItemReadListenerProxy readListenerProxy : itemReadListeners) {
+				readListenerProxy.afterRead(itemRead);
+			}
+
+			// itemRead == null means we reached the end of
+			// the readerProxy "resultset"
+			status.setFinished(itemRead == null);
+			if (!status.isFinished()) {
+				stepContext.getMetric(MetricImpl.MetricType.READ_COUNT).incValue();
+			}
+		} catch (Exception e) {
+			stepContext.setException(e);
+			for (ItemReadListenerProxy readListenerProxy : itemReadListeners) {
+				readListenerProxy.onReadError(e);
+			}
+			if(!rollbackRetry) {
+				if (retryReadException(e)) {
+					for (ItemReadListenerProxy readListenerProxy : itemReadListeners) {
+						readListenerProxy.onReadError(e);
+					}
+					// if not a rollback exception, just retry the current item
+					if (!retryHandler.isRollbackException(e)) {
+						itemRead = readItem(status);
+					} else {
+						status.setRollback(true);
+						rollbackRetry = true;
+						// inc rollbackCount
+						stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
+					}
+				}
+				else if(skipReadException(e)) {
+					status.setSkipped(true);
+					stepContext.getMetric(MetricImpl.MetricType.READ_SKIP_COUNT).incValue();
+
+				}
+				else {
+					throw new BatchContainerRuntimeException(e);
+				}
+			}
+			else {
+				// coming from a rollback retry
+				if(skipReadException(e)) {
+					status.setSkipped(true);
+					stepContext.getMetric(MetricImpl.MetricType.READ_SKIP_COUNT).incValue();
+
+				}
+				else if (retryReadException(e)) {
+					if (!retryHandler.isRollbackException(e)) {
+						itemRead = readItem(status);
+					}
+					else {
+						status.setRollback(true);
+						// inc rollbackCount
+						stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
+					}
+				}
+				else {
+					throw new BatchContainerRuntimeException(e);
+				}
+			}
+
+		} catch (Throwable e) {
+			throw new BatchContainerRuntimeException(e);
+		}
+
+		logger.exiting(sourceClass, "readItem", itemRead==null ? "<null>" : itemRead);
+		return itemRead;
+	}
+
+	/**
+	 * Process an item previously read by the reader
+	 * 
+	 * @param itemRead
+	 *            the item read
+	 * @param status
+	 *            flags the current process status
+	 * @return the processed item
+	 */
+	private Object processItem(Object itemRead, ItemStatus status) {
+		logger.entering(sourceClass, "processItem", new Object[] { itemRead, status });
+		Object processedItem = null;
+
+		// if no processor defined for this chunk
+		if (processorProxy == null){
+			return itemRead;
+		}
+
+		try {
+
+			// call process listeners before and after the actual process call
+			for (ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
+				processListenerProxy.beforeProcess(itemRead);
+			}
+
+			processedItem = processorProxy.processItem(itemRead);
+
+			if (processedItem == null) {
+				// inc filterCount
+				stepContext.getMetric(MetricImpl.MetricType.FILTER_COUNT).incValue();
+				status.setFiltered(true);
+			}
+
+			for (ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
+				processListenerProxy.afterProcess(itemRead, processedItem);
+			}
+		} catch (Exception e) {
+			for (ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
+				processListenerProxy.onProcessError(processedItem, e);
+			}
+			if(!rollbackRetry) {
+				if (retryProcessException(e, itemRead)) {
+					if (!retryHandler.isRollbackException(e)) {
+						// call process listeners before and after the actual
+						// process call
+						for (ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
+							processListenerProxy.beforeProcess(itemRead);
+						}
+						processedItem = processItem(itemRead, status);
+						if (processedItem == null) {
+							// inc filterCount
+							stepContext.getMetric(MetricImpl.MetricType.FILTER_COUNT).incValue();
+							status.setFiltered(true);
+						}
+
+						for (ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
+							processListenerProxy.afterProcess(itemRead, processedItem);
+						}
+					} else {
+						status.setRollback(true);
+						rollbackRetry = true;
+						// inc rollbackCount
+						stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
+					}
+				}
+				else if (skipProcessException(e, itemRead)) {
+					status.setSkipped(true);
+					stepContext.getMetric(MetricImpl.MetricType.PROCESS_SKIP_COUNT).incValue();
+				}
+				else {
+					throw new BatchContainerRuntimeException(e);
+				}
+			}
+			else {
+				if (skipProcessException(e, itemRead)) {
+					status.setSkipped(true);
+					stepContext.getMetric(MetricImpl.MetricType.PROCESS_SKIP_COUNT).incValue();
+				} else if (retryProcessException(e, itemRead)) {
+					if (!retryHandler.isRollbackException(e)) {
+						// call process listeners before and after the actual
+						// process call
+						for (ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
+							processListenerProxy.beforeProcess(itemRead);
+						}
+						processedItem = processItem(itemRead, status);
+						if (processedItem == null) {
+							// inc filterCount
+							stepContext.getMetric(MetricImpl.MetricType.FILTER_COUNT).incValue();
+							status.setFiltered(true);
+						}
+
+						for (ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
+							processListenerProxy.afterProcess(itemRead, processedItem);
+						}
+					} else {
+						status.setRollback(true);
+						rollbackRetry = true;
+						// inc rollbackCount
+						stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
+					}
+				} else {
+					throw new BatchContainerRuntimeException(e);
+				}
+			}
+
+		} catch (Throwable e) {
+			throw new BatchContainerRuntimeException(e);
+		}
+
+		logger.exiting(sourceClass, "processItem", processedItem==null ? "<null>" : processedItem);
+		return processedItem;
+	}
+
+	/**
+	 * Writes items
+	 * 
+	 * @param theChunk
+	 *            the array list with all items processed ready to be written
+	 */
+	private void writeChunk(List<Object> theChunk, ItemStatus status) {
+		logger.entering(sourceClass, "writeChunk", theChunk);
+		if (!theChunk.isEmpty()) {
+			try {
+
+				// call read listeners before and after the actual read
+				for (ItemWriteListenerProxy writeListenerProxy : itemWriteListeners) {
+					writeListenerProxy.beforeWrite(theChunk);
+				}
+
+				writerProxy.writeItems(theChunk);
+
+				for (ItemWriteListenerProxy writeListenerProxy : itemWriteListeners) {
+					writeListenerProxy.afterWrite(theChunk);
+				}
+				stepContext.getMetric(MetricImpl.MetricType.WRITE_COUNT).incValueBy(theChunk.size());
+			} catch (Exception e) {
+				this.stepContext.setException(e);
+				for (ItemWriteListenerProxy writeListenerProxy : itemWriteListeners) {
+					writeListenerProxy.onWriteError(theChunk, e);
+				}
+				if(!rollbackRetry)
+				{
+					if (retryWriteException(e, theChunk)) {
+						if (!retryHandler.isRollbackException(e)) {
+							writeChunk(theChunk, status);
+						} else {
+							rollbackRetry = true;
+							status.setRollback(true);
+							// inc rollbackCount
+							stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
+						}
+					} else if (skipWriteException(e, theChunk)) {
+						stepContext.getMetric(MetricImpl.MetricType.WRITE_SKIP_COUNT).incValueBy(1);
+					} else {
+						throw new BatchContainerRuntimeException(e);
+					}
+
+				}
+				else {
+					if (skipWriteException(e, theChunk)) {
+						stepContext.getMetric(MetricImpl.MetricType.WRITE_SKIP_COUNT).incValueBy(1);
+					} else if (retryWriteException(e, theChunk)) {
+						if (!retryHandler.isRollbackException(e)) {
+							status.setRetry(true);
+							writeChunk(theChunk, status);
+						} else {
+							rollbackRetry = true;
+							status.setRollback(true);
+							// inc rollbackCount
+							stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
+						}
+					} else {
+						throw new BatchContainerRuntimeException(e);
+					}
+				}
+
+			} catch (Throwable e) {
+				throw new BatchContainerRuntimeException(e);
+			}
+		}
+		logger.exiting(sourceClass, "writeChunk");
+	}
+
+	/**
+	 * Main Read-Process-Write loop
+	 * 
+	 * @throws Exception
+	 */
+	private void invokeChunk() {
+		logger.entering(sourceClass, "invokeChunk2");
+
+		int itemCount = ChunkHelper.getItemCount(chunk);
+		int timeInterval = ChunkHelper.getTimeLimit(chunk);
+		List<Object> chunkToWrite = new ArrayList<Object>();
+		boolean checkPointed = true;
+		boolean rollback = false;
+		Throwable caughtThrowable = null;
+
+		// begin new transaction at first iteration or after a checkpoint commit
+
+		try {
+			transactionManager.begin();
+			this.openReaderAndWriter();
+			transactionManager.commit();
+
+			while (true) {
+
+				if (checkPointed || rollback) {
+					if (this.checkpointProxy.getCheckpointType() == "custom" ){
+						int newtimeOut = this.checkpointManager.checkpointTimeout();
+						transactionManager.setTransactionTimeout(newtimeOut);
+					}
+					transactionManager.begin();
+					for (ChunkListenerProxy chunkProxy : chunkListeners) {
+						chunkProxy.beforeChunk();
+					}
+
+					if (rollback) {
+						positionReaderAtCheckpoint();
+						positionWriterAtCheckpoint();
+						checkpointManager = new CheckpointManager(readerProxy, writerProxy,
+								getCheckpointAlgorithm(itemCount, timeInterval), jobExecutionImpl.getExecutionId(), jobExecutionImpl
+								.getJobInstance().getInstanceId(), step.getId());
+					}
+				}
+
+				ItemStatus status = new ItemStatus();
+
+				if (rollback) {
+					rollback = false;
+				}
+
+				chunkToWrite = readAndProcess(itemCount, status);
+
+				if (status.isRollback()) {
+					itemCount = 1;
+					rollback = true;
+
+					readerProxy.close();
+					writerProxy.close();
+
+					transactionManager.rollback();
+
+					continue;
+				}
+
+				writeChunk(chunkToWrite, status);
+
+				if (status.isRollback()) {
+					itemCount = 1;
+					rollback = true;
+
+					readerProxy.close();
+					writerProxy.close();
+
+					transactionManager.rollback();
+
+					continue;
+				}
+				checkPointed = status.isCheckPointed();
+
+				// we could finish the chunk in 3 conditions: buffer is full,
+				// checkpoint, not more input
+				if (status.isCheckPointed() || status.isFinished()) {
+					// TODO: missing before checkpoint listeners
+					// 1.- check if spec list proper steps for before checkpoint
+					// 2.- ask Andy about retry
+					// 3.- when do we stop?
+
+							checkpointManager.checkpoint();
+
+							for (ChunkListenerProxy chunkProxy : chunkListeners) {
+								chunkProxy.afterChunk();
+							}
+
+							this.persistUserData();
+
+							this.chkptAlg.beginCheckpoint();
+
+							transactionManager.commit();
+
+							this.chkptAlg.endCheckpoint();
+
+							invokeCollectorIfPresent();
+
+							// exit loop when last record is written
+							if (status.isFinished()) {
+								transactionManager.begin();
+
+								readerProxy.close();
+								writerProxy.close();
+
+								transactionManager.commit();
+								// increment commitCount
+								stepContext.getMetric(MetricImpl.MetricType.COMMIT_COUNT).incValue();
+								break;
+							} else {
+								// increment commitCount
+								stepContext.getMetric(MetricImpl.MetricType.COMMIT_COUNT).incValue();
+							}
+
+				}
+
+			}
+		} catch (Exception e) {
+			caughtThrowable = e;
+			logger.log(Level.SEVERE, "Failure in Read-Process-Write Loop", e);
+			// Only try to call onError() if we have an Exception, but not an Error.
+			for (ChunkListenerProxy chunkProxy : chunkListeners) {
+				try {
+					chunkProxy.onError(e);
+				} catch (Exception e1) {
+					StringWriter sw = new StringWriter();
+					PrintWriter pw = new PrintWriter(sw);
+					e1.printStackTrace(pw);
+					logger.warning("Caught secondary exception when calling chunk listener onError() with stack trace: " + sw.toString() + "\n. Will continue to remaining chunk listeners (if any) and rethrow wrapping the primary exception.");
+				}
+			}
+		} catch (Throwable t) {
+			caughtThrowable = t;
+			logger.log(Level.SEVERE, "Failure in Read-Process-Write Loop", t);
+		} finally {
+			if (caughtThrowable != null) {
+				transactionManager.setRollbackOnly();
+				logger.warning("Caught throwable in chunk processing. Attempting to close all readers and writers.");
+				readerProxy.close();
+				writerProxy.close();
+				transactionManager.rollback();
+				logger.exiting(sourceClass, "invokeChunk");
+				throw new BatchContainerRuntimeException("Failure in Read-Process-Write Loop", caughtThrowable);
+			} else {
+				logger.finest("Exiting normally");
+				logger.exiting(sourceClass, "invokeChunk");
+			}
+		}
+	}
+
+	protected void invokeCoreStep() throws BatchContainerServiceException {
+
+		this.chunk = step.getChunk();
+
+		initializeChunkArtifacts();
+		
+		invokeChunk();
+	}
+
+	private CheckpointAlgorithm getCheckpointAlgorithm(int itemCount, int timeInterval) {
+		CheckpointAlgorithm alg = null;
+
+		if (checkpointProxy.getCheckpointType() == "item") {
+			alg = new ItemCheckpointAlgorithm();
+			((ItemCheckpointAlgorithm) alg).setThresholds(itemCount, timeInterval);
+		} else { // custom chkpt alg
+			alg = (CheckpointAlgorithm) checkpointProxy;
+		}
+
+		return alg;
+	}
+
+	/*
+	 * Initialize itemreader, itemwriter, and item processor checkpoint
+	 */
+	private void initializeChunkArtifacts() {
+		String sourceMethod = "initializeChunkArtifacts";
+		if (logger.isLoggable(Level.FINE))
+			logger.entering(sourceClass, sourceMethod);
+
+		int itemCount = ChunkHelper.getItemCount(chunk);
+		int timeInterval = ChunkHelper.getTimeLimit(chunk);
+		String checkpointPolicy = ChunkHelper.getCheckpointPolicy(chunk);
+
+		ItemReader itemReader = chunk.getReader();
+		List<Property> itemReaderProps = itemReader.getProperties() == null ? null : itemReader.getProperties().getPropertyList();
+		try {
+			InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, 
+					itemReaderProps);
+
+			readerProxy = ProxyFactory.createItemReaderProxy(itemReader.getRef(), injectionRef, stepContext);
+
+			if (logger.isLoggable(Level.FINE)) {
+				logger.fine("Created ItemReaderProxy for " + itemReader.getRef());
+			}
+		} catch (ArtifactValidationException e) {
+			throw new BatchContainerServiceException("Cannot create the ItemReader [" + itemReader.getRef() + "]", e);
+		}
+
+		ItemProcessor itemProcessor = chunk.getProcessor();
+		if (itemProcessor != null){
+			List<Property> itemProcessorProps = itemProcessor.getProperties() == null ? null : itemProcessor.getProperties().getPropertyList();
+			try {
+
+				InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, 
+						itemProcessorProps);
+
+				processorProxy = ProxyFactory.createItemProcessorProxy(itemProcessor.getRef(), injectionRef, stepContext);
+				if (logger.isLoggable(Level.FINE)) {
+					logger.fine("Created ItemProcessorProxy for " + itemProcessor.getRef());
+				}
+			} catch (ArtifactValidationException e) {
+				throw new BatchContainerServiceException("Cannot create the ItemProcessor [" + itemProcessor.getRef() + "]", e);
+			}
+		}
+
+		ItemWriter itemWriter = chunk.getWriter();
+		List<Property> itemWriterProps = itemWriter.getProperties() == null ? null : itemWriter.getProperties().getPropertyList();
+		try {
+			InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, 
+					itemWriterProps);
+
+			writerProxy = ProxyFactory.createItemWriterProxy(itemWriter.getRef(), injectionRef, stepContext);
+			if (logger.isLoggable(Level.FINE)) {
+				logger.fine("Created ItemWriterProxy for " + itemWriter.getRef());
+			}
+		} catch (ArtifactValidationException e) {
+			throw new BatchContainerServiceException("Cannot create the ItemWriter [" + itemWriter.getRef() + "]", e);
+		}
+
+		try {
+			List<Property> propList = null;
+
+			if (chunk.getCheckpointAlgorithm() != null) {
+
+				propList = (chunk.getCheckpointAlgorithm().getProperties() == null) ? null : chunk.getCheckpointAlgorithm().getProperties()
+						.getPropertyList();
+			}
+
+			InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, 
+					propList);
+
+			checkpointProxy = CheckpointAlgorithmFactory.getCheckpointAlgorithmProxy(step, injectionRef, stepContext);
+			if (logger.isLoggable(Level.FINE)) {
+				logger.fine("Created CheckpointAlgorithmProxy for policy [" + checkpointPolicy + "]");
+			}
+		} catch (ArtifactValidationException e) {
+			throw new BatchContainerServiceException("Cannot create the CheckpointAlgorithm for policy [" + chunk.getCheckpointPolicy()
+					+ "]", e);
+		}
+
+		InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, 
+				null);
+
+		this.chunkListeners = jobExecutionImpl.getListenerFactory().getChunkListeners(step, injectionRef, stepContext);
+		this.skipProcessListeners = jobExecutionImpl.getListenerFactory().getSkipProcessListeners(step, injectionRef, stepContext);
+		this.skipReadListeners = jobExecutionImpl.getListenerFactory().getSkipReadListeners(step, injectionRef, stepContext);
+		this.skipWriteListeners = jobExecutionImpl.getListenerFactory().getSkipWriteListeners(step, injectionRef, stepContext);
+		this.retryProcessListeners = jobExecutionImpl.getListenerFactory().getRetryProcessListeners(step, injectionRef, stepContext);
+		this.retryReadListeners = jobExecutionImpl.getListenerFactory().getRetryReadListeners(step, injectionRef, stepContext);
+		this.retryWriteListeners = jobExecutionImpl.getListenerFactory().getRetryWriteListeners(step, injectionRef, stepContext);
+		this.itemReadListeners = jobExecutionImpl.getListenerFactory().getItemReadListeners(step, injectionRef, stepContext);
+		this.itemProcessListeners = jobExecutionImpl.getListenerFactory().getItemProcessListeners(step, injectionRef, stepContext);
+		this.itemWriteListeners = jobExecutionImpl.getListenerFactory().getItemWriteListeners(step, injectionRef, stepContext);
+
+		if (checkpointProxy.getCheckpointType() == "item") {
+			chkptAlg = new ItemCheckpointAlgorithm();
+			((ItemCheckpointAlgorithm) chkptAlg).setThresholds(itemCount, timeInterval);
+		} else { // custom chkpt alg
+			chkptAlg = (CheckpointAlgorithm) checkpointProxy;
+		}
+
+		if (logger.isLoggable(Level.FINE)) {
+			logger.fine("Setting contexts for chunk artifacts");
+		}
+
+		if (logger.isLoggable(Level.FINE))
+			logger.fine("Initialize checkpoint manager with item-count=" + itemCount);
+		logger.fine("Initialize checkpoint manager with time-interval=" + timeInterval);
+
+		checkpointManager = new CheckpointManager(readerProxy, writerProxy, chkptAlg, jobExecutionImpl.getExecutionId(), jobExecutionImpl
+				.getJobInstance().getInstanceId(), step.getId());
+
+		skipHandler = new SkipHandler(chunk, jobExecutionImpl.getJobInstance().getInstanceId(), step.getId());
+		skipHandler.addSkipProcessListener(skipProcessListeners);
+		skipHandler.addSkipReadListener(skipReadListeners);
+		skipHandler.addSkipWriteListener(skipWriteListeners);
+
+		retryHandler = new RetryHandler(chunk, jobExecutionImpl.getJobInstance().getInstanceId(), step.getId());
+
+		retryHandler.addRetryProcessListener(retryProcessListeners);
+		retryHandler.addRetryReadListener(retryReadListeners);
+		retryHandler.addRetryWriteListener(retryWriteListeners);
+
+		if (logger.isLoggable(Level.FINE))
+			logger.exiting(sourceClass, sourceMethod);
+	}
+
+	private void openReaderAndWriter() {
+		String sourceMethod = "openReaderAndWriter";
+
+		if (logger.isLoggable(Level.FINE))
+			logger.entering(sourceClass, sourceMethod);
+
+		_persistenceManagerService = servicesManager.getPersistenceManagerService();
+		readerChkptDK = new CheckpointDataKey(jobExecutionImpl.getJobInstance().getInstanceId(), step.getId(), "READER");
+		CheckpointData readerChkptData = _persistenceManagerService.getCheckpointData(readerChkptDK);
+		try {
+
+			// check for data in backing store
+			if (readerChkptData != null) {
+
+				byte[] readertoken = readerChkptData.getRestartToken();
+				ByteArrayInputStream readerChkptBA = new ByteArrayInputStream(readertoken);
+				TCCLObjectInputStream readerOIS = null;
+				try {
+					readerOIS = new TCCLObjectInputStream(readerChkptBA);
+					readerProxy.open((Serializable) readerOIS.readObject());
+					readerOIS.close();
+				} catch (Exception ex) {
+					// is this what I should be throwing here?
+							throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + step.getId() + "]", ex);
+				}
+			} else {
+				// no chkpt data exists in the backing store
+				readerChkptData = null;
+				readerProxy.open(null);
+			}
+		} catch (ClassCastException e) {
+			logger.warning("Expected CheckpointData but found" + readerChkptData );
+			throw new IllegalStateException("Expected CheckpointData but found" + readerChkptData );
+		}
+
+		writerChkptDK = new CheckpointDataKey(jobExecutionImpl.getJobInstance().getInstanceId(), step.getId(), "WRITER");
+		CheckpointData writerChkptData = _persistenceManagerService.getCheckpointData(writerChkptDK);
+
+		try {
+			// check for data in backing store
+			if (writerChkptData != null) {
+				byte[] writertoken = writerChkptData.getRestartToken();
+				ByteArrayInputStream writerChkptBA = new ByteArrayInputStream(writertoken);
+				TCCLObjectInputStream writerOIS = null;
+				try {
+					writerOIS = new TCCLObjectInputStream(writerChkptBA);
+					writerProxy.open((Serializable) writerOIS.readObject());
+					writerOIS.close();
+				} catch (Exception ex) {
+					// is this what I should be throwing here?
+							throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + step.getId() + "]", ex);
+				}
+			} else {
+				// no chkpt data exists in the backing store
+				writerChkptData = null;
+				writerProxy.open(null);
+			}
+		} catch (ClassCastException e) {
+			logger.warning("Expected Checkpoint but found" + writerChkptData);
+			throw new IllegalStateException("Expected Checkpoint but found" + writerChkptData);
+		}
+
+		// set up metrics
+		// stepContext.addMetric(MetricImpl.Counter.valueOf("READ_COUNT"), 0);
+		// stepContext.addMetric(MetricImpl.Counter.valueOf("WRITE_COUNT"), 0);
+		// stepContext.addMetric(MetricImpl.Counter.valueOf("READ_SKIP_COUNT"),
+		// 0);
+		// stepContext.addMetric(MetricImpl.Counter.valueOf("PROCESS_SKIP_COUNT"),
+		// 0);
+		// stepContext.addMetric(MetricImpl.Counter.valueOf("WRITE_SKIP_COUNT"),
+		// 0);
+
+		if (logger.isLoggable(Level.FINE))
+			logger.exiting(sourceClass, sourceMethod);
+	}
+
+	@Override
+	public void stop() {
+		stepContext.setBatchStatus(BatchStatus.STOPPING);
+
+		// we don't need to call stop on the chunk implementation here since a
+		// chunk always returns control to
+		// the batch container after every item.
+
+	}
+
+	boolean skipReadException(Exception e) {
+
+		try {
+			skipHandler.handleExceptionRead(e);
+		} catch (BatchContainerRuntimeException bcre) {
+			return false;
+		}
+
+		return true;
+
+	}
+
+	boolean retryReadException(Exception e) {
+
+		try {
+			retryHandler.handleExceptionRead(e);
+		} catch (BatchContainerRuntimeException bcre) {
+			return false;
+		}
+
+		return true;
+
+	}
+
+	boolean skipProcessException(Exception e, Object record) {
+
+		try {
+			skipHandler.handleExceptionWithRecordProcess(e, record);
+		} catch (BatchContainerRuntimeException bcre) {
+			return false;
+		}
+
+		return true;
+
+	}
+
+	boolean retryProcessException(Exception e, Object record) {
+
+		try {
+			retryHandler.handleExceptionProcess(e, record);
+		} catch (BatchContainerRuntimeException bcre) {
+			return false;
+		}
+
+		return true;
+
+	}
+
+	boolean skipWriteException(Exception e, List<Object> chunkToWrite) {
+
+
+
+		try {
+			skipHandler.handleExceptionWithRecordListWrite(e, chunkToWrite);
+		} catch (BatchContainerRuntimeException bcre) {
+			return false;
+		}
+
+
+		return true;
+
+	}
+
+	boolean retryWriteException(Exception e, List<Object> chunkToWrite) {
+
+		try {
+			retryHandler.handleExceptionWrite(e, chunkToWrite);
+		} catch (BatchContainerRuntimeException bcre) {
+			return false;
+		}
+
+		return true;
+
+	}
+
+	private void positionReaderAtCheckpoint() {
+		_persistenceManagerService = servicesManager.getPersistenceManagerService();
+		readerChkptDK = new CheckpointDataKey(jobExecutionImpl.getJobInstance().getInstanceId(), step.getId(), "READER");
+
+		CheckpointData readerData = _persistenceManagerService.getCheckpointData(readerChkptDK);
+		try {
+			// check for data in backing store
+			if (readerData != null) {
+				byte[] readertoken = readerData.getRestartToken();
+				ByteArrayInputStream readerChkptBA = new ByteArrayInputStream(readertoken);
+				TCCLObjectInputStream readerOIS = null;
+				try {
+					readerOIS = new TCCLObjectInputStream(readerChkptBA);
+					readerProxy.open((Serializable) readerOIS.readObject());
+					readerOIS.close();
+				} catch (Exception ex) {
+					// is this what I should be throwing here?
+							throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + step.getId() + "]", ex);
+				}
+			} else {
+				// no chkpt data exists in the backing store
+				readerData = null;
+				readerProxy.open(null);
+			}
+		} catch (ClassCastException e) {
+			throw new IllegalStateException("Expected CheckpointData but found" + readerData);
+		}
+	}
+
+	private void positionWriterAtCheckpoint() {
+		_persistenceManagerService = servicesManager.getPersistenceManagerService();
+		writerChkptDK = new CheckpointDataKey(jobExecutionImpl.getJobInstance().getInstanceId(), step.getId(), "WRITER");
+
+		CheckpointData writerData =  _persistenceManagerService.getCheckpointData(writerChkptDK);
+
+		try {
+			// check for data in backing store
+			if (writerData != null) {
+				byte[] writertoken = writerData.getRestartToken();
+				ByteArrayInputStream writerChkptBA = new ByteArrayInputStream(writertoken);
+				TCCLObjectInputStream writerOIS = null;
+				try {
+					writerOIS = new TCCLObjectInputStream(writerChkptBA);
+					writerProxy.open((Serializable) writerOIS.readObject());
+					writerOIS.close();
+				} catch (Exception ex) {
+					// is this what I should be throwing here?
+							throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + step.getId() + "]", ex);
+				}
+			} else {
+				// no chkpt data exists in the backing store
+				writerData = null;
+				writerProxy.open(null);
+			}
+		} catch (ClassCastException e) {
+			throw new IllegalStateException("Expected CheckpointData but found" + writerData);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/f7740962/JSR352.Runtime/src/com/ibm/jbatch/container/impl/DecisionControllerImpl.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/DecisionControllerImpl.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/DecisionControllerImpl.java
new file mode 100755
index 0000000..0595c48
--- /dev/null
+++ b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/DecisionControllerImpl.java
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ * 
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License, 
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.jbatch.container.impl;
+
+import java.util.List;
+import java.util.logging.Logger;
+
+import javax.batch.runtime.StepExecution;
+
+import com.ibm.jbatch.container.IExecutionElementController;
+import com.ibm.jbatch.container.artifact.proxy.DeciderProxy;
+import com.ibm.jbatch.container.artifact.proxy.InjectionReferences;
+import com.ibm.jbatch.container.artifact.proxy.ProxyFactory;
+import com.ibm.jbatch.container.exception.BatchContainerRuntimeException;
+import com.ibm.jbatch.container.exception.BatchContainerServiceException;
+import com.ibm.jbatch.container.jobinstance.RuntimeJobExecution;
+import com.ibm.jbatch.container.jsl.ExecutionElement;
+import com.ibm.jbatch.container.services.IPersistenceManagerService;
+import com.ibm.jbatch.container.servicesmanager.ServicesManagerImpl;
+import com.ibm.jbatch.container.status.ExecutionStatus;
+import com.ibm.jbatch.container.status.ExtendedBatchStatus;
+import com.ibm.jbatch.container.validation.ArtifactValidationException;
+import com.ibm.jbatch.jsl.model.Decision;
+import com.ibm.jbatch.jsl.model.Property;
+
+public class DecisionControllerImpl implements IExecutionElementController {
+
+	private final static String sourceClass = SplitControllerImpl.class.getName();
+	private final static Logger logger = Logger.getLogger(sourceClass);
+
+	private RuntimeJobExecution jobExecution; 
+
+	private Decision decision;
+
+	private StepExecution[] previousStepExecutions = null;
+
+	private IPersistenceManagerService persistenceService = null;
+
+	public DecisionControllerImpl(RuntimeJobExecution jobExecution, Decision decision) {
+		this.jobExecution = jobExecution;
+		this.decision = decision;
+		persistenceService = ServicesManagerImpl.getInstance().getPersistenceManagerService();
+	}
+
+	@Override
+	public ExecutionStatus execute() {
+
+		String deciderId = decision.getRef();
+		List<Property> propList = (decision.getProperties() == null) ? null : decision.getProperties().getPropertyList();
+
+		DeciderProxy deciderProxy;
+
+		//Create a decider proxy and inject the associated properties
+
+		/* Set the contexts associated with this scope */
+		//job context is always in scope
+		//the parent controller will only pass one valid context to a decision controller
+		//so two of these contexts will always be null
+		InjectionReferences injectionRef = new InjectionReferences(jobExecution.getJobContext(), null, propList);
+
+		try {
+			deciderProxy = ProxyFactory.createDeciderProxy(deciderId,injectionRef );
+		} catch (ArtifactValidationException e) {
+			throw new BatchContainerServiceException("Cannot create the decider [" + deciderId + "]", e);
+		}
+
+		String exitStatus = deciderProxy.decide(this.previousStepExecutions);
+
+		logger.fine("Decider exiting and setting job-level exit status to " + exitStatus);
+
+		//Set the value returned from the decider as the job context exit status.
+		this.jobExecution.getJobContext().setExitStatus(exitStatus);
+
+		return new ExecutionStatus(ExtendedBatchStatus.NORMAL_COMPLETION, exitStatus);
+	}
+
+	protected void setPreviousStepExecutions(ExecutionElement previousExecutionElement, IExecutionElementController previousElementController) { 
+		if (previousExecutionElement == null) {
+			// only job context is available to the decider 
+		} else if (previousExecutionElement instanceof Decision) {
+			
+			throw new BatchContainerRuntimeException("A decision cannot precede another decision.");
+			
+		}
+		
+		List<Long> previousStepExecsIds = previousElementController.getLastRunStepExecutions();
+		
+		StepExecution[] stepExecArray = new StepExecution[previousStepExecsIds.size()];
+		
+		for (int i=0; i < stepExecArray.length; i++) {
+		    StepExecution stepExec = persistenceService.getStepExecutionByStepExecutionId(previousStepExecsIds.get(i));
+		    stepExecArray[i] = stepExec;
+		}
+
+		this.previousStepExecutions =  stepExecArray;
+		
+	}
+
+
+	@Override
+	public void stop() { 
+		// no-op
+	}
+
+    @Override
+    public List<Long> getLastRunStepExecutions() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/f7740962/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ExecutionElementControllerFactory.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ExecutionElementControllerFactory.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ExecutionElementControllerFactory.java
new file mode 100755
index 0000000..ad504b7
--- /dev/null
+++ b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ExecutionElementControllerFactory.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ * 
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License, 
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.jbatch.container.impl;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+
+import com.ibm.jbatch.container.context.impl.StepContextImpl;
+import com.ibm.jbatch.container.jobinstance.RuntimeJobExecution;
+import com.ibm.jbatch.container.util.PartitionDataWrapper;
+import com.ibm.jbatch.jsl.model.Batchlet;
+import com.ibm.jbatch.jsl.model.Chunk;
+import com.ibm.jbatch.jsl.model.Decision;
+import com.ibm.jbatch.jsl.model.Flow;
+import com.ibm.jbatch.jsl.model.Partition;
+import com.ibm.jbatch.jsl.model.Split;
+import com.ibm.jbatch.jsl.model.Step;
+
+public class ExecutionElementControllerFactory {
+
+	private final static String CLASSNAME = ExecutionElementControllerFactory.class.getName();
+	private final static Logger logger = Logger.getLogger(CLASSNAME);
+
+	public static BaseStepControllerImpl getStepController(RuntimeJobExecution jobExecutionImpl, Step step, StepContextImpl stepContext, long rootJobExecutionId,  BlockingQueue<PartitionDataWrapper> analyzerQueue) {
+
+		String methodName = "getStepController";
+
+		if(logger.isLoggable(Level.FINER)) { logger.logp (Level.FINER, CLASSNAME, methodName, "Get StepController for", step.getId());}
+
+		Partition partition = step.getPartition();
+		if (partition != null) {
+
+			if (partition.getMapper() != null ) {
+				if (logger.isLoggable(Level.FINER)) {
+					logger.logp(Level.FINER, CLASSNAME, methodName, "Found partitioned step with mapper" , step);
+				}
+				return new PartitionedStepControllerImpl(jobExecutionImpl, step, stepContext, rootJobExecutionId);
+			}
+
+			if (partition.getPlan() != null) {
+				if (partition.getPlan().getPartitions() != null) {
+					if (logger.isLoggable(Level.FINER)) {
+						logger.logp(Level.FINER, CLASSNAME, methodName, "Found partitioned step with plan", step);
+					}
+					return new PartitionedStepControllerImpl(jobExecutionImpl, step, stepContext, rootJobExecutionId);
+				}
+			}
+		}
+
+		Batchlet batchlet = step.getBatchlet();
+		if (batchlet != null) {
+			if(logger.isLoggable(Level.FINER)) {  
+				logger.finer("Found batchlet: " + batchlet + ", with ref= " + batchlet.getRef());
+			}
+			if (step.getChunk() != null) {
+				throw new IllegalArgumentException("Step contains both a batchlet and a chunk.  Aborting.");
+			}       
+			return new BatchletStepControllerImpl(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerQueue);
+		} else {
+			Chunk chunk = step.getChunk();
+			if(logger.isLoggable(Level.FINER)) {  
+				logger.finer("Found chunk: " + chunk);
+			}
+			if (chunk == null) {
+				throw new IllegalArgumentException("Step does not contain either a batchlet or a chunk.  Aborting.");
+			}
+			return new ChunkStepControllerImpl(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerQueue);
+		}           
+	} 
+
+	public static DecisionControllerImpl getDecisionController(RuntimeJobExecution jobExecutionImpl, Decision decision) {
+		return new DecisionControllerImpl(jobExecutionImpl, decision);
+	} 
+	
+	public static FlowControllerImpl getFlowController(RuntimeJobExecution jobExecutionImpl, Flow flow, long rootJobExecutionId) {
+		return new FlowControllerImpl(jobExecutionImpl, flow, rootJobExecutionId);
+	} 
+	
+	public static SplitControllerImpl getSplitController(RuntimeJobExecution jobExecutionImpl, Split split, long rootJobExecutionId) {
+		return new SplitControllerImpl(jobExecutionImpl, split, rootJobExecutionId);
+	}  
+}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/f7740962/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ExecutionTransitioner.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ExecutionTransitioner.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ExecutionTransitioner.java
new file mode 100755
index 0000000..665f6e4
--- /dev/null
+++ b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ExecutionTransitioner.java
@@ -0,0 +1,275 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ * 
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License, 
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.jbatch.container.impl;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.logging.Logger;
+
+import javax.batch.runtime.BatchStatus;
+
+import com.ibm.jbatch.container.IController;
+import com.ibm.jbatch.container.IExecutionElementController;
+import com.ibm.jbatch.container.context.impl.JobContextImpl;
+import com.ibm.jbatch.container.context.impl.StepContextImpl;
+import com.ibm.jbatch.container.exception.BatchContainerRuntimeException;
+import com.ibm.jbatch.container.jobinstance.RuntimeJobExecution;
+import com.ibm.jbatch.container.jsl.ExecutionElement;
+import com.ibm.jbatch.container.jsl.IllegalTransitionException;
+import com.ibm.jbatch.container.jsl.Transition;
+import com.ibm.jbatch.container.jsl.TransitionElement;
+import com.ibm.jbatch.container.navigator.ModelNavigator;
+import com.ibm.jbatch.container.status.ExtendedBatchStatus;
+import com.ibm.jbatch.container.status.ExecutionStatus;
+import com.ibm.jbatch.container.util.PartitionDataWrapper;
+import com.ibm.jbatch.jsl.model.Decision;
+import com.ibm.jbatch.jsl.model.End;
+import com.ibm.jbatch.jsl.model.Fail;
+import com.ibm.jbatch.jsl.model.Flow;
+import com.ibm.jbatch.jsl.model.JSLJob;
+import com.ibm.jbatch.jsl.model.Split;
+import com.ibm.jbatch.jsl.model.Step;
+import com.ibm.jbatch.jsl.model.Stop;
+
+public class ExecutionTransitioner {
+
+	private final static String CLASSNAME = ExecutionTransitioner.class.getName();
+	private final static Logger logger = Logger.getLogger(CLASSNAME);
+
+	private RuntimeJobExecution jobExecution;
+	private long rootJobExecutionId;
+	private ModelNavigator<?> modelNavigator;
+	
+	// 'volatile' since it receives stop on separate thread.
+	private volatile IExecutionElementController currentStoppableElementController;
+	private IExecutionElementController previousElementController;
+	private ExecutionElement currentExecutionElement = null;
+	private ExecutionElement previousExecutionElement = null;
+
+	
+	private JobContextImpl jobContext;
+	private BlockingQueue<PartitionDataWrapper> analyzerQueue = null;
+	
+	private List<Long> stepExecIds;
+	
+	public ExecutionTransitioner(RuntimeJobExecution jobExecution, long rootJobExecutionId, ModelNavigator<?> modelNavigator) {
+		this.jobExecution = jobExecution;
+		this.rootJobExecutionId = rootJobExecutionId;
+		this.modelNavigator = modelNavigator;
+		this.jobContext = jobExecution.getJobContext(); 
+	}
+	
+	public ExecutionTransitioner(RuntimeJobExecution jobExecution, long rootJobExecutionId, ModelNavigator<JSLJob> jobNavigator, BlockingQueue<PartitionDataWrapper> analyzerQueue) {
+		this.jobExecution = jobExecution;
+		this.rootJobExecutionId = rootJobExecutionId;
+		this.modelNavigator = jobNavigator;
+		this.jobContext = jobExecution.getJobContext(); 
+		this.analyzerQueue = analyzerQueue;
+	}
+	
+	/**
+	 * Used for job and flow.
+	 * @return
+	 */
+	public ExecutionStatus doExecutionLoop() {
+
+		final String methodName = "doExecutionLoop";
+		
+		try {
+			currentExecutionElement = modelNavigator.getFirstExecutionElement(jobExecution.getRestartOn());
+		} catch (IllegalTransitionException e) {
+			String errorMsg = "Could not transition to first execution element within job.";
+			logger.warning(errorMsg);
+			throw new IllegalArgumentException(errorMsg, e);
+		}
+
+		logger.fine("First execution element = " + currentExecutionElement.getId());
+
+		while (true) {
+
+			if (jobContext.getBatchStatus().equals(BatchStatus.STOPPING)) {
+				logger.fine(methodName + " Exiting execution loop as job is now in stopping state.");
+				return new ExecutionStatus(ExtendedBatchStatus.JOB_OPERATOR_STOPPING);
+			}
+			
+			IExecutionElementController currentElementController = getNextElementController();
+			currentStoppableElementController = currentElementController;
+			
+			ExecutionStatus status = currentElementController.execute();
+
+			// Nothing special for decision or step except to get exit status.  For flow and split we want to bubble up though.
+			if ((currentExecutionElement instanceof Split) || (currentExecutionElement instanceof Flow)) {
+				// Exit status and restartOn should both be in the job context.
+				if (!status.getExtendedBatchStatus().equals(ExtendedBatchStatus.NORMAL_COMPLETION)) {
+					logger.fine("Breaking out of loop with return status = " + status.getExtendedBatchStatus().name());
+					return status;
+				}
+			} 
+
+			// Seems like this should only happen if an Error is thrown at the step level, since normally a step-level
+			// exception is caught and the fact that it was thrown capture in the ExecutionStatus
+			if (jobContext.getBatchStatus().equals(BatchStatus.FAILED)) {
+				String errorMsg = "Sub-execution returned its own BatchStatus of FAILED.  Deal with this by throwing exception to the next layer.";
+				logger.warning(errorMsg);
+				throw new BatchContainerRuntimeException(errorMsg);
+			}
+
+			// set the execution element controller to null so we don't try to call stop on it after the element has finished executing
+			currentStoppableElementController = null;
+			
+			logger.fine("Done executing element=" + currentExecutionElement.getId() + ", exitStatus=" + status.getExitStatus());
+
+			if (jobContext.getBatchStatus().equals(BatchStatus.STOPPING)) {
+				logger.fine(methodName + " Exiting as job has been stopped");
+				return new ExecutionStatus(ExtendedBatchStatus.JOB_OPERATOR_STOPPING);
+			}
+
+			Transition nextTransition = null;
+			try {
+				nextTransition = modelNavigator.getNextTransition(currentExecutionElement, status);
+			} catch (IllegalTransitionException e) {
+				String errorMsg = "Problem transitioning to next execution element.";
+				logger.warning(errorMsg);
+				throw new BatchContainerRuntimeException(errorMsg, e);
+			} 
+
+			//
+			// We will find ourselves in one of four states now.  
+			// 
+			// 1. Finished transitioning after a normal execution, but nothing to do 'next'.
+			// 2. We just executed a step which through an exception, but didn't match a transition element.
+			// 3. We are going to 'next' to another execution element (and jump back to the top of this '
+			//    'while'-loop.
+			// 4. We matched a terminating transition element (<end>, <stop> or <fail).
+			//
+			
+			// 1.
+			if (nextTransition.isFinishedTransitioning()) {
+				logger.fine(methodName + "No next execution element, and no transition element found either.  Looks like we're done and ready for COMPLETED state.");
+				this.stepExecIds =  currentElementController.getLastRunStepExecutions();
+				// Consider just passing the last 'status' back, but let's unwrap the exit status and pass a new NORMAL_COMPLETION
+				// status back instead.
+				return new ExecutionStatus(ExtendedBatchStatus.NORMAL_COMPLETION, status.getExitStatus());
+			// 2.
+			} else if (nextTransition.noTransitionElementMatchedAfterException()) {
+				return new ExecutionStatus(ExtendedBatchStatus.EXCEPTION_THROWN, status.getExitStatus());
+			// 3.
+			} else if (nextTransition.getNextExecutionElement() != null) {
+				// hold on to the previous execution element for the decider
+				// we need it because we need to inject the context of the
+				// previous execution element into the decider
+				previousExecutionElement = currentExecutionElement;
+				previousElementController = currentElementController;
+				currentExecutionElement = nextTransition.getNextExecutionElement();
+			// 4.
+			} else if (nextTransition.getTransitionElement() != null) {
+				ExecutionStatus terminatingStatus = handleTerminatingTransitionElement(nextTransition.getTransitionElement());
+				logger.finer(methodName + " , Breaking out of execution loop after processing terminating transition element.");
+				return terminatingStatus;
+			} else {
+				throw new IllegalStateException("Not sure how we'd end up in this state...aborting rather than looping.");
+			}
+		}
+	}
+
+	
+	private IExecutionElementController getNextElementController() {
+		IExecutionElementController elementController =null;
+
+		if (currentExecutionElement instanceof Decision) {
+			Decision decision = (Decision)currentExecutionElement;
+			elementController = ExecutionElementControllerFactory.getDecisionController(jobExecution, decision);			
+			DecisionControllerImpl decisionController = (DecisionControllerImpl)elementController;
+			decisionController.setPreviousStepExecutions(previousExecutionElement, previousElementController);
+		} else if (currentExecutionElement instanceof Flow) {
+			Flow flow = (Flow)currentExecutionElement;
+			elementController = ExecutionElementControllerFactory.getFlowController(jobExecution, flow, rootJobExecutionId);
+		} else if (currentExecutionElement instanceof Split) {
+			Split split = (Split)currentExecutionElement;
+			elementController = ExecutionElementControllerFactory.getSplitController(jobExecution, split, rootJobExecutionId);
+		} else if (currentExecutionElement instanceof Step) {
+			Step step = (Step)currentExecutionElement;
+			StepContextImpl stepContext = new StepContextImpl(step.getId());
+			elementController = ExecutionElementControllerFactory.getStepController(jobExecution, step, stepContext, rootJobExecutionId, analyzerQueue);
+		}
+		logger.fine("Next execution element controller = " + elementController);
+		return elementController;
+	}
+			
+			
+	private ExecutionStatus handleTerminatingTransitionElement(TransitionElement transitionElement) {
+
+		ExecutionStatus retVal;
+		
+		logger.fine("Found terminating transition element (stop, end, or fail).");
+
+		if (transitionElement instanceof Stop) {
+
+			Stop stopElement = (Stop)transitionElement;
+			String restartOn = stopElement.getRestart();
+			String exitStatusFromJSL = stopElement.getExitStatus();
+			logger.fine("Next transition element is a <stop> : " + transitionElement + " with restartOn=" + restartOn + 
+					" , and JSL exit status = " + exitStatusFromJSL);
+
+			retVal = new ExecutionStatus(ExtendedBatchStatus.JSL_STOP);
+			
+			if (exitStatusFromJSL != null) {
+				jobContext.setExitStatus(exitStatusFromJSL);  
+				retVal.setExitStatus(exitStatusFromJSL);  
+			}
+			if (restartOn != null) {
+				jobContext.setRestartOn(restartOn);				
+				retVal.setRestartOn(restartOn);				
+			}
+		} else if (transitionElement instanceof End) {
+
+			End endElement = (End)transitionElement;
+			String exitStatusFromJSL = endElement.getExitStatus();
+			logger.fine("Next transition element is an <end> : " + transitionElement + 
+					" with JSL exit status = " + exitStatusFromJSL);
+			retVal = new ExecutionStatus(ExtendedBatchStatus.JSL_END);
+			if (exitStatusFromJSL != null) {
+				jobContext.setExitStatus(exitStatusFromJSL);  
+				retVal.setExitStatus(exitStatusFromJSL);  
+			}
+		} else if (transitionElement instanceof Fail) {
+
+			Fail failElement = (Fail)transitionElement;
+			String exitStatusFromJSL = failElement.getExitStatus();
+			logger.fine("Next transition element is a <fail> : " + transitionElement + 
+					" with JSL exit status = " + exitStatusFromJSL);
+			retVal = new ExecutionStatus(ExtendedBatchStatus.JSL_FAIL);
+			if (exitStatusFromJSL != null) {
+				jobContext.setExitStatus(exitStatusFromJSL);  
+				retVal.setExitStatus(exitStatusFromJSL);  
+			}
+		} else {
+			throw new IllegalStateException("Not sure how we'd get here...aborting.");
+		}
+		return retVal;
+	}
+
+	public IController getCurrentStoppableElementController() {
+		return currentStoppableElementController;
+	}
+
+    public List<Long> getStepExecIds() {
+        return stepExecIds;
+    }
+
+
+}
\ No newline at end of file