You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@batchee.apache.org by rm...@apache.org on 2013/11/05 08:38:44 UTC
[07/62] Importing JBatch Reference Implementation from IBM. We'll
fork it but this commit is to keep a track of what we forked.
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/f7740962/JSR352.Runtime/src/com/ibm/jbatch/container/impl/FlowControllerImpl.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/FlowControllerImpl.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/FlowControllerImpl.java
new file mode 100755
index 0000000..255d871
--- /dev/null
+++ b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/FlowControllerImpl.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.jbatch.container.impl;
+
+import java.util.List;
+import java.util.logging.Logger;
+
+import javax.batch.runtime.BatchStatus;
+
+import com.ibm.jbatch.container.IController;
+import com.ibm.jbatch.container.IExecutionElementController;
+import com.ibm.jbatch.container.context.impl.JobContextImpl;
+import com.ibm.jbatch.container.jobinstance.RuntimeJobExecution;
+import com.ibm.jbatch.container.navigator.ModelNavigator;
+import com.ibm.jbatch.container.navigator.NavigatorFactory;
+import com.ibm.jbatch.container.services.IPersistenceManagerService;
+import com.ibm.jbatch.container.servicesmanager.ServicesManagerImpl;
+import com.ibm.jbatch.container.status.ExtendedBatchStatus;
+import com.ibm.jbatch.container.status.ExecutionStatus;
+import com.ibm.jbatch.jsl.model.Flow;
+
+public class FlowControllerImpl implements IExecutionElementController {
+
+ private final static String CLASSNAME = FlowControllerImpl.class.getName();
+ private final static Logger logger = Logger.getLogger(CLASSNAME);
+
+ private final RuntimeJobExecution jobExecution;
+ private final JobContextImpl jobContext;
+
+ protected ModelNavigator<Flow> flowNavigator;
+
+ protected Flow flow;
+ private long rootJobExecutionId;
+
+ private ExecutionTransitioner transitioner;
+
+ //
+ // The currently executing controller, this will only be set to the
+ // local variable reference when we are ready to accept stop events for
+ // this execution.
+ private volatile IController currentStoppableElementController = null;
+
+ private static IPersistenceManagerService _persistenceManagementService = ServicesManagerImpl.getInstance().getPersistenceManagerService();
+
+
+ public FlowControllerImpl(RuntimeJobExecution jobExecution, Flow flow, long rootJobExecutionId) {
+ this.jobExecution = jobExecution;
+ this.jobContext = jobExecution.getJobContext();
+ this.flowNavigator = NavigatorFactory.createFlowNavigator(flow);
+ this.flow = flow;
+ this.rootJobExecutionId = rootJobExecutionId;
+ }
+
+ @Override
+ public ExecutionStatus execute() {
+ if (!jobContext.getBatchStatus().equals(BatchStatus.STOPPING)) {
+ transitioner = new ExecutionTransitioner(jobExecution, rootJobExecutionId, flowNavigator);
+ return transitioner.doExecutionLoop();
+ } else {
+ return new ExecutionStatus(ExtendedBatchStatus.JOB_OPERATOR_STOPPING);
+ }
+ }
+
+
+ @Override
+ public void stop() {
+ // Since this is not a top-level controller, don't try to filter based on existing status.. just pass
+ // along the stop().
+ IController stoppableElementController = transitioner.getCurrentStoppableElementController();
+ if (stoppableElementController != null) {
+ stoppableElementController.stop();
+ }
+ }
+
+ @Override
+ public List<Long> getLastRunStepExecutions() {
+ return this.transitioner.getStepExecIds();
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/f7740962/JSR352.Runtime/src/com/ibm/jbatch/container/impl/FlowInSplitThreadRootControllerImpl.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/FlowInSplitThreadRootControllerImpl.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/FlowInSplitThreadRootControllerImpl.java
new file mode 100755
index 0000000..f680586
--- /dev/null
+++ b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/FlowInSplitThreadRootControllerImpl.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.jbatch.container.impl;
+
+import com.ibm.jbatch.container.jobinstance.RuntimeFlowInSplitExecution;
+import com.ibm.jbatch.container.status.ExtendedBatchStatus;
+import com.ibm.jbatch.container.status.ExecutionStatus;
+import com.ibm.jbatch.container.util.FlowInSplitBuilderConfig;
+
+public class FlowInSplitThreadRootControllerImpl extends JobThreadRootControllerImpl {
+
+ private final static String CLASSNAME = FlowInSplitThreadRootControllerImpl.class.getName();
+
+ // Careful, we have a separately named reference to the same object in the parent class
+ RuntimeFlowInSplitExecution flowInSplitExecution;
+
+ public FlowInSplitThreadRootControllerImpl(RuntimeFlowInSplitExecution flowInSplitExecution, FlowInSplitBuilderConfig config) {
+ super(flowInSplitExecution, config.getRootJobExecutionId());
+ this.flowInSplitExecution = flowInSplitExecution;
+ }
+
+ @Override
+ /**
+ * Not only are we setting the status correctly at the subjob level, we are also setting it on the execution
+ * so that it is visible by the parent split.
+ */
+ public ExecutionStatus originateExecutionOnThread() {
+ ExecutionStatus status = super.originateExecutionOnThread();
+ flowInSplitExecution.setFlowStatus(status);
+ return status;
+ }
+
+ @Override
+ protected void batchStatusFailedFromException() {
+ super.batchStatusFailedFromException();
+ flowInSplitExecution.getFlowStatus().setExtendedBatchStatus(ExtendedBatchStatus.EXCEPTION_THROWN);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/f7740962/JSR352.Runtime/src/com/ibm/jbatch/container/impl/JobControllerImpl.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/JobControllerImpl.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/JobControllerImpl.java
new file mode 100755
index 0000000..75e1332
--- /dev/null
+++ b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/JobControllerImpl.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.jbatch.container.impl;
+
+import com.ibm.jbatch.container.jobinstance.RuntimeJobExecution;
+
+public class JobControllerImpl extends JobThreadRootControllerImpl {
+
+ private final static String CLASSNAME = JobControllerImpl.class.getName();
+
+ private JobControllerImpl(RuntimeJobExecution jobExecution, long rootJobExecutionId) {
+ super(jobExecution, rootJobExecutionId);
+ }
+
+ public JobControllerImpl(RuntimeJobExecution jobExecution) {
+ this(jobExecution, jobExecution.getExecutionId());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/f7740962/JSR352.Runtime/src/com/ibm/jbatch/container/impl/JobThreadRootControllerImpl.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/JobThreadRootControllerImpl.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/JobThreadRootControllerImpl.java
new file mode 100755
index 0000000..3e30ab2
--- /dev/null
+++ b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/JobThreadRootControllerImpl.java
@@ -0,0 +1,328 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.jbatch.container.impl;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.sql.Timestamp;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.batch.runtime.BatchStatus;
+
+import com.ibm.jbatch.container.IController;
+import com.ibm.jbatch.container.IThreadRootController;
+import com.ibm.jbatch.container.artifact.proxy.InjectionReferences;
+import com.ibm.jbatch.container.artifact.proxy.JobListenerProxy;
+import com.ibm.jbatch.container.artifact.proxy.ListenerFactory;
+import com.ibm.jbatch.container.context.impl.JobContextImpl;
+import com.ibm.jbatch.container.jobinstance.RuntimeJobExecution;
+import com.ibm.jbatch.container.navigator.ModelNavigator;
+import com.ibm.jbatch.container.services.IJobStatusManagerService;
+import com.ibm.jbatch.container.services.IPersistenceManagerService;
+import com.ibm.jbatch.container.servicesmanager.ServicesManagerImpl;
+import com.ibm.jbatch.container.status.ExtendedBatchStatus;
+import com.ibm.jbatch.container.status.ExecutionStatus;
+import com.ibm.jbatch.container.util.PartitionDataWrapper;
+import com.ibm.jbatch.jsl.model.JSLJob;
+import com.ibm.jbatch.jsl.model.JSLProperties;
+import com.ibm.jbatch.jsl.model.Property;
+
+public abstract class JobThreadRootControllerImpl implements IThreadRootController {
+
+ private final static String CLASSNAME = JobThreadRootControllerImpl.class.getName();
+ private final static Logger logger = Logger.getLogger(CLASSNAME);
+
+ protected RuntimeJobExecution jobExecution;
+ protected JobContextImpl jobContext;
+ protected long rootJobExecutionId;
+ protected long jobInstanceId;
+ protected IJobStatusManagerService jobStatusService;
+ protected IPersistenceManagerService persistenceService;
+ private ListenerFactory listenerFactory = null;
+
+ private ExecutionTransitioner transitioner;
+ protected final ModelNavigator<JSLJob> jobNavigator;
+ private BlockingQueue<PartitionDataWrapper> analyzerQueue;
+
+ public JobThreadRootControllerImpl(RuntimeJobExecution jobExecution, long rootJobExecutionId) {
+ this.jobExecution = jobExecution;
+ this.jobContext = jobExecution.getJobContext();
+ this.rootJobExecutionId = rootJobExecutionId;
+ this.jobInstanceId = jobExecution.getInstanceId();
+ this.jobStatusService = ServicesManagerImpl.getInstance().getJobStatusManagerService();
+ this.persistenceService = ServicesManagerImpl.getInstance().getPersistenceManagerService();
+ this.jobNavigator = jobExecution.getJobNavigator();
+ setupListeners();
+ }
+
+ public JobThreadRootControllerImpl(RuntimeJobExecution jobExecution, long rootJobExecutionId, BlockingQueue<PartitionDataWrapper> analyzerQueue) {
+ this(jobExecution, rootJobExecutionId);
+ this.analyzerQueue = analyzerQueue;
+ }
+
+ /*
+ * By not passing the rootJobExecutionId, we are "orphaning" the subjob execution and making it not findable from the parent.
+ * This is exactly what we want for getStepExecutions()... we don't want it to get extraneous entries for the partitions.
+ */
+ public JobThreadRootControllerImpl(RuntimeJobExecution jobExecution, BlockingQueue<PartitionDataWrapper> analyzerQueue) {
+ this(jobExecution, jobExecution.getExecutionId());
+ this.analyzerQueue = analyzerQueue;
+ }
+
+ @Override
+ public ExecutionStatus originateExecutionOnThread() {
+ String methodName = "executeJob";
+ logger.entering(CLASSNAME, methodName);
+
+ ExecutionStatus retVal = null;
+ try {
+ // Check if we've already gotten the stop() command.
+ if (!jobContext.getBatchStatus().equals(BatchStatus.STOPPING)) {
+
+ // Now that we're ready to start invoking artifacts, set the status to 'STARTED'
+ markJobStarted();
+
+ jobListenersBeforeJob();
+
+ // --------------------
+ // The BIG loop transitioning
+ // within the job !!!
+ // --------------------
+ transitioner = new ExecutionTransitioner(jobExecution, rootJobExecutionId, jobNavigator, analyzerQueue);
+ retVal = transitioner.doExecutionLoop();
+ ExtendedBatchStatus extBatchStatus = retVal.getExtendedBatchStatus();
+ switch (extBatchStatus) {
+ case JSL_STOP : jslStop();
+ break;
+ case JSL_FAIL : updateJobBatchStatus(BatchStatus.FAILED);
+ break;
+ case EXCEPTION_THROWN : updateJobBatchStatus(BatchStatus.FAILED);
+ break;
+ }
+ }
+ } catch (Throwable t) {
+ // We still want to try to call the afterJob() listener and persist the batch and exit
+ // status for the failure in an orderly fashion. So catch and continue.
+ logWarning("Caught throwable in main execution loop", t);
+ batchStatusFailedFromException();
+ }
+
+ endOfJob();
+
+ logger.exiting(CLASSNAME, methodName);
+ return retVal;
+ }
+
+ protected void setContextProperties() {
+ JSLJob jobModel = jobExecution.getJobNavigator().getRootModelElement();
+ JSLProperties jslProps = jobModel.getProperties();
+
+ if (jslProps != null) {
+ Properties contextProps = jobContext.getProperties();
+ for (Property property : jslProps.getPropertyList()) {
+ contextProps.setProperty(property.getName(), property.getValue());
+ }
+ }
+ }
+
+ protected void jslStop() {
+ String restartOn = jobContext.getRestartOn();
+ logger.fine("Logging JSL stop(): exitStatus = " + jobContext.getExitStatus() + ", restartOn = " +restartOn );
+ batchStatusStopping();
+ jobStatusService.updateJobStatusFromJSLStop(jobInstanceId, restartOn);
+ return;
+ }
+
+ protected void markJobStarted() {
+ updateJobBatchStatus(BatchStatus.STARTED);
+ long time = System.currentTimeMillis();
+ Timestamp timestamp = new Timestamp(time);
+ jobExecution.setLastUpdateTime(timestamp);
+ jobExecution.setStartTime(timestamp);
+ persistenceService.markJobStarted(jobExecution.getExecutionId(), timestamp);
+ }
+
+ /*
+ * Follow similar pattern for end of step in BaseStepControllerImpl
+ *
+ * 1. Execute the very last artifacts (jobListener)
+ * 2. transition to final batch status
+ * 3. default ExitStatus if necessary
+ * 4. persist statuses and end time data
+ *
+ * We don't want to give up on the orderly process of 2,3,4, if we blow up
+ * in after job, so catch that and keep on going.
+ */
+ protected void endOfJob() {
+
+
+ // 1. Execute the very last artifacts (jobListener)
+ try {
+ jobListenersAfterJob();
+ } catch (Throwable t) {
+ StringWriter sw = new StringWriter();
+ PrintWriter pw = new PrintWriter(sw);
+ t.printStackTrace(pw);
+ logger.warning("Error invoking jobListener.afterJob(). Stack trace: " + sw.toString());
+ batchStatusFailedFromException();
+ }
+
+ // 2. transition to final batch status
+ transitionToFinalBatchStatus();
+
+ // 3. default ExitStatus if necessary
+ if (jobContext.getExitStatus() == null) {
+ logger.fine("No job-level exitStatus set, defaulting to job batch Status = " + jobContext.getBatchStatus());
+ jobContext.setExitStatus(jobContext.getBatchStatus().name());
+ }
+
+ // 4. persist statuses and end time data
+ logger.fine("Job complete for job id=" + jobExecution.getJobInstance().getJobName() + ", executionId=" + jobExecution.getExecutionId()
+ + ", batchStatus=" + jobContext.getBatchStatus() + ", exitStatus=" + jobContext.getExitStatus());
+ persistJobBatchAndExitStatus();
+
+ }
+
+ private void persistJobBatchAndExitStatus() {
+ BatchStatus batchStatus = jobContext.getBatchStatus();
+
+ // Take a current timestamp for last updated no matter what the status.
+ long time = System.currentTimeMillis();
+ Timestamp timestamp = new Timestamp(time);
+ jobExecution.setLastUpdateTime(timestamp);
+
+ // Perhaps these should be coordinated in a tran but probably better still would be
+ // rethinking the table design to let the database provide us consistently with a single update.
+ jobStatusService.updateJobBatchStatus(jobInstanceId, batchStatus);
+ jobStatusService.updateJobExecutionStatus(jobExecution.getInstanceId(), jobContext.getBatchStatus(), jobContext.getExitStatus());
+
+ if (batchStatus.equals(BatchStatus.COMPLETED) || batchStatus.equals(BatchStatus.STOPPED) ||
+ batchStatus.equals(BatchStatus.FAILED)) {
+
+ jobExecution.setEndTime(timestamp);
+ persistenceService.updateWithFinalExecutionStatusesAndTimestamps(jobExecution.getExecutionId(),
+ batchStatus, jobContext.getExitStatus(), timestamp);
+ } else {
+ throw new IllegalStateException("Not expected to encounter batchStatus of " + batchStatus +" at this point. Aborting.");
+ }
+ }
+
+ /**
+ * The only valid states at this point are STARTED or STOPPING. Shouldn't have
+ * been able to get to COMPLETED, STOPPED, or FAILED at this point in the code.
+ */
+
+ private void transitionToFinalBatchStatus() {
+ BatchStatus currentBatchStatus = jobContext.getBatchStatus();
+ if (currentBatchStatus.equals(BatchStatus.STARTED)) {
+ updateJobBatchStatus(BatchStatus.COMPLETED);
+ } else if (currentBatchStatus.equals(BatchStatus.STOPPING)) {
+ updateJobBatchStatus(BatchStatus.STOPPED);
+ } else if (currentBatchStatus.equals(BatchStatus.FAILED)) {
+ updateJobBatchStatus(BatchStatus.FAILED); // Should have already been done but maybe better for possible code refactoring to have it here.
+ } else {
+ throw new IllegalStateException("Step batch status should not be in a " + currentBatchStatus.name() + " state");
+ }
+ }
+
+ protected void updateJobBatchStatus(BatchStatus batchStatus) {
+ logger.fine("Setting job batch status to: " + batchStatus);
+ jobContext.setBatchStatus(batchStatus);
+ }
+
+
+ protected void logWarning(String msg, Throwable t) {
+ StringWriter sw = new StringWriter();
+ t.printStackTrace(new PrintWriter(sw));
+ logger.warning(msg + " with Throwable message: " + t.getMessage() + ", and stack trace: " + sw.toString());
+ }
+
+ /*
+ * The thought here is that while we don't persist all the transitions in batch status (given
+ * we plan to persist at the very end), we do persist STOPPING right away, since if we end up
+ * "stuck in STOPPING" we at least will have a record in the database.
+ */
+ protected void batchStatusStopping() {
+ updateJobBatchStatus(BatchStatus.STOPPING);
+ long time = System.currentTimeMillis();
+ Timestamp timestamp = new Timestamp(time);
+ jobExecution.setLastUpdateTime(timestamp);
+ persistenceService.updateBatchStatusOnly(jobExecution.getExecutionId(), BatchStatus.STOPPING, timestamp);
+ }
+
+
+
+ private void setupListeners() {
+ JSLJob jobModel = jobExecution.getJobNavigator().getRootModelElement();
+ InjectionReferences injectionRef = new InjectionReferences(jobContext, null, null);
+ listenerFactory = new ListenerFactory(jobModel, injectionRef);
+ jobExecution.setListenerFactory(listenerFactory);
+ }
+
+
+ @Override
+ public void stop() {
+ if (jobContext.getBatchStatus().equals(BatchStatus.STARTING) || jobContext.getBatchStatus().equals(BatchStatus.STARTED)) {
+
+ batchStatusStopping();
+
+ IController stoppableElementController = transitioner.getCurrentStoppableElementController();
+ if (stoppableElementController != null) {
+ stoppableElementController.stop();
+ }
+ } else {
+ logger.info("Stop ignored since batch status for job is already set to: " + jobContext.getBatchStatus());
+ }
+ }
+
+ // Call beforeJob() on all the job listeners
+ protected void jobListenersBeforeJob() {
+ List<JobListenerProxy> jobListeners = listenerFactory.getJobListeners();
+ for (JobListenerProxy listenerProxy : jobListeners) {
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine("Invoking beforeJob() on jobListener: " + listenerProxy.getDelegate() + " of type: " + listenerProxy.getDelegate().getClass());
+ }
+ listenerProxy.beforeJob();
+ }
+ }
+
+ // Call afterJob() on all the job listeners
+ private void jobListenersAfterJob() {
+ List<JobListenerProxy> jobListeners = listenerFactory.getJobListeners();
+ for (JobListenerProxy listenerProxy : jobListeners) {
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine(" Invoking afterJob() on jobListener: " + listenerProxy.getDelegate() + " of type: " + listenerProxy.getDelegate().getClass());
+ }
+ listenerProxy.afterJob();
+ }
+ }
+
+ protected void batchStatusFailedFromException() {
+ updateJobBatchStatus(BatchStatus.FAILED);
+ }
+
+ @Override
+ public List<Long> getLastRunStepExecutions() {
+
+ return this.transitioner.getStepExecIds();
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/f7740962/JSR352.Runtime/src/com/ibm/jbatch/container/impl/PartitionThreadRootControllerImpl.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/PartitionThreadRootControllerImpl.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/PartitionThreadRootControllerImpl.java
new file mode 100755
index 0000000..08d105b
--- /dev/null
+++ b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/PartitionThreadRootControllerImpl.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.jbatch.container.impl;
+
+import com.ibm.jbatch.container.jobinstance.RuntimeJobExecution;
+import com.ibm.jbatch.container.util.PartitionsBuilderConfig;
+
+/**
+ * Currently there's no special function on top of the subjob required of the partition.
+ *
+ */
+public class PartitionThreadRootControllerImpl extends JobThreadRootControllerImpl {
+
+ private final static String CLASSNAME = PartitionThreadRootControllerImpl.class.getName();
+
+ public PartitionThreadRootControllerImpl(RuntimeJobExecution jobExecution, PartitionsBuilderConfig config) {
+ super(jobExecution, config.getAnalyzerQueue());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/f7740962/JSR352.Runtime/src/com/ibm/jbatch/container/impl/PartitionedStepBuilder.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/PartitionedStepBuilder.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/PartitionedStepBuilder.java
new file mode 100755
index 0000000..4a5f038
--- /dev/null
+++ b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/PartitionedStepBuilder.java
@@ -0,0 +1,188 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package com.ibm.jbatch.container.impl;
+
+import javax.batch.runtime.context.JobContext;
+
+import com.ibm.jbatch.container.jsl.CloneUtility;
+import com.ibm.jbatch.jsl.model.Flow;
+import com.ibm.jbatch.jsl.model.JSLJob;
+import com.ibm.jbatch.jsl.model.ObjectFactory;
+import com.ibm.jbatch.jsl.model.Partition;
+import com.ibm.jbatch.jsl.model.PartitionPlan;
+import com.ibm.jbatch.jsl.model.Split;
+import com.ibm.jbatch.jsl.model.Step;
+
+public class PartitionedStepBuilder {
+
+ public static final String JOB_ID_SEPARATOR = ":"; // Use something permissible in NCName to allow us to key off of.
+ /*
+ * Build a generated job with only one flow in it to submit to the
+ * BatchKernel. This is used to build subjobs from splits.
+ *
+ */
+ public static JSLJob buildFlowInSplitSubJob(Long parentJobExecutionId, JobContext jobContext, Split split, Flow flow) {
+
+ ObjectFactory jslFactory = new ObjectFactory();
+ JSLJob subJob = jslFactory.createJSLJob();
+
+ // Set the generated subjob id
+ String subJobId = generateSubJobId(parentJobExecutionId, split.getId(), flow.getId());
+ subJob.setId(subJobId);
+
+
+ //Copy all properties from parent JobContext to flow threads
+ subJob.setProperties(CloneUtility.javaPropsTojslProperties(jobContext.getProperties()));
+
+
+ //We don't need to do a deep copy here since each flow is already independent of all others, unlike in a partition
+ //where one step instance can be executed with different properties on multiple threads.
+
+ subJob.getExecutionElements().add(flow);
+
+ return subJob;
+ }
+
+ /*
+ * Build a generated job with only one step in it to submit to the
+ * BatchKernel. This is used for partitioned steps.
+ *
+ */
+ public static JSLJob buildPartitionSubJob(Long parentJobInstanceId, JobContext jobContext, Step step, int partitionInstance) {
+
+ ObjectFactory jslFactory = new ObjectFactory();
+ JSLJob subJob = jslFactory.createJSLJob();
+
+
+ // Set the generated subjob id
+ String subJobId = generateSubJobId(parentJobInstanceId, step.getId(), partitionInstance);
+ subJob.setId(subJobId);
+
+
+ //Copy all properties from parent JobContext to partitioned step threads
+ subJob.setProperties(CloneUtility.javaPropsTojslProperties(jobContext.getProperties()));
+
+ // Add one step to job
+ Step newStep = jslFactory.createStep();
+
+ //set id
+ newStep.setId(step.getId());
+
+
+ /***
+ * deep copy all fields in a step
+ */
+ newStep.setAllowStartIfComplete(step.getAllowStartIfComplete());
+
+ if (step.getBatchlet() != null){
+ newStep.setBatchlet(CloneUtility.cloneBatchlet(step.getBatchlet()));
+ }
+
+ if (step.getChunk() != null) {
+ newStep.setChunk(CloneUtility.cloneChunk(step.getChunk()));
+ }
+
+ // Do not copy next attribute and control elements. Transitioning should ONLY
+ // take place on the main thread.
+
+ //Do not add step listeners, only call them on parent thread.
+
+ //Add partition artifacts and set instances to 1 as the base case
+ Partition partition = step.getPartition();
+ if (partition != null) {
+ if (partition.getCollector() != null) {
+
+ Partition basePartition = jslFactory.createPartition();
+
+ PartitionPlan partitionPlan = jslFactory.createPartitionPlan();
+ partitionPlan.setPartitions(null);
+ basePartition.setPlan(partitionPlan);
+
+ basePartition.setCollector(partition.getCollector());
+ newStep.setPartition(basePartition);
+
+ }
+ }
+
+ newStep.setStartLimit(step.getStartLimit());
+ newStep.setProperties(CloneUtility.cloneJSLProperties(step.getProperties()));
+
+ // Don't try to only clone based on type (e.g. ChunkListener vs. StepListener).
+ // We don't know the type at the model level, and a given artifact could implement more
+ // than one listener interface (e.g. ChunkListener AND StepListener).
+ newStep.setListeners(CloneUtility.cloneListeners(step.getListeners()));
+
+ //Add Step properties, need to be careful here to remember the right precedence
+
+ subJob.getExecutionElements().add(newStep);
+
+
+ return subJob;
+ }
+
+ /**
+ * @param parentJobInstanceId
+ * the execution id of the parent job
+ * @param splitId this is the split id where the flows are nested
+ * @param flowId
+ * this is the id of the partitioned control element, it can be a
+ * step id or flow id
+ * @param partitionInstance
+ * the instance number of the partitioned element
+ * @return a String of the form
+ * <parentJobExecutionId>:<parentId>:<splitId>:<flowId>
+ */
+ private static String generateSubJobId(Long parentJobInstanceId, String splitId, String flowId) {
+
+ StringBuilder strBuilder = new StringBuilder(JOB_ID_SEPARATOR);
+ strBuilder.append(parentJobInstanceId.toString());
+ strBuilder.append(JOB_ID_SEPARATOR);
+ strBuilder.append(splitId);
+ strBuilder.append(JOB_ID_SEPARATOR);
+ strBuilder.append(flowId);
+
+ return strBuilder.toString();
+ }
+
+ /**
+ * @param parentJobInstanceId
+ * the execution id of the parent job
+ * @param stepId
+ * this is the id of the partitioned control element, it can be a
+ * step id or flow id
+ * @param partitionInstance
+ * the instance number of the partitioned element
+ * @return a String of the form
+ * <parentJobExecutionId>:<parentId>:<partitionInstance>
+ */
+ private static String generateSubJobId(Long parentJobInstanceId, String stepId, int partitionInstance) {
+
+ StringBuilder strBuilder = new StringBuilder(JOB_ID_SEPARATOR);
+ strBuilder.append(parentJobInstanceId.toString());
+ strBuilder.append(JOB_ID_SEPARATOR);
+ strBuilder.append(stepId);
+ strBuilder.append(JOB_ID_SEPARATOR);
+ strBuilder.append(partitionInstance);
+
+ return strBuilder.toString();
+ }
+
+
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/f7740962/JSR352.Runtime/src/com/ibm/jbatch/container/impl/PartitionedStepControllerImpl.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/PartitionedStepControllerImpl.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/PartitionedStepControllerImpl.java
new file mode 100755
index 0000000..1734b11
--- /dev/null
+++ b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/PartitionedStepControllerImpl.java
@@ -0,0 +1,528 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.jbatch.container.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.batch.api.partition.PartitionPlan;
+import javax.batch.api.partition.PartitionReducer.PartitionStatus;
+import javax.batch.operations.JobExecutionAlreadyCompleteException;
+import javax.batch.operations.JobExecutionNotMostRecentException;
+import javax.batch.operations.JobRestartException;
+import javax.batch.operations.JobStartException;
+import javax.batch.runtime.BatchStatus;
+
+import com.ibm.jbatch.container.artifact.proxy.InjectionReferences;
+import com.ibm.jbatch.container.artifact.proxy.PartitionAnalyzerProxy;
+import com.ibm.jbatch.container.artifact.proxy.PartitionMapperProxy;
+import com.ibm.jbatch.container.artifact.proxy.PartitionReducerProxy;
+import com.ibm.jbatch.container.artifact.proxy.ProxyFactory;
+import com.ibm.jbatch.container.artifact.proxy.StepListenerProxy;
+import com.ibm.jbatch.container.context.impl.StepContextImpl;
+import com.ibm.jbatch.container.exception.BatchContainerRuntimeException;
+import com.ibm.jbatch.container.exception.BatchContainerServiceException;
+import com.ibm.jbatch.container.jobinstance.RuntimeJobExecution;
+import com.ibm.jbatch.container.jsl.CloneUtility;
+import com.ibm.jbatch.container.util.BatchPartitionPlan;
+import com.ibm.jbatch.container.util.BatchPartitionWorkUnit;
+import com.ibm.jbatch.container.util.BatchWorkUnit;
+import com.ibm.jbatch.container.util.PartitionDataWrapper;
+import com.ibm.jbatch.container.util.PartitionDataWrapper.PartitionEventType;
+import com.ibm.jbatch.container.util.PartitionsBuilderConfig;
+import com.ibm.jbatch.container.validation.ArtifactValidationException;
+import com.ibm.jbatch.jsl.model.Analyzer;
+import com.ibm.jbatch.jsl.model.JSLJob;
+import com.ibm.jbatch.jsl.model.JSLProperties;
+import com.ibm.jbatch.jsl.model.PartitionMapper;
+import com.ibm.jbatch.jsl.model.PartitionReducer;
+import com.ibm.jbatch.jsl.model.Property;
+import com.ibm.jbatch.jsl.model.Step;
+
+public class PartitionedStepControllerImpl extends BaseStepControllerImpl {
+
+ private final static String sourceClass = PartitionedStepControllerImpl.class.getName();
+ private final static Logger logger = Logger.getLogger(sourceClass);
+
+ private static final int DEFAULT_PARTITION_INSTANCES = 1;
+ private static final int DEFAULT_THREADS = 0; //0 means default to number of instances
+
+ private PartitionPlan plan = null;
+
+ private int partitions = DEFAULT_PARTITION_INSTANCES;
+ private int threads = DEFAULT_THREADS;
+
+ private Properties[] partitionProperties = null;
+
+ private volatile List<BatchPartitionWorkUnit> parallelBatchWorkUnits;
+
+ private PartitionReducerProxy partitionReducerProxy = null;
+
+ // On invocation this will be re-primed to reflect already-completed partitions from a previous execution.
+ int numPreviouslyCompleted = 0;
+
+ private PartitionAnalyzerProxy analyzerProxy = null;
+
+ final List<JSLJob> subJobs = new ArrayList<JSLJob>();
+ protected List<StepListenerProxy> stepListeners = null;
+
+ List<BatchPartitionWorkUnit> completedWork = new ArrayList<BatchPartitionWorkUnit>();
+
+ BlockingQueue<BatchPartitionWorkUnit> completedWorkQueue = null;
+
+ protected PartitionedStepControllerImpl(final RuntimeJobExecution jobExecutionImpl, final Step step, StepContextImpl stepContext, long rootJobExecutionId) {
+ super(jobExecutionImpl, step, stepContext, rootJobExecutionId);
+ }
+
+ @Override
+ public void stop() {
+
+ updateBatchStatus(BatchStatus.STOPPING);
+
+ // It's possible we may try to stop a partitioned step before any
+ // sub steps have been started.
+ synchronized (subJobs) {
+
+ if (parallelBatchWorkUnits != null) {
+ for (BatchWorkUnit subJob : parallelBatchWorkUnits) {
+ try {
+ batchKernel.stopJob(subJob.getJobExecutionImpl().getExecutionId());
+ } catch (Exception e) {
+ // TODO - Is this what we want to know.
+ // Blow up if it happens to force the issue.
+ throw new IllegalStateException(e);
+ }
+ }
+ }
+ }
+ }
+
+ private PartitionPlan generatePartitionPlan() {
+ // Determine the number of partitions
+
+
+ PartitionPlan plan = null;
+ Integer previousNumPartitions = null;
+ final PartitionMapper partitionMapper = step.getPartition().getMapper();
+
+ //from persisted plan from previous run
+ if (stepStatus.getNumPartitions() != null) {
+ previousNumPartitions = stepStatus.getNumPartitions();
+ }
+
+ if (partitionMapper != null) { //from partition mapper
+
+ PartitionMapperProxy partitionMapperProxy;
+
+ final List<Property> propertyList = partitionMapper.getProperties() == null ? null
+ : partitionMapper.getProperties().getPropertyList();
+
+ // Set all the contexts associated with this controller.
+ // Some of them may be null
+ InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext,
+ propertyList);
+
+ try {
+ partitionMapperProxy = ProxyFactory.createPartitionMapperProxy(
+ partitionMapper.getRef(), injectionRef, stepContext);
+ } catch (final ArtifactValidationException e) {
+ throw new BatchContainerServiceException(
+ "Cannot create the PartitionMapper ["
+ + partitionMapper.getRef() + "]", e);
+ }
+
+
+ PartitionPlan mapperPlan = partitionMapperProxy.mapPartitions();
+
+ //Set up the new partition plan
+ plan = new BatchPartitionPlan();
+ plan.setPartitionsOverride(mapperPlan.getPartitionsOverride());
+
+ //When true is specified, the partition count from the current run
+ //is used and all results from past partitions are discarded.
+ if (mapperPlan.getPartitionsOverride() || previousNumPartitions == null){
+ plan.setPartitions(mapperPlan.getPartitions());
+ } else {
+ plan.setPartitions(previousNumPartitions);
+ }
+
+ if (mapperPlan.getThreads() == 0) {
+ plan.setThreads(plan.getPartitions());
+ } else {
+ plan.setThreads(mapperPlan.getThreads());
+ }
+
+ plan.setPartitionProperties(mapperPlan.getPartitionProperties());
+
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine("Partition plan defined by partition mapper: " + plan);
+ }
+
+ } else if (step.getPartition().getPlan() != null) { //from static partition element in jsl
+
+
+ String partitionsAttr = step.getPartition().getPlan().getPartitions();
+ String threadsAttr = null;
+
+ int numPartitions = Integer.MIN_VALUE;
+ int numThreads;
+ Properties[] partitionProps = null;
+
+ if (partitionsAttr != null) {
+ try {
+ numPartitions = Integer.parseInt(partitionsAttr);
+ } catch (final NumberFormatException e) {
+ throw new IllegalArgumentException("Could not parse partition instances value in stepId: " + step.getId()
+ + ", with instances=" + partitionsAttr, e);
+ }
+ partitionProps = new Properties[numPartitions];
+ if (numPartitions < 1) {
+ throw new IllegalArgumentException("Partition instances value must be 1 or greater in stepId: " + step.getId()
+ + ", with instances=" + partitionsAttr);
+ }
+ }
+
+ threadsAttr = step.getPartition().getPlan().getThreads();
+ if (threadsAttr != null) {
+ try {
+ numThreads = Integer.parseInt(threadsAttr);
+ if (numThreads == 0) {
+ numThreads = numPartitions;
+ }
+ } catch (final NumberFormatException e) {
+ throw new IllegalArgumentException("Could not parse partition threads value in stepId: " + step.getId()
+ + ", with threads=" + threadsAttr, e);
+ }
+ if (numThreads < 0) {
+ throw new IllegalArgumentException("Threads value must be 0 or greater in stepId: " + step.getId()
+ + ", with threads=" + threadsAttr);
+
+ }
+ } else { //default to number of partitions if threads isn't set
+ numThreads = numPartitions;
+ }
+
+
+ if (step.getPartition().getPlan().getProperties() != null) {
+
+ List<JSLProperties> jslProperties = step.getPartition().getPlan().getProperties();
+ for (JSLProperties props : jslProperties) {
+ int targetPartition = Integer.parseInt(props.getPartition());
+
+ try {
+ partitionProps[targetPartition] = CloneUtility.jslPropertiesToJavaProperties(props);
+ } catch (ArrayIndexOutOfBoundsException e) {
+ throw new BatchContainerRuntimeException("There are only " + numPartitions + " partition instances, but there are "
+ + jslProperties.size()
+ + " partition properties lists defined. Remember that partition indexing is 0 based like Java arrays.", e);
+ }
+ }
+ }
+ plan = new BatchPartitionPlan();
+ plan.setPartitions(numPartitions);
+ plan.setThreads(numThreads);
+ plan.setPartitionProperties(partitionProps);
+ plan.setPartitionsOverride(false); //FIXME what is the default for a static plan??
+ }
+
+
+ // Set the other instance variables for convenience.
+ this.partitions = plan.getPartitions();
+ this.threads = plan.getThreads();
+ this.partitionProperties = plan.getPartitionProperties();
+
+ return plan;
+ }
+
+
+ @Override
+ protected void invokeCoreStep() throws JobRestartException, JobStartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException {
+
+ this.plan = this.generatePartitionPlan();
+
+ //persist the partition plan so on restart we have the same plan to reuse
+ stepStatus.setNumPartitions(plan.getPartitions());
+
+ /* When true is specified, the partition count from the current run
+ * is used and all results from past partitions are discarded. Any
+ * resource cleanup or back out of work done in the previous run is the
+ * responsibility of the application. The PartitionReducer artifact's
+ * rollbackPartitionedStep method is invoked during restart before any
+ * partitions begin processing to provide a cleanup hook.
+ */
+ if (plan.getPartitionsOverride()) {
+ if (this.partitionReducerProxy != null) {
+ this.partitionReducerProxy.rollbackPartitionedStep();
+ }
+ }
+
+ logger.fine("Number of partitions in step: " + partitions + " in step " + step.getId() + "; Subjob properties defined by partition mapper: " + partitionProperties);
+
+ //Set up a blocking queue to pick up collector data from a partitioned thread
+ if (this.analyzerProxy != null) {
+ this.analyzerStatusQueue = new LinkedBlockingQueue<PartitionDataWrapper>();
+ }
+ this.completedWorkQueue = new LinkedBlockingQueue<BatchPartitionWorkUnit>();
+
+ // Build all sub jobs from partitioned step
+ buildSubJobBatchWorkUnits();
+
+ // kick off the threads
+ executeAndWaitForCompletion();
+
+ // Deal with the results.
+ checkCompletedWork();
+ }
+ private void buildSubJobBatchWorkUnits() throws JobRestartException, JobStartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException {
+ synchronized (subJobs) {
+ //check if we've already issued a stop
+ if (jobExecutionImpl.getJobContext().getBatchStatus().equals(BatchStatus.STOPPING)){
+ logger.fine("Step already in STOPPING state, exiting from buildSubJobBatchWorkUnits() before beginning execution");
+ return;
+ }
+
+ for (int instance = 0; instance < partitions; instance++) {
+ subJobs.add(PartitionedStepBuilder.buildPartitionSubJob(jobExecutionImpl.getInstanceId(),jobExecutionImpl.getJobContext(), step, instance));
+ }
+
+ PartitionsBuilderConfig config = new PartitionsBuilderConfig(subJobs, partitionProperties, analyzerStatusQueue, completedWorkQueue, jobExecutionImpl.getExecutionId());
+ // Then build all the subjobs but do not start them yet
+ if (stepStatus.getStartCount() > 1 && !plan.getPartitionsOverride()) {
+ parallelBatchWorkUnits = batchKernel.buildOnRestartParallelPartitions(config);
+ } else {
+ parallelBatchWorkUnits = batchKernel.buildNewParallelPartitions(config);
+ }
+
+ // NOTE: At this point I might not have as many work units as I had partitions, since some may have already completed.
+ }
+ }
+
+ private void executeAndWaitForCompletion() throws JobRestartException {
+
+ if (jobExecutionImpl.getJobContext().getBatchStatus().equals(BatchStatus.STOPPING)){
+ logger.fine("Step already in STOPPING state, exiting from executeAndWaitForCompletion() before beginning execution");
+ return;
+ }
+
+ int numTotalForThisExecution = parallelBatchWorkUnits.size();
+ this.numPreviouslyCompleted = partitions - numTotalForThisExecution;
+ int numCurrentCompleted = 0;
+ int numCurrentSubmitted = 0;
+
+ logger.fine("Calculated that " + numPreviouslyCompleted + " partitions are already complete out of total # = "
+ + partitions + ", with # remaining =" + numTotalForThisExecution);
+
+ //Start up to to the max num we are allowed from the num threads attribute
+ for (int i=0; i < this.threads && i < numTotalForThisExecution; i++, numCurrentSubmitted++) {
+ if (stepStatus.getStartCount() > 1 && !!!plan.getPartitionsOverride()) {
+ batchKernel.restartGeneratedJob(parallelBatchWorkUnits.get(i));
+ } else {
+ batchKernel.startGeneratedJob(parallelBatchWorkUnits.get(i));
+ }
+ }
+
+ boolean readyToSubmitAnother = false;
+ while (true) {
+ logger.finer("Begin main loop in waitForQueueCompletion(), readyToSubmitAnother = " + readyToSubmitAnother);
+ try {
+ if (analyzerProxy != null) {
+ logger.fine("Found analyzer, proceeding on analyzerQueue path");
+ PartitionDataWrapper dataWrapper = analyzerStatusQueue.take();
+ if (PartitionEventType.ANALYZE_COLLECTOR_DATA.equals(dataWrapper.getEventType())) {
+ logger.finer("Analyze collector data: " + dataWrapper.getCollectorData());
+ analyzerProxy.analyzeCollectorData(dataWrapper.getCollectorData());
+ continue; // without being ready to submit another
+ } else if (PartitionEventType.ANALYZE_STATUS.equals(dataWrapper.getEventType())) {
+ analyzerProxy.analyzeStatus(dataWrapper.getBatchstatus(), dataWrapper.getExitStatus());
+ logger.fine("Analyze status called for completed partition: batchStatus= " + dataWrapper.getBatchstatus() + ", exitStatus = " + dataWrapper.getExitStatus());
+ completedWork.add(completedWorkQueue.take()); // Shouldn't be a a long wait.
+ readyToSubmitAnother = true;
+ } else {
+ logger.warning("Invalid partition state");
+ throw new IllegalStateException("Invalid partition state");
+ }
+ } else {
+ logger.fine("No analyzer, proceeding on completedWorkQueue path");
+ // block until at least one thread has finished to
+ // submit more batch work. hold on to the finished work to look at later
+ completedWork.add(completedWorkQueue.take());
+ readyToSubmitAnother = true;
+ }
+ } catch (InterruptedException e) {
+ logger.severe("Caught exc"+ e);
+ throw new BatchContainerRuntimeException(e);
+ }
+
+ if (readyToSubmitAnother) {
+ numCurrentCompleted++;
+ logger.fine("Ready to submit another (if there is another left to submit); numCurrentCompleted = " + numCurrentCompleted);
+ if (numCurrentCompleted < numTotalForThisExecution) {
+ if (numCurrentSubmitted < numTotalForThisExecution) {
+ logger.fine("Submitting # " + numCurrentSubmitted + " out of " + numTotalForThisExecution + " total for this execution");
+ if (stepStatus.getStartCount() > 1) {
+ batchKernel.startGeneratedJob(parallelBatchWorkUnits.get(numCurrentSubmitted++));
+ } else {
+ batchKernel.restartGeneratedJob(parallelBatchWorkUnits.get(numCurrentSubmitted++));
+ }
+ readyToSubmitAnother = false;
+ }
+ } else {
+ logger.fine("Finished... breaking out of loop");
+ break;
+ }
+ } else {
+ logger.fine("Not ready to submit another."); // Must have just done a collector
+ }
+ }
+ }
+
+ private void checkCompletedWork() {
+
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine("Check completed work list.");
+ }
+
+ /**
+ * check the batch status of each subJob after it's done to see if we need to issue a rollback
+ * start rollback if any have stopped or failed
+ */
+ boolean rollback = false;
+ boolean partitionFailed = false;
+
+ for (final BatchWorkUnit subJob : completedWork) {
+ BatchStatus batchStatus = subJob.getJobExecutionImpl().getJobContext().getBatchStatus();
+ if (batchStatus.equals(BatchStatus.FAILED)) {
+ logger.fine("Subjob " + subJob.getJobExecutionImpl().getExecutionId() + " ended with status '" + batchStatus + "'; Starting logical transaction rollback.");
+
+ rollback = true;
+ partitionFailed = true;
+
+ //Keep track of the failing status and throw an exception to propagate after the rest of the partitions are complete
+ stepContext.setBatchStatus(BatchStatus.FAILED);
+ }
+ }
+
+ //If rollback is false we never issued a rollback so we can issue a logicalTXSynchronizationBeforeCompletion
+ //NOTE: this will get issued even in a subjob fails or stops if no logicalTXSynchronizationRollback method is provied
+ //We are assuming that not providing a rollback was intentional
+ if (rollback == true) {
+ if (this.partitionReducerProxy != null) {
+ this.partitionReducerProxy.rollbackPartitionedStep();
+ }
+ if (partitionFailed) {
+ throw new BatchContainerRuntimeException("One or more partitions failed");
+ }
+ } else {
+ if (this.partitionReducerProxy != null) {
+ this.partitionReducerProxy.beforePartitionedStepCompletion();
+ }
+ }
+ }
+
+ @Override
+ protected void setupStepArtifacts() {
+
+ InjectionReferences injectionRef = null;
+ injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, null);
+ this.stepListeners = jobExecutionImpl.getListenerFactory().getStepListeners(step, injectionRef, stepContext);
+
+ Analyzer analyzer = step.getPartition().getAnalyzer();
+
+ if (analyzer != null) {
+ final List<Property> propList = analyzer.getProperties() == null ? null : analyzer.getProperties()
+ .getPropertyList();
+
+ injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propList);
+
+ try {
+ analyzerProxy = ProxyFactory.createPartitionAnalyzerProxy(analyzer.getRef(), injectionRef, stepContext);
+ } catch (final ArtifactValidationException e) {
+ throw new BatchContainerServiceException("Cannot create the analyzer [" + analyzer.getRef() + "]", e);
+ }
+ }
+
+ PartitionReducer partitionReducer = step.getPartition().getReducer();
+
+ if (partitionReducer != null) {
+
+ final List<Property> propList = partitionReducer.getProperties() == null ? null : partitionReducer.getProperties()
+ .getPropertyList();
+
+ injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propList);
+
+ try {
+ this.partitionReducerProxy = ProxyFactory.createPartitionReducerProxy(partitionReducer.getRef(), injectionRef, stepContext);
+ } catch (final ArtifactValidationException e) {
+ throw new BatchContainerServiceException("Cannot create the analyzer [" + partitionReducer.getRef() + "]",
+ e);
+ }
+ }
+
+ }
+
+ @Override
+ protected void invokePreStepArtifacts() {
+
+ if (stepListeners != null) {
+ for (StepListenerProxy listenerProxy : stepListeners) {
+ // Call beforeStep on all the step listeners
+ listenerProxy.beforeStep();
+ }
+ }
+
+ // Invoke the reducer before all parallel steps start (must occur
+ // before mapper as well)
+ if (this.partitionReducerProxy != null) {
+ this.partitionReducerProxy.beginPartitionedStep();
+ }
+
+ }
+
+ @Override
+ protected void invokePostStepArtifacts() {
+ // Invoke the reducer after all parallel steps are done
+ if (this.partitionReducerProxy != null) {
+
+ if ((BatchStatus.COMPLETED).equals(stepContext.getBatchStatus())) {
+ this.partitionReducerProxy.afterPartitionedStepCompletion(PartitionStatus.COMMIT);
+ }else {
+ this.partitionReducerProxy.afterPartitionedStepCompletion(PartitionStatus.ROLLBACK);
+ }
+
+ }
+
+ // Called in spec'd order, e.g. Sec. 11.7
+ if (stepListeners != null) {
+ for (StepListenerProxy listenerProxy : stepListeners) {
+ // Call afterStep on all the step listeners
+ listenerProxy.afterStep();
+ }
+ }
+ }
+
+ @Override
+ protected void sendStatusFromPartitionToAnalyzerIfPresent() {
+ // Since we're already on the main thread, there will never
+ // be anything to do on this thread. It's only on the partitioned
+ // threads that there is something to send back.
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/f7740962/JSR352.Runtime/src/com/ibm/jbatch/container/impl/RetryHandler.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/RetryHandler.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/RetryHandler.java
new file mode 100755
index 0000000..e0906a6
--- /dev/null
+++ b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/RetryHandler.java
@@ -0,0 +1,430 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.jbatch.container.impl;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+
+import com.ibm.jbatch.container.artifact.proxy.RetryProcessListenerProxy;
+import com.ibm.jbatch.container.artifact.proxy.RetryReadListenerProxy;
+import com.ibm.jbatch.container.artifact.proxy.RetryWriteListenerProxy;
+import com.ibm.jbatch.container.exception.BatchContainerRuntimeException;
+import com.ibm.jbatch.jsl.model.Chunk;
+import com.ibm.jbatch.jsl.model.ExceptionClassFilter;
+
+public class RetryHandler {
+
+ /**
+ *
+ * Logic for handling retryable records.
+ *
+ * A RetryHandler object is attached to every BDS that inherits from AbstractBatchDataStream.
+ *
+ */
+
+ private static final String className = RetryHandler.class.getName();
+ private static Logger logger = Logger.getLogger(RetryHandler.class.getPackage().getName());
+
+ public static final String RETRY_COUNT = "retry-limit";
+ public static final String RETRY_INCLUDE_EX = "include class";
+ public static final String RETRY_EXCLUDE_EX = "exclude class";
+
+ private static final int RETRY_NONE = 0;
+ private static final int RETRY_READ = 1;
+ private static final int RETRY_PROCESS = 2;
+ private static final int RETRY_WRITE = 3;
+
+ private int retryType = RETRY_NONE;
+
+ /*private RetryProcessListenerProxy _retryProcessListener = null;
+ private RetryReadListenerProxy _retryReadListener = null;
+ private RetryWriteListenerProxy _retryWriteListener = null;*/
+
+ List<RetryProcessListenerProxy> _retryProcessListeners = null;
+ List<RetryReadListenerProxy> _retryReadListeners = null;
+ List<RetryWriteListenerProxy> _retryWriteListeners = null;
+
+ private long _jobId = 0;
+ private String _stepId = null;
+ private Set<String> _retryNoRBIncludeExceptions = null;
+ private Set<String> _retryNoRBExcludeExceptions = null;
+ private Set<String> _retryIncludeExceptions = null;
+ private Set<String> _retryExcludeExceptions = null;
+ private int _retryLimit = Integer.MIN_VALUE;
+ private long _retryCount = 0;
+ private Exception _retryException = null;
+
+ public RetryHandler(Chunk chunk, long l, String stepId)
+ {
+ _jobId = l;
+ _stepId = stepId;
+
+ initialize(chunk);
+ }
+
+
+ /**
+ * Add the user-defined RetryProcessListener.
+ *
+ */
+ public void addRetryProcessListener(List<RetryProcessListenerProxy> retryProcessListeners)
+ {
+ _retryProcessListeners = retryProcessListeners;
+ }
+
+ /**
+ * Add the user-defined RetryReadListener.
+ *
+ */
+ public void addRetryReadListener(List<RetryReadListenerProxy> retryReadListeners)
+ {
+ _retryReadListeners = retryReadListeners;
+ }
+
+ /**
+ * Add the user-defined RetryWriteListener.
+ *
+ */
+ public void addRetryWriteListener(List<RetryWriteListenerProxy> retryWriteListeners)
+ {
+ _retryWriteListeners = retryWriteListeners;
+ }
+
+
+ /**
+ * Read the retry exception lists from the BDS props.
+ */
+ private void initialize(Chunk chunk)
+ {
+ final String mName = "initialize";
+
+ if(logger.isLoggable(Level.FINER))
+ logger.entering(className, mName);
+
+ try
+ {
+ if (chunk.getRetryLimit() != null){
+ _retryLimit = Integer.parseInt(chunk.getRetryLimit());
+ if (_retryLimit < 0) {
+ throw new IllegalArgumentException("The retry-limit attribute on a chunk cannot be a negative value");
+ }
+
+ }
+ }
+ catch (NumberFormatException nfe)
+ {
+ throw new RuntimeException("NumberFormatException reading " + RETRY_COUNT, nfe);
+ }
+
+ // Read the include/exclude exceptions.
+ _retryIncludeExceptions = new HashSet<String>();
+ _retryExcludeExceptions = new HashSet<String>();
+ _retryNoRBIncludeExceptions = new HashSet<String>();
+ _retryNoRBExcludeExceptions = new HashSet<String>();
+
+ String includeEx = null;
+ String excludeEx = null;
+ String includeExNoRB = null;
+ String excludeExNoRB = null;
+
+ if (chunk.getRetryableExceptionClasses() != null) {
+ if (chunk.getRetryableExceptionClasses().getIncludeList() != null) {
+ List<ExceptionClassFilter.Include> includes = chunk.getRetryableExceptionClasses().getIncludeList();
+ for (ExceptionClassFilter.Include include : includes) {
+ _retryIncludeExceptions.add(include.getClazz().trim());
+ logger.finer("RETRYHANDLE: include: " + include.getClazz().trim());
+ }
+
+ if (_retryIncludeExceptions.size() == 0) {
+ logger.finer("RETRYHANDLE: include element not present");
+
+ }
+ }
+ if (chunk.getRetryableExceptionClasses().getExcludeList() != null) {
+ List<ExceptionClassFilter.Exclude> excludes = chunk.getRetryableExceptionClasses().getExcludeList();
+ for (ExceptionClassFilter.Exclude exclude : excludes) {
+ _retryExcludeExceptions.add(exclude.getClazz().trim());
+ logger.finer("SKIPHANDLE: exclude: " + exclude.getClazz().trim());
+ }
+
+ if (_retryExcludeExceptions.size() == 0) {
+ logger.finer("SKIPHANDLE: exclude element not present");
+
+ }
+ }
+ }
+
+ if (chunk.getNoRollbackExceptionClasses() != null) {
+ if (chunk.getNoRollbackExceptionClasses().getIncludeList() != null) {
+ List<ExceptionClassFilter.Include> includes = chunk.getNoRollbackExceptionClasses().getIncludeList();
+ for (ExceptionClassFilter.Include include : includes) {
+ _retryNoRBIncludeExceptions.add(include.getClazz().trim());
+ logger.finer("RETRYHANDLE: include: " + include.getClazz().trim());
+ }
+
+ if (_retryNoRBIncludeExceptions.size() == 0) {
+ logger.finer("RETRYHANDLE: include element not present");
+
+ }
+ }
+ if (chunk.getNoRollbackExceptionClasses().getExcludeList() != null) {
+ List<ExceptionClassFilter.Exclude> excludes = chunk.getNoRollbackExceptionClasses().getExcludeList();
+ for (ExceptionClassFilter.Exclude exclude : excludes) {
+ _retryNoRBExcludeExceptions.add(exclude.getClazz().trim());
+ logger.finer("SKIPHANDLE: exclude: " + exclude.getClazz().trim());
+ }
+
+ if (_retryNoRBExcludeExceptions.size() == 0) {
+ logger.finer("SKIPHANDLE: exclude element not present");
+
+ }
+ }
+ }
+
+ if (logger.isLoggable(Level.FINE)) {
+ logger.logp(Level.FINE, className, mName, "added include exception " + includeEx + "; added exclude exception " + excludeEx);
+ logger.logp(Level.FINE, className, mName, "added include no rollback exception " + includeExNoRB
+ + "; added exclude no rollback exception " + excludeExNoRB);
+ }
+
+ if(logger.isLoggable(Level.FINER)) {
+ logger.exiting(className, mName, this.toString());
+ }
+ }
+
+ public boolean isRollbackException(Exception e)
+ {
+ return !isNoRollbackException(e);
+ }
+ /**
+ * Handle exception from a read failure.
+ */
+ public void handleExceptionRead(Exception e)
+ {
+ final String mName = "handleExceptionRead";
+
+ logger.finer("RETRYHANDLE: in retryhandler handle exception on a read:" + e.toString());
+
+ if(logger.isLoggable(Level.FINER))
+ logger.logp(Level.FINE, className, mName, e.getClass().getName() + "; " + this.toString());
+
+ if (!isRetryLimitReached() && isRetryable(e))
+ {
+ retryType = RETRY_READ;
+ _retryException = e;
+ // Retry it. Log it. Call the RetryListener.
+ ++_retryCount;
+ logRetry(e);
+
+ if (_retryReadListeners != null) {
+ for (RetryReadListenerProxy retryReadListenerProxy : _retryReadListeners) {
+ retryReadListenerProxy.onRetryReadException(e);
+ }
+ }
+ }
+ else
+ {
+ // No retry. Throw it back.
+ if(logger.isLoggable(Level.FINER))
+ logger.logp(Level.FINE, className, mName, "No retry. Rethrow", e);
+ throw new BatchContainerRuntimeException(e);
+ }
+
+ if(logger.isLoggable(Level.FINER))
+ logger.exiting(className, mName, e);
+ }
+
+ /**
+ * Handle exception from a process failure.
+ */
+ public void handleExceptionProcess(Exception e, Object w)
+ {
+ final String mName = "handleExceptionProcess";
+
+ if(logger.isLoggable(Level.FINER))
+ logger.logp(Level.FINE, className, mName, e.getClass().getName() + "; " + this.toString());
+
+ if (!isRetryLimitReached() && isRetryable(e))
+ {
+ retryType = RETRY_PROCESS;
+ _retryException = e;
+ // Retry it. Log it. Call the RetryListener.
+ ++_retryCount;
+ logRetry(e);
+
+ if (_retryProcessListeners != null) {
+ for (RetryProcessListenerProxy retryProcessListenerProxy : _retryProcessListeners) {
+ retryProcessListenerProxy.onRetryProcessException(w, e);
+ }
+ }
+ }
+ else
+ {
+ // No retry. Throw it back.
+ if(logger.isLoggable(Level.FINER))
+ logger.logp(Level.FINE, className, mName, "No retry. Rethrow ", e);
+ throw new BatchContainerRuntimeException(e);
+ }
+ }
+
+ /**
+ * Handle exception from a write failure.
+ */
+ public void handleExceptionWrite(Exception e, List<Object> w)
+ {
+ final String mName = "handleExceptionWrite";
+
+ if(logger.isLoggable(Level.FINER))
+ logger.logp(Level.FINE, className, mName, e.getClass().getName() + "; " + this.toString());
+
+ if (!isRetryLimitReached() && isRetryable(e))
+ {
+ // Retry it. Log it. Call the RetryListener.
+ retryType = RETRY_WRITE;
+ _retryException = e;
+ ++_retryCount;
+ logRetry(e);
+
+ if (_retryWriteListeners != null) {
+ for (RetryWriteListenerProxy retryWriteListenerProxy : _retryWriteListeners) {
+ retryWriteListenerProxy.onRetryWriteException(w, e);
+ }
+ }
+ }
+ else
+ {
+ // No retry. Throw it back.
+ if(logger.isLoggable(Level.FINER))
+ logger.logp(Level.FINE, className, mName, "No retry. Rethrow ", e);
+ throw new BatchContainerRuntimeException(e);
+ }
+ }
+
+
+ /**
+ * Check the retryable exception lists to determine whether
+ * the given Exception is retryable.
+ */
+ private boolean isRetryable(Exception e)
+ {
+ final String mName = "isRetryable";
+
+ String exClassName = e.getClass().getName();
+
+ boolean retVal = containsException(_retryIncludeExceptions, e) && !containsException(_retryExcludeExceptions, e);
+
+ if(logger.isLoggable(Level.FINE))
+ logger.logp(Level.FINE, className, mName, mName + ": " + retVal + ": " + exClassName);
+
+ return retVal;
+ }
+
+ private boolean isNoRollbackException(Exception e)
+ {
+ final String mName = "isNoRollbackException";
+
+ String exClassName = e.getClass().getName();
+
+ boolean retVal = containsException(_retryNoRBIncludeExceptions, e) && !containsException(_retryNoRBExcludeExceptions, e);
+
+ if(logger.isLoggable(Level.FINE))
+ logger.logp(Level.FINE, className, mName, mName + ": " + retVal + ": " + exClassName);
+
+ return retVal;
+ }
+
+ /**
+ * Check whether given exception is in the specified exception list
+ */
+ private boolean containsException(Set<String> retryList, Exception e)
+ {
+ final String mName = "containsException";
+ boolean retVal = false;
+
+ for ( Iterator it = retryList.iterator(); it.hasNext(); ) {
+ String exClassName = (String) it.next();
+
+ try {
+ if (retVal = Thread.currentThread().getContextClassLoader().loadClass(exClassName).isInstance(e))
+ break;
+ } catch (ClassNotFoundException cnf) {
+ logger.logp(Level.FINE, className, mName, cnf.getLocalizedMessage());
+ }
+ }
+
+ if(logger.isLoggable(Level.FINE))
+ logger.logp(Level.FINE, className, mName, mName + ": " + retVal );
+
+ return retVal;
+ }
+
+ /**
+ * Check if the retry limit has been reached.
+ *
+ * Note: if retry handling isn't enabled (i.e. not configured in xJCL), then this method
+ * will always return TRUE.
+ */
+ private boolean isRetryLimitReached()
+ {
+ // Unlimited retries if it is never defined
+ if (_retryLimit == Integer.MIN_VALUE) {
+ return false;
+ }
+
+ return (_retryCount >= _retryLimit);
+ }
+
+
+ private void logRetry(Exception e)
+ {
+ String key = "record.retried.norollback.by.batch.container";
+ Object[] details = { _jobId, _stepId, e.getClass().getName() + ": " + e.getMessage() };
+ //String message = LoggerUtil.getFormattedMessage(key, details, true);
+ //logger.info(message);
+ }
+
+ public Exception getException()
+ {
+ return _retryException;
+ }
+
+ public long getRetryCount()
+ {
+ return _retryCount;
+ }
+
+ public void setRetryCount(long retryCount)
+ {
+ final String mName = "setRetryCount";
+
+ _retryCount = retryCount;
+
+ if(logger.isLoggable(Level.FINE))
+ logger.logp(Level.FINE, className, mName, "setRetryCount: " + _retryCount);
+ }
+
+ public String toString()
+ {
+ return "RetryHandler{" + super.toString() + "}count:limit=" + _retryCount + ":" + _retryLimit;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/f7740962/JSR352.Runtime/src/com/ibm/jbatch/container/impl/SingleThreadedStepControllerImpl.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/SingleThreadedStepControllerImpl.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/SingleThreadedStepControllerImpl.java
new file mode 100755
index 0000000..1c81e7f
--- /dev/null
+++ b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/SingleThreadedStepControllerImpl.java
@@ -0,0 +1,147 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.jbatch.container.impl;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.logging.Logger;
+
+
+import com.ibm.jbatch.container.IController;
+import com.ibm.jbatch.container.artifact.proxy.InjectionReferences;
+import com.ibm.jbatch.container.artifact.proxy.PartitionCollectorProxy;
+import com.ibm.jbatch.container.artifact.proxy.ProxyFactory;
+import com.ibm.jbatch.container.artifact.proxy.StepListenerProxy;
+import com.ibm.jbatch.container.context.impl.StepContextImpl;
+import com.ibm.jbatch.container.exception.BatchContainerServiceException;
+import com.ibm.jbatch.container.jobinstance.RuntimeJobExecution;
+import com.ibm.jbatch.container.util.PartitionDataWrapper;
+import com.ibm.jbatch.container.util.PartitionDataWrapper.PartitionEventType;
+import com.ibm.jbatch.container.validation.ArtifactValidationException;
+import com.ibm.jbatch.jsl.model.Collector;
+import com.ibm.jbatch.jsl.model.Property;
+import com.ibm.jbatch.jsl.model.Step;
+
+/**
+ *
+ * When a partitioned step is run, this controller will only be used for the partition threads,
+ * NOT the top-level main thread that the step executes upon.
+ *
+ * When a non-partitioned step is run this controller will be used as well (and there will be no
+ * separate main thread with controller).
+ *
+ */
+public abstract class SingleThreadedStepControllerImpl extends BaseStepControllerImpl implements IController {
+
+ private final static String sourceClass = SingleThreadedStepControllerImpl.class.getName();
+ private final static Logger logger = Logger.getLogger(sourceClass);
+
+ // Collector only used from partition threads, not main thread
+ protected PartitionCollectorProxy collectorProxy = null;
+
+ protected SingleThreadedStepControllerImpl(RuntimeJobExecution jobExecutionImpl, Step step, StepContextImpl stepContext, long rootJobExecutionId, BlockingQueue<PartitionDataWrapper> analyzerStatusQueue) {
+ super(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerStatusQueue);
+ }
+
+ List<StepListenerProxy> stepListeners = null;
+
+ protected void setupStepArtifacts() {
+ // set up listeners
+
+ InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, null);
+ this.stepListeners = jobExecutionImpl.getListenerFactory().getStepListeners(step, injectionRef, stepContext);
+
+ // set up collectors if we are running a partitioned step
+ if (step.getPartition() != null) {
+ Collector collector = step.getPartition().getCollector();
+ if (collector != null) {
+ List<Property> propList = (collector.getProperties() == null) ? null : collector.getProperties().getPropertyList();
+ /**
+ * Inject job flow, split, and step contexts into partition
+ * artifacts like collectors and listeners some of these
+ * contexts may be null
+ */
+ injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propList);
+
+ try {
+ this.collectorProxy = ProxyFactory.createPartitionCollectorProxy(collector.getRef(), injectionRef, this.stepContext);
+ } catch (ArtifactValidationException e) {
+ throw new BatchContainerServiceException("Cannot create the collector [" + collector.getRef() + "]", e);
+ }
+ }
+ }
+ }
+
+ @Override
+ protected void invokePreStepArtifacts() {
+ // Don't call beforeStep() in the partitioned case, since we are now on a partition thread, and
+ // have already called beforeStep() on the main thread as the spec says.
+ if ((stepListeners != null) && (this.jobExecutionImpl.getPartitionInstance() == null)) {
+ for (StepListenerProxy listenerProxy : stepListeners) {
+ listenerProxy.beforeStep();
+ }
+ }
+ }
+
+ @Override
+ protected void invokePostStepArtifacts() {
+ // Don't call beforeStep() in the partitioned case, since we are now on a partition thread, and
+ // have already called beforeStep() on the main thread as the spec says.
+ if ((stepListeners != null) && (this.jobExecutionImpl.getPartitionInstance() == null)) {
+ for (StepListenerProxy listenerProxy : stepListeners) {
+ listenerProxy.afterStep();
+ }
+ }
+ }
+
+ protected void invokeCollectorIfPresent() {
+ if (collectorProxy != null) {
+ Serializable data = collectorProxy.collectPartitionData();
+ logger.finer("Got partition data: " + data + ", from collector: " + collectorProxy);
+ sendCollectorDataToAnalyzerIfPresent(data);
+ }
+ }
+
+ // Useless to have collector without analyzer but let's check so we don't hang or blow up.
+ protected void sendCollectorDataToAnalyzerIfPresent(Serializable data) {
+ if (analyzerStatusQueue != null) {
+ logger.finer("Sending collector partition data: " + data + " to analyzer queue: " + analyzerStatusQueue);
+ PartitionDataWrapper dataWrapper = new PartitionDataWrapper();
+ dataWrapper.setCollectorData(data);
+ dataWrapper.setEventType(PartitionEventType.ANALYZE_COLLECTOR_DATA);
+ analyzerStatusQueue.add(dataWrapper);
+ } else {
+ logger.fine("Analyzer not configured.");
+ }
+ }
+
+ // Useless to have collector without analyzer but let's check so we don't hang or blow up.
+ @Override
+ protected void sendStatusFromPartitionToAnalyzerIfPresent() {
+ if (analyzerStatusQueue != null) {
+ logger.fine("Send status from partition for analyzeStatus with batchStatus = " + stepStatus.getBatchStatus() + ", exitStatus = " + stepStatus.getExitStatus());
+ PartitionDataWrapper dataWrapper = new PartitionDataWrapper();
+ dataWrapper.setBatchStatus(stepStatus.getBatchStatus());
+ dataWrapper.setExitStatus(stepStatus.getExitStatus());
+ dataWrapper.setEventType(PartitionEventType.ANALYZE_STATUS);
+ analyzerStatusQueue.add(dataWrapper);
+ } else {
+ logger.fine("Analyzer not configured.");
+ }
+ }
+}