You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@batchee.apache.org by rm...@apache.org on 2013/11/05 08:38:44 UTC

[07/62] Importing JBatch Reference Implementation from IBM. We'll fork it but this commit is to keep a track of what we forked.

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/f7740962/JSR352.Runtime/src/com/ibm/jbatch/container/impl/FlowControllerImpl.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/FlowControllerImpl.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/FlowControllerImpl.java
new file mode 100755
index 0000000..255d871
--- /dev/null
+++ b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/FlowControllerImpl.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ * 
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License, 
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.jbatch.container.impl;
+
+import java.util.List;
+import java.util.logging.Logger;
+
+import javax.batch.runtime.BatchStatus;
+
+import com.ibm.jbatch.container.IController;
+import com.ibm.jbatch.container.IExecutionElementController;
+import com.ibm.jbatch.container.context.impl.JobContextImpl;
+import com.ibm.jbatch.container.jobinstance.RuntimeJobExecution;
+import com.ibm.jbatch.container.navigator.ModelNavigator;
+import com.ibm.jbatch.container.navigator.NavigatorFactory;
+import com.ibm.jbatch.container.services.IPersistenceManagerService;
+import com.ibm.jbatch.container.servicesmanager.ServicesManagerImpl;
+import com.ibm.jbatch.container.status.ExtendedBatchStatus;
+import com.ibm.jbatch.container.status.ExecutionStatus;
+import com.ibm.jbatch.jsl.model.Flow;
+
+public class FlowControllerImpl implements IExecutionElementController {
+
+	private final static String CLASSNAME = FlowControllerImpl.class.getName();
+	private final static Logger logger = Logger.getLogger(CLASSNAME);
+
+	private final RuntimeJobExecution jobExecution;
+	private final JobContextImpl jobContext;
+
+	protected ModelNavigator<Flow> flowNavigator;
+
+	protected Flow flow;
+	private long rootJobExecutionId;
+
+	private ExecutionTransitioner transitioner;
+
+	//
+	// The currently executing controller, this will only be set to the 
+	// local variable reference when we are ready to accept stop events for
+	// this execution.
+	private volatile IController currentStoppableElementController = null;
+	
+    private static IPersistenceManagerService _persistenceManagementService = ServicesManagerImpl.getInstance().getPersistenceManagerService();
+
+
+	public FlowControllerImpl(RuntimeJobExecution jobExecution, Flow flow, long rootJobExecutionId) {
+		this.jobExecution = jobExecution;
+		this.jobContext = jobExecution.getJobContext();
+		this.flowNavigator = NavigatorFactory.createFlowNavigator(flow);
+		this.flow = flow;
+		this.rootJobExecutionId = rootJobExecutionId;
+	}
+
+	@Override
+	public ExecutionStatus execute() {
+		if (!jobContext.getBatchStatus().equals(BatchStatus.STOPPING)) { 
+			transitioner = new ExecutionTransitioner(jobExecution, rootJobExecutionId, flowNavigator);
+			return transitioner.doExecutionLoop();
+		} else {
+			return new ExecutionStatus(ExtendedBatchStatus.JOB_OPERATOR_STOPPING);
+		}
+	}
+
+
+	@Override
+	public void stop() { 
+		// Since this is not a top-level controller, don't try to filter based on existing status.. just pass
+		// along the stop().
+		IController stoppableElementController = transitioner.getCurrentStoppableElementController();
+		if (stoppableElementController != null) {
+			stoppableElementController.stop();
+		}
+	}
+
+    @Override
+    public List<Long> getLastRunStepExecutions() {
+        return this.transitioner.getStepExecIds();
+    }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/f7740962/JSR352.Runtime/src/com/ibm/jbatch/container/impl/FlowInSplitThreadRootControllerImpl.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/FlowInSplitThreadRootControllerImpl.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/FlowInSplitThreadRootControllerImpl.java
new file mode 100755
index 0000000..f680586
--- /dev/null
+++ b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/FlowInSplitThreadRootControllerImpl.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ * 
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License, 
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.jbatch.container.impl;
+
+import com.ibm.jbatch.container.jobinstance.RuntimeFlowInSplitExecution;
+import com.ibm.jbatch.container.status.ExtendedBatchStatus;
+import com.ibm.jbatch.container.status.ExecutionStatus;
+import com.ibm.jbatch.container.util.FlowInSplitBuilderConfig;
+
+public class FlowInSplitThreadRootControllerImpl extends JobThreadRootControllerImpl {
+
+	private final static String CLASSNAME = FlowInSplitThreadRootControllerImpl.class.getName();
+	
+	// Careful, we have a separately named reference to the same object in the parent class
+	RuntimeFlowInSplitExecution flowInSplitExecution;
+	
+	public FlowInSplitThreadRootControllerImpl(RuntimeFlowInSplitExecution flowInSplitExecution, FlowInSplitBuilderConfig config) {
+		super(flowInSplitExecution, config.getRootJobExecutionId());
+		this.flowInSplitExecution = flowInSplitExecution;
+	}
+	
+	@Override
+	/**
+	 * Not only are we setting the status correctly at the subjob level, we are also setting it on the execution
+	 * so that it is visible by the parent split.
+	 */
+	public ExecutionStatus originateExecutionOnThread() {
+		ExecutionStatus status = super.originateExecutionOnThread();
+		flowInSplitExecution.setFlowStatus(status);
+		return status;
+	}
+	
+	@Override 
+	protected void batchStatusFailedFromException() {
+		super.batchStatusFailedFromException();
+		flowInSplitExecution.getFlowStatus().setExtendedBatchStatus(ExtendedBatchStatus.EXCEPTION_THROWN);
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/f7740962/JSR352.Runtime/src/com/ibm/jbatch/container/impl/JobControllerImpl.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/JobControllerImpl.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/JobControllerImpl.java
new file mode 100755
index 0000000..75e1332
--- /dev/null
+++ b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/JobControllerImpl.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ * 
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License, 
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.jbatch.container.impl;
+
+import com.ibm.jbatch.container.jobinstance.RuntimeJobExecution;
+
+public class JobControllerImpl extends JobThreadRootControllerImpl {
+
+	private final static String CLASSNAME = JobControllerImpl.class.getName();
+	
+	private JobControllerImpl(RuntimeJobExecution jobExecution, long rootJobExecutionId) {
+		super(jobExecution, rootJobExecutionId);
+	}
+
+	public JobControllerImpl(RuntimeJobExecution jobExecution) {
+		this(jobExecution, jobExecution.getExecutionId());
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/f7740962/JSR352.Runtime/src/com/ibm/jbatch/container/impl/JobThreadRootControllerImpl.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/JobThreadRootControllerImpl.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/JobThreadRootControllerImpl.java
new file mode 100755
index 0000000..3e30ab2
--- /dev/null
+++ b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/JobThreadRootControllerImpl.java
@@ -0,0 +1,328 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ * 
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License, 
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.jbatch.container.impl;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.sql.Timestamp;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.batch.runtime.BatchStatus;
+
+import com.ibm.jbatch.container.IController;
+import com.ibm.jbatch.container.IThreadRootController;
+import com.ibm.jbatch.container.artifact.proxy.InjectionReferences;
+import com.ibm.jbatch.container.artifact.proxy.JobListenerProxy;
+import com.ibm.jbatch.container.artifact.proxy.ListenerFactory;
+import com.ibm.jbatch.container.context.impl.JobContextImpl;
+import com.ibm.jbatch.container.jobinstance.RuntimeJobExecution;
+import com.ibm.jbatch.container.navigator.ModelNavigator;
+import com.ibm.jbatch.container.services.IJobStatusManagerService;
+import com.ibm.jbatch.container.services.IPersistenceManagerService;
+import com.ibm.jbatch.container.servicesmanager.ServicesManagerImpl;
+import com.ibm.jbatch.container.status.ExtendedBatchStatus;
+import com.ibm.jbatch.container.status.ExecutionStatus;
+import com.ibm.jbatch.container.util.PartitionDataWrapper;
+import com.ibm.jbatch.jsl.model.JSLJob;
+import com.ibm.jbatch.jsl.model.JSLProperties;
+import com.ibm.jbatch.jsl.model.Property;
+
+public abstract class JobThreadRootControllerImpl implements IThreadRootController {
+
+	private final static String CLASSNAME = JobThreadRootControllerImpl.class.getName();
+	private final static Logger logger = Logger.getLogger(CLASSNAME);
+
+	protected RuntimeJobExecution jobExecution;
+	protected JobContextImpl jobContext;
+	protected long rootJobExecutionId;
+	protected long jobInstanceId;
+	protected IJobStatusManagerService jobStatusService;
+	protected IPersistenceManagerService persistenceService;
+	private ListenerFactory listenerFactory = null;
+
+	private ExecutionTransitioner transitioner; 
+	protected final ModelNavigator<JSLJob> jobNavigator;
+	private BlockingQueue<PartitionDataWrapper> analyzerQueue;
+
+	public JobThreadRootControllerImpl(RuntimeJobExecution jobExecution, long rootJobExecutionId) {
+		this.jobExecution = jobExecution;
+		this.jobContext = jobExecution.getJobContext();
+		this.rootJobExecutionId = rootJobExecutionId;
+		this.jobInstanceId = jobExecution.getInstanceId();
+		this.jobStatusService = ServicesManagerImpl.getInstance().getJobStatusManagerService();
+		this.persistenceService = ServicesManagerImpl.getInstance().getPersistenceManagerService();
+		this.jobNavigator = jobExecution.getJobNavigator();
+		setupListeners();
+	}
+
+	public JobThreadRootControllerImpl(RuntimeJobExecution jobExecution, long rootJobExecutionId, BlockingQueue<PartitionDataWrapper> analyzerQueue) {
+		this(jobExecution, rootJobExecutionId);
+		this.analyzerQueue = analyzerQueue;
+	}
+	
+	/*
+	 * By not passing the rootJobExecutionId, we are "orphaning" the subjob execution and making it not findable from the parent.  
+	 * This is exactly what we want for getStepExecutions()... we don't want it to get extraneous entries for the partitions.   
+	 */
+	public JobThreadRootControllerImpl(RuntimeJobExecution jobExecution, BlockingQueue<PartitionDataWrapper> analyzerQueue) {
+		this(jobExecution, jobExecution.getExecutionId());
+		this.analyzerQueue = analyzerQueue;
+	}
+
+	@Override
+	public ExecutionStatus originateExecutionOnThread() {
+		String methodName = "executeJob";
+		logger.entering(CLASSNAME, methodName);
+
+		ExecutionStatus retVal = null;
+		try {
+			// Check if we've already gotten the stop() command.
+			if (!jobContext.getBatchStatus().equals(BatchStatus.STOPPING)) { 
+
+				// Now that we're ready to start invoking artifacts, set the status to 'STARTED'
+				markJobStarted();
+
+				jobListenersBeforeJob();
+
+				// --------------------
+				// The BIG loop transitioning 
+				// within the job !!!
+				// --------------------
+				transitioner = new ExecutionTransitioner(jobExecution, rootJobExecutionId, jobNavigator, analyzerQueue);
+				retVal = transitioner.doExecutionLoop();
+				ExtendedBatchStatus extBatchStatus = retVal.getExtendedBatchStatus();
+				switch (extBatchStatus)  {
+					case JSL_STOP : 		jslStop();
+											break;
+					case JSL_FAIL : 		updateJobBatchStatus(BatchStatus.FAILED);
+											break;
+					case EXCEPTION_THROWN : updateJobBatchStatus(BatchStatus.FAILED);
+											break;
+				}
+			}
+		} catch (Throwable t) {
+			// We still want to try to call the afterJob() listener and persist the batch and exit
+			// status for the failure in an orderly fashion.  So catch and continue.
+			logWarning("Caught throwable in main execution loop", t);
+			batchStatusFailedFromException();
+		}
+
+		endOfJob();
+
+		logger.exiting(CLASSNAME, methodName);
+		return retVal;
+	}
+
+	protected void setContextProperties() {
+		JSLJob jobModel = jobExecution.getJobNavigator().getRootModelElement();
+		JSLProperties jslProps = jobModel.getProperties();
+
+		if (jslProps != null) {
+			Properties contextProps = jobContext.getProperties();
+			for (Property property : jslProps.getPropertyList()) {
+				contextProps.setProperty(property.getName(), property.getValue());
+			}	
+		}
+	}
+
+	protected void jslStop() {
+		String restartOn = jobContext.getRestartOn();	
+		logger.fine("Logging JSL stop(): exitStatus = " + jobContext.getExitStatus() + ", restartOn = " +restartOn );
+		batchStatusStopping();
+		jobStatusService.updateJobStatusFromJSLStop(jobInstanceId, restartOn);
+		return;
+	}
+
+	protected void markJobStarted() {
+		updateJobBatchStatus(BatchStatus.STARTED);
+		long time = System.currentTimeMillis();
+		Timestamp timestamp = new Timestamp(time);
+		jobExecution.setLastUpdateTime(timestamp);
+		jobExecution.setStartTime(timestamp);
+		persistenceService.markJobStarted(jobExecution.getExecutionId(), timestamp);
+	}
+
+	/*
+	 *  Follow similar pattern for end of step in BaseStepControllerImpl
+	 *  
+	 *  1. Execute the very last artifacts (jobListener)
+	 *  2. transition to final batch status
+	 *  3. default ExitStatus if necessary
+	 *  4. persist statuses and end time data
+	 *  
+	 *  We don't want to give up on the orderly process of 2,3,4, if we blow up 
+	 *  in after job, so catch that and keep on going.
+	 */
+	protected void endOfJob() {
+
+
+		// 1. Execute the very last artifacts (jobListener)
+		try {
+			jobListenersAfterJob();
+		} catch (Throwable t) {
+			StringWriter sw = new StringWriter();
+			PrintWriter pw = new PrintWriter(sw);
+			t.printStackTrace(pw);
+			logger.warning("Error invoking jobListener.afterJob(). Stack trace: " + sw.toString());
+			batchStatusFailedFromException();
+		}
+
+		// 2. transition to final batch status
+		transitionToFinalBatchStatus();
+
+		// 3. default ExitStatus if necessary
+		if (jobContext.getExitStatus() == null) {
+			logger.fine("No job-level exitStatus set, defaulting to job batch Status = " + jobContext.getBatchStatus());
+			jobContext.setExitStatus(jobContext.getBatchStatus().name());
+		}
+
+		// 4. persist statuses and end time data
+		logger.fine("Job complete for job id=" + jobExecution.getJobInstance().getJobName() + ", executionId=" + jobExecution.getExecutionId() 
+				+ ", batchStatus=" + jobContext.getBatchStatus() + ", exitStatus=" + jobContext.getExitStatus());
+		persistJobBatchAndExitStatus();
+
+	}
+
+	private void persistJobBatchAndExitStatus() {
+		BatchStatus batchStatus = jobContext.getBatchStatus();
+
+		// Take a current timestamp for last updated no matter what the status.
+		long time = System.currentTimeMillis();
+		Timestamp timestamp = new Timestamp(time);
+		jobExecution.setLastUpdateTime(timestamp);
+
+		// Perhaps these should be coordinated in a tran but probably better still would be
+		// rethinking the table design to let the database provide us consistently with a single update.
+		jobStatusService.updateJobBatchStatus(jobInstanceId, batchStatus);
+		jobStatusService.updateJobExecutionStatus(jobExecution.getInstanceId(), jobContext.getBatchStatus(), jobContext.getExitStatus());
+
+		if (batchStatus.equals(BatchStatus.COMPLETED) || batchStatus.equals(BatchStatus.STOPPED) ||  
+				batchStatus.equals(BatchStatus.FAILED)) {
+
+			jobExecution.setEndTime(timestamp);
+			persistenceService.updateWithFinalExecutionStatusesAndTimestamps(jobExecution.getExecutionId(), 
+					batchStatus, jobContext.getExitStatus(), timestamp);
+		} else {
+			throw new IllegalStateException("Not expected to encounter batchStatus of " + batchStatus +" at this point.  Aborting.");
+		}
+	}
+
+	/**
+	 * The only valid states at this point are STARTED or STOPPING.   Shouldn't have
+	 * been able to get to COMPLETED, STOPPED, or FAILED at this point in the code.
+	 */
+
+	private void transitionToFinalBatchStatus() {
+		BatchStatus currentBatchStatus = jobContext.getBatchStatus();
+		if (currentBatchStatus.equals(BatchStatus.STARTED)) {
+			updateJobBatchStatus(BatchStatus.COMPLETED);
+		} else if (currentBatchStatus.equals(BatchStatus.STOPPING)) {
+			updateJobBatchStatus(BatchStatus.STOPPED);
+		} else if (currentBatchStatus.equals(BatchStatus.FAILED)) {
+			updateJobBatchStatus(BatchStatus.FAILED);  // Should have already been done but maybe better for possible code refactoring to have it here.
+		} else {
+			throw new IllegalStateException("Step batch status should not be in a " + currentBatchStatus.name() + " state");
+		}
+	}
+
+	protected void updateJobBatchStatus(BatchStatus batchStatus) {
+		logger.fine("Setting job batch status to: " + batchStatus);
+		jobContext.setBatchStatus(batchStatus);
+	}
+
+
+	protected void logWarning(String msg, Throwable t) {
+		StringWriter sw = new StringWriter();
+		t.printStackTrace(new PrintWriter(sw));
+		logger.warning(msg + " with Throwable message: " + t.getMessage() + ", and stack trace: " + sw.toString());
+	}
+
+	/*
+	 * The thought here is that while we don't persist all the transitions in batch status (given
+	 * we plan to persist at the very end), we do persist STOPPING right away, since if we end up
+	 * "stuck in STOPPING" we at least will have a record in the database.
+	 */
+	protected void batchStatusStopping() {
+		updateJobBatchStatus(BatchStatus.STOPPING);
+		long time = System.currentTimeMillis();
+		Timestamp timestamp = new Timestamp(time);
+		jobExecution.setLastUpdateTime(timestamp);
+		persistenceService.updateBatchStatusOnly(jobExecution.getExecutionId(), BatchStatus.STOPPING, timestamp);
+	}
+
+
+
+	private void setupListeners() {
+		JSLJob jobModel = jobExecution.getJobNavigator().getRootModelElement();   
+		InjectionReferences injectionRef = new InjectionReferences(jobContext, null, null);
+		listenerFactory = new ListenerFactory(jobModel, injectionRef);
+		jobExecution.setListenerFactory(listenerFactory);
+	}
+
+
+	@Override
+	public void stop() {
+		if (jobContext.getBatchStatus().equals(BatchStatus.STARTING) || jobContext.getBatchStatus().equals(BatchStatus.STARTED)) {
+
+			batchStatusStopping();
+
+			IController stoppableElementController = transitioner.getCurrentStoppableElementController();
+			if (stoppableElementController != null) {
+				stoppableElementController.stop();
+			}
+		} else {
+			logger.info("Stop ignored since batch status for job is already set to: " + jobContext.getBatchStatus());
+		}
+	}
+
+	// Call beforeJob() on all the job listeners
+	protected void jobListenersBeforeJob() {
+		List<JobListenerProxy> jobListeners = listenerFactory.getJobListeners();
+		for (JobListenerProxy listenerProxy : jobListeners) {
+			if (logger.isLoggable(Level.FINE)) {
+				logger.fine("Invoking beforeJob() on jobListener: " + listenerProxy.getDelegate() + " of type: " + listenerProxy.getDelegate().getClass());
+			}
+			listenerProxy.beforeJob();
+		}
+	}
+
+	// Call afterJob() on all the job listeners
+	private void jobListenersAfterJob() {
+		List<JobListenerProxy> jobListeners = listenerFactory.getJobListeners();
+		for (JobListenerProxy listenerProxy : jobListeners) {
+			if (logger.isLoggable(Level.FINE)) {
+				logger.fine(" Invoking afterJob() on jobListener: " + listenerProxy.getDelegate() + " of type: " + listenerProxy.getDelegate().getClass());
+			}
+			listenerProxy.afterJob();
+		}	
+	}
+
+	protected void batchStatusFailedFromException() {
+		updateJobBatchStatus(BatchStatus.FAILED);
+	}
+	
+    @Override
+    public List<Long> getLastRunStepExecutions() {
+        
+        return this.transitioner.getStepExecIds();
+        
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/f7740962/JSR352.Runtime/src/com/ibm/jbatch/container/impl/PartitionThreadRootControllerImpl.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/PartitionThreadRootControllerImpl.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/PartitionThreadRootControllerImpl.java
new file mode 100755
index 0000000..08d105b
--- /dev/null
+++ b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/PartitionThreadRootControllerImpl.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ * 
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License, 
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.jbatch.container.impl;
+
+import com.ibm.jbatch.container.jobinstance.RuntimeJobExecution;
+import com.ibm.jbatch.container.util.PartitionsBuilderConfig;
+
+/**
+ * Currently there's no special function on top of the subjob required of the partition.
+ *
+ */
+public class PartitionThreadRootControllerImpl extends JobThreadRootControllerImpl {
+
+	private final static String CLASSNAME = PartitionThreadRootControllerImpl.class.getName();
+	
+	public PartitionThreadRootControllerImpl(RuntimeJobExecution jobExecution, PartitionsBuilderConfig config) {
+		super(jobExecution, config.getAnalyzerQueue());
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/f7740962/JSR352.Runtime/src/com/ibm/jbatch/container/impl/PartitionedStepBuilder.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/PartitionedStepBuilder.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/PartitionedStepBuilder.java
new file mode 100755
index 0000000..4a5f038
--- /dev/null
+++ b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/PartitionedStepBuilder.java
@@ -0,0 +1,188 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ * 
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License, 
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package com.ibm.jbatch.container.impl;
+
+import javax.batch.runtime.context.JobContext;
+
+import com.ibm.jbatch.container.jsl.CloneUtility;
+import com.ibm.jbatch.jsl.model.Flow;
+import com.ibm.jbatch.jsl.model.JSLJob;
+import com.ibm.jbatch.jsl.model.ObjectFactory;
+import com.ibm.jbatch.jsl.model.Partition;
+import com.ibm.jbatch.jsl.model.PartitionPlan;
+import com.ibm.jbatch.jsl.model.Split;
+import com.ibm.jbatch.jsl.model.Step;
+
+public class PartitionedStepBuilder {
+
+	public static final String JOB_ID_SEPARATOR = ":";  // Use something permissible in NCName to allow us to key off of.
+    /*
+     * Build a generated job with only one flow in it to submit to the
+     * BatchKernel. This is used to build subjobs from splits.
+     * 
+     */
+    public static JSLJob buildFlowInSplitSubJob(Long parentJobExecutionId, JobContext jobContext, Split split, Flow flow) {
+
+        ObjectFactory jslFactory = new ObjectFactory();
+        JSLJob subJob = jslFactory.createJSLJob();
+
+        // Set the generated subjob id
+        String subJobId = generateSubJobId(parentJobExecutionId, split.getId(), flow.getId());
+        subJob.setId(subJobId);
+        
+        
+        //Copy all properties from parent JobContext to flow threads
+        subJob.setProperties(CloneUtility.javaPropsTojslProperties(jobContext.getProperties()));
+        
+
+        //We don't need to do a deep copy here since each flow is already independent of all others, unlike in a partition
+        //where one step instance can be executed with different properties on multiple threads.
+
+        subJob.getExecutionElements().add(flow);
+
+        return subJob;
+    }
+	
+    /*
+     * Build a generated job with only one step in it to submit to the
+     * BatchKernel. This is used for partitioned steps.
+     * 
+     */
+    public static JSLJob buildPartitionSubJob(Long parentJobInstanceId, JobContext jobContext, Step step, int partitionInstance) {
+
+        ObjectFactory jslFactory = new ObjectFactory();
+        JSLJob subJob = jslFactory.createJSLJob();
+        
+
+        // Set the generated subjob id
+        String subJobId = generateSubJobId(parentJobInstanceId, step.getId(), partitionInstance);
+        subJob.setId(subJobId);
+        
+        
+        //Copy all properties from parent JobContext to partitioned step threads
+        subJob.setProperties(CloneUtility.javaPropsTojslProperties(jobContext.getProperties()));
+
+        // Add one step to job
+        Step newStep = jslFactory.createStep();
+        
+        //set id
+        newStep.setId(step.getId());
+
+        
+        /***
+         * deep copy all fields in a step
+         */
+        newStep.setAllowStartIfComplete(step.getAllowStartIfComplete());
+        
+        if (step.getBatchlet() != null){
+        	newStep.setBatchlet(CloneUtility.cloneBatchlet(step.getBatchlet()));
+        }
+        
+        if (step.getChunk() != null) {
+        	newStep.setChunk(CloneUtility.cloneChunk(step.getChunk()));
+        }
+        
+        // Do not copy next attribute and control elements.  Transitioning should ONLY
+        // take place on the main thread.
+        
+        //Do not add step listeners, only call them on parent thread.
+
+        //Add partition artifacts and set instances to 1 as the base case 
+        Partition partition = step.getPartition();
+        if (partition != null) {
+        	if (partition.getCollector() != null) {
+        		
+        		Partition basePartition = jslFactory.createPartition();
+        		
+        		PartitionPlan partitionPlan = jslFactory.createPartitionPlan();
+        		partitionPlan.setPartitions(null);
+        		basePartition.setPlan(partitionPlan);
+        		
+        		basePartition.setCollector(partition.getCollector());
+        		newStep.setPartition(basePartition);
+                	
+        	}
+        }
+        
+        newStep.setStartLimit(step.getStartLimit());
+        newStep.setProperties(CloneUtility.cloneJSLProperties(step.getProperties()));
+        
+        // Don't try to only clone based on type (e.g. ChunkListener vs. StepListener).
+        // We don't know the type at the model level, and a given artifact could implement more
+        // than one listener interface (e.g. ChunkListener AND StepListener).
+        newStep.setListeners(CloneUtility.cloneListeners(step.getListeners()));       
+        
+        //Add Step properties, need to be careful here to remember the right precedence
+        
+        subJob.getExecutionElements().add(newStep);
+
+
+        return subJob;
+    }
+
+    /**
+     * @param parentJobInstanceId
+     *            the execution id of the parent job
+     * @param splitId this is the split id where the flows are nested    
+     * @param flowId
+     *            this is the id of the partitioned control element, it can be a
+     *            step id or flow id
+     * @param partitionInstance
+     *            the instance number of the partitioned element
+     * @return a String of the form
+     *         <parentJobExecutionId>:<parentId>:<splitId>:<flowId>
+     */
+    private static String generateSubJobId(Long parentJobInstanceId, String splitId, String flowId) {
+
+        StringBuilder strBuilder = new StringBuilder(JOB_ID_SEPARATOR);
+        strBuilder.append(parentJobInstanceId.toString());
+        strBuilder.append(JOB_ID_SEPARATOR);
+        strBuilder.append(splitId);
+        strBuilder.append(JOB_ID_SEPARATOR);
+        strBuilder.append(flowId);
+
+        return strBuilder.toString();
+    }
+    
+    /**
+     * @param parentJobInstanceId
+     *            the execution id of the parent job
+     * @param stepId
+     *            this is the id of the partitioned control element, it can be a
+     *            step id or flow id
+     * @param partitionInstance
+     *            the instance number of the partitioned element
+     * @return a String of the form
+     *         <parentJobExecutionId>:<parentId>:<partitionInstance>
+     */
+    private static String generateSubJobId(Long parentJobInstanceId, String stepId, int partitionInstance) {
+
+        StringBuilder strBuilder = new StringBuilder(JOB_ID_SEPARATOR);
+        strBuilder.append(parentJobInstanceId.toString());
+        strBuilder.append(JOB_ID_SEPARATOR);
+        strBuilder.append(stepId);
+        strBuilder.append(JOB_ID_SEPARATOR);
+        strBuilder.append(partitionInstance);
+
+        return strBuilder.toString();
+    }
+    
+    
+
+    
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/f7740962/JSR352.Runtime/src/com/ibm/jbatch/container/impl/PartitionedStepControllerImpl.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/PartitionedStepControllerImpl.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/PartitionedStepControllerImpl.java
new file mode 100755
index 0000000..1734b11
--- /dev/null
+++ b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/PartitionedStepControllerImpl.java
@@ -0,0 +1,528 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ * 
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License, 
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.jbatch.container.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.batch.api.partition.PartitionPlan;
+import javax.batch.api.partition.PartitionReducer.PartitionStatus;
+import javax.batch.operations.JobExecutionAlreadyCompleteException;
+import javax.batch.operations.JobExecutionNotMostRecentException;
+import javax.batch.operations.JobRestartException;
+import javax.batch.operations.JobStartException;
+import javax.batch.runtime.BatchStatus;
+
+import com.ibm.jbatch.container.artifact.proxy.InjectionReferences;
+import com.ibm.jbatch.container.artifact.proxy.PartitionAnalyzerProxy;
+import com.ibm.jbatch.container.artifact.proxy.PartitionMapperProxy;
+import com.ibm.jbatch.container.artifact.proxy.PartitionReducerProxy;
+import com.ibm.jbatch.container.artifact.proxy.ProxyFactory;
+import com.ibm.jbatch.container.artifact.proxy.StepListenerProxy;
+import com.ibm.jbatch.container.context.impl.StepContextImpl;
+import com.ibm.jbatch.container.exception.BatchContainerRuntimeException;
+import com.ibm.jbatch.container.exception.BatchContainerServiceException;
+import com.ibm.jbatch.container.jobinstance.RuntimeJobExecution;
+import com.ibm.jbatch.container.jsl.CloneUtility;
+import com.ibm.jbatch.container.util.BatchPartitionPlan;
+import com.ibm.jbatch.container.util.BatchPartitionWorkUnit;
+import com.ibm.jbatch.container.util.BatchWorkUnit;
+import com.ibm.jbatch.container.util.PartitionDataWrapper;
+import com.ibm.jbatch.container.util.PartitionDataWrapper.PartitionEventType;
+import com.ibm.jbatch.container.util.PartitionsBuilderConfig;
+import com.ibm.jbatch.container.validation.ArtifactValidationException;
+import com.ibm.jbatch.jsl.model.Analyzer;
+import com.ibm.jbatch.jsl.model.JSLJob;
+import com.ibm.jbatch.jsl.model.JSLProperties;
+import com.ibm.jbatch.jsl.model.PartitionMapper;
+import com.ibm.jbatch.jsl.model.PartitionReducer;
+import com.ibm.jbatch.jsl.model.Property;
+import com.ibm.jbatch.jsl.model.Step;
+
+public class PartitionedStepControllerImpl extends BaseStepControllerImpl {
+
+	private final static String sourceClass = PartitionedStepControllerImpl.class.getName();
+	private final static Logger logger = Logger.getLogger(sourceClass);
+
+	private static final int DEFAULT_PARTITION_INSTANCES = 1;
+	private static final int DEFAULT_THREADS = 0; //0 means default to number of instances
+
+	private PartitionPlan plan = null;
+
+	private int partitions = DEFAULT_PARTITION_INSTANCES;
+	private int threads = DEFAULT_THREADS;
+
+	private Properties[] partitionProperties = null;
+
+	private volatile List<BatchPartitionWorkUnit> parallelBatchWorkUnits;
+
+	private PartitionReducerProxy partitionReducerProxy = null;
+
+	// On invocation this will be re-primed to reflect already-completed partitions from a previous execution.
+	int numPreviouslyCompleted = 0;
+
+	private PartitionAnalyzerProxy analyzerProxy = null;
+
+	final List<JSLJob> subJobs = new ArrayList<JSLJob>();
+	protected List<StepListenerProxy> stepListeners = null;
+
+	List<BatchPartitionWorkUnit> completedWork = new ArrayList<BatchPartitionWorkUnit>();
+	
+	BlockingQueue<BatchPartitionWorkUnit> completedWorkQueue = null;
+
+	protected PartitionedStepControllerImpl(final RuntimeJobExecution jobExecutionImpl, final Step step, StepContextImpl stepContext, long rootJobExecutionId) {
+		super(jobExecutionImpl, step, stepContext, rootJobExecutionId);
+	}
+
+	@Override
+	public void stop() {
+
+		updateBatchStatus(BatchStatus.STOPPING);
+
+		// It's possible we may try to stop a partitioned step before any
+		// sub steps have been started.
+		synchronized (subJobs) {
+
+			if (parallelBatchWorkUnits != null) {
+				for (BatchWorkUnit subJob : parallelBatchWorkUnits) {
+					try {
+						batchKernel.stopJob(subJob.getJobExecutionImpl().getExecutionId());
+					} catch (Exception e) {
+						// TODO - Is this what we want to know.  
+						// Blow up if it happens to force the issue.
+						throw new IllegalStateException(e);
+					}
+				}
+			}
+		}
+	}
+
+	private PartitionPlan generatePartitionPlan() {
+		// Determine the number of partitions
+
+
+		PartitionPlan plan = null;
+		Integer previousNumPartitions = null;
+		final PartitionMapper partitionMapper = step.getPartition().getMapper();
+
+		//from persisted plan from previous run
+		if (stepStatus.getNumPartitions() != null) {
+			previousNumPartitions = stepStatus.getNumPartitions();
+		}
+
+		if (partitionMapper != null) { //from partition mapper
+
+			PartitionMapperProxy partitionMapperProxy;
+
+			final List<Property> propertyList = partitionMapper.getProperties() == null ? null
+					: partitionMapper.getProperties().getPropertyList();
+
+			// Set all the contexts associated with this controller.
+			// Some of them may be null
+			InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, 
+					propertyList);
+
+			try {
+				partitionMapperProxy = ProxyFactory.createPartitionMapperProxy(
+						partitionMapper.getRef(), injectionRef, stepContext);
+			} catch (final ArtifactValidationException e) {
+				throw new BatchContainerServiceException(
+						"Cannot create the PartitionMapper ["
+								+ partitionMapper.getRef() + "]", e);
+			}
+
+
+			PartitionPlan mapperPlan = partitionMapperProxy.mapPartitions();
+
+			//Set up the new partition plan
+			plan = new BatchPartitionPlan();
+			plan.setPartitionsOverride(mapperPlan.getPartitionsOverride());
+
+			//When true is specified, the partition count from the current run
+			//is used and all results from past partitions are discarded.
+			if (mapperPlan.getPartitionsOverride() || previousNumPartitions == null){
+				plan.setPartitions(mapperPlan.getPartitions());
+			} else {
+				plan.setPartitions(previousNumPartitions);
+			}
+
+			if (mapperPlan.getThreads() == 0) {
+				plan.setThreads(plan.getPartitions());
+			} else {
+				plan.setThreads(mapperPlan.getThreads());    
+			}
+
+			plan.setPartitionProperties(mapperPlan.getPartitionProperties());
+
+			if (logger.isLoggable(Level.FINE)) {
+				logger.fine("Partition plan defined by partition mapper: " + plan);
+			}
+
+		} else if (step.getPartition().getPlan() != null) { //from static partition element in jsl
+
+
+			String partitionsAttr = step.getPartition().getPlan().getPartitions();
+			String threadsAttr = null;
+
+			int numPartitions = Integer.MIN_VALUE;
+			int numThreads;
+			Properties[] partitionProps = null;
+
+			if (partitionsAttr != null) {
+				try {
+					numPartitions = Integer.parseInt(partitionsAttr);
+				} catch (final NumberFormatException e) {
+					throw new IllegalArgumentException("Could not parse partition instances value in stepId: " + step.getId()
+							+ ", with instances=" + partitionsAttr, e);
+				}   
+				partitionProps = new Properties[numPartitions];
+				if (numPartitions < 1) {
+					throw new IllegalArgumentException("Partition instances value must be 1 or greater in stepId: " + step.getId()
+							+ ", with instances=" + partitionsAttr);
+				}
+			}
+
+			threadsAttr = step.getPartition().getPlan().getThreads();
+			if (threadsAttr != null) {
+				try {
+					numThreads = Integer.parseInt(threadsAttr);
+					if (numThreads == 0) {
+						numThreads = numPartitions;
+					}
+				} catch (final NumberFormatException e) {
+					throw new IllegalArgumentException("Could not parse partition threads value in stepId: " + step.getId()
+							+ ", with threads=" + threadsAttr, e);
+				}   
+				if (numThreads < 0) {
+					throw new IllegalArgumentException("Threads value must be 0 or greater in stepId: " + step.getId()
+							+ ", with threads=" + threadsAttr);
+
+				}
+			} else { //default to number of partitions if threads isn't set
+				numThreads = numPartitions;
+			}
+
+
+			if (step.getPartition().getPlan().getProperties() != null) {
+
+				List<JSLProperties> jslProperties = step.getPartition().getPlan().getProperties();
+				for (JSLProperties props : jslProperties) {
+					int targetPartition = Integer.parseInt(props.getPartition());
+
+                    try {
+                        partitionProps[targetPartition] = CloneUtility.jslPropertiesToJavaProperties(props);
+                    } catch (ArrayIndexOutOfBoundsException e) {
+                        throw new BatchContainerRuntimeException("There are only " + numPartitions + " partition instances, but there are "
+                                + jslProperties.size()
+                                + " partition properties lists defined. Remember that partition indexing is 0 based like Java arrays.", e);
+                    }
+                }
+			}
+			plan = new BatchPartitionPlan();
+			plan.setPartitions(numPartitions);
+			plan.setThreads(numThreads);
+			plan.setPartitionProperties(partitionProps);
+			plan.setPartitionsOverride(false); //FIXME what is the default for a static plan??
+		}
+
+
+		// Set the other instance variables for convenience.
+		this.partitions = plan.getPartitions();
+		this.threads = plan.getThreads();
+		this.partitionProperties = plan.getPartitionProperties();
+
+		return plan;
+	}
+
+
+	@Override
+	protected void invokeCoreStep() throws JobRestartException, JobStartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException {
+
+		this.plan = this.generatePartitionPlan();
+
+		//persist the partition plan so on restart we have the same plan to reuse
+		stepStatus.setNumPartitions(plan.getPartitions());
+
+		/* When true is specified, the partition count from the current run
+		 * is used and all results from past partitions are discarded. Any
+		 * resource cleanup or back out of work done in the previous run is the
+		 * responsibility of the application. The PartitionReducer artifact's
+		 * rollbackPartitionedStep method is invoked during restart before any
+		 * partitions begin processing to provide a cleanup hook.
+		 */
+		if (plan.getPartitionsOverride()) {
+			if (this.partitionReducerProxy != null) {
+				this.partitionReducerProxy.rollbackPartitionedStep();
+			}
+		}
+
+		logger.fine("Number of partitions in step: " + partitions + " in step " + step.getId() + "; Subjob properties defined by partition mapper: " + partitionProperties);
+
+		//Set up a blocking queue to pick up collector data from a partitioned thread
+		if (this.analyzerProxy != null) {
+			this.analyzerStatusQueue =  new LinkedBlockingQueue<PartitionDataWrapper>();
+		}
+		this.completedWorkQueue = new LinkedBlockingQueue<BatchPartitionWorkUnit>();
+
+		// Build all sub jobs from partitioned step
+		buildSubJobBatchWorkUnits();
+
+		// kick off the threads
+		executeAndWaitForCompletion();
+
+		// Deal with the results.
+		checkCompletedWork();
+	}
+	private void buildSubJobBatchWorkUnits() throws JobRestartException, JobStartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException  {
+		synchronized (subJobs) {		
+			//check if we've already issued a stop
+			if (jobExecutionImpl.getJobContext().getBatchStatus().equals(BatchStatus.STOPPING)){
+				logger.fine("Step already in STOPPING state, exiting from buildSubJobBatchWorkUnits() before beginning execution");
+				return;
+			}
+
+			for (int instance = 0; instance < partitions; instance++) {
+				subJobs.add(PartitionedStepBuilder.buildPartitionSubJob(jobExecutionImpl.getInstanceId(),jobExecutionImpl.getJobContext(), step, instance));
+			}
+
+			PartitionsBuilderConfig config = new PartitionsBuilderConfig(subJobs, partitionProperties, analyzerStatusQueue, completedWorkQueue, jobExecutionImpl.getExecutionId());
+			// Then build all the subjobs but do not start them yet
+			if (stepStatus.getStartCount() > 1 && !plan.getPartitionsOverride()) {
+				parallelBatchWorkUnits = batchKernel.buildOnRestartParallelPartitions(config);
+			} else {
+				parallelBatchWorkUnits = batchKernel.buildNewParallelPartitions(config);
+			}
+
+			// NOTE:  At this point I might not have as many work units as I had partitions, since some may have already completed.
+		}
+	}
+
+	private void executeAndWaitForCompletion() throws JobRestartException {
+		
+		if (jobExecutionImpl.getJobContext().getBatchStatus().equals(BatchStatus.STOPPING)){
+			logger.fine("Step already in STOPPING state, exiting from executeAndWaitForCompletion() before beginning execution");
+			return;
+		}
+		
+		int numTotalForThisExecution = parallelBatchWorkUnits.size();
+		this.numPreviouslyCompleted = partitions - numTotalForThisExecution; 
+		int numCurrentCompleted = 0;
+		int numCurrentSubmitted = 0;
+
+		logger.fine("Calculated that " + numPreviouslyCompleted + " partitions are already complete out of total # = " 
+				+ partitions + ", with # remaining =" + numTotalForThisExecution);
+
+		//Start up to to the max num we are allowed from the num threads attribute
+		for (int i=0; i < this.threads && i < numTotalForThisExecution; i++, numCurrentSubmitted++) {
+			if (stepStatus.getStartCount() > 1 && !!!plan.getPartitionsOverride()) {
+				batchKernel.restartGeneratedJob(parallelBatchWorkUnits.get(i));
+			} else {
+				batchKernel.startGeneratedJob(parallelBatchWorkUnits.get(i));
+			}
+		}
+
+		boolean readyToSubmitAnother = false;
+		while (true) {
+			logger.finer("Begin main loop in waitForQueueCompletion(), readyToSubmitAnother = " + readyToSubmitAnother);
+			try {
+				if (analyzerProxy != null) {
+					logger.fine("Found analyzer, proceeding on analyzerQueue path");
+					PartitionDataWrapper dataWrapper = analyzerStatusQueue.take();
+					if (PartitionEventType.ANALYZE_COLLECTOR_DATA.equals(dataWrapper.getEventType())) {
+						logger.finer("Analyze collector data: " + dataWrapper.getCollectorData());
+						analyzerProxy.analyzeCollectorData(dataWrapper.getCollectorData());
+						continue; // without being ready to submit another
+					} else if (PartitionEventType.ANALYZE_STATUS.equals(dataWrapper.getEventType())) {
+						analyzerProxy.analyzeStatus(dataWrapper.getBatchstatus(), dataWrapper.getExitStatus());
+						logger.fine("Analyze status called for completed partition: batchStatus= " + dataWrapper.getBatchstatus() + ", exitStatus = " + dataWrapper.getExitStatus());
+						completedWork.add(completedWorkQueue.take());  // Shouldn't be a a long wait.
+						readyToSubmitAnother = true;
+					} else {
+						logger.warning("Invalid partition state");
+						throw new IllegalStateException("Invalid partition state");
+					}
+				} else {
+					logger.fine("No analyzer, proceeding on completedWorkQueue path");
+					// block until at least one thread has finished to
+					// submit more batch work. hold on to the finished work to look at later
+					completedWork.add(completedWorkQueue.take());
+					readyToSubmitAnother = true;
+				}
+			} catch (InterruptedException e) {
+				logger.severe("Caught exc"+ e);
+				throw new BatchContainerRuntimeException(e);
+			}
+
+			if (readyToSubmitAnother) {
+				numCurrentCompleted++;
+				logger.fine("Ready to submit another (if there is another left to submit); numCurrentCompleted = " + numCurrentCompleted);
+				if (numCurrentCompleted < numTotalForThisExecution) {
+					if (numCurrentSubmitted < numTotalForThisExecution) {
+						logger.fine("Submitting # " + numCurrentSubmitted + " out of " + numTotalForThisExecution + " total for this execution");
+						if (stepStatus.getStartCount() > 1) {
+							batchKernel.startGeneratedJob(parallelBatchWorkUnits.get(numCurrentSubmitted++));
+						} else {
+							batchKernel.restartGeneratedJob(parallelBatchWorkUnits.get(numCurrentSubmitted++));
+						}
+						readyToSubmitAnother = false;
+					}
+				} else {
+					logger.fine("Finished... breaking out of loop");
+					break;
+				}
+			} else {
+				logger.fine("Not ready to submit another."); // Must have just done a collector
+			}
+		}
+	}        
+
+	private void checkCompletedWork() {
+
+		if (logger.isLoggable(Level.FINE)) {
+			logger.fine("Check completed work list.");
+		}
+
+		/**
+		 * check the batch status of each subJob after it's done to see if we need to issue a rollback
+		 * start rollback if any have stopped or failed
+		 */
+		boolean rollback = false;
+		boolean partitionFailed = false;
+		
+		for (final BatchWorkUnit subJob : completedWork) {
+			BatchStatus batchStatus = subJob.getJobExecutionImpl().getJobContext().getBatchStatus();
+			if (batchStatus.equals(BatchStatus.FAILED)) {
+				logger.fine("Subjob " + subJob.getJobExecutionImpl().getExecutionId() + " ended with status '" + batchStatus + "'; Starting logical transaction rollback.");
+
+				rollback = true;
+				partitionFailed = true;
+
+				//Keep track of the failing status and throw an exception to propagate after the rest of the partitions are complete
+				stepContext.setBatchStatus(BatchStatus.FAILED);
+			} 
+		}
+
+		//If rollback is false we never issued a rollback so we can issue a logicalTXSynchronizationBeforeCompletion
+		//NOTE: this will get issued even in a subjob fails or stops if no logicalTXSynchronizationRollback method is provied
+		//We are assuming that not providing a rollback was intentional
+		if (rollback == true) {
+			if (this.partitionReducerProxy != null) {
+				this.partitionReducerProxy.rollbackPartitionedStep();
+			}
+			if (partitionFailed) {
+				throw new BatchContainerRuntimeException("One or more partitions failed");
+			}
+		} else {
+			if (this.partitionReducerProxy != null) {
+				this.partitionReducerProxy.beforePartitionedStepCompletion();
+			}
+		}
+	}
+
+	@Override
+	protected void setupStepArtifacts() {
+
+		InjectionReferences injectionRef = null;
+		injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, null);
+		this.stepListeners = jobExecutionImpl.getListenerFactory().getStepListeners(step, injectionRef, stepContext);
+
+		Analyzer analyzer = step.getPartition().getAnalyzer();
+
+		if (analyzer != null) {
+			final List<Property> propList = analyzer.getProperties() == null ? null : analyzer.getProperties()
+					.getPropertyList();
+
+			injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propList);
+
+			try {
+				analyzerProxy = ProxyFactory.createPartitionAnalyzerProxy(analyzer.getRef(), injectionRef, stepContext);
+			} catch (final ArtifactValidationException e) {
+				throw new BatchContainerServiceException("Cannot create the analyzer [" + analyzer.getRef() + "]", e);
+			}
+		} 
+
+		PartitionReducer partitionReducer = step.getPartition().getReducer();
+
+		if (partitionReducer != null) {
+
+			final List<Property> propList = partitionReducer.getProperties() == null ? null : partitionReducer.getProperties()
+					.getPropertyList();
+
+			injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propList);
+
+			try {
+				this.partitionReducerProxy = ProxyFactory.createPartitionReducerProxy(partitionReducer.getRef(), injectionRef, stepContext);
+			} catch (final ArtifactValidationException e) {
+				throw new BatchContainerServiceException("Cannot create the analyzer [" + partitionReducer.getRef() + "]",
+						e);
+			}
+		}
+
+	}
+
+	@Override
+	protected void invokePreStepArtifacts() {
+
+		if (stepListeners != null) {
+			for (StepListenerProxy listenerProxy : stepListeners) {
+				// Call beforeStep on all the step listeners
+				listenerProxy.beforeStep();
+			}
+		}
+
+		// Invoke the reducer before all parallel steps start (must occur
+		// before mapper as well)
+		if (this.partitionReducerProxy != null) {
+			this.partitionReducerProxy.beginPartitionedStep();
+		}
+
+	}
+
+	@Override
+	protected void invokePostStepArtifacts() {
+		// Invoke the reducer after all parallel steps are done
+		if (this.partitionReducerProxy != null) {
+
+			if ((BatchStatus.COMPLETED).equals(stepContext.getBatchStatus())) {
+				this.partitionReducerProxy.afterPartitionedStepCompletion(PartitionStatus.COMMIT);
+			}else {
+				this.partitionReducerProxy.afterPartitionedStepCompletion(PartitionStatus.ROLLBACK); 
+			}
+
+		}
+
+		// Called in spec'd order, e.g. Sec. 11.7
+		if (stepListeners != null) {
+			for (StepListenerProxy listenerProxy : stepListeners) {
+				// Call afterStep on all the step listeners
+				listenerProxy.afterStep();
+			}
+		}
+	}
+
+	@Override
+	protected void sendStatusFromPartitionToAnalyzerIfPresent() {
+		// Since we're already on the main thread, there will never
+		// be anything to do on this thread.  It's only on the partitioned
+		// threads that there is something to send back.
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/f7740962/JSR352.Runtime/src/com/ibm/jbatch/container/impl/RetryHandler.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/RetryHandler.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/RetryHandler.java
new file mode 100755
index 0000000..e0906a6
--- /dev/null
+++ b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/RetryHandler.java
@@ -0,0 +1,430 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ * 
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License, 
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.jbatch.container.impl;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+
+import com.ibm.jbatch.container.artifact.proxy.RetryProcessListenerProxy;
+import com.ibm.jbatch.container.artifact.proxy.RetryReadListenerProxy;
+import com.ibm.jbatch.container.artifact.proxy.RetryWriteListenerProxy;
+import com.ibm.jbatch.container.exception.BatchContainerRuntimeException;
+import com.ibm.jbatch.jsl.model.Chunk;
+import com.ibm.jbatch.jsl.model.ExceptionClassFilter;
+
+public class RetryHandler {
+
+	/**
+	 *
+	 * Logic for handling retryable records.
+	 *
+	 * A RetryHandler object is attached to every BDS that inherits from AbstractBatchDataStream.
+	 *
+	 */
+
+	private static final String className = RetryHandler.class.getName();
+	private static Logger logger = Logger.getLogger(RetryHandler.class.getPackage().getName());
+
+	public static final String RETRY_COUNT      = "retry-limit";
+	public static final String RETRY_INCLUDE_EX = "include class";
+	public static final String RETRY_EXCLUDE_EX = "exclude class";
+
+	private static final int RETRY_NONE = 0;
+	private static final int RETRY_READ = 1;
+	private static final int RETRY_PROCESS = 2;
+	private static final int RETRY_WRITE = 3;
+
+	private int retryType = RETRY_NONE;
+
+	/*private RetryProcessListenerProxy _retryProcessListener = null;
+	  private RetryReadListenerProxy _retryReadListener = null;
+	  private RetryWriteListenerProxy _retryWriteListener = null;*/
+
+	List<RetryProcessListenerProxy> _retryProcessListeners = null;
+	List<RetryReadListenerProxy> _retryReadListeners = null;
+	List<RetryWriteListenerProxy> _retryWriteListeners = null;
+
+	private long _jobId = 0;
+	private String _stepId = null;
+	private Set<String> _retryNoRBIncludeExceptions = null;
+	private Set<String> _retryNoRBExcludeExceptions = null;
+	private Set<String> _retryIncludeExceptions = null;
+	private Set<String> _retryExcludeExceptions = null;
+	private int _retryLimit = Integer.MIN_VALUE;
+	private long _retryCount = 0;
+	private Exception _retryException = null;
+
+	public RetryHandler(Chunk chunk, long l, String stepId)
+	{
+		_jobId = l;
+		_stepId = stepId;
+
+		initialize(chunk);
+	}
+
+
+	/**
+	 * Add the user-defined RetryProcessListener.
+	 *
+	 */
+	public void addRetryProcessListener(List<RetryProcessListenerProxy> retryProcessListeners)
+	{
+		_retryProcessListeners =  retryProcessListeners;
+	}
+
+	/**
+	 * Add the user-defined RetryReadListener.
+	 *
+	 */
+	public void addRetryReadListener(List<RetryReadListenerProxy> retryReadListeners)
+	{
+		_retryReadListeners = retryReadListeners;
+	}
+
+	/**
+	 * Add the user-defined RetryWriteListener.
+	 *
+	 */
+	public void addRetryWriteListener(List<RetryWriteListenerProxy> retryWriteListeners)
+	{
+		_retryWriteListeners = retryWriteListeners;
+	}
+
+
+	/**
+	 * Read the retry exception lists from the BDS props.
+	 */
+	private void initialize(Chunk chunk)
+	{
+		final String mName = "initialize";
+
+		if(logger.isLoggable(Level.FINER)) 
+			logger.entering(className, mName);
+
+		try
+		{
+			if (chunk.getRetryLimit() != null){
+				_retryLimit = Integer.parseInt(chunk.getRetryLimit());
+                if (_retryLimit < 0) {
+                    throw new IllegalArgumentException("The retry-limit attribute on a chunk cannot be a negative value");
+                }
+
+			}
+		}
+		catch (NumberFormatException nfe)
+		{
+			throw new RuntimeException("NumberFormatException reading " + RETRY_COUNT, nfe);
+		}
+
+        // Read the include/exclude exceptions.
+        _retryIncludeExceptions = new HashSet<String>();
+        _retryExcludeExceptions = new HashSet<String>();
+        _retryNoRBIncludeExceptions = new HashSet<String>();
+        _retryNoRBExcludeExceptions = new HashSet<String>();
+
+        String includeEx = null;
+        String excludeEx = null;
+        String includeExNoRB = null;
+        String excludeExNoRB = null;
+
+        if (chunk.getRetryableExceptionClasses() != null) {
+            if (chunk.getRetryableExceptionClasses().getIncludeList() != null) {
+                List<ExceptionClassFilter.Include> includes = chunk.getRetryableExceptionClasses().getIncludeList();
+                for (ExceptionClassFilter.Include include : includes) {
+                    _retryIncludeExceptions.add(include.getClazz().trim());
+                    logger.finer("RETRYHANDLE: include: " + include.getClazz().trim());
+                }
+
+                if (_retryIncludeExceptions.size() == 0) {
+                    logger.finer("RETRYHANDLE: include element not present");
+
+                }
+            }
+            if (chunk.getRetryableExceptionClasses().getExcludeList() != null) {
+                List<ExceptionClassFilter.Exclude> excludes = chunk.getRetryableExceptionClasses().getExcludeList();
+                for (ExceptionClassFilter.Exclude exclude : excludes) {
+                    _retryExcludeExceptions.add(exclude.getClazz().trim());
+                    logger.finer("SKIPHANDLE: exclude: " + exclude.getClazz().trim());
+                }
+
+                if (_retryExcludeExceptions.size() == 0) {
+                    logger.finer("SKIPHANDLE: exclude element not present");
+
+                }
+            }
+        }
+
+        if (chunk.getNoRollbackExceptionClasses() != null) {
+            if (chunk.getNoRollbackExceptionClasses().getIncludeList() != null) {
+                List<ExceptionClassFilter.Include> includes = chunk.getNoRollbackExceptionClasses().getIncludeList();
+                for (ExceptionClassFilter.Include include : includes) {
+                    _retryNoRBIncludeExceptions.add(include.getClazz().trim());
+                    logger.finer("RETRYHANDLE: include: " + include.getClazz().trim());
+                }
+
+                if (_retryNoRBIncludeExceptions.size() == 0) {
+                    logger.finer("RETRYHANDLE: include element not present");
+
+                }
+            }
+            if (chunk.getNoRollbackExceptionClasses().getExcludeList() != null) {
+                List<ExceptionClassFilter.Exclude> excludes = chunk.getNoRollbackExceptionClasses().getExcludeList();
+                for (ExceptionClassFilter.Exclude exclude : excludes) {
+                    _retryNoRBExcludeExceptions.add(exclude.getClazz().trim());
+                    logger.finer("SKIPHANDLE: exclude: " + exclude.getClazz().trim());
+                }
+
+                if (_retryNoRBExcludeExceptions.size() == 0) {
+                    logger.finer("SKIPHANDLE: exclude element not present");
+
+                }
+            }
+        }
+
+        if (logger.isLoggable(Level.FINE)) {
+            logger.logp(Level.FINE, className, mName, "added include exception " + includeEx + "; added exclude exception " + excludeEx);
+            logger.logp(Level.FINE, className, mName, "added include no rollback exception " + includeExNoRB
+                    + "; added exclude no rollback exception " + excludeExNoRB);
+        }
+	        
+	    if(logger.isLoggable(Level.FINER)) {
+	      logger.exiting(className, mName, this.toString());
+	    }
+	  }
+	  
+	  public boolean isRollbackException(Exception e)
+	  {
+		  return !isNoRollbackException(e);
+	  }
+	  /**
+	   * Handle exception from a read failure.
+	   */
+	  public void handleExceptionRead(Exception e)
+	  {
+	    final String mName = "handleExceptionRead";
+	    
+	    logger.finer("RETRYHANDLE: in retryhandler handle exception on a read:" + e.toString());
+
+	    if(logger.isLoggable(Level.FINER)) 
+	      logger.logp(Level.FINE, className, mName, e.getClass().getName() + "; " + this.toString());
+	    
+	    if (!isRetryLimitReached() && isRetryable(e))
+	    {
+	       retryType = RETRY_READ;
+	       _retryException = e;
+	      // Retry it.  Log it.  Call the RetryListener.
+	      ++_retryCount;
+	      logRetry(e);
+
+	      if (_retryReadListeners != null) {
+	    	  for (RetryReadListenerProxy retryReadListenerProxy : _retryReadListeners) {
+	    		  retryReadListenerProxy.onRetryReadException(e);
+				}
+	      }
+	    }
+	    else
+	    {
+	      // No retry.  Throw it back.
+	      if(logger.isLoggable(Level.FINER)) 
+	        logger.logp(Level.FINE, className, mName, "No retry.  Rethrow", e);
+	      	throw new BatchContainerRuntimeException(e);
+	    }
+
+	    if(logger.isLoggable(Level.FINER)) 
+	      logger.exiting(className, mName, e);
+	  }
+
+	  /** 
+	   * Handle exception from a process failure.
+	   */
+	  public void handleExceptionProcess(Exception e, Object w)
+	  {
+	    final String mName = "handleExceptionProcess";
+	    
+	    if(logger.isLoggable(Level.FINER)) 
+	      logger.logp(Level.FINE, className, mName, e.getClass().getName() + "; " + this.toString());
+	    
+	    if (!isRetryLimitReached() && isRetryable(e))
+	    {
+	      retryType = RETRY_PROCESS;
+	      _retryException = e;
+	      // Retry it.  Log it.  Call the RetryListener.
+	      ++_retryCount;
+	      logRetry(e);
+
+	      if (_retryProcessListeners != null) {
+	    	  for (RetryProcessListenerProxy retryProcessListenerProxy : _retryProcessListeners) {
+	    		  retryProcessListenerProxy.onRetryProcessException(w, e);
+				}
+	      }
+	    }
+	    else
+	    {
+	      // No retry.  Throw it back.
+	      if(logger.isLoggable(Level.FINER)) 
+	        logger.logp(Level.FINE, className, mName, "No retry.  Rethrow ", e);
+	      throw new BatchContainerRuntimeException(e);
+	    }
+	  }
+	  
+	  /** 
+	   * Handle exception from a write failure.
+	   */
+	  public void handleExceptionWrite(Exception e, List<Object> w)
+	  {
+	    final String mName = "handleExceptionWrite";
+	    
+	    if(logger.isLoggable(Level.FINER)) 
+	      logger.logp(Level.FINE, className, mName, e.getClass().getName() + "; " + this.toString());
+
+	    if (!isRetryLimitReached() && isRetryable(e))
+	    {
+	      // Retry it.  Log it.  Call the RetryListener.
+	      retryType = RETRY_WRITE;
+	      _retryException = e;
+	      ++_retryCount;
+	      logRetry(e);
+
+	      if (_retryWriteListeners != null) {
+	    	  for (RetryWriteListenerProxy retryWriteListenerProxy : _retryWriteListeners) {
+	    		  retryWriteListenerProxy.onRetryWriteException(w, e);
+				}
+	      }
+	    }
+	    else
+	    {
+	      // No retry.  Throw it back.
+	      if(logger.isLoggable(Level.FINER)) 
+	        logger.logp(Level.FINE, className, mName, "No retry.  Rethrow ", e);
+	      throw new BatchContainerRuntimeException(e);
+	    }
+	  }
+
+
+	  /**
+	   * Check the retryable exception lists to determine whether
+	   * the given Exception is retryable.
+	   */
+	  private boolean isRetryable(Exception e)
+	  {
+	    final String mName = "isRetryable";
+
+	    String exClassName = e.getClass().getName();
+	    
+	    boolean retVal = containsException(_retryIncludeExceptions, e) && !containsException(_retryExcludeExceptions, e);
+	    
+	    if(logger.isLoggable(Level.FINE)) 
+	      logger.logp(Level.FINE, className, mName, mName + ": " + retVal + ": " + exClassName);
+
+	    return retVal;
+	  }
+	
+	  private boolean isNoRollbackException(Exception e)
+	  {
+		  final String mName = "isNoRollbackException";
+
+		  String exClassName = e.getClass().getName();
+		  
+		  boolean retVal = containsException(_retryNoRBIncludeExceptions, e) && !containsException(_retryNoRBExcludeExceptions, e);
+			  
+		  if(logger.isLoggable(Level.FINE)) 
+		    logger.logp(Level.FINE, className, mName, mName + ": " + retVal + ": " + exClassName);
+
+		  return retVal;
+	  }
+	  
+	  /**
+	   * Check whether given exception is in the specified exception list 
+	   */
+	  private boolean containsException(Set<String> retryList, Exception e)
+	  {
+	    final String mName = "containsException";
+	    boolean retVal = false;
+
+	    for ( Iterator it = retryList.iterator(); it.hasNext(); ) {
+	        String exClassName = (String) it.next();
+	       
+	        try {
+	        	if (retVal = Thread.currentThread().getContextClassLoader().loadClass(exClassName).isInstance(e))
+	        		break;
+	        } catch (ClassNotFoundException cnf) {
+	        	logger.logp(Level.FINE, className, mName, cnf.getLocalizedMessage());
+	        }
+	    }
+
+	    if(logger.isLoggable(Level.FINE)) 
+	      logger.logp(Level.FINE, className, mName, mName + ": " + retVal );
+
+	    return retVal;
+	  }
+
+	  /**
+	   * Check if the retry limit has been reached.
+	   *
+	   * Note: if retry handling isn't enabled (i.e. not configured in xJCL), then this method 
+	   *       will always return TRUE.
+	   */
+	  private boolean isRetryLimitReached()
+	  {
+        // Unlimited retries if it is never defined
+        if (_retryLimit == Integer.MIN_VALUE) {
+            return false;
+        }
+	      
+	    return (_retryCount >= _retryLimit);
+	  }
+
+	  
+	  private void logRetry(Exception e)
+	  {
+	    String key = "record.retried.norollback.by.batch.container";
+	    Object[] details = { _jobId, _stepId, e.getClass().getName() + ": " + e.getMessage() };
+	    //String message = LoggerUtil.getFormattedMessage(key, details, true);
+	    //logger.info(message);	
+		}
+
+	  public Exception getException()
+	  {
+		  return _retryException;
+	  }
+	  
+	  public long getRetryCount()
+	  {
+	    return _retryCount;
+	  }
+
+	  public void setRetryCount(long retryCount)
+	  {
+	    final String mName = "setRetryCount";
+
+	    _retryCount = retryCount;
+
+	    if(logger.isLoggable(Level.FINE)) 
+	      logger.logp(Level.FINE, className, mName, "setRetryCount: " + _retryCount);
+	  }
+
+	  public String toString()
+	  {
+	    return "RetryHandler{" + super.toString() + "}count:limit=" + _retryCount + ":" + _retryLimit;
+	  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/f7740962/JSR352.Runtime/src/com/ibm/jbatch/container/impl/SingleThreadedStepControllerImpl.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/SingleThreadedStepControllerImpl.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/SingleThreadedStepControllerImpl.java
new file mode 100755
index 0000000..1c81e7f
--- /dev/null
+++ b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/SingleThreadedStepControllerImpl.java
@@ -0,0 +1,147 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ * 
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License, 
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.jbatch.container.impl;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.logging.Logger;
+
+
+import com.ibm.jbatch.container.IController;
+import com.ibm.jbatch.container.artifact.proxy.InjectionReferences;
+import com.ibm.jbatch.container.artifact.proxy.PartitionCollectorProxy;
+import com.ibm.jbatch.container.artifact.proxy.ProxyFactory;
+import com.ibm.jbatch.container.artifact.proxy.StepListenerProxy;
+import com.ibm.jbatch.container.context.impl.StepContextImpl;
+import com.ibm.jbatch.container.exception.BatchContainerServiceException;
+import com.ibm.jbatch.container.jobinstance.RuntimeJobExecution;
+import com.ibm.jbatch.container.util.PartitionDataWrapper;
+import com.ibm.jbatch.container.util.PartitionDataWrapper.PartitionEventType;
+import com.ibm.jbatch.container.validation.ArtifactValidationException;
+import com.ibm.jbatch.jsl.model.Collector;
+import com.ibm.jbatch.jsl.model.Property;
+import com.ibm.jbatch.jsl.model.Step;
+
+/**
+ * 
+ * When a partitioned step is run, this controller will only be used for the partition threads, 
+ * NOT the top-level main thread that the step executes upon.
+ * 
+ * When a non-partitioned step is run this controller will be used as well (and there will be no
+ * separate main thread with controller).
+ *
+ */
+public abstract class SingleThreadedStepControllerImpl extends BaseStepControllerImpl implements IController {
+
+	private final static String sourceClass = SingleThreadedStepControllerImpl.class.getName();
+	private final static Logger logger = Logger.getLogger(sourceClass);
+
+	// Collector only used from partition threads, not main thread
+	protected PartitionCollectorProxy collectorProxy = null;
+
+	protected SingleThreadedStepControllerImpl(RuntimeJobExecution jobExecutionImpl, Step step, StepContextImpl stepContext, long rootJobExecutionId, BlockingQueue<PartitionDataWrapper> analyzerStatusQueue) {
+		super(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerStatusQueue);
+	}
+
+	List<StepListenerProxy> stepListeners = null;
+
+	protected void setupStepArtifacts() {
+		// set up listeners
+
+		InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, null);
+		this.stepListeners = jobExecutionImpl.getListenerFactory().getStepListeners(step, injectionRef, stepContext);
+
+		// set up collectors if we are running a partitioned step
+		if (step.getPartition() != null) {
+			Collector collector = step.getPartition().getCollector();
+			if (collector != null) {
+				List<Property> propList = (collector.getProperties() == null) ? null : collector.getProperties().getPropertyList();
+				/**
+				 * Inject job flow, split, and step contexts into partition
+				 * artifacts like collectors and listeners some of these
+				 * contexts may be null
+				 */
+				injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propList);
+
+				try {
+					this.collectorProxy = ProxyFactory.createPartitionCollectorProxy(collector.getRef(), injectionRef, this.stepContext);
+				} catch (ArtifactValidationException e) {
+					throw new BatchContainerServiceException("Cannot create the collector [" + collector.getRef() + "]", e);
+				}
+			}
+		}
+	}
+
+	@Override
+	protected void invokePreStepArtifacts() {
+		// Don't call beforeStep() in the partitioned case, since we are now on a partition thread, and
+		// have already called beforeStep() on the main thread as the spec says.
+		if ((stepListeners != null) && (this.jobExecutionImpl.getPartitionInstance() == null)) {
+			for (StepListenerProxy listenerProxy : stepListeners) {
+				listenerProxy.beforeStep();
+			}
+		}
+	}
+
+	@Override
+	protected void invokePostStepArtifacts() {
+		// Don't call beforeStep() in the partitioned case, since we are now on a partition thread, and
+		// have already called beforeStep() on the main thread as the spec says.
+		if ((stepListeners != null) && (this.jobExecutionImpl.getPartitionInstance() == null)) {
+			for (StepListenerProxy listenerProxy : stepListeners) {
+				listenerProxy.afterStep();
+			}
+		}
+	}
+
+	protected void invokeCollectorIfPresent() {
+		if (collectorProxy != null) {
+			Serializable data = collectorProxy.collectPartitionData();
+			logger.finer("Got partition data: " + data + ", from collector: " + collectorProxy);
+			sendCollectorDataToAnalyzerIfPresent(data);
+		} 
+	}
+	
+	// Useless to have collector without analyzer but let's check so we don't hang or blow up.
+	protected void sendCollectorDataToAnalyzerIfPresent(Serializable data) {
+		if (analyzerStatusQueue != null) {
+			logger.finer("Sending collector partition data: " + data + " to analyzer queue: " + analyzerStatusQueue);
+			PartitionDataWrapper dataWrapper = new PartitionDataWrapper();
+			dataWrapper.setCollectorData(data);
+			dataWrapper.setEventType(PartitionEventType.ANALYZE_COLLECTOR_DATA);
+			analyzerStatusQueue.add(dataWrapper);
+		} else {
+			logger.fine("Analyzer not configured.");
+		}
+	}
+
+	// Useless to have collector without analyzer but let's check so we don't hang or blow up.
+	@Override
+	protected void sendStatusFromPartitionToAnalyzerIfPresent() {
+		if (analyzerStatusQueue != null) {
+			logger.fine("Send status from partition for analyzeStatus with batchStatus = " + stepStatus.getBatchStatus() + ", exitStatus = " + stepStatus.getExitStatus());
+			PartitionDataWrapper dataWrapper = new PartitionDataWrapper();
+			dataWrapper.setBatchStatus(stepStatus.getBatchStatus());
+			dataWrapper.setExitStatus(stepStatus.getExitStatus());
+			dataWrapper.setEventType(PartitionEventType.ANALYZE_STATUS);
+			analyzerStatusQueue.add(dataWrapper);
+		} else {
+			logger.fine("Analyzer not configured.");
+		}
+	}
+}