You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@batchee.apache.org by rm...@apache.org on 2013/11/05 08:39:29 UTC
[52/62] importing batchee from github - a fork from the IBm RI
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/JSR352.Runtime/src/com/ibm/jbatch/container/impl/PartitionedStepBuilder.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/PartitionedStepBuilder.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/PartitionedStepBuilder.java
deleted file mode 100755
index 4a5f038..0000000
--- a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/PartitionedStepBuilder.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- *
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License,
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package com.ibm.jbatch.container.impl;
-
-import javax.batch.runtime.context.JobContext;
-
-import com.ibm.jbatch.container.jsl.CloneUtility;
-import com.ibm.jbatch.jsl.model.Flow;
-import com.ibm.jbatch.jsl.model.JSLJob;
-import com.ibm.jbatch.jsl.model.ObjectFactory;
-import com.ibm.jbatch.jsl.model.Partition;
-import com.ibm.jbatch.jsl.model.PartitionPlan;
-import com.ibm.jbatch.jsl.model.Split;
-import com.ibm.jbatch.jsl.model.Step;
-
-public class PartitionedStepBuilder {
-
- public static final String JOB_ID_SEPARATOR = ":"; // Use something permissible in NCName to allow us to key off of.
- /*
- * Build a generated job with only one flow in it to submit to the
- * BatchKernel. This is used to build subjobs from splits.
- *
- */
- public static JSLJob buildFlowInSplitSubJob(Long parentJobExecutionId, JobContext jobContext, Split split, Flow flow) {
-
- ObjectFactory jslFactory = new ObjectFactory();
- JSLJob subJob = jslFactory.createJSLJob();
-
- // Set the generated subjob id
- String subJobId = generateSubJobId(parentJobExecutionId, split.getId(), flow.getId());
- subJob.setId(subJobId);
-
-
- //Copy all properties from parent JobContext to flow threads
- subJob.setProperties(CloneUtility.javaPropsTojslProperties(jobContext.getProperties()));
-
-
- //We don't need to do a deep copy here since each flow is already independent of all others, unlike in a partition
- //where one step instance can be executed with different properties on multiple threads.
-
- subJob.getExecutionElements().add(flow);
-
- return subJob;
- }
-
- /*
- * Build a generated job with only one step in it to submit to the
- * BatchKernel. This is used for partitioned steps.
- *
- */
- public static JSLJob buildPartitionSubJob(Long parentJobInstanceId, JobContext jobContext, Step step, int partitionInstance) {
-
- ObjectFactory jslFactory = new ObjectFactory();
- JSLJob subJob = jslFactory.createJSLJob();
-
-
- // Set the generated subjob id
- String subJobId = generateSubJobId(parentJobInstanceId, step.getId(), partitionInstance);
- subJob.setId(subJobId);
-
-
- //Copy all properties from parent JobContext to partitioned step threads
- subJob.setProperties(CloneUtility.javaPropsTojslProperties(jobContext.getProperties()));
-
- // Add one step to job
- Step newStep = jslFactory.createStep();
-
- //set id
- newStep.setId(step.getId());
-
-
- /***
- * deep copy all fields in a step
- */
- newStep.setAllowStartIfComplete(step.getAllowStartIfComplete());
-
- if (step.getBatchlet() != null){
- newStep.setBatchlet(CloneUtility.cloneBatchlet(step.getBatchlet()));
- }
-
- if (step.getChunk() != null) {
- newStep.setChunk(CloneUtility.cloneChunk(step.getChunk()));
- }
-
- // Do not copy next attribute and control elements. Transitioning should ONLY
- // take place on the main thread.
-
- //Do not add step listeners, only call them on parent thread.
-
- //Add partition artifacts and set instances to 1 as the base case
- Partition partition = step.getPartition();
- if (partition != null) {
- if (partition.getCollector() != null) {
-
- Partition basePartition = jslFactory.createPartition();
-
- PartitionPlan partitionPlan = jslFactory.createPartitionPlan();
- partitionPlan.setPartitions(null);
- basePartition.setPlan(partitionPlan);
-
- basePartition.setCollector(partition.getCollector());
- newStep.setPartition(basePartition);
-
- }
- }
-
- newStep.setStartLimit(step.getStartLimit());
- newStep.setProperties(CloneUtility.cloneJSLProperties(step.getProperties()));
-
- // Don't try to only clone based on type (e.g. ChunkListener vs. StepListener).
- // We don't know the type at the model level, and a given artifact could implement more
- // than one listener interface (e.g. ChunkListener AND StepListener).
- newStep.setListeners(CloneUtility.cloneListeners(step.getListeners()));
-
- //Add Step properties, need to be careful here to remember the right precedence
-
- subJob.getExecutionElements().add(newStep);
-
-
- return subJob;
- }
-
- /**
- * @param parentJobInstanceId
- * the execution id of the parent job
- * @param splitId this is the split id where the flows are nested
- * @param flowId
- * this is the id of the partitioned control element, it can be a
- * step id or flow id
- * @param partitionInstance
- * the instance number of the partitioned element
- * @return a String of the form
- * <parentJobExecutionId>:<parentId>:<splitId>:<flowId>
- */
- private static String generateSubJobId(Long parentJobInstanceId, String splitId, String flowId) {
-
- StringBuilder strBuilder = new StringBuilder(JOB_ID_SEPARATOR);
- strBuilder.append(parentJobInstanceId.toString());
- strBuilder.append(JOB_ID_SEPARATOR);
- strBuilder.append(splitId);
- strBuilder.append(JOB_ID_SEPARATOR);
- strBuilder.append(flowId);
-
- return strBuilder.toString();
- }
-
- /**
- * @param parentJobInstanceId
- * the execution id of the parent job
- * @param stepId
- * this is the id of the partitioned control element, it can be a
- * step id or flow id
- * @param partitionInstance
- * the instance number of the partitioned element
- * @return a String of the form
- * <parentJobExecutionId>:<parentId>:<partitionInstance>
- */
- private static String generateSubJobId(Long parentJobInstanceId, String stepId, int partitionInstance) {
-
- StringBuilder strBuilder = new StringBuilder(JOB_ID_SEPARATOR);
- strBuilder.append(parentJobInstanceId.toString());
- strBuilder.append(JOB_ID_SEPARATOR);
- strBuilder.append(stepId);
- strBuilder.append(JOB_ID_SEPARATOR);
- strBuilder.append(partitionInstance);
-
- return strBuilder.toString();
- }
-
-
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/JSR352.Runtime/src/com/ibm/jbatch/container/impl/PartitionedStepControllerImpl.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/PartitionedStepControllerImpl.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/PartitionedStepControllerImpl.java
deleted file mode 100755
index 1734b11..0000000
--- a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/PartitionedStepControllerImpl.java
+++ /dev/null
@@ -1,528 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- *
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License,
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.jbatch.container.impl;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.batch.api.partition.PartitionPlan;
-import javax.batch.api.partition.PartitionReducer.PartitionStatus;
-import javax.batch.operations.JobExecutionAlreadyCompleteException;
-import javax.batch.operations.JobExecutionNotMostRecentException;
-import javax.batch.operations.JobRestartException;
-import javax.batch.operations.JobStartException;
-import javax.batch.runtime.BatchStatus;
-
-import com.ibm.jbatch.container.artifact.proxy.InjectionReferences;
-import com.ibm.jbatch.container.artifact.proxy.PartitionAnalyzerProxy;
-import com.ibm.jbatch.container.artifact.proxy.PartitionMapperProxy;
-import com.ibm.jbatch.container.artifact.proxy.PartitionReducerProxy;
-import com.ibm.jbatch.container.artifact.proxy.ProxyFactory;
-import com.ibm.jbatch.container.artifact.proxy.StepListenerProxy;
-import com.ibm.jbatch.container.context.impl.StepContextImpl;
-import com.ibm.jbatch.container.exception.BatchContainerRuntimeException;
-import com.ibm.jbatch.container.exception.BatchContainerServiceException;
-import com.ibm.jbatch.container.jobinstance.RuntimeJobExecution;
-import com.ibm.jbatch.container.jsl.CloneUtility;
-import com.ibm.jbatch.container.util.BatchPartitionPlan;
-import com.ibm.jbatch.container.util.BatchPartitionWorkUnit;
-import com.ibm.jbatch.container.util.BatchWorkUnit;
-import com.ibm.jbatch.container.util.PartitionDataWrapper;
-import com.ibm.jbatch.container.util.PartitionDataWrapper.PartitionEventType;
-import com.ibm.jbatch.container.util.PartitionsBuilderConfig;
-import com.ibm.jbatch.container.validation.ArtifactValidationException;
-import com.ibm.jbatch.jsl.model.Analyzer;
-import com.ibm.jbatch.jsl.model.JSLJob;
-import com.ibm.jbatch.jsl.model.JSLProperties;
-import com.ibm.jbatch.jsl.model.PartitionMapper;
-import com.ibm.jbatch.jsl.model.PartitionReducer;
-import com.ibm.jbatch.jsl.model.Property;
-import com.ibm.jbatch.jsl.model.Step;
-
-public class PartitionedStepControllerImpl extends BaseStepControllerImpl {
-
- private final static String sourceClass = PartitionedStepControllerImpl.class.getName();
- private final static Logger logger = Logger.getLogger(sourceClass);
-
- private static final int DEFAULT_PARTITION_INSTANCES = 1;
- private static final int DEFAULT_THREADS = 0; //0 means default to number of instances
-
- private PartitionPlan plan = null;
-
- private int partitions = DEFAULT_PARTITION_INSTANCES;
- private int threads = DEFAULT_THREADS;
-
- private Properties[] partitionProperties = null;
-
- private volatile List<BatchPartitionWorkUnit> parallelBatchWorkUnits;
-
- private PartitionReducerProxy partitionReducerProxy = null;
-
- // On invocation this will be re-primed to reflect already-completed partitions from a previous execution.
- int numPreviouslyCompleted = 0;
-
- private PartitionAnalyzerProxy analyzerProxy = null;
-
- final List<JSLJob> subJobs = new ArrayList<JSLJob>();
- protected List<StepListenerProxy> stepListeners = null;
-
- List<BatchPartitionWorkUnit> completedWork = new ArrayList<BatchPartitionWorkUnit>();
-
- BlockingQueue<BatchPartitionWorkUnit> completedWorkQueue = null;
-
- protected PartitionedStepControllerImpl(final RuntimeJobExecution jobExecutionImpl, final Step step, StepContextImpl stepContext, long rootJobExecutionId) {
- super(jobExecutionImpl, step, stepContext, rootJobExecutionId);
- }
-
- @Override
- public void stop() {
-
- updateBatchStatus(BatchStatus.STOPPING);
-
- // It's possible we may try to stop a partitioned step before any
- // sub steps have been started.
- synchronized (subJobs) {
-
- if (parallelBatchWorkUnits != null) {
- for (BatchWorkUnit subJob : parallelBatchWorkUnits) {
- try {
- batchKernel.stopJob(subJob.getJobExecutionImpl().getExecutionId());
- } catch (Exception e) {
- // TODO - Is this what we want to know.
- // Blow up if it happens to force the issue.
- throw new IllegalStateException(e);
- }
- }
- }
- }
- }
-
- private PartitionPlan generatePartitionPlan() {
- // Determine the number of partitions
-
-
- PartitionPlan plan = null;
- Integer previousNumPartitions = null;
- final PartitionMapper partitionMapper = step.getPartition().getMapper();
-
- //from persisted plan from previous run
- if (stepStatus.getNumPartitions() != null) {
- previousNumPartitions = stepStatus.getNumPartitions();
- }
-
- if (partitionMapper != null) { //from partition mapper
-
- PartitionMapperProxy partitionMapperProxy;
-
- final List<Property> propertyList = partitionMapper.getProperties() == null ? null
- : partitionMapper.getProperties().getPropertyList();
-
- // Set all the contexts associated with this controller.
- // Some of them may be null
- InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext,
- propertyList);
-
- try {
- partitionMapperProxy = ProxyFactory.createPartitionMapperProxy(
- partitionMapper.getRef(), injectionRef, stepContext);
- } catch (final ArtifactValidationException e) {
- throw new BatchContainerServiceException(
- "Cannot create the PartitionMapper ["
- + partitionMapper.getRef() + "]", e);
- }
-
-
- PartitionPlan mapperPlan = partitionMapperProxy.mapPartitions();
-
- //Set up the new partition plan
- plan = new BatchPartitionPlan();
- plan.setPartitionsOverride(mapperPlan.getPartitionsOverride());
-
- //When true is specified, the partition count from the current run
- //is used and all results from past partitions are discarded.
- if (mapperPlan.getPartitionsOverride() || previousNumPartitions == null){
- plan.setPartitions(mapperPlan.getPartitions());
- } else {
- plan.setPartitions(previousNumPartitions);
- }
-
- if (mapperPlan.getThreads() == 0) {
- plan.setThreads(plan.getPartitions());
- } else {
- plan.setThreads(mapperPlan.getThreads());
- }
-
- plan.setPartitionProperties(mapperPlan.getPartitionProperties());
-
- if (logger.isLoggable(Level.FINE)) {
- logger.fine("Partition plan defined by partition mapper: " + plan);
- }
-
- } else if (step.getPartition().getPlan() != null) { //from static partition element in jsl
-
-
- String partitionsAttr = step.getPartition().getPlan().getPartitions();
- String threadsAttr = null;
-
- int numPartitions = Integer.MIN_VALUE;
- int numThreads;
- Properties[] partitionProps = null;
-
- if (partitionsAttr != null) {
- try {
- numPartitions = Integer.parseInt(partitionsAttr);
- } catch (final NumberFormatException e) {
- throw new IllegalArgumentException("Could not parse partition instances value in stepId: " + step.getId()
- + ", with instances=" + partitionsAttr, e);
- }
- partitionProps = new Properties[numPartitions];
- if (numPartitions < 1) {
- throw new IllegalArgumentException("Partition instances value must be 1 or greater in stepId: " + step.getId()
- + ", with instances=" + partitionsAttr);
- }
- }
-
- threadsAttr = step.getPartition().getPlan().getThreads();
- if (threadsAttr != null) {
- try {
- numThreads = Integer.parseInt(threadsAttr);
- if (numThreads == 0) {
- numThreads = numPartitions;
- }
- } catch (final NumberFormatException e) {
- throw new IllegalArgumentException("Could not parse partition threads value in stepId: " + step.getId()
- + ", with threads=" + threadsAttr, e);
- }
- if (numThreads < 0) {
- throw new IllegalArgumentException("Threads value must be 0 or greater in stepId: " + step.getId()
- + ", with threads=" + threadsAttr);
-
- }
- } else { //default to number of partitions if threads isn't set
- numThreads = numPartitions;
- }
-
-
- if (step.getPartition().getPlan().getProperties() != null) {
-
- List<JSLProperties> jslProperties = step.getPartition().getPlan().getProperties();
- for (JSLProperties props : jslProperties) {
- int targetPartition = Integer.parseInt(props.getPartition());
-
- try {
- partitionProps[targetPartition] = CloneUtility.jslPropertiesToJavaProperties(props);
- } catch (ArrayIndexOutOfBoundsException e) {
- throw new BatchContainerRuntimeException("There are only " + numPartitions + " partition instances, but there are "
- + jslProperties.size()
- + " partition properties lists defined. Remember that partition indexing is 0 based like Java arrays.", e);
- }
- }
- }
- plan = new BatchPartitionPlan();
- plan.setPartitions(numPartitions);
- plan.setThreads(numThreads);
- plan.setPartitionProperties(partitionProps);
- plan.setPartitionsOverride(false); //FIXME what is the default for a static plan??
- }
-
-
- // Set the other instance variables for convenience.
- this.partitions = plan.getPartitions();
- this.threads = plan.getThreads();
- this.partitionProperties = plan.getPartitionProperties();
-
- return plan;
- }
-
-
- @Override
- protected void invokeCoreStep() throws JobRestartException, JobStartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException {
-
- this.plan = this.generatePartitionPlan();
-
- //persist the partition plan so on restart we have the same plan to reuse
- stepStatus.setNumPartitions(plan.getPartitions());
-
- /* When true is specified, the partition count from the current run
- * is used and all results from past partitions are discarded. Any
- * resource cleanup or back out of work done in the previous run is the
- * responsibility of the application. The PartitionReducer artifact's
- * rollbackPartitionedStep method is invoked during restart before any
- * partitions begin processing to provide a cleanup hook.
- */
- if (plan.getPartitionsOverride()) {
- if (this.partitionReducerProxy != null) {
- this.partitionReducerProxy.rollbackPartitionedStep();
- }
- }
-
- logger.fine("Number of partitions in step: " + partitions + " in step " + step.getId() + "; Subjob properties defined by partition mapper: " + partitionProperties);
-
- //Set up a blocking queue to pick up collector data from a partitioned thread
- if (this.analyzerProxy != null) {
- this.analyzerStatusQueue = new LinkedBlockingQueue<PartitionDataWrapper>();
- }
- this.completedWorkQueue = new LinkedBlockingQueue<BatchPartitionWorkUnit>();
-
- // Build all sub jobs from partitioned step
- buildSubJobBatchWorkUnits();
-
- // kick off the threads
- executeAndWaitForCompletion();
-
- // Deal with the results.
- checkCompletedWork();
- }
- private void buildSubJobBatchWorkUnits() throws JobRestartException, JobStartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException {
- synchronized (subJobs) {
- //check if we've already issued a stop
- if (jobExecutionImpl.getJobContext().getBatchStatus().equals(BatchStatus.STOPPING)){
- logger.fine("Step already in STOPPING state, exiting from buildSubJobBatchWorkUnits() before beginning execution");
- return;
- }
-
- for (int instance = 0; instance < partitions; instance++) {
- subJobs.add(PartitionedStepBuilder.buildPartitionSubJob(jobExecutionImpl.getInstanceId(),jobExecutionImpl.getJobContext(), step, instance));
- }
-
- PartitionsBuilderConfig config = new PartitionsBuilderConfig(subJobs, partitionProperties, analyzerStatusQueue, completedWorkQueue, jobExecutionImpl.getExecutionId());
- // Then build all the subjobs but do not start them yet
- if (stepStatus.getStartCount() > 1 && !plan.getPartitionsOverride()) {
- parallelBatchWorkUnits = batchKernel.buildOnRestartParallelPartitions(config);
- } else {
- parallelBatchWorkUnits = batchKernel.buildNewParallelPartitions(config);
- }
-
- // NOTE: At this point I might not have as many work units as I had partitions, since some may have already completed.
- }
- }
-
- private void executeAndWaitForCompletion() throws JobRestartException {
-
- if (jobExecutionImpl.getJobContext().getBatchStatus().equals(BatchStatus.STOPPING)){
- logger.fine("Step already in STOPPING state, exiting from executeAndWaitForCompletion() before beginning execution");
- return;
- }
-
- int numTotalForThisExecution = parallelBatchWorkUnits.size();
- this.numPreviouslyCompleted = partitions - numTotalForThisExecution;
- int numCurrentCompleted = 0;
- int numCurrentSubmitted = 0;
-
- logger.fine("Calculated that " + numPreviouslyCompleted + " partitions are already complete out of total # = "
- + partitions + ", with # remaining =" + numTotalForThisExecution);
-
- //Start up to to the max num we are allowed from the num threads attribute
- for (int i=0; i < this.threads && i < numTotalForThisExecution; i++, numCurrentSubmitted++) {
- if (stepStatus.getStartCount() > 1 && !!!plan.getPartitionsOverride()) {
- batchKernel.restartGeneratedJob(parallelBatchWorkUnits.get(i));
- } else {
- batchKernel.startGeneratedJob(parallelBatchWorkUnits.get(i));
- }
- }
-
- boolean readyToSubmitAnother = false;
- while (true) {
- logger.finer("Begin main loop in waitForQueueCompletion(), readyToSubmitAnother = " + readyToSubmitAnother);
- try {
- if (analyzerProxy != null) {
- logger.fine("Found analyzer, proceeding on analyzerQueue path");
- PartitionDataWrapper dataWrapper = analyzerStatusQueue.take();
- if (PartitionEventType.ANALYZE_COLLECTOR_DATA.equals(dataWrapper.getEventType())) {
- logger.finer("Analyze collector data: " + dataWrapper.getCollectorData());
- analyzerProxy.analyzeCollectorData(dataWrapper.getCollectorData());
- continue; // without being ready to submit another
- } else if (PartitionEventType.ANALYZE_STATUS.equals(dataWrapper.getEventType())) {
- analyzerProxy.analyzeStatus(dataWrapper.getBatchstatus(), dataWrapper.getExitStatus());
- logger.fine("Analyze status called for completed partition: batchStatus= " + dataWrapper.getBatchstatus() + ", exitStatus = " + dataWrapper.getExitStatus());
- completedWork.add(completedWorkQueue.take()); // Shouldn't be a a long wait.
- readyToSubmitAnother = true;
- } else {
- logger.warning("Invalid partition state");
- throw new IllegalStateException("Invalid partition state");
- }
- } else {
- logger.fine("No analyzer, proceeding on completedWorkQueue path");
- // block until at least one thread has finished to
- // submit more batch work. hold on to the finished work to look at later
- completedWork.add(completedWorkQueue.take());
- readyToSubmitAnother = true;
- }
- } catch (InterruptedException e) {
- logger.severe("Caught exc"+ e);
- throw new BatchContainerRuntimeException(e);
- }
-
- if (readyToSubmitAnother) {
- numCurrentCompleted++;
- logger.fine("Ready to submit another (if there is another left to submit); numCurrentCompleted = " + numCurrentCompleted);
- if (numCurrentCompleted < numTotalForThisExecution) {
- if (numCurrentSubmitted < numTotalForThisExecution) {
- logger.fine("Submitting # " + numCurrentSubmitted + " out of " + numTotalForThisExecution + " total for this execution");
- if (stepStatus.getStartCount() > 1) {
- batchKernel.startGeneratedJob(parallelBatchWorkUnits.get(numCurrentSubmitted++));
- } else {
- batchKernel.restartGeneratedJob(parallelBatchWorkUnits.get(numCurrentSubmitted++));
- }
- readyToSubmitAnother = false;
- }
- } else {
- logger.fine("Finished... breaking out of loop");
- break;
- }
- } else {
- logger.fine("Not ready to submit another."); // Must have just done a collector
- }
- }
- }
-
- private void checkCompletedWork() {
-
- if (logger.isLoggable(Level.FINE)) {
- logger.fine("Check completed work list.");
- }
-
- /**
- * check the batch status of each subJob after it's done to see if we need to issue a rollback
- * start rollback if any have stopped or failed
- */
- boolean rollback = false;
- boolean partitionFailed = false;
-
- for (final BatchWorkUnit subJob : completedWork) {
- BatchStatus batchStatus = subJob.getJobExecutionImpl().getJobContext().getBatchStatus();
- if (batchStatus.equals(BatchStatus.FAILED)) {
- logger.fine("Subjob " + subJob.getJobExecutionImpl().getExecutionId() + " ended with status '" + batchStatus + "'; Starting logical transaction rollback.");
-
- rollback = true;
- partitionFailed = true;
-
- //Keep track of the failing status and throw an exception to propagate after the rest of the partitions are complete
- stepContext.setBatchStatus(BatchStatus.FAILED);
- }
- }
-
- //If rollback is false we never issued a rollback so we can issue a logicalTXSynchronizationBeforeCompletion
- //NOTE: this will get issued even in a subjob fails or stops if no logicalTXSynchronizationRollback method is provied
- //We are assuming that not providing a rollback was intentional
- if (rollback == true) {
- if (this.partitionReducerProxy != null) {
- this.partitionReducerProxy.rollbackPartitionedStep();
- }
- if (partitionFailed) {
- throw new BatchContainerRuntimeException("One or more partitions failed");
- }
- } else {
- if (this.partitionReducerProxy != null) {
- this.partitionReducerProxy.beforePartitionedStepCompletion();
- }
- }
- }
-
- @Override
- protected void setupStepArtifacts() {
-
- InjectionReferences injectionRef = null;
- injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, null);
- this.stepListeners = jobExecutionImpl.getListenerFactory().getStepListeners(step, injectionRef, stepContext);
-
- Analyzer analyzer = step.getPartition().getAnalyzer();
-
- if (analyzer != null) {
- final List<Property> propList = analyzer.getProperties() == null ? null : analyzer.getProperties()
- .getPropertyList();
-
- injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propList);
-
- try {
- analyzerProxy = ProxyFactory.createPartitionAnalyzerProxy(analyzer.getRef(), injectionRef, stepContext);
- } catch (final ArtifactValidationException e) {
- throw new BatchContainerServiceException("Cannot create the analyzer [" + analyzer.getRef() + "]", e);
- }
- }
-
- PartitionReducer partitionReducer = step.getPartition().getReducer();
-
- if (partitionReducer != null) {
-
- final List<Property> propList = partitionReducer.getProperties() == null ? null : partitionReducer.getProperties()
- .getPropertyList();
-
- injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propList);
-
- try {
- this.partitionReducerProxy = ProxyFactory.createPartitionReducerProxy(partitionReducer.getRef(), injectionRef, stepContext);
- } catch (final ArtifactValidationException e) {
- throw new BatchContainerServiceException("Cannot create the analyzer [" + partitionReducer.getRef() + "]",
- e);
- }
- }
-
- }
-
- @Override
- protected void invokePreStepArtifacts() {
-
- if (stepListeners != null) {
- for (StepListenerProxy listenerProxy : stepListeners) {
- // Call beforeStep on all the step listeners
- listenerProxy.beforeStep();
- }
- }
-
- // Invoke the reducer before all parallel steps start (must occur
- // before mapper as well)
- if (this.partitionReducerProxy != null) {
- this.partitionReducerProxy.beginPartitionedStep();
- }
-
- }
-
- @Override
- protected void invokePostStepArtifacts() {
- // Invoke the reducer after all parallel steps are done
- if (this.partitionReducerProxy != null) {
-
- if ((BatchStatus.COMPLETED).equals(stepContext.getBatchStatus())) {
- this.partitionReducerProxy.afterPartitionedStepCompletion(PartitionStatus.COMMIT);
- }else {
- this.partitionReducerProxy.afterPartitionedStepCompletion(PartitionStatus.ROLLBACK);
- }
-
- }
-
- // Called in spec'd order, e.g. Sec. 11.7
- if (stepListeners != null) {
- for (StepListenerProxy listenerProxy : stepListeners) {
- // Call afterStep on all the step listeners
- listenerProxy.afterStep();
- }
- }
- }
-
- @Override
- protected void sendStatusFromPartitionToAnalyzerIfPresent() {
- // Since we're already on the main thread, there will never
- // be anything to do on this thread. It's only on the partitioned
- // threads that there is something to send back.
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/JSR352.Runtime/src/com/ibm/jbatch/container/impl/RetryHandler.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/RetryHandler.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/RetryHandler.java
deleted file mode 100755
index e0906a6..0000000
--- a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/RetryHandler.java
+++ /dev/null
@@ -1,430 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- *
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License,
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.jbatch.container.impl;
-
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-
-import com.ibm.jbatch.container.artifact.proxy.RetryProcessListenerProxy;
-import com.ibm.jbatch.container.artifact.proxy.RetryReadListenerProxy;
-import com.ibm.jbatch.container.artifact.proxy.RetryWriteListenerProxy;
-import com.ibm.jbatch.container.exception.BatchContainerRuntimeException;
-import com.ibm.jbatch.jsl.model.Chunk;
-import com.ibm.jbatch.jsl.model.ExceptionClassFilter;
-
-public class RetryHandler {
-
- /**
- *
- * Logic for handling retryable records.
- *
- * A RetryHandler object is attached to every BDS that inherits from AbstractBatchDataStream.
- *
- */
-
- private static final String className = RetryHandler.class.getName();
- private static Logger logger = Logger.getLogger(RetryHandler.class.getPackage().getName());
-
- public static final String RETRY_COUNT = "retry-limit";
- public static final String RETRY_INCLUDE_EX = "include class";
- public static final String RETRY_EXCLUDE_EX = "exclude class";
-
- private static final int RETRY_NONE = 0;
- private static final int RETRY_READ = 1;
- private static final int RETRY_PROCESS = 2;
- private static final int RETRY_WRITE = 3;
-
- private int retryType = RETRY_NONE;
-
- /*private RetryProcessListenerProxy _retryProcessListener = null;
- private RetryReadListenerProxy _retryReadListener = null;
- private RetryWriteListenerProxy _retryWriteListener = null;*/
-
- List<RetryProcessListenerProxy> _retryProcessListeners = null;
- List<RetryReadListenerProxy> _retryReadListeners = null;
- List<RetryWriteListenerProxy> _retryWriteListeners = null;
-
- private long _jobId = 0;
- private String _stepId = null;
- private Set<String> _retryNoRBIncludeExceptions = null;
- private Set<String> _retryNoRBExcludeExceptions = null;
- private Set<String> _retryIncludeExceptions = null;
- private Set<String> _retryExcludeExceptions = null;
- private int _retryLimit = Integer.MIN_VALUE;
- private long _retryCount = 0;
- private Exception _retryException = null;
-
- public RetryHandler(Chunk chunk, long l, String stepId)
- {
- _jobId = l;
- _stepId = stepId;
-
- initialize(chunk);
- }
-
-
- /**
- * Add the user-defined RetryProcessListener.
- *
- */
- public void addRetryProcessListener(List<RetryProcessListenerProxy> retryProcessListeners)
- {
- _retryProcessListeners = retryProcessListeners;
- }
-
- /**
- * Add the user-defined RetryReadListener.
- *
- */
- public void addRetryReadListener(List<RetryReadListenerProxy> retryReadListeners)
- {
- _retryReadListeners = retryReadListeners;
- }
-
- /**
- * Add the user-defined RetryWriteListener.
- *
- */
- public void addRetryWriteListener(List<RetryWriteListenerProxy> retryWriteListeners)
- {
- _retryWriteListeners = retryWriteListeners;
- }
-
-
- /**
- * Read the retry exception lists from the BDS props.
- */
- private void initialize(Chunk chunk)
- {
- final String mName = "initialize";
-
- if(logger.isLoggable(Level.FINER))
- logger.entering(className, mName);
-
- try
- {
- if (chunk.getRetryLimit() != null){
- _retryLimit = Integer.parseInt(chunk.getRetryLimit());
- if (_retryLimit < 0) {
- throw new IllegalArgumentException("The retry-limit attribute on a chunk cannot be a negative value");
- }
-
- }
- }
- catch (NumberFormatException nfe)
- {
- throw new RuntimeException("NumberFormatException reading " + RETRY_COUNT, nfe);
- }
-
- // Read the include/exclude exceptions.
- _retryIncludeExceptions = new HashSet<String>();
- _retryExcludeExceptions = new HashSet<String>();
- _retryNoRBIncludeExceptions = new HashSet<String>();
- _retryNoRBExcludeExceptions = new HashSet<String>();
-
- String includeEx = null;
- String excludeEx = null;
- String includeExNoRB = null;
- String excludeExNoRB = null;
-
- if (chunk.getRetryableExceptionClasses() != null) {
- if (chunk.getRetryableExceptionClasses().getIncludeList() != null) {
- List<ExceptionClassFilter.Include> includes = chunk.getRetryableExceptionClasses().getIncludeList();
- for (ExceptionClassFilter.Include include : includes) {
- _retryIncludeExceptions.add(include.getClazz().trim());
- logger.finer("RETRYHANDLE: include: " + include.getClazz().trim());
- }
-
- if (_retryIncludeExceptions.size() == 0) {
- logger.finer("RETRYHANDLE: include element not present");
-
- }
- }
- if (chunk.getRetryableExceptionClasses().getExcludeList() != null) {
- List<ExceptionClassFilter.Exclude> excludes = chunk.getRetryableExceptionClasses().getExcludeList();
- for (ExceptionClassFilter.Exclude exclude : excludes) {
- _retryExcludeExceptions.add(exclude.getClazz().trim());
- logger.finer("SKIPHANDLE: exclude: " + exclude.getClazz().trim());
- }
-
- if (_retryExcludeExceptions.size() == 0) {
- logger.finer("SKIPHANDLE: exclude element not present");
-
- }
- }
- }
-
- if (chunk.getNoRollbackExceptionClasses() != null) {
- if (chunk.getNoRollbackExceptionClasses().getIncludeList() != null) {
- List<ExceptionClassFilter.Include> includes = chunk.getNoRollbackExceptionClasses().getIncludeList();
- for (ExceptionClassFilter.Include include : includes) {
- _retryNoRBIncludeExceptions.add(include.getClazz().trim());
- logger.finer("RETRYHANDLE: include: " + include.getClazz().trim());
- }
-
- if (_retryNoRBIncludeExceptions.size() == 0) {
- logger.finer("RETRYHANDLE: include element not present");
-
- }
- }
- if (chunk.getNoRollbackExceptionClasses().getExcludeList() != null) {
- List<ExceptionClassFilter.Exclude> excludes = chunk.getNoRollbackExceptionClasses().getExcludeList();
- for (ExceptionClassFilter.Exclude exclude : excludes) {
- _retryNoRBExcludeExceptions.add(exclude.getClazz().trim());
- logger.finer("SKIPHANDLE: exclude: " + exclude.getClazz().trim());
- }
-
- if (_retryNoRBExcludeExceptions.size() == 0) {
- logger.finer("SKIPHANDLE: exclude element not present");
-
- }
- }
- }
-
- if (logger.isLoggable(Level.FINE)) {
- logger.logp(Level.FINE, className, mName, "added include exception " + includeEx + "; added exclude exception " + excludeEx);
- logger.logp(Level.FINE, className, mName, "added include no rollback exception " + includeExNoRB
- + "; added exclude no rollback exception " + excludeExNoRB);
- }
-
- if(logger.isLoggable(Level.FINER)) {
- logger.exiting(className, mName, this.toString());
- }
- }
-
- public boolean isRollbackException(Exception e)
- {
- return !isNoRollbackException(e);
- }
- /**
- * Handle exception from a read failure.
- */
- public void handleExceptionRead(Exception e)
- {
- final String mName = "handleExceptionRead";
-
- logger.finer("RETRYHANDLE: in retryhandler handle exception on a read:" + e.toString());
-
- if(logger.isLoggable(Level.FINER))
- logger.logp(Level.FINE, className, mName, e.getClass().getName() + "; " + this.toString());
-
- if (!isRetryLimitReached() && isRetryable(e))
- {
- retryType = RETRY_READ;
- _retryException = e;
- // Retry it. Log it. Call the RetryListener.
- ++_retryCount;
- logRetry(e);
-
- if (_retryReadListeners != null) {
- for (RetryReadListenerProxy retryReadListenerProxy : _retryReadListeners) {
- retryReadListenerProxy.onRetryReadException(e);
- }
- }
- }
- else
- {
- // No retry. Throw it back.
- if(logger.isLoggable(Level.FINER))
- logger.logp(Level.FINE, className, mName, "No retry. Rethrow", e);
- throw new BatchContainerRuntimeException(e);
- }
-
- if(logger.isLoggable(Level.FINER))
- logger.exiting(className, mName, e);
- }
-
- /**
- * Handle exception from a process failure.
- */
- public void handleExceptionProcess(Exception e, Object w)
- {
- final String mName = "handleExceptionProcess";
-
- if(logger.isLoggable(Level.FINER))
- logger.logp(Level.FINE, className, mName, e.getClass().getName() + "; " + this.toString());
-
- if (!isRetryLimitReached() && isRetryable(e))
- {
- retryType = RETRY_PROCESS;
- _retryException = e;
- // Retry it. Log it. Call the RetryListener.
- ++_retryCount;
- logRetry(e);
-
- if (_retryProcessListeners != null) {
- for (RetryProcessListenerProxy retryProcessListenerProxy : _retryProcessListeners) {
- retryProcessListenerProxy.onRetryProcessException(w, e);
- }
- }
- }
- else
- {
- // No retry. Throw it back.
- if(logger.isLoggable(Level.FINER))
- logger.logp(Level.FINE, className, mName, "No retry. Rethrow ", e);
- throw new BatchContainerRuntimeException(e);
- }
- }
-
- /**
- * Handle exception from a write failure.
- */
- public void handleExceptionWrite(Exception e, List<Object> w)
- {
- final String mName = "handleExceptionWrite";
-
- if(logger.isLoggable(Level.FINER))
- logger.logp(Level.FINE, className, mName, e.getClass().getName() + "; " + this.toString());
-
- if (!isRetryLimitReached() && isRetryable(e))
- {
- // Retry it. Log it. Call the RetryListener.
- retryType = RETRY_WRITE;
- _retryException = e;
- ++_retryCount;
- logRetry(e);
-
- if (_retryWriteListeners != null) {
- for (RetryWriteListenerProxy retryWriteListenerProxy : _retryWriteListeners) {
- retryWriteListenerProxy.onRetryWriteException(w, e);
- }
- }
- }
- else
- {
- // No retry. Throw it back.
- if(logger.isLoggable(Level.FINER))
- logger.logp(Level.FINE, className, mName, "No retry. Rethrow ", e);
- throw new BatchContainerRuntimeException(e);
- }
- }
-
-
- /**
- * Check the retryable exception lists to determine whether
- * the given Exception is retryable.
- */
- private boolean isRetryable(Exception e)
- {
- final String mName = "isRetryable";
-
- String exClassName = e.getClass().getName();
-
- boolean retVal = containsException(_retryIncludeExceptions, e) && !containsException(_retryExcludeExceptions, e);
-
- if(logger.isLoggable(Level.FINE))
- logger.logp(Level.FINE, className, mName, mName + ": " + retVal + ": " + exClassName);
-
- return retVal;
- }
-
- private boolean isNoRollbackException(Exception e)
- {
- final String mName = "isNoRollbackException";
-
- String exClassName = e.getClass().getName();
-
- boolean retVal = containsException(_retryNoRBIncludeExceptions, e) && !containsException(_retryNoRBExcludeExceptions, e);
-
- if(logger.isLoggable(Level.FINE))
- logger.logp(Level.FINE, className, mName, mName + ": " + retVal + ": " + exClassName);
-
- return retVal;
- }
-
- /**
- * Check whether given exception is in the specified exception list
- */
- private boolean containsException(Set<String> retryList, Exception e)
- {
- final String mName = "containsException";
- boolean retVal = false;
-
- for ( Iterator it = retryList.iterator(); it.hasNext(); ) {
- String exClassName = (String) it.next();
-
- try {
- if (retVal = Thread.currentThread().getContextClassLoader().loadClass(exClassName).isInstance(e))
- break;
- } catch (ClassNotFoundException cnf) {
- logger.logp(Level.FINE, className, mName, cnf.getLocalizedMessage());
- }
- }
-
- if(logger.isLoggable(Level.FINE))
- logger.logp(Level.FINE, className, mName, mName + ": " + retVal );
-
- return retVal;
- }
-
- /**
- * Check if the retry limit has been reached.
- *
- * Note: if retry handling isn't enabled (i.e. not configured in xJCL), then this method
- * will always return TRUE.
- */
- private boolean isRetryLimitReached()
- {
- // Unlimited retries if it is never defined
- if (_retryLimit == Integer.MIN_VALUE) {
- return false;
- }
-
- return (_retryCount >= _retryLimit);
- }
-
-
- private void logRetry(Exception e)
- {
- String key = "record.retried.norollback.by.batch.container";
- Object[] details = { _jobId, _stepId, e.getClass().getName() + ": " + e.getMessage() };
- //String message = LoggerUtil.getFormattedMessage(key, details, true);
- //logger.info(message);
- }
-
- public Exception getException()
- {
- return _retryException;
- }
-
- public long getRetryCount()
- {
- return _retryCount;
- }
-
- public void setRetryCount(long retryCount)
- {
- final String mName = "setRetryCount";
-
- _retryCount = retryCount;
-
- if(logger.isLoggable(Level.FINE))
- logger.logp(Level.FINE, className, mName, "setRetryCount: " + _retryCount);
- }
-
- public String toString()
- {
- return "RetryHandler{" + super.toString() + "}count:limit=" + _retryCount + ":" + _retryLimit;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/JSR352.Runtime/src/com/ibm/jbatch/container/impl/SingleThreadedStepControllerImpl.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/SingleThreadedStepControllerImpl.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/SingleThreadedStepControllerImpl.java
deleted file mode 100755
index 1c81e7f..0000000
--- a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/SingleThreadedStepControllerImpl.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- *
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License,
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.jbatch.container.impl;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.logging.Logger;
-
-
-import com.ibm.jbatch.container.IController;
-import com.ibm.jbatch.container.artifact.proxy.InjectionReferences;
-import com.ibm.jbatch.container.artifact.proxy.PartitionCollectorProxy;
-import com.ibm.jbatch.container.artifact.proxy.ProxyFactory;
-import com.ibm.jbatch.container.artifact.proxy.StepListenerProxy;
-import com.ibm.jbatch.container.context.impl.StepContextImpl;
-import com.ibm.jbatch.container.exception.BatchContainerServiceException;
-import com.ibm.jbatch.container.jobinstance.RuntimeJobExecution;
-import com.ibm.jbatch.container.util.PartitionDataWrapper;
-import com.ibm.jbatch.container.util.PartitionDataWrapper.PartitionEventType;
-import com.ibm.jbatch.container.validation.ArtifactValidationException;
-import com.ibm.jbatch.jsl.model.Collector;
-import com.ibm.jbatch.jsl.model.Property;
-import com.ibm.jbatch.jsl.model.Step;
-
-/**
- *
- * When a partitioned step is run, this controller will only be used for the partition threads,
- * NOT the top-level main thread that the step executes upon.
- *
- * When a non-partitioned step is run this controller will be used as well (and there will be no
- * separate main thread with controller).
- *
- */
-public abstract class SingleThreadedStepControllerImpl extends BaseStepControllerImpl implements IController {
-
- private final static String sourceClass = SingleThreadedStepControllerImpl.class.getName();
- private final static Logger logger = Logger.getLogger(sourceClass);
-
- // Collector only used from partition threads, not main thread
- protected PartitionCollectorProxy collectorProxy = null;
-
- protected SingleThreadedStepControllerImpl(RuntimeJobExecution jobExecutionImpl, Step step, StepContextImpl stepContext, long rootJobExecutionId, BlockingQueue<PartitionDataWrapper> analyzerStatusQueue) {
- super(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerStatusQueue);
- }
-
- List<StepListenerProxy> stepListeners = null;
-
- protected void setupStepArtifacts() {
- // set up listeners
-
- InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, null);
- this.stepListeners = jobExecutionImpl.getListenerFactory().getStepListeners(step, injectionRef, stepContext);
-
- // set up collectors if we are running a partitioned step
- if (step.getPartition() != null) {
- Collector collector = step.getPartition().getCollector();
- if (collector != null) {
- List<Property> propList = (collector.getProperties() == null) ? null : collector.getProperties().getPropertyList();
- /**
- * Inject job flow, split, and step contexts into partition
- * artifacts like collectors and listeners some of these
- * contexts may be null
- */
- injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propList);
-
- try {
- this.collectorProxy = ProxyFactory.createPartitionCollectorProxy(collector.getRef(), injectionRef, this.stepContext);
- } catch (ArtifactValidationException e) {
- throw new BatchContainerServiceException("Cannot create the collector [" + collector.getRef() + "]", e);
- }
- }
- }
- }
-
- @Override
- protected void invokePreStepArtifacts() {
- // Don't call beforeStep() in the partitioned case, since we are now on a partition thread, and
- // have already called beforeStep() on the main thread as the spec says.
- if ((stepListeners != null) && (this.jobExecutionImpl.getPartitionInstance() == null)) {
- for (StepListenerProxy listenerProxy : stepListeners) {
- listenerProxy.beforeStep();
- }
- }
- }
-
- @Override
- protected void invokePostStepArtifacts() {
- // Don't call beforeStep() in the partitioned case, since we are now on a partition thread, and
- // have already called beforeStep() on the main thread as the spec says.
- if ((stepListeners != null) && (this.jobExecutionImpl.getPartitionInstance() == null)) {
- for (StepListenerProxy listenerProxy : stepListeners) {
- listenerProxy.afterStep();
- }
- }
- }
-
- protected void invokeCollectorIfPresent() {
- if (collectorProxy != null) {
- Serializable data = collectorProxy.collectPartitionData();
- logger.finer("Got partition data: " + data + ", from collector: " + collectorProxy);
- sendCollectorDataToAnalyzerIfPresent(data);
- }
- }
-
- // Useless to have collector without analyzer but let's check so we don't hang or blow up.
- protected void sendCollectorDataToAnalyzerIfPresent(Serializable data) {
- if (analyzerStatusQueue != null) {
- logger.finer("Sending collector partition data: " + data + " to analyzer queue: " + analyzerStatusQueue);
- PartitionDataWrapper dataWrapper = new PartitionDataWrapper();
- dataWrapper.setCollectorData(data);
- dataWrapper.setEventType(PartitionEventType.ANALYZE_COLLECTOR_DATA);
- analyzerStatusQueue.add(dataWrapper);
- } else {
- logger.fine("Analyzer not configured.");
- }
- }
-
- // Useless to have collector without analyzer but let's check so we don't hang or blow up.
- @Override
- protected void sendStatusFromPartitionToAnalyzerIfPresent() {
- if (analyzerStatusQueue != null) {
- logger.fine("Send status from partition for analyzeStatus with batchStatus = " + stepStatus.getBatchStatus() + ", exitStatus = " + stepStatus.getExitStatus());
- PartitionDataWrapper dataWrapper = new PartitionDataWrapper();
- dataWrapper.setBatchStatus(stepStatus.getBatchStatus());
- dataWrapper.setExitStatus(stepStatus.getExitStatus());
- dataWrapper.setEventType(PartitionEventType.ANALYZE_STATUS);
- analyzerStatusQueue.add(dataWrapper);
- } else {
- logger.fine("Analyzer not configured.");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/JSR352.Runtime/src/com/ibm/jbatch/container/impl/SkipHandler.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/SkipHandler.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/SkipHandler.java
deleted file mode 100755
index f4ba676..0000000
--- a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/SkipHandler.java
+++ /dev/null
@@ -1,360 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- *
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License,
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package com.ibm.jbatch.container.impl;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-
-import com.ibm.jbatch.container.artifact.proxy.SkipProcessListenerProxy;
-import com.ibm.jbatch.container.artifact.proxy.SkipReadListenerProxy;
-import com.ibm.jbatch.container.artifact.proxy.SkipWriteListenerProxy;
-import com.ibm.jbatch.container.exception.BatchContainerRuntimeException;
-import com.ibm.jbatch.jsl.model.Chunk;
-import com.ibm.jbatch.jsl.model.ExceptionClassFilter;
-
-public class SkipHandler {
-
- /**
- *
- * Logic for handling skipped records.
- *
- */
-
- private static final String className = SkipHandler.class.getName();
- private static Logger logger = Logger.getLogger(SkipHandler.class.getPackage().getName());
-
- public static final String SKIP_COUNT = "skip-limit";
- public static final String SKIP_INCLUDE_EX = "include class";
- public static final String SKIP_EXCLUDE_EX = "exclude class";
-
- private List<SkipProcessListenerProxy> _skipProcessListener = null;
- private List<SkipReadListenerProxy> _skipReadListener = null;
- private List<SkipWriteListenerProxy> _skipWriteListener = null;
-
- private long _jobId = 0;
- private String _stepId = null;
- private Set<String> _skipIncludeExceptions = null;
- private Set<String> _skipExcludeExceptions = null;
- private int _skipLimit = Integer.MIN_VALUE;
- private long _skipCount = 0;
-
- public SkipHandler(Chunk chunk, long l, String stepId)
- {
- _jobId = l;
- _stepId = stepId;
-
- initialize(chunk);
- }
-
- /**
- * Add the user-defined SkipReadListeners.
- *
- */
- public void addSkipReadListener(List<SkipReadListenerProxy> skipReadListener)
- {
- _skipReadListener = skipReadListener;
- }
-
- /**
- * Add the user-defined SkipWriteListeners.
- *
- */
- public void addSkipWriteListener(List<SkipWriteListenerProxy> skipWriteListener)
- {
- _skipWriteListener = skipWriteListener;
- }
-
- /**
- * Add the user-defined SkipReadListeners.
- *
- */
- public void addSkipProcessListener(List<SkipProcessListenerProxy> skipProcessListener)
- {
- _skipProcessListener = skipProcessListener;
- }
-
-
- /**
- * Read the skip exception lists from the BDS props.
- */
- private void initialize(Chunk chunk)
- {
- final String mName = "initialize";
-
- if(logger.isLoggable(Level.FINER))
- logger.entering(className, mName);
-
- try
- {
- if (chunk.getSkipLimit() != null){
- _skipLimit = Integer.parseInt(chunk.getSkipLimit());
- if (_skipLimit < 0) {
- throw new IllegalArgumentException("The skip-limit attribute on a chunk cannot be a negative value");
- }
- }
- }
- catch (NumberFormatException nfe)
- {
- throw new RuntimeException("NumberFormatException reading " + SKIP_COUNT, nfe);
- }
-
-
- // Read the include/exclude exceptions.
-
- _skipIncludeExceptions = new HashSet<String>();
- _skipExcludeExceptions = new HashSet<String>();
-
- // boolean done = false;
- List<String> includeEx = new ArrayList<String>();
- List<String> excludeEx = new ArrayList<String>();
-
- if (chunk.getSkippableExceptionClasses() != null) {
- if (chunk.getSkippableExceptionClasses().getIncludeList() != null) {
- List<ExceptionClassFilter.Include> includes = chunk.getSkippableExceptionClasses().getIncludeList();
-
- for (ExceptionClassFilter.Include include : includes) {
- _skipIncludeExceptions.add(include.getClazz().trim());
- logger.finer("SKIPHANDLE: include: " + include.getClazz().trim());
- }
-
- if (_skipIncludeExceptions.size() == 0) {
- logger.finer("SKIPHANDLE: include element not present");
-
- }
- }
- }
-
- if (chunk.getSkippableExceptionClasses() != null) {
- if (chunk.getSkippableExceptionClasses().getExcludeList() != null) {
- List<ExceptionClassFilter.Exclude> excludes = chunk.getSkippableExceptionClasses().getExcludeList();
-
- for (ExceptionClassFilter.Exclude exclude : excludes) {
- _skipExcludeExceptions.add(exclude.getClazz().trim());
- logger.finer("SKIPHANDLE: exclude: " + exclude.getClazz().trim());
- }
-
- if (_skipExcludeExceptions.size() == 0) {
- logger.finer("SKIPHANDLE: exclude element not present");
-
- }
-
- }
- }
-
- if (logger.isLoggable(Level.FINE))
- logger.logp(Level.FINE, className, mName, "added include exception " + includeEx + "; added exclude exception " + excludeEx);
-
- if(logger.isLoggable(Level.FINER))
- logger.exiting(className, mName, this.toString());
- }
-
-
- /**
- * Handle exception from a read failure.
- */
- public void handleExceptionRead(Exception e)
- {
- final String mName = "handleException";
-
- logger.finer("SKIPHANDLE: in skiphandler handle exception on a read");
-
- if(logger.isLoggable(Level.FINER))
- logger.logp(Level.FINE, className, mName, e.getClass().getName() + "; " + this.toString());
-
- if (!isSkipLimitReached() && isSkippable(e))
- {
- // Skip it. Log it. Call the SkipListener.
- ++_skipCount;
- logSkip(e);
-
- if (_skipReadListener != null) {
- for (SkipReadListenerProxy skipReadListenerProxy : _skipReadListener) {
- skipReadListenerProxy.onSkipReadItem(e);
- }
- }
- }
- else
- {
- // No skip. Throw it back. don't throw it back - we might want to retry ...
- if(logger.isLoggable(Level.FINER))
- logger.logp(Level.FINE, className, mName, "No skip. Rethrow", e);
- throw new BatchContainerRuntimeException(e);
- }
-
- if(logger.isLoggable(Level.FINER))
- logger.exiting(className, mName, e);
- }
-
- /**
- * Handle exception from a process failure.
- */
- public void handleExceptionWithRecordProcess(Exception e, Object w)
- {
- final String mName = "handleExceptionWithRecordProcess";
- if(logger.isLoggable(Level.FINER))
- logger.logp(Level.FINE, className, mName, e.getClass().getName() + "; " + this.toString());
-
- if (!isSkipLimitReached() && isSkippable(e))
- {
- // Skip it. Log it. Call the SkipProcessListener.
- ++_skipCount;
- logSkip(e);
-
- if (_skipProcessListener != null) {
- for (SkipProcessListenerProxy skipProcessListenerProxy : _skipProcessListener) {
- skipProcessListenerProxy.onSkipProcessItem(w, e);
- }
- }
- }
- else
- {
- // No skip. Throw it back.
- if(logger.isLoggable(Level.FINER))
- logger.logp(Level.FINE, className, mName, "No skip. Rethrow ", e);
- throw new BatchContainerRuntimeException(e);
- }
- }
- /**
- * Handle exception from a write failure.
- */
- public void handleExceptionWithRecordListWrite(Exception e, List<?> items)
- {
- final String mName = "handleExceptionWithRecordListWrite(Exception, List<?>)";
-
- if(logger.isLoggable(Level.FINER))
- logger.logp(Level.FINE, className, mName, e.getClass().getName() + "; " + this.toString());
-
- if (!isSkipLimitReached() && isSkippable(e))
- {
- // Skip it. Log it. Call the SkipListener.
- ++_skipCount;
- logSkip(e);
-
- if (_skipWriteListener != null) {
- for (SkipWriteListenerProxy skipWriteListenerProxy : _skipWriteListener) {
- skipWriteListenerProxy.onSkipWriteItem(items, e);
- }
- }
- }
- else
- {
- System.out.println("## NO SKIP");
- // No skip. Throw it back. - No, exit without throwing
- if(logger.isLoggable(Level.FINER))
- logger.logp(Level.FINE, className, mName, "No skip. Rethrow ", e);
- throw new BatchContainerRuntimeException(e);
- }
- }
-
-
- /**
- * Check the skipCount and skippable exception lists to determine whether
- * the given Exception is skippable.
- */
- private boolean isSkippable(Exception e)
- {
- final String mName = "isSkippable";
-
- String exClassName = e.getClass().getName();
-
- boolean retVal = containsSkippable(_skipIncludeExceptions, e) && !containsSkippable(_skipExcludeExceptions, e);
-
- if(logger.isLoggable(Level.FINE))
- logger.logp(Level.FINE, className, mName, mName + ": " + retVal + ": " + exClassName);
-
- return retVal;
- }
-
- /**
- * Check whether given exception is in skippable exception list
- */
- private boolean containsSkippable(Set<String> skipList, Exception e)
- {
- final String mName = "containsSkippable";
- boolean retVal = false;
-
- for ( Iterator it = skipList.iterator(); it.hasNext(); ) {
- String exClassName = (String) it.next();
- try {
- ClassLoader tccl = Thread.currentThread().getContextClassLoader();
- if (retVal = tccl.loadClass(exClassName).isInstance(e))
- break;
- } catch (ClassNotFoundException cnf) {
- logger.logp(Level.FINE, className, mName, cnf.getLocalizedMessage());
- }
- }
-
- if(logger.isLoggable(Level.FINE))
- logger.logp(Level.FINE, className, mName, mName + ": " + retVal );
-
- return retVal;
- }
-
-
- /**
- * Check if the skip limit has been reached.
- *
- * Note: if skip handling isn't enabled (i.e. not configured in xJCL), then
- * this method will always return TRUE.
- */
- private boolean isSkipLimitReached() {
- // Unlimited skips if it is never defined
- if (_skipLimit == Integer.MIN_VALUE) {
- return false;
- }
-
- return (_skipCount >= _skipLimit);
- }
-
- private void logSkip(Exception e)
- {
- Object[] details = { _jobId, _stepId, e.getClass().getName() + ": " + e.getMessage() };
- if(logger.isLoggable(Level.FINE))
- logger.logp(Level.FINE, className, "logSkip", "Logging details: ", details);
- }
-
-
- public long getSkipCount()
- {
- return _skipCount;
- }
-
- public void setSkipCount(long skipCount)
- {
- final String mName = "setSkipCount";
-
- _skipCount = skipCount;
-
- if(logger.isLoggable(Level.FINE))
- logger.logp(Level.FINE, className, mName, "setSkipCount: " + _skipCount);
- }
-
- public String toString()
- {
- return "SkipHandler{" + super.toString() + "}count:limit=" + _skipCount + ":" + _skipLimit;
- }
-
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/JSR352.Runtime/src/com/ibm/jbatch/container/impl/SplitControllerImpl.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/SplitControllerImpl.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/SplitControllerImpl.java
deleted file mode 100755
index 792b1ce..0000000
--- a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/SplitControllerImpl.java
+++ /dev/null
@@ -1,290 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- *
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License,
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.jbatch.container.impl;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.batch.operations.JobExecutionAlreadyCompleteException;
-import javax.batch.operations.JobExecutionNotMostRecentException;
-import javax.batch.operations.JobRestartException;
-import javax.batch.operations.JobStartException;
-import javax.batch.operations.NoSuchJobExecutionException;
-
-import com.ibm.jbatch.container.IExecutionElementController;
-import com.ibm.jbatch.container.context.impl.JobContextImpl;
-import com.ibm.jbatch.container.exception.BatchContainerRuntimeException;
-import com.ibm.jbatch.container.jobinstance.RuntimeFlowInSplitExecution;
-import com.ibm.jbatch.container.jobinstance.RuntimeJobExecution;
-import com.ibm.jbatch.container.services.IBatchKernelService;
-import com.ibm.jbatch.container.servicesmanager.ServicesManager;
-import com.ibm.jbatch.container.servicesmanager.ServicesManagerImpl;
-import com.ibm.jbatch.container.status.ExecutionStatus;
-import com.ibm.jbatch.container.status.ExtendedBatchStatus;
-import com.ibm.jbatch.container.status.SplitExecutionStatus;
-import com.ibm.jbatch.container.util.BatchFlowInSplitWorkUnit;
-import com.ibm.jbatch.container.util.BatchParallelWorkUnit;
-import com.ibm.jbatch.container.util.FlowInSplitBuilderConfig;
-import com.ibm.jbatch.jsl.model.Flow;
-import com.ibm.jbatch.jsl.model.JSLJob;
-import com.ibm.jbatch.jsl.model.Split;
-
-public class SplitControllerImpl implements IExecutionElementController {
-
- private final static String sourceClass = SplitControllerImpl.class.getName();
- private final static Logger logger = Logger.getLogger(sourceClass);
-
- private final RuntimeJobExecution jobExecution;
-
- private volatile List<BatchFlowInSplitWorkUnit> parallelBatchWorkUnits;
-
- private final ServicesManager servicesManager;
- private final IBatchKernelService batchKernel;
- private final JobContextImpl jobContext;
- private final BlockingQueue<BatchFlowInSplitWorkUnit> completedWorkQueue = new LinkedBlockingQueue<BatchFlowInSplitWorkUnit>();
- private final long rootJobExecutionId;
-
- final List<JSLJob> subJobs = new ArrayList<JSLJob>();
-
- protected Split split;
-
- public SplitControllerImpl(RuntimeJobExecution jobExecution, Split split, long rootJobExecutionId) {
- this.jobExecution = jobExecution;
- this.jobContext = jobExecution.getJobContext();
- this.rootJobExecutionId = rootJobExecutionId;
- this.split = split;
-
- servicesManager = ServicesManagerImpl.getInstance();
- batchKernel = servicesManager.getBatchKernelService();
- }
-
- @Override
- public void stop() {
-
- // It's possible we may try to stop a split before any
- // sub steps have been started.
- synchronized (subJobs) {
-
- if (parallelBatchWorkUnits != null) {
- for (BatchParallelWorkUnit subJob : parallelBatchWorkUnits) {
- try {
- batchKernel.stopJob(subJob.getJobExecutionImpl().getExecutionId());
- } catch (Exception e) {
- // TODO - Is this what we want to know.
- // Blow up if it happens to force the issue.
- throw new IllegalStateException(e);
- }
- }
- }
- }
- }
-
- @Override
- public SplitExecutionStatus execute() throws JobRestartException, JobStartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException, NoSuchJobExecutionException {
- String sourceMethod = "execute";
- if (logger.isLoggable(Level.FINER)) {
- logger.entering(sourceClass, sourceMethod, "Root JobExecution Id = " + rootJobExecutionId);
- }
-
- // Build all sub jobs from partitioned step
- buildSubJobBatchWorkUnits();
-
- // kick off the threads
- executeWorkUnits();
-
- // Deal with the results.
- SplitExecutionStatus status = waitForCompletionAndAggregateStatus();
-
- if (logger.isLoggable(Level.FINER)) {
- logger.exiting(sourceClass, sourceMethod, status);
- }
-
- return status;
- }
-
- /**
- * Note we restart all flows. There is no concept of "the flow completed". It is only steps
- * within the flows that may have already completed and so may not have needed to be rerun.
- *
- */
- private void buildSubJobBatchWorkUnits() {
-
- List<Flow> flows = this.split.getFlows();
-
- parallelBatchWorkUnits = new ArrayList<BatchFlowInSplitWorkUnit>();
-
- // Build all sub jobs from flows in split
- synchronized (subJobs) {
- for (Flow flow : flows) {
- subJobs.add(PartitionedStepBuilder.buildFlowInSplitSubJob(jobExecution.getExecutionId(), jobContext, this.split, flow));
- }
- for (JSLJob job : subJobs) {
- int count = batchKernel.getJobInstanceCount(job.getId());
- FlowInSplitBuilderConfig config = new FlowInSplitBuilderConfig(job, completedWorkQueue, rootJobExecutionId);
- if (count == 0) {
- parallelBatchWorkUnits.add(batchKernel.buildNewFlowInSplitWorkUnit(config));
- } else if (count == 1) {
- parallelBatchWorkUnits.add(batchKernel.buildOnRestartFlowInSplitWorkUnit(config));
- } else {
- throw new IllegalStateException("There is an inconsistency somewhere in the internal subjob creation");
- }
- }
- }
- }
-
- private void executeWorkUnits () {
- // Then start or restart all subjobs in parallel
- for (BatchParallelWorkUnit work : parallelBatchWorkUnits) {
- int count = batchKernel.getJobInstanceCount(work.getJobExecutionImpl().getJobInstance().getJobName());
-
- assert (count <= 1);
-
- if (count == 1) {
- batchKernel.startGeneratedJob(work);
- } else if (count > 1) {
- batchKernel.restartGeneratedJob(work);
- } else {
- throw new IllegalStateException("There is an inconsistency somewhere in the internal subjob creation");
- }
- }
- }
-
- private SplitExecutionStatus waitForCompletionAndAggregateStatus() {
-
- SplitExecutionStatus splitStatus = new SplitExecutionStatus();
- ExtendedBatchStatus aggregateTerminatingStatus = null;
-
- for (int i=0; i < subJobs.size(); i++) {
- BatchFlowInSplitWorkUnit batchWork;
- try {
- batchWork = completedWorkQueue.take(); //wait for each thread to finish and then look at it's status
- } catch (InterruptedException e) {
- throw new BatchContainerRuntimeException(e);
- }
-
- RuntimeFlowInSplitExecution flowExecution = batchWork.getJobExecutionImpl();
- ExecutionStatus flowStatus = flowExecution.getFlowStatus();
- if (logger.isLoggable(Level.FINE)) {
- logger.fine("Subjob " + flowExecution.getExecutionId() + "ended with flow-in-split status: " + flowStatus);
- }
- aggregateTerminatingStatusFromSingleFlow(aggregateTerminatingStatus, flowStatus, splitStatus);
- }
-
- // If this is still set to 'null' that means all flows completed normally without terminating the job.
- if (aggregateTerminatingStatus == null) {
- logger.fine("Setting normal split status as no contained flows ended the job.");
- aggregateTerminatingStatus = ExtendedBatchStatus.NORMAL_COMPLETION;
- }
-
- splitStatus.setExtendedBatchStatus(aggregateTerminatingStatus);
- logger.fine("Returning from waitForCompletionAndAggregateStatus with return value: " + splitStatus);
- return splitStatus;
- }
-
-
- //
- // A <fail> and an uncaught exception are peers. They each take precedence over a <stop>, which take precedence over an <end>.
- // Among peers the last one seen gets to set the exit stauts.
- //
- private void aggregateTerminatingStatusFromSingleFlow(ExtendedBatchStatus aggregateStatus, ExecutionStatus flowStatus,
- SplitExecutionStatus splitStatus) {
-
- String exitStatus = flowStatus.getExitStatus();
- String restartOn = flowStatus.getRestartOn();
- ExtendedBatchStatus flowBatchStatus = flowStatus.getExtendedBatchStatus();
-
- logger.fine("Aggregating possible terminating status for flow ending with status: " + flowStatus
- + ", restartOn = " + restartOn);
-
- if ( flowBatchStatus.equals(ExtendedBatchStatus.JSL_END) || flowBatchStatus.equals(ExtendedBatchStatus.JSL_STOP) ||
- flowBatchStatus.equals(ExtendedBatchStatus.JSL_FAIL) || flowBatchStatus.equals(ExtendedBatchStatus.EXCEPTION_THROWN) ) {
- if (aggregateStatus == null) {
- logger.fine("A flow detected as ended because of a terminating condition: " + flowBatchStatus.name()
- + ". First flow detected in terminating state. Setting exitStatus if non-null.");
- setInJobContext(flowBatchStatus, exitStatus, restartOn);
- aggregateStatus = flowBatchStatus;
- } else {
- splitStatus.setCouldMoreThanOneFlowHaveTerminatedJob(true);
- if (aggregateStatus.equals(ExtendedBatchStatus.JSL_END)) {
- logger.warning("Current flow's batch and exit status will take precedence over and override earlier one from <end> transition element. " +
- "Overriding, setting exit status if non-null and preparing to end job.");
- setInJobContext(flowBatchStatus, exitStatus, restartOn);
- aggregateStatus = flowBatchStatus;
- } else if (aggregateStatus.equals(ExtendedBatchStatus.JSL_STOP)) {
- // Everything but an <end> overrides a <stop>
- if (!(flowBatchStatus.equals(ExtendedBatchStatus.JSL_END))) {
- logger.warning("Current flow's batch and exit status will take precedence over and override earlier one from <stop> transition element. " +
- "Overriding, setting exit status if non-null and preparing to end job.");
- setInJobContext(flowBatchStatus, exitStatus, restartOn);
- aggregateStatus = flowBatchStatus;
- } else {
- logger.fine("End does not override stop. The flow with <end> will effectively be ignored with respect to terminating the job.");
- }
- } else if (aggregateStatus.equals(ExtendedBatchStatus.JSL_FAIL) || aggregateStatus.equals(ExtendedBatchStatus.EXCEPTION_THROWN)) {
- if (flowBatchStatus.equals(ExtendedBatchStatus.JSL_FAIL) || flowBatchStatus.equals(ExtendedBatchStatus.EXCEPTION_THROWN)) {
- logger.warning("Current flow's batch and exit status will take precedence over and override earlier one from <fail> transition element or exception thrown. " +
- "Overriding, setting exit status if non-null and preparing to end job.");
- setInJobContext(flowBatchStatus, exitStatus, restartOn);
- aggregateStatus = flowBatchStatus;
- } else {
- logger.fine("End and stop do not override exception thrown or <fail>. The flow with <end> or <stop> will effectively be ignored with respect to terminating the job.");
- }
- }
- }
- } else {
- logger.fine("Flow completing normally without any terminating transition or exception thrown.");
- }
- }
-
- private void setInJobContext(ExtendedBatchStatus flowBatchStatus, String exitStatus, String restartOn) {
- if (exitStatus != null) {
- jobContext.setExitStatus(exitStatus);
- }
- if (ExtendedBatchStatus.JSL_STOP.equals(flowBatchStatus)) {
- if (restartOn != null) {
- jobContext.setRestartOn(restartOn);
- }
- }
- }
-
- public List<BatchFlowInSplitWorkUnit> getParallelJobExecs() {
-
- return parallelBatchWorkUnits;
- }
-
- @Override
- public List<Long> getLastRunStepExecutions() {
-
- List<Long> stepExecIdList = new ArrayList<Long>();
-
- for (BatchFlowInSplitWorkUnit workUnit : parallelBatchWorkUnits) {
-
- List<Long> stepExecIds = workUnit.getController().getLastRunStepExecutions();
-
- stepExecIdList.addAll(stepExecIds);
- }
-
- return stepExecIdList;
- }
-
-
-
-}