You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@batchee.apache.org by rm...@apache.org on 2013/11/05 08:39:07 UTC
[30/62] importing batchee from github - a fork from the IBm RI
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/impl/controller/JobThreadRootController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/JobThreadRootController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/JobThreadRootController.java
new file mode 100755
index 0000000..1dd8c3b
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/JobThreadRootController.java
@@ -0,0 +1,274 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.container.impl.controller;
+
+import org.apache.batchee.container.Controller;
+import org.apache.batchee.container.ThreadRootController;
+import org.apache.batchee.container.impl.JobContextImpl;
+import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
+import org.apache.batchee.container.navigator.ModelNavigator;
+import org.apache.batchee.container.proxy.InjectionReferences;
+import org.apache.batchee.container.proxy.JobListenerProxy;
+import org.apache.batchee.container.proxy.ListenerFactory;
+import org.apache.batchee.container.services.JobStatusManagerService;
+import org.apache.batchee.spi.PersistenceManagerService;
+import org.apache.batchee.container.services.ServicesManager;
+import org.apache.batchee.container.status.ExecutionStatus;
+import org.apache.batchee.container.status.ExtendedBatchStatus;
+import org.apache.batchee.container.util.PartitionDataWrapper;
+import org.apache.batchee.jaxb.JSLJob;
+
+import javax.batch.runtime.BatchStatus;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.sql.Timestamp;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public abstract class JobThreadRootController implements ThreadRootController {
+ private static final Logger LOGGER = Logger.getLogger(JobThreadRootController.class.getName());
+
+ protected final RuntimeJobExecution jobExecution;
+ protected final JobContextImpl jobContext;
+ protected final long rootJobExecutionId;
+ protected final long jobInstanceId;
+ private final ListenerFactory listenerFactory;
+ protected final ModelNavigator<JSLJob> jobNavigator;
+ protected final JobStatusManagerService jobStatusService;
+ protected final PersistenceManagerService persistenceService;
+
+ private ExecutionTransitioner transitioner;
+ private BlockingQueue<PartitionDataWrapper> analyzerQueue;
+
+ public JobThreadRootController(final RuntimeJobExecution jobExecution, final long rootJobExecutionId) {
+ this.jobExecution = jobExecution;
+ this.jobContext = jobExecution.getJobContext();
+ this.rootJobExecutionId = rootJobExecutionId;
+ this.jobInstanceId = jobExecution.getInstanceId();
+ this.jobStatusService = ServicesManager.service(JobStatusManagerService.class);
+ this.persistenceService = ServicesManager.service(PersistenceManagerService.class);
+ this.jobNavigator = jobExecution.getJobNavigator();
+
+ final JSLJob jobModel = jobExecution.getJobNavigator().getRootModelElement();
+ final InjectionReferences injectionRef = new InjectionReferences(jobContext, null, null);
+ listenerFactory = new ListenerFactory(jobModel, injectionRef, jobExecution);
+ jobExecution.setListenerFactory(listenerFactory);
+ }
+
+ /*
+ * By not passing the rootJobExecutionId, we are "orphaning" the subjob execution and making it not findable from the parent.
+ * This is exactly what we want for getStepExecutions()... we don't want it to get extraneous entries for the partitions.
+ */
+ public JobThreadRootController(final RuntimeJobExecution jobExecution, final BlockingQueue<PartitionDataWrapper> analyzerQueue) {
+ this(jobExecution, jobExecution.getExecutionId());
+ this.analyzerQueue = analyzerQueue;
+ }
+
+ @Override
+ public ExecutionStatus originateExecutionOnThread() {
+ ExecutionStatus retVal = null;
+ try {
+ // Check if we've already gotten the stop() command.
+ if (!jobContext.getBatchStatus().equals(BatchStatus.STOPPING)) {
+
+ // Now that we're ready to start invoking artifacts, set the status to 'STARTED'
+ markJobStarted();
+
+ jobListenersBeforeJob();
+
+ // --------------------
+ // The BIG loop transitioning
+ // within the job !!!
+ // --------------------
+ transitioner = new ExecutionTransitioner(jobExecution, rootJobExecutionId, jobNavigator, analyzerQueue);
+ retVal = transitioner.doExecutionLoop();
+ ExtendedBatchStatus extBatchStatus = retVal.getExtendedBatchStatus();
+ switch (extBatchStatus) {
+ case JSL_STOP:
+ jslStop();
+ break;
+ case JSL_FAIL:
+ updateJobBatchStatus(BatchStatus.FAILED);
+ break;
+ case EXCEPTION_THROWN:
+ updateJobBatchStatus(BatchStatus.FAILED);
+ break;
+ }
+ }
+ } catch (final Throwable t) {
+ // We still want to try to call the afterJob() listener and persist the batch and exit
+ // status for the failure in an orderly fashion. So catch and continue.
+ batchStatusFailedFromException();
+ LOGGER.log(Level.SEVERE, t.getMessage(), t);
+ }
+
+ endOfJob();
+
+ return retVal;
+ }
+
+ protected void jslStop() {
+ String restartOn = jobContext.getRestartOn();
+ batchStatusStopping();
+ jobStatusService.updateJobStatusFromJSLStop(jobInstanceId, restartOn);
+ }
+
+ protected void markJobStarted() {
+ updateJobBatchStatus(BatchStatus.STARTED);
+ final Timestamp timestamp = new Timestamp(System.currentTimeMillis());
+ jobExecution.setLastUpdateTime(timestamp);
+ jobExecution.setStartTime(timestamp);
+ persistenceService.markJobStarted(jobExecution.getExecutionId(), timestamp);
+ }
+
+ /*
+ * Follow similar pattern for end of step in BaseStepController
+ *
+ * 1. Execute the very last artifacts (jobListener)
+ * 2. transition to final batch status
+ * 3. default ExitStatus if necessary
+ * 4. persist statuses and end time data
+ *
+ * We don't want to give up on the orderly process of 2,3,4, if we blow up
+ * in after job, so catch that and keep on going.
+ */
+ protected void endOfJob() {
+
+
+ // 1. Execute the very last artifacts (jobListener)
+ try {
+ jobListenersAfterJob();
+ } catch (Throwable t) {
+ final StringWriter sw = new StringWriter();
+ final PrintWriter pw = new PrintWriter(sw);
+ t.printStackTrace(pw);
+ batchStatusFailedFromException();
+ }
+
+ // 2. transition to final batch status
+ transitionToFinalBatchStatus();
+
+ // 3. default ExitStatus if necessary
+ if (jobContext.getExitStatus() == null) {
+ jobContext.setExitStatus(jobContext.getBatchStatus().name());
+ }
+
+ // 4. persist statuses and end time data
+ persistJobBatchAndExitStatus();
+
+ }
+
+ private void persistJobBatchAndExitStatus() {
+ BatchStatus batchStatus = jobContext.getBatchStatus();
+
+ // Take a current timestamp for last updated no matter what the status.
+ long time = System.currentTimeMillis();
+ Timestamp timestamp = new Timestamp(time);
+ jobExecution.setLastUpdateTime(timestamp);
+
+ // Perhaps these should be coordinated in a tran but probably better still would be
+ // rethinking the table design to let the database provide us consistently with a single update.
+ jobStatusService.updateJobBatchStatus(jobInstanceId, batchStatus);
+ jobStatusService.updateJobExecutionStatus(jobExecution.getInstanceId(), jobContext.getBatchStatus(), jobContext.getExitStatus());
+
+ if (batchStatus.equals(BatchStatus.COMPLETED) || batchStatus.equals(BatchStatus.STOPPED) ||
+ batchStatus.equals(BatchStatus.FAILED)) {
+
+ jobExecution.setEndTime(timestamp);
+ persistenceService.updateWithFinalExecutionStatusesAndTimestamps(jobExecution.getExecutionId(),
+ batchStatus, jobContext.getExitStatus(), timestamp);
+ } else {
+ throw new IllegalStateException("Not expected to encounter batchStatus of " + batchStatus + " at this point. Aborting.");
+ }
+ }
+
+ /**
+ * The only valid states at this point are STARTED or STOPPING. Shouldn't have
+ * been able to get to COMPLETED, STOPPED, or FAILED at this point in the code.
+ */
+
+ private void transitionToFinalBatchStatus() {
+ BatchStatus currentBatchStatus = jobContext.getBatchStatus();
+ if (currentBatchStatus.equals(BatchStatus.STARTED)) {
+ updateJobBatchStatus(BatchStatus.COMPLETED);
+ } else if (currentBatchStatus.equals(BatchStatus.STOPPING)) {
+ updateJobBatchStatus(BatchStatus.STOPPED);
+ } else if (currentBatchStatus.equals(BatchStatus.FAILED)) {
+ updateJobBatchStatus(BatchStatus.FAILED); // Should have already been done but maybe better for possible code refactoring to have it here.
+ } else {
+ throw new IllegalStateException("Step batch status should not be in a " + currentBatchStatus.name() + " state");
+ }
+ }
+
+ protected void updateJobBatchStatus(BatchStatus batchStatus) {
+ jobContext.setBatchStatus(batchStatus);
+ }
+
+ /*
+ * The thought here is that while we don't persist all the transitions in batch status (given
+ * we plan to persist at the very end), we do persist STOPPING right away, since if we end up
+ * "stuck in STOPPING" we at least will have a record in the database.
+ */
+ protected void batchStatusStopping() {
+ updateJobBatchStatus(BatchStatus.STOPPING);
+
+ final Timestamp timestamp = new Timestamp(System.currentTimeMillis());
+ jobExecution.setLastUpdateTime(timestamp);
+ persistenceService.updateBatchStatusOnly(jobExecution.getExecutionId(), BatchStatus.STOPPING, timestamp);
+ }
+
+ @Override
+ public void stop() {
+ if (jobContext.getBatchStatus().equals(BatchStatus.STARTING) || jobContext.getBatchStatus().equals(BatchStatus.STARTED)) {
+ batchStatusStopping();
+
+ if (transitioner != null) {
+ final Controller stoppableElementController = transitioner.getCurrentStoppableElementController();
+ if (stoppableElementController != null) {
+ stoppableElementController.stop();
+ }
+ } // else reusage of thread, this controller was not initialized
+ }
+ }
+
+ // Call beforeJob() on all the job listeners
+ protected void jobListenersBeforeJob() {
+ final List<JobListenerProxy> jobListeners = listenerFactory.getJobListeners();
+ for (final JobListenerProxy listenerProxy : jobListeners) {
+ listenerProxy.beforeJob();
+ }
+ }
+
+ // Call afterJob() on all the job listeners
+ private void jobListenersAfterJob() {
+ final List<JobListenerProxy> jobListeners = listenerFactory.getJobListeners();
+ for (final JobListenerProxy listenerProxy : jobListeners) {
+ listenerProxy.afterJob();
+ }
+ }
+
+ protected void batchStatusFailedFromException() {
+ updateJobBatchStatus(BatchStatus.FAILED);
+ }
+
+ @Override
+ public List<Long> getLastRunStepExecutions() {
+ return this.transitioner.getStepExecIds();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionThreadRootController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionThreadRootController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionThreadRootController.java
new file mode 100755
index 0000000..920ca99
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionThreadRootController.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.container.impl.controller;
+
+import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
+import org.apache.batchee.container.util.PartitionsBuilderConfig;
+
+/**
+ * Currently there's no special function on top of the subjob required of the partition.
+ */
+public class PartitionThreadRootController extends JobThreadRootController {
+ public PartitionThreadRootController(RuntimeJobExecution jobExecution, PartitionsBuilderConfig config) {
+ super(jobExecution, config.getAnalyzerQueue());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepBuilder.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepBuilder.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepBuilder.java
new file mode 100755
index 0000000..81e63dd
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepBuilder.java
@@ -0,0 +1,161 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.batchee.container.impl.controller;
+
+import org.apache.batchee.container.jsl.CloneUtility;
+import org.apache.batchee.jaxb.Flow;
+import org.apache.batchee.jaxb.JSLJob;
+import org.apache.batchee.jaxb.ObjectFactory;
+import org.apache.batchee.jaxb.Partition;
+import org.apache.batchee.jaxb.PartitionPlan;
+import org.apache.batchee.jaxb.Split;
+import org.apache.batchee.jaxb.Step;
+
+import javax.batch.runtime.context.JobContext;
+
+public class PartitionedStepBuilder {
+ public static final String JOB_ID_SEPARATOR = ":"; // Use something permissible in NCName to allow us to key off of.
+
+ /*
+ * Build a generated job with only one flow in it to submit to the
+ * BatchKernel. This is used to build subjobs from splits.
+ *
+ */
+ public static JSLJob buildFlowInSplitSubJob(Long parentJobExecutionId, JobContext jobContext, Split split, Flow flow) {
+
+ ObjectFactory jslFactory = new ObjectFactory();
+ JSLJob subJob = jslFactory.createJSLJob();
+
+ // Set the generated subjob id
+ String subJobId = generateSubJobId(parentJobExecutionId, split.getId(), flow.getId());
+ subJob.setId(subJobId);
+
+
+ //Copy all properties from parent JobContext to flow threads
+ subJob.setProperties(CloneUtility.javaPropsTojslProperties(jobContext.getProperties()));
+
+
+ //We don't need to do a deep copy here since each flow is already independent of all others, unlike in a partition
+ //where one step instance can be executed with different properties on multiple threads.
+
+ subJob.getExecutionElements().add(flow);
+
+ return subJob;
+ }
+
+ /*
+ * Build a generated job with only one step in it to submit to the
+ * BatchKernel. This is used for partitioned steps.
+ *
+ */
+ public static JSLJob buildPartitionSubJob(Long parentJobInstanceId, JobContext jobContext, Step step, int partitionInstance) {
+
+ ObjectFactory jslFactory = new ObjectFactory();
+ JSLJob subJob = jslFactory.createJSLJob();
+
+
+ // Set the generated subjob id
+ String subJobId = generateSubJobId(parentJobInstanceId, step.getId(), partitionInstance);
+ subJob.setId(subJobId);
+
+
+ //Copy all properties from parent JobContext to partitioned step threads
+ subJob.setProperties(CloneUtility.javaPropsTojslProperties(jobContext.getProperties()));
+
+ // Add one step to job
+ Step newStep = jslFactory.createStep();
+
+ //set id
+ newStep.setId(step.getId());
+
+
+ /***
+ * deep copy all fields in a step
+ */
+ newStep.setAllowStartIfComplete(step.getAllowStartIfComplete());
+
+ if (step.getBatchlet() != null) {
+ newStep.setBatchlet(CloneUtility.cloneBatchlet(step.getBatchlet()));
+ }
+
+ if (step.getChunk() != null) {
+ newStep.setChunk(CloneUtility.cloneChunk(step.getChunk()));
+ }
+
+ // Do not copy next attribute and control elements. Transitioning should ONLY
+ // take place on the main thread.
+
+ //Do not add step listeners, only call them on parent thread.
+
+ //Add partition artifacts and set instances to 1 as the base case
+ Partition partition = step.getPartition();
+ if (partition != null) {
+ if (partition.getCollector() != null) {
+
+ Partition basePartition = jslFactory.createPartition();
+
+ PartitionPlan partitionPlan = jslFactory.createPartitionPlan();
+ partitionPlan.setPartitions(null);
+ basePartition.setPlan(partitionPlan);
+
+ basePartition.setCollector(partition.getCollector());
+ newStep.setPartition(basePartition);
+
+ }
+ }
+
+ newStep.setStartLimit(step.getStartLimit());
+ newStep.setProperties(CloneUtility.cloneJSLProperties(step.getProperties()));
+
+ // Don't try to only clone based on type (e.g. ChunkListener vs. StepListener).
+ // We don't know the type at the model level, and a given artifact could implement more
+ // than one listener interface (e.g. ChunkListener AND StepListener).
+ newStep.setListeners(CloneUtility.cloneListeners(step.getListeners()));
+
+ //Add Step properties, need to be careful here to remember the right precedence
+
+ subJob.getExecutionElements().add(newStep);
+
+
+ return subJob;
+ }
+
+ /**
+ * @param parentJobInstanceId the execution id of the parent job
+ * @param splitId this is the split id where the flows are nested
+ * @param flowId this is the id of the partitioned control element, it can be a
+ * step id or flow id
+ * @return a String of the form
+ * <parentJobExecutionId>:<parentId>:<splitId>:<flowId>
+ */
+ private static String generateSubJobId(Long parentJobInstanceId, String splitId, String flowId) {
+ return JOB_ID_SEPARATOR + parentJobInstanceId.toString() + JOB_ID_SEPARATOR + splitId + JOB_ID_SEPARATOR + flowId;
+ }
+
+ /**
+ * @param parentJobInstanceId the execution id of the parent job
+ * @param stepId this is the id of the partitioned control element, it can be a
+ * step id or flow id
+ * @param partitionInstance the instance number of the partitioned element
+ * @return a String of the form
+ * <parentJobExecutionId>:<parentId>:<partitionInstance>
+ */
+ private static String generateSubJobId(Long parentJobInstanceId, String stepId, int partitionInstance) {
+ return JOB_ID_SEPARATOR + parentJobInstanceId.toString() + JOB_ID_SEPARATOR + stepId + JOB_ID_SEPARATOR + partitionInstance;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
new file mode 100755
index 0000000..9797233
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
@@ -0,0 +1,449 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.container.impl.controller;
+
+import org.apache.batchee.container.exception.BatchContainerRuntimeException;
+import org.apache.batchee.container.impl.StepContextImpl;
+import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
+import org.apache.batchee.container.jsl.CloneUtility;
+import org.apache.batchee.container.proxy.InjectionReferences;
+import org.apache.batchee.container.proxy.PartitionAnalyzerProxy;
+import org.apache.batchee.container.proxy.PartitionMapperProxy;
+import org.apache.batchee.container.proxy.PartitionReducerProxy;
+import org.apache.batchee.container.proxy.ProxyFactory;
+import org.apache.batchee.container.proxy.StepListenerProxy;
+import org.apache.batchee.container.util.BatchPartitionPlan;
+import org.apache.batchee.container.util.BatchPartitionWorkUnit;
+import org.apache.batchee.container.util.BatchWorkUnit;
+import org.apache.batchee.container.util.PartitionDataWrapper;
+import org.apache.batchee.container.util.PartitionDataWrapper.PartitionEventType;
+import org.apache.batchee.container.util.PartitionsBuilderConfig;
+import org.apache.batchee.jaxb.Analyzer;
+import org.apache.batchee.jaxb.JSLJob;
+import org.apache.batchee.jaxb.JSLProperties;
+import org.apache.batchee.jaxb.PartitionMapper;
+import org.apache.batchee.jaxb.PartitionReducer;
+import org.apache.batchee.jaxb.Property;
+import org.apache.batchee.jaxb.Step;
+
+import javax.batch.api.partition.PartitionPlan;
+import javax.batch.api.partition.PartitionReducer.PartitionStatus;
+import javax.batch.operations.JobExecutionAlreadyCompleteException;
+import javax.batch.operations.JobExecutionNotMostRecentException;
+import javax.batch.operations.JobRestartException;
+import javax.batch.operations.JobStartException;
+import javax.batch.runtime.BatchStatus;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class PartitionedStepController extends BaseStepController {
+ private static final int DEFAULT_PARTITION_INSTANCES = 1;
+ private static final int DEFAULT_THREADS = 0; //0 means default to number of instances
+
+ private PartitionPlan plan = null;
+
+ private int partitions = DEFAULT_PARTITION_INSTANCES;
+ private int threads = DEFAULT_THREADS;
+
+ private Properties[] partitionProperties = null;
+
+ private volatile List<BatchPartitionWorkUnit> parallelBatchWorkUnits;
+
+ private PartitionReducerProxy partitionReducerProxy = null;
+
+ // On invocation this will be re-primed to reflect already-completed partitions from a previous execution.
+ int numPreviouslyCompleted = 0;
+
+ private PartitionAnalyzerProxy analyzerProxy = null;
+
+ final List<JSLJob> subJobs = new ArrayList<JSLJob>();
+ protected List<StepListenerProxy> stepListeners = null;
+
+ List<BatchPartitionWorkUnit> completedWork = new ArrayList<BatchPartitionWorkUnit>();
+
+ BlockingQueue<BatchPartitionWorkUnit> completedWorkQueue = null;
+
+ protected PartitionedStepController(final RuntimeJobExecution jobExecutionImpl, final Step step, StepContextImpl stepContext, long rootJobExecutionId) {
+ super(jobExecutionImpl, step, stepContext, rootJobExecutionId);
+ }
+
+ @Override
+ public void stop() {
+
+ updateBatchStatus(BatchStatus.STOPPING);
+
+ // It's possible we may try to stop a partitioned step before any
+ // sub steps have been started.
+ synchronized (subJobs) {
+
+ if (parallelBatchWorkUnits != null) {
+ for (BatchWorkUnit subJob : parallelBatchWorkUnits) {
+ try {
+ BATCH_KERNEL.stopJob(subJob.getJobExecutionImpl().getExecutionId());
+ } catch (Exception e) {
+ // TODO - Is this what we want to know.
+ // Blow up if it happens to force the issue.
+ throw new IllegalStateException(e);
+ }
+ }
+ }
+ }
+ }
+
+ private PartitionPlan generatePartitionPlan() {
+ // Determine the number of partitions
+
+
+ PartitionPlan plan = null;
+ Integer previousNumPartitions = null;
+ final PartitionMapper partitionMapper = step.getPartition().getMapper();
+
+ //from persisted plan from previous run
+ if (stepStatus.getNumPartitions() != null) {
+ previousNumPartitions = stepStatus.getNumPartitions();
+ }
+
+ if (partitionMapper != null) { //from partition mapper
+
+ final List<Property> propertyList = partitionMapper.getProperties() == null ? null
+ : partitionMapper.getProperties().getPropertyList();
+
+ // Set all the contexts associated with this controller.
+ // Some of them may be null
+ final InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propertyList);
+ final PartitionMapperProxy partitionMapperProxy = ProxyFactory.createPartitionMapperProxy(partitionMapper.getRef(), injectionRef, stepContext, jobExecutionImpl);
+
+
+ final PartitionPlan mapperPlan = partitionMapperProxy.mapPartitions();
+
+ //Set up the new partition plan
+ plan = new BatchPartitionPlan();
+ plan.setPartitionsOverride(mapperPlan.getPartitionsOverride());
+
+ //When true is specified, the partition count from the current run
+ //is used and all results from past partitions are discarded.
+ if (mapperPlan.getPartitionsOverride() || previousNumPartitions == null) {
+ plan.setPartitions(mapperPlan.getPartitions());
+ } else {
+ plan.setPartitions(previousNumPartitions);
+ }
+
+ if (mapperPlan.getThreads() == 0) {
+ plan.setThreads(plan.getPartitions());
+ } else {
+ plan.setThreads(mapperPlan.getThreads());
+ }
+
+ plan.setPartitionProperties(mapperPlan.getPartitionProperties());
+ } else if (step.getPartition().getPlan() != null) { //from static partition element in jsl
+
+
+ final String partitionsAttr = step.getPartition().getPlan().getPartitions();
+ String threadsAttr;
+
+ int numPartitions = Integer.MIN_VALUE;
+ int numThreads;
+ Properties[] partitionProps = null;
+
+ if (partitionsAttr != null) {
+ try {
+ numPartitions = Integer.parseInt(partitionsAttr);
+ } catch (final NumberFormatException e) {
+ throw new IllegalArgumentException("Could not parse partition instances value in stepId: " + step.getId()
+ + ", with instances=" + partitionsAttr, e);
+ }
+ partitionProps = new Properties[numPartitions];
+ if (numPartitions < 1) {
+ throw new IllegalArgumentException("Partition instances value must be 1 or greater in stepId: " + step.getId()
+ + ", with instances=" + partitionsAttr);
+ }
+ }
+
+ threadsAttr = step.getPartition().getPlan().getThreads();
+ if (threadsAttr != null) {
+ try {
+ numThreads = Integer.parseInt(threadsAttr);
+ if (numThreads == 0) {
+ numThreads = numPartitions;
+ }
+ } catch (final NumberFormatException e) {
+ throw new IllegalArgumentException("Could not parse partition threads value in stepId: " + step.getId()
+ + ", with threads=" + threadsAttr, e);
+ }
+ if (numThreads < 0) {
+ throw new IllegalArgumentException("Threads value must be 0 or greater in stepId: " + step.getId()
+ + ", with threads=" + threadsAttr);
+
+ }
+ } else { //default to number of partitions if threads isn't set
+ numThreads = numPartitions;
+ }
+
+
+ if (step.getPartition().getPlan().getProperties() != null) {
+
+ List<JSLProperties> jslProperties = step.getPartition().getPlan().getProperties();
+ for (JSLProperties props : jslProperties) {
+ int targetPartition = Integer.parseInt(props.getPartition());
+
+ try {
+ partitionProps[targetPartition] = CloneUtility.jslPropertiesToJavaProperties(props);
+ } catch (ArrayIndexOutOfBoundsException e) {
+ throw new BatchContainerRuntimeException("There are only " + numPartitions + " partition instances, but there are "
+ + jslProperties.size()
+ + " partition properties lists defined. Remember that partition indexing is 0 based like Java arrays.", e);
+ }
+ }
+ }
+ plan = new BatchPartitionPlan();
+ plan.setPartitions(numPartitions);
+ plan.setThreads(numThreads);
+ plan.setPartitionProperties(partitionProps);
+ plan.setPartitionsOverride(false); //FIXME what is the default for a static plan??
+ }
+
+
+ // Set the other instance variables for convenience.
+ this.partitions = plan.getPartitions();
+ this.threads = plan.getThreads();
+ this.partitionProperties = plan.getPartitionProperties();
+
+ return plan;
+ }
+
+
+ @Override
+ protected void invokeCoreStep() throws JobRestartException, JobStartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException {
+
+ this.plan = this.generatePartitionPlan();
+
+ //persist the partition plan so on restart we have the same plan to reuse
+ stepStatus.setNumPartitions(plan.getPartitions());
+
+ /* When true is specified, the partition count from the current run
+ * is used and all results from past partitions are discarded. Any
+ * resource cleanup or back out of work done in the previous run is the
+ * responsibility of the application. The PartitionReducer artifact's
+ * rollbackPartitionedStep method is invoked during restart before any
+ * partitions begin processing to provide a cleanup hook.
+ */
+ if (plan.getPartitionsOverride()) {
+ if (this.partitionReducerProxy != null) {
+ this.partitionReducerProxy.rollbackPartitionedStep();
+ }
+ }
+
+ //Set up a blocking queue to pick up collector data from a partitioned thread
+ if (this.analyzerProxy != null) {
+ this.analyzerStatusQueue = new LinkedBlockingQueue<PartitionDataWrapper>();
+ }
+ this.completedWorkQueue = new LinkedBlockingQueue<BatchPartitionWorkUnit>();
+
+ // Build all sub jobs from partitioned step
+ buildSubJobBatchWorkUnits();
+
+ // kick off the threads
+ executeAndWaitForCompletion();
+
+ // Deal with the results.
+ checkCompletedWork();
+ }
+
+ private void buildSubJobBatchWorkUnits() throws JobRestartException, JobStartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException {
+ synchronized (subJobs) {
+ //check if we've already issued a stop
+ if (jobExecutionImpl.getJobContext().getBatchStatus().equals(BatchStatus.STOPPING)) {
+ return;
+ }
+
+ for (int instance = 0; instance < partitions; instance++) {
+ subJobs.add(PartitionedStepBuilder.buildPartitionSubJob(jobExecutionImpl.getInstanceId(), jobExecutionImpl.getJobContext(), step, instance));
+ }
+
+ PartitionsBuilderConfig config = new PartitionsBuilderConfig(subJobs, partitionProperties, analyzerStatusQueue, completedWorkQueue, jobExecutionImpl.getExecutionId());
+ // Then build all the subjobs but do not start them yet
+ if (stepStatus.getStartCount() > 1 && !plan.getPartitionsOverride()) {
+ parallelBatchWorkUnits = BATCH_KERNEL.buildOnRestartParallelPartitions(config);
+ } else {
+ parallelBatchWorkUnits = BATCH_KERNEL.buildNewParallelPartitions(config);
+ }
+
+ // NOTE: At this point I might not have as many work units as I had partitions, since some may have already completed.
+ }
+ }
+
+ private void executeAndWaitForCompletion() throws JobRestartException {
+
+ if (jobExecutionImpl.getJobContext().getBatchStatus().equals(BatchStatus.STOPPING)) {
+ return;
+ }
+
+ int numTotalForThisExecution = parallelBatchWorkUnits.size();
+ this.numPreviouslyCompleted = partitions - numTotalForThisExecution;
+ int numCurrentCompleted = 0;
+ int numCurrentSubmitted = 0;
+
+ //Start up to to the max num we are allowed from the num threads attribute
+ for (int i = 0; i < this.threads && i < numTotalForThisExecution; i++, numCurrentSubmitted++) {
+ final BatchWorkUnit workUnit = parallelBatchWorkUnits.get(i);
+ if (stepStatus.getStartCount() > 1 && !plan.getPartitionsOverride()) {
+ BATCH_KERNEL.restartGeneratedJob(workUnit);
+ } else {
+ BATCH_KERNEL.startGeneratedJob(workUnit);
+ }
+ }
+
+ while (true) {
+ try {
+ if (analyzerProxy != null) {
+ PartitionDataWrapper dataWrapper = analyzerStatusQueue.take();
+ if (PartitionEventType.ANALYZE_COLLECTOR_DATA.equals(dataWrapper.getEventType())) {
+ analyzerProxy.analyzeCollectorData(dataWrapper.getCollectorData());
+ continue; // without being ready to submit another
+ } else if (PartitionEventType.ANALYZE_STATUS.equals(dataWrapper.getEventType())) {
+ analyzerProxy.analyzeStatus(dataWrapper.getBatchstatus(), dataWrapper.getExitStatus());
+ completedWork.add(completedWorkQueue.take()); // Shouldn't be a a long wait.
+ } else {
+ throw new IllegalStateException("Invalid partition state");
+ }
+ } else {
+ // block until at least one thread has finished to
+ // submit more batch work. hold on to the finished work to look at later
+ completedWork.add(completedWorkQueue.take());
+ }
+ } catch (final InterruptedException e) {
+ throw new BatchContainerRuntimeException(e);
+ }
+
+ numCurrentCompleted++;
+ if (numCurrentCompleted < numTotalForThisExecution) {
+ if (numCurrentSubmitted < numTotalForThisExecution) {
+ if (stepStatus.getStartCount() > 1) {
+ BATCH_KERNEL.startGeneratedJob(parallelBatchWorkUnits.get(numCurrentSubmitted++));
+ } else {
+ BATCH_KERNEL.restartGeneratedJob(parallelBatchWorkUnits.get(numCurrentSubmitted++));
+ }
+ }
+ } else {
+ break;
+ }
+ }
+ }
+
+ private void checkCompletedWork() {
+ /**
+ * check the batch status of each subJob after it's done to see if we need to issue a rollback
+ * start rollback if any have stopped or failed
+ */
+ boolean rollback = false;
+
+ for (final BatchWorkUnit subJob : completedWork) {
+ BatchStatus batchStatus = subJob.getJobExecutionImpl().getJobContext().getBatchStatus();
+ if (batchStatus.equals(BatchStatus.FAILED)) {
+ rollback = true;
+
+ //Keep track of the failing status and throw an exception to propagate after the rest of the partitions are complete
+ stepContext.setBatchStatus(BatchStatus.FAILED);
+ }
+ }
+
+ //If rollback is false we never issued a rollback so we can issue a logicalTXSynchronizationBeforeCompletion
+ //NOTE: this will get issued even in a subjob fails or stops if no logicalTXSynchronizationRollback method is provied
+ //We are assuming that not providing a rollback was intentional
+ if (rollback) {
+ if (this.partitionReducerProxy != null) {
+ this.partitionReducerProxy.rollbackPartitionedStep();
+ }
+ throw new BatchContainerRuntimeException("One or more partitions failed");
+ } else {
+ if (this.partitionReducerProxy != null) {
+ this.partitionReducerProxy.beforePartitionedStepCompletion();
+ }
+ }
+ }
+
+ @Override
+ protected void setupStepArtifacts() {
+ InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, null);
+ this.stepListeners = jobExecutionImpl.getListenerFactory().getStepListeners(step, injectionRef, stepContext, jobExecutionImpl);
+
+ final Analyzer analyzer = step.getPartition().getAnalyzer();
+ if (analyzer != null) {
+ final List<Property> propList = analyzer.getProperties() == null ? null : analyzer.getProperties().getPropertyList();
+ injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propList);
+ analyzerProxy = ProxyFactory.createPartitionAnalyzerProxy(analyzer.getRef(), injectionRef, stepContext, jobExecutionImpl);
+ }
+
+ final PartitionReducer partitionReducer = step.getPartition().getReducer();
+ if (partitionReducer != null) {
+ final List<Property> propList = partitionReducer.getProperties() == null ? null : partitionReducer.getProperties().getPropertyList();
+ injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propList);
+ partitionReducerProxy = ProxyFactory.createPartitionReducerProxy(partitionReducer.getRef(), injectionRef, stepContext, jobExecutionImpl);
+ }
+
+ }
+
+ @Override
+ protected void invokePreStepArtifacts() {
+
+ if (stepListeners != null) {
+ for (StepListenerProxy listenerProxy : stepListeners) {
+ // Call beforeStep on all the step listeners
+ listenerProxy.beforeStep();
+ }
+ }
+
+ // Invoke the reducer before all parallel steps start (must occur
+ // before mapper as well)
+ if (this.partitionReducerProxy != null) {
+ this.partitionReducerProxy.beginPartitionedStep();
+ }
+
+ }
+
+ @Override
+ protected void invokePostStepArtifacts() {
+ // Invoke the reducer after all parallel steps are done
+ if (this.partitionReducerProxy != null) {
+
+ if ((BatchStatus.COMPLETED).equals(stepContext.getBatchStatus())) {
+ this.partitionReducerProxy.afterPartitionedStepCompletion(PartitionStatus.COMMIT);
+ } else {
+ this.partitionReducerProxy.afterPartitionedStepCompletion(PartitionStatus.ROLLBACK);
+ }
+
+ }
+
+ // Called in spec'd order, e.g. Sec. 11.7
+ if (stepListeners != null) {
+ for (StepListenerProxy listenerProxy : stepListeners) {
+ // Call afterStep on all the step listeners
+ listenerProxy.afterStep();
+ }
+ }
+ }
+
+ @Override
+ protected void sendStatusFromPartitionToAnalyzerIfPresent() {
+ // Since we're already on the main thread, there will never
+ // be anything to do on this thread. It's only on the partitioned
+ // threads that there is something to send back.
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SingleThreadedStepController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SingleThreadedStepController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SingleThreadedStepController.java
new file mode 100755
index 0000000..d085622
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SingleThreadedStepController.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.container.impl.controller;
+
+import org.apache.batchee.container.Controller;
+import org.apache.batchee.container.impl.StepContextImpl;
+import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
+import org.apache.batchee.container.proxy.InjectionReferences;
+import org.apache.batchee.container.proxy.PartitionCollectorProxy;
+import org.apache.batchee.container.proxy.ProxyFactory;
+import org.apache.batchee.container.proxy.StepListenerProxy;
+import org.apache.batchee.container.util.PartitionDataWrapper;
+import org.apache.batchee.container.util.PartitionDataWrapper.PartitionEventType;
+import org.apache.batchee.jaxb.Collector;
+import org.apache.batchee.jaxb.Property;
+import org.apache.batchee.jaxb.Step;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * When a partitioned step is run, this controller will only be used for the partition threads,
+ * NOT the top-level main thread that the step executes upon.
+ * <p/>
+ * When a non-partitioned step is run this controller will be used as well (and there will be no
+ * separate main thread with controller).
+ */
+public abstract class SingleThreadedStepController extends BaseStepController implements Controller {
+ // Collector only used from partition threads, not main thread
+ protected PartitionCollectorProxy collectorProxy = null;
+
+ protected SingleThreadedStepController(RuntimeJobExecution jobExecutionImpl, Step step, StepContextImpl stepContext, long rootJobExecutionId, BlockingQueue<PartitionDataWrapper> analyzerStatusQueue) {
+ super(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerStatusQueue);
+ }
+
+ List<StepListenerProxy> stepListeners = null;
+
+ protected void setupStepArtifacts() {
+ // set up listeners
+
+ InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, null);
+ this.stepListeners = jobExecutionImpl.getListenerFactory().getStepListeners(step, injectionRef, stepContext, jobExecutionImpl);
+
+ // set up collectors if we are running a partitioned step
+ if (step.getPartition() != null) {
+ Collector collector = step.getPartition().getCollector();
+ if (collector != null) {
+ List<Property> propList = (collector.getProperties() == null) ? null : collector.getProperties().getPropertyList();
+ /**
+ * Inject job flow, split, and step contexts into partition
+ * artifacts like collectors and listeners some of these
+ * contexts may be null
+ */
+ injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propList);
+ this.collectorProxy = ProxyFactory.createPartitionCollectorProxy(collector.getRef(), injectionRef, this.stepContext, jobExecutionImpl);
+ }
+ }
+ }
+
+ @Override
+ protected void invokePreStepArtifacts() {
+ // Don't call beforeStep() in the partitioned case, since we are now on a partition thread, and
+ // have already called beforeStep() on the main thread as the spec says.
+ if ((stepListeners != null) && (this.jobExecutionImpl.getPartitionInstance() == null)) {
+ for (StepListenerProxy listenerProxy : stepListeners) {
+ listenerProxy.beforeStep();
+ }
+ }
+ }
+
+ @Override
+ protected void invokePostStepArtifacts() {
+ // Don't call beforeStep() in the partitioned case, since we are now on a partition thread, and
+ // have already called beforeStep() on the main thread as the spec says.
+ if ((stepListeners != null) && (this.jobExecutionImpl.getPartitionInstance() == null)) {
+ for (StepListenerProxy listenerProxy : stepListeners) {
+ listenerProxy.afterStep();
+ }
+ }
+ }
+
+ protected void invokeCollectorIfPresent() {
+ if (collectorProxy != null) {
+ final Serializable data = collectorProxy.collectPartitionData();
+ sendCollectorDataToAnalyzerIfPresent(data);
+ }
+ }
+
+ // Useless to have collector without analyzer but let's check so we don't hang or blow up.
+ protected void sendCollectorDataToAnalyzerIfPresent(Serializable data) {
+ if (analyzerStatusQueue != null) {
+ final PartitionDataWrapper dataWrapper = new PartitionDataWrapper();
+ dataWrapper.setCollectorData(data);
+ dataWrapper.setEventType(PartitionEventType.ANALYZE_COLLECTOR_DATA);
+ analyzerStatusQueue.add(dataWrapper);
+ }
+ }
+
+ // Useless to have collector without analyzer but let's check so we don't hang or blow up.
+ @Override
+ protected void sendStatusFromPartitionToAnalyzerIfPresent() {
+ if (analyzerStatusQueue != null) {
+ final PartitionDataWrapper dataWrapper = new PartitionDataWrapper();
+ dataWrapper.setBatchStatus(stepStatus.getBatchStatus());
+ dataWrapper.setExitStatus(stepStatus.getExitStatus());
+ dataWrapper.setEventType(PartitionEventType.ANALYZE_STATUS);
+ analyzerStatusQueue.add(dataWrapper);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SplitController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SplitController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SplitController.java
new file mode 100755
index 0000000..b882de6
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/SplitController.java
@@ -0,0 +1,236 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.container.impl.controller;
+
+import org.apache.batchee.container.ExecutionElementController;
+import org.apache.batchee.container.exception.BatchContainerRuntimeException;
+import org.apache.batchee.container.impl.JobContextImpl;
+import org.apache.batchee.container.impl.jobinstance.RuntimeFlowInSplitExecution;
+import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
+import org.apache.batchee.container.services.BatchKernelService;
+import org.apache.batchee.container.services.ServicesManager;
+import org.apache.batchee.container.status.ExecutionStatus;
+import org.apache.batchee.container.status.ExtendedBatchStatus;
+import org.apache.batchee.container.status.SplitExecutionStatus;
+import org.apache.batchee.container.util.BatchFlowInSplitWorkUnit;
+import org.apache.batchee.container.util.BatchParallelWorkUnit;
+import org.apache.batchee.container.util.FlowInSplitBuilderConfig;
+import org.apache.batchee.jaxb.Flow;
+import org.apache.batchee.jaxb.JSLJob;
+import org.apache.batchee.jaxb.Split;
+
+import javax.batch.operations.JobExecutionAlreadyCompleteException;
+import javax.batch.operations.JobExecutionNotMostRecentException;
+import javax.batch.operations.JobRestartException;
+import javax.batch.operations.JobStartException;
+import javax.batch.operations.NoSuchJobExecutionException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Logger;
+
+public class SplitController implements ExecutionElementController {
+ private final static Logger logger = Logger.getLogger(SplitController.class.getName());
+
+ private final RuntimeJobExecution jobExecution;
+
+ private volatile List<BatchFlowInSplitWorkUnit> parallelBatchWorkUnits;
+
+ private final BatchKernelService batchKernel;
+ private final JobContextImpl jobContext;
+ private final BlockingQueue<BatchFlowInSplitWorkUnit> completedWorkQueue = new LinkedBlockingQueue<BatchFlowInSplitWorkUnit>();
+ private final long rootJobExecutionId;
+
+ private final List<JSLJob> subJobs = new ArrayList<JSLJob>();
+
+ protected Split split;
+
+ public SplitController(RuntimeJobExecution jobExecution, Split split, long rootJobExecutionId) {
+ this.jobExecution = jobExecution;
+ this.jobContext = jobExecution.getJobContext();
+ this.rootJobExecutionId = rootJobExecutionId;
+ this.split = split;
+
+ batchKernel = ServicesManager.service(BatchKernelService.class);
+ }
+
+ @Override
+ public void stop() {
+
+ // It's possible we may try to stop a split before any
+ // sub steps have been started.
+ synchronized (subJobs) {
+
+ if (parallelBatchWorkUnits != null) {
+ for (BatchParallelWorkUnit subJob : parallelBatchWorkUnits) {
+ try {
+ batchKernel.stopJob(subJob.getJobExecutionImpl().getExecutionId());
+ } catch (final Exception e) {
+ // TODO - Is this what we want to know.
+ // Blow up if it happens to force the issue.
+ throw new IllegalStateException(e);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public SplitExecutionStatus execute() throws JobRestartException, JobStartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException, NoSuchJobExecutionException {
+ // Build all sub jobs from partitioned step
+ buildSubJobBatchWorkUnits();
+
+ // kick off the threads
+ executeWorkUnits();
+
+ // Deal with the results.
+ return waitForCompletionAndAggregateStatus();
+ }
+
+ /**
+ * Note we restart all flows. There is no concept of "the flow completed". It is only steps
+ * within the flows that may have already completed and so may not have needed to be rerun.
+ */
+ private void buildSubJobBatchWorkUnits() {
+
+ List<Flow> flows = this.split.getFlows();
+
+ parallelBatchWorkUnits = new ArrayList<BatchFlowInSplitWorkUnit>();
+
+ // Build all sub jobs from flows in split
+ synchronized (subJobs) {
+ for (Flow flow : flows) {
+ subJobs.add(PartitionedStepBuilder.buildFlowInSplitSubJob(jobExecution.getExecutionId(), jobContext, this.split, flow));
+ }
+ for (JSLJob job : subJobs) {
+ int count = batchKernel.getJobInstanceCount(job.getId());
+ FlowInSplitBuilderConfig config = new FlowInSplitBuilderConfig(job, completedWorkQueue, rootJobExecutionId);
+ if (count == 0) {
+ parallelBatchWorkUnits.add(batchKernel.buildNewFlowInSplitWorkUnit(config));
+ } else if (count == 1) {
+ parallelBatchWorkUnits.add(batchKernel.buildOnRestartFlowInSplitWorkUnit(config));
+ } else {
+ throw new IllegalStateException("There is an inconsistency somewhere in the internal subjob creation");
+ }
+ }
+ }
+ }
+
+ private void executeWorkUnits() {
+ // Then start or restart all subjobs in parallel
+ for (BatchParallelWorkUnit work : parallelBatchWorkUnits) {
+ int count = batchKernel.getJobInstanceCount(work.getJobExecutionImpl().getJobInstance().getJobName());
+
+ assert (count <= 1);
+
+ if (count == 1) {
+ batchKernel.startGeneratedJob(work);
+ } else if (count > 1) {
+ batchKernel.restartGeneratedJob(work);
+ } else {
+ throw new IllegalStateException("There is an inconsistency somewhere in the internal subjob creation");
+ }
+ }
+ }
+
+ private SplitExecutionStatus waitForCompletionAndAggregateStatus() {
+ final SplitExecutionStatus splitStatus = new SplitExecutionStatus();
+
+ for (final JSLJob ignored : subJobs) {
+ final BatchFlowInSplitWorkUnit batchWork;
+ try {
+ batchWork = completedWorkQueue.take(); //wait for each thread to finish and then look at it's status
+ } catch (InterruptedException e) {
+ throw new BatchContainerRuntimeException(e);
+ }
+
+ final RuntimeFlowInSplitExecution flowExecution = batchWork.getJobExecutionImpl();
+ final ExecutionStatus flowStatus = flowExecution.getFlowStatus();
+ aggregateTerminatingStatusFromSingleFlow(null, flowStatus, splitStatus);
+ }
+
+ // If this is still set to 'null' that means all flows completed normally without terminating the job.
+ splitStatus.setExtendedBatchStatus(ExtendedBatchStatus.NORMAL_COMPLETION);
+ return splitStatus;
+ }
+
+
+ //
+ // A <fail> and an uncaught exception are peers. They each take precedence over a <stop>, which take precedence over an <end>.
+ // Among peers the last one seen gets to set the exit stauts.
+ //
+ private ExtendedBatchStatus aggregateTerminatingStatusFromSingleFlow(final ExtendedBatchStatus aggregateStatus, final ExecutionStatus flowStatus, final SplitExecutionStatus splitStatus) {
+ final String exitStatus = flowStatus.getExitStatus();
+ final String restartOn = flowStatus.getRestartOn();
+ final ExtendedBatchStatus flowBatchStatus = flowStatus.getExtendedBatchStatus();
+
+ if (flowBatchStatus.equals(ExtendedBatchStatus.JSL_END) || flowBatchStatus.equals(ExtendedBatchStatus.JSL_STOP) ||
+ flowBatchStatus.equals(ExtendedBatchStatus.JSL_FAIL) || flowBatchStatus.equals(ExtendedBatchStatus.EXCEPTION_THROWN)) {
+ if (aggregateStatus == null) {
+ setInJobContext(flowBatchStatus, exitStatus, restartOn);
+ return flowBatchStatus;
+ } else {
+ splitStatus.setCouldMoreThanOneFlowHaveTerminatedJob(true);
+ if (aggregateStatus.equals(ExtendedBatchStatus.JSL_END)) {
+ logger.warning("Current flow's batch and exit status will take precedence over and override earlier one from <end> transition element. " +
+ "Overriding, setting exit status if non-null and preparing to end job.");
+ setInJobContext(flowBatchStatus, exitStatus, restartOn);
+ return flowBatchStatus;
+ } else if (aggregateStatus.equals(ExtendedBatchStatus.JSL_STOP)) {
+ // Everything but an <end> overrides a <stop>
+ if (!(flowBatchStatus.equals(ExtendedBatchStatus.JSL_END))) {
+ logger.warning("Current flow's batch and exit status will take precedence over and override earlier one from <stop> transition element. " +
+ "Overriding, setting exit status if non-null and preparing to end job.");
+ setInJobContext(flowBatchStatus, exitStatus, restartOn);
+ return flowBatchStatus;
+ }
+ } else if (aggregateStatus.equals(ExtendedBatchStatus.JSL_FAIL) || aggregateStatus.equals(ExtendedBatchStatus.EXCEPTION_THROWN)) {
+ if (flowBatchStatus.equals(ExtendedBatchStatus.JSL_FAIL) || flowBatchStatus.equals(ExtendedBatchStatus.EXCEPTION_THROWN)) {
+ logger.warning("Current flow's batch and exit status will take precedence over and override earlier one from <fail> transition element or exception thrown. " +
+ "Overriding, setting exit status if non-null and preparing to end job.");
+ setInJobContext(flowBatchStatus, exitStatus, restartOn);
+ return flowBatchStatus;
+ }
+ }
+ }
+ }
+
+ return null;
+ }
+
+ private void setInJobContext(ExtendedBatchStatus flowBatchStatus, String exitStatus, String restartOn) {
+ if (exitStatus != null) {
+ jobContext.setExitStatus(exitStatus);
+ }
+ if (ExtendedBatchStatus.JSL_STOP.equals(flowBatchStatus)) {
+ if (restartOn != null) {
+ jobContext.setRestartOn(restartOn);
+ }
+ }
+ }
+
+ @Override
+ public List<Long> getLastRunStepExecutions() {
+ final List<Long> stepExecIdList = new ArrayList<Long>();
+ for (final BatchFlowInSplitWorkUnit workUnit : parallelBatchWorkUnits) {
+ final List<Long> stepExecIds = workUnit.getController().getLastRunStepExecutions();
+ stepExecIdList.addAll(stepExecIds);
+ }
+ return stepExecIdList;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/impl/controller/batchlet/BatchletStepController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/batchlet/BatchletStepController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/batchlet/BatchletStepController.java
new file mode 100755
index 0000000..a0f0cbb
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/batchlet/BatchletStepController.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.container.impl.controller.batchlet;
+
+import org.apache.batchee.container.exception.BatchContainerServiceException;
+import org.apache.batchee.container.impl.controller.SingleThreadedStepController;
+import org.apache.batchee.container.impl.StepContextImpl;
+import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
+import org.apache.batchee.container.proxy.BatchletProxy;
+import org.apache.batchee.container.proxy.InjectionReferences;
+import org.apache.batchee.container.proxy.ProxyFactory;
+import org.apache.batchee.container.util.PartitionDataWrapper;
+import org.apache.batchee.jaxb.Batchlet;
+import org.apache.batchee.jaxb.Property;
+import org.apache.batchee.jaxb.Step;
+
+import javax.batch.runtime.BatchStatus;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+
+public class BatchletStepController extends SingleThreadedStepController {
+ private BatchletProxy batchletProxy;
+
+ public BatchletStepController(final RuntimeJobExecution jobExecutionImpl, final Step step,
+ final StepContextImpl stepContext, final long rootJobExecutionId,
+ final BlockingQueue<PartitionDataWrapper> analyzerStatusQueue) {
+ super(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerStatusQueue);
+ }
+
+ private void invokeBatchlet(final Batchlet batchlet) throws BatchContainerServiceException {
+ final String batchletId = batchlet.getRef();
+ final List<Property> propList = (batchlet.getProperties() == null) ? null : batchlet.getProperties().getPropertyList();
+ final InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propList);
+ batchletProxy = ProxyFactory.createBatchletProxy(batchletId, injectionRef, stepContext, jobExecutionImpl);
+
+ if (!wasStopIssued()) {
+ final String processRetVal = batchletProxy.process();
+ stepContext.setBatchletProcessRetVal(processRetVal);
+ }
+ }
+
+ protected synchronized boolean wasStopIssued() {
+ // Might only be set to stopping at the job level. Use the lock for this object on this
+ // method along with the stop() method
+ if (jobExecutionImpl.getJobContext().getBatchStatus().equals(BatchStatus.STOPPING)) {
+ stepContext.setBatchStatus(BatchStatus.STOPPING);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ protected void invokeCoreStep() throws BatchContainerServiceException {
+ //TODO If this step is partitioned create partition artifacts
+ /*
+ Partition partition = step.getPartition();
+ if (partition != null) {
+ //partition.getConcurrencyElements();
+ }
+ */
+ try {
+ invokeBatchlet(step.getBatchlet());
+ } finally {
+ invokeCollectorIfPresent();
+ }
+ }
+
+ @Override
+ public synchronized void stop() {
+ // It is possible for stop() to be issued before process()
+ if (BatchStatus.STARTING.equals(stepContext.getBatchStatus()) ||
+ BatchStatus.STARTED.equals(stepContext.getBatchStatus())) {
+
+ stepContext.setBatchStatus(BatchStatus.STOPPING);
+
+ if (batchletProxy != null) {
+ batchletProxy.stop();
+ }
+ }/* else {
+ // TODO do we need to throw an error if the batchlet is already stopping/stopped
+ // a stop gets issued twice
+ }*/
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointAlgorithmFactory.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointAlgorithmFactory.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointAlgorithmFactory.java
new file mode 100755
index 0000000..34f2fae
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointAlgorithmFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.batchee.container.impl.controller.chunk;
+
+
+import org.apache.batchee.container.impl.StepContextImpl;
+import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
+import org.apache.batchee.container.proxy.CheckpointAlgorithmProxy;
+import org.apache.batchee.container.proxy.InjectionReferences;
+import org.apache.batchee.container.proxy.ProxyFactory;
+import org.apache.batchee.jaxb.Chunk;
+import org.apache.batchee.jaxb.Step;
+
+public final class CheckpointAlgorithmFactory {
+ public static CheckpointAlgorithmProxy getCheckpointAlgorithmProxy(final Step step, final InjectionReferences injectionReferences, final StepContextImpl stepContext, final RuntimeJobExecution jobExecution) {
+ final Chunk chunk = step.getChunk();
+ final String checkpointType = chunk.getCheckpointPolicy();
+ final CheckpointAlgorithmProxy proxy;
+ if ("custom".equalsIgnoreCase(checkpointType)) {
+ proxy = ProxyFactory.createCheckpointAlgorithmProxy(chunk.getCheckpointAlgorithm().getRef(), injectionReferences, stepContext, jobExecution);
+ } else /* "item" */ {
+ proxy = new CheckpointAlgorithmProxy(new ItemCheckpointAlgorithm());
+ }
+ return proxy;
+
+ }
+
+ private CheckpointAlgorithmFactory() {
+ // no-op
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointData.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointData.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointData.java
new file mode 100755
index 0000000..702e0e9
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointData.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.batchee.container.impl.controller.chunk;
+
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+
+public class CheckpointData implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private long _jobInstanceId;
+ private CheckpointType type;
+ private String stepName;
+ private byte[] restartToken;
+
+ public CheckpointData(final long jobInstanceId, final String stepname, final CheckpointType ckType) {
+ if (stepname != null && ckType != null) {
+ _jobInstanceId = jobInstanceId;
+ type = ckType;
+ stepName = stepname;
+ try {
+ restartToken = "NOTSET".getBytes("UTF8");
+ } catch (final UnsupportedEncodingException e) {
+ throw new RuntimeException("Doesn't support UTF-8", e);
+ }
+ } else {
+ throw new RuntimeException("Invalid parameters to CheckpointData jobInstanceId: " + _jobInstanceId +
+ " BDS: " + ckType + " stepName: " + stepname);
+ }
+ }
+
+ public long getjobInstanceId() {
+ return _jobInstanceId;
+ }
+
+ public void setjobInstanceId(final long id) {
+ _jobInstanceId = id;
+ }
+
+ public CheckpointType getType() {
+ return type;
+ }
+
+ public void setType(final CheckpointType ckType) {
+ type = ckType;
+ }
+
+ public String getStepName() {
+ return stepName;
+ }
+
+ public void setStepName(final String name) {
+ stepName = name;
+ }
+
+ public byte[] getRestartToken() {
+ return restartToken;
+ }
+
+ public void setRestartToken(byte[] token) {
+ restartToken = token;
+ }
+
+ @Override
+ public String toString() {
+ String restartString;
+ try {
+ restartString = new String(this.restartToken, "UTF8");
+ } catch (UnsupportedEncodingException e) {
+ restartString = "<bytes not UTF-8>";
+ }
+ return " jobInstanceId: " + _jobInstanceId + " stepId: " + this.stepName + " bdsName: " + this.type.name() +
+ " restartToken: [UTF8-bytes: " + restartString;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointDataKey.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointDataKey.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointDataKey.java
new file mode 100755
index 0000000..59ff588
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointDataKey.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.batchee.container.impl.controller.chunk;
+
+public class CheckpointDataKey {
+ private long jobInstanceId;
+ private CheckpointType type;
+ private String stepName;
+
+ public CheckpointDataKey(final long jobId, final String stepName, final CheckpointType bdsName) {
+ this.jobInstanceId = jobId;
+ this.stepName = stepName;
+ this.type = bdsName;
+ }
+
+ public long getJobInstanceId() {
+ return jobInstanceId;
+ }
+
+ public CheckpointType getType() {
+ return type;
+ }
+
+ public String getStepName() {
+ return stepName;
+ }
+
+ public String getCommaSeparatedKey() {
+ return jobInstanceId + "," + stepName + "," + type.name();
+ }
+
+ @Override
+ public String toString() {
+ return getCommaSeparatedKey();
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ final CheckpointDataKey that = CheckpointDataKey.class.cast(o);
+ return jobInstanceId == that.jobInstanceId
+ && type.equals(that.type)
+ && stepName.equals(that.stepName);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = (int) (jobInstanceId ^ (jobInstanceId >>> 32));
+ result = 31 * result + type.hashCode();
+ result = 31 * result + stepName.hashCode();
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java
new file mode 100755
index 0000000..76cae58
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.batchee.container.impl.controller.chunk;
+
+import org.apache.batchee.container.exception.BatchContainerRuntimeException;
+import org.apache.batchee.container.exception.BatchContainerServiceException;
+import org.apache.batchee.container.proxy.ItemReaderProxy;
+import org.apache.batchee.container.proxy.ItemWriterProxy;
+import org.apache.batchee.spi.PersistenceManagerService;
+import org.apache.batchee.container.services.ServicesManager;
+
+import javax.batch.api.chunk.CheckpointAlgorithm;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+
+public class CheckpointManager {
+ private final PersistenceManagerService persistenceManagerService;
+ private final ItemReaderProxy readerProxy;
+ private final ItemWriterProxy writerProxy;
+ private final CheckpointAlgorithm checkpointAlgorithm;
+ private final String stepId;
+ private final long jobInstanceID;
+
+
+ public CheckpointManager(final ItemReaderProxy reader, final ItemWriterProxy writer,
+ final CheckpointAlgorithm chkptAlg,
+ final long jobInstanceID, final String stepId) {
+ this.readerProxy = reader;
+ this.writerProxy = writer;
+ this.checkpointAlgorithm = chkptAlg;
+ this.stepId = stepId;
+ this.jobInstanceID = jobInstanceID;
+
+ this.persistenceManagerService = ServicesManager.service(PersistenceManagerService.class);
+ }
+
+ public boolean applyCheckPointPolicy() {
+ try {
+ return checkpointAlgorithm.isReadyToCheckpoint();
+ } catch (final Exception e) {
+ throw new BatchContainerRuntimeException("Checkpoint algorithm failed", e);
+ }
+ }
+
+ public void checkpoint() {
+ final ByteArrayOutputStream readerChkptBA = new ByteArrayOutputStream();
+ final ByteArrayOutputStream writerChkptBA = new ByteArrayOutputStream();
+ final ObjectOutputStream readerOOS, writerOOS;
+ final CheckpointDataKey readerChkptDK, writerChkptDK;
+ try {
+ readerOOS = new ObjectOutputStream(readerChkptBA);
+ readerOOS.writeObject(readerProxy.checkpointInfo());
+ readerOOS.close();
+ CheckpointData readerChkptData = new CheckpointData(jobInstanceID, stepId, CheckpointType.READER);
+ readerChkptData.setRestartToken(readerChkptBA.toByteArray());
+ readerChkptDK = new CheckpointDataKey(jobInstanceID, stepId, CheckpointType.READER);
+
+ persistenceManagerService.setCheckpointData(readerChkptDK, readerChkptData);
+
+ writerOOS = new ObjectOutputStream(writerChkptBA);
+ writerOOS.writeObject(writerProxy.checkpointInfo());
+ writerOOS.close();
+ CheckpointData writerChkptData = new CheckpointData(jobInstanceID, stepId, CheckpointType.WRITER);
+ writerChkptData.setRestartToken(writerChkptBA.toByteArray());
+ writerChkptDK = new CheckpointDataKey(jobInstanceID, stepId, CheckpointType.WRITER);
+
+ persistenceManagerService.setCheckpointData(writerChkptDK, writerChkptData);
+
+ } catch (final Exception ex) {
+ // is this what I should be throwing here?
+ throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + stepId + "]", ex);
+ }
+ }
+
+ public int checkpointTimeout() {
+ try {
+ return this.checkpointAlgorithm.checkpointTimeout();
+ } catch (final Exception e) {
+ throw new BatchContainerRuntimeException("Checkpoint algorithm checkpointTimeout() failed", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointType.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointType.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointType.java
new file mode 100644
index 0000000..7f6c46f
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointType.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.container.impl.controller.chunk;
+
+public enum CheckpointType {
+ READER, WRITER
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkHelper.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkHelper.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkHelper.java
new file mode 100755
index 0000000..756d108
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkHelper.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.batchee.container.impl.controller.chunk;
+
+import org.apache.batchee.jaxb.Chunk;
+
+public class ChunkHelper {
+
+ public static int getItemCount(Chunk chunk) {
+ String chunkSizeStr = chunk.getItemCount();
+ int size = 10;
+
+ if (chunkSizeStr != null && !chunkSizeStr.isEmpty()) {
+ size = Integer.valueOf(chunk.getItemCount());
+ }
+
+ chunk.setItemCount(Integer.toString(size));
+ return size;
+ }
+
+ public static int getTimeLimit(Chunk chunk) {
+ String chunkTimeLimitStr = chunk.getTimeLimit();
+ int timeLimit = 0; //default time limit = 0 seconds ie no timelimit
+
+ if (chunkTimeLimitStr != null && !chunkTimeLimitStr.isEmpty()) {
+ timeLimit = Integer.valueOf(chunk.getTimeLimit());
+ }
+
+ chunk.setTimeLimit(Integer.toString(timeLimit));
+ return timeLimit;
+ }
+
+ public static String getCheckpointPolicy(Chunk chunk) {
+ String checkpointPolicy = chunk.getCheckpointPolicy();
+
+ if (checkpointPolicy != null && !checkpointPolicy.isEmpty()) {
+ if (!(checkpointPolicy.equals("item") || checkpointPolicy.equals("custom"))) {
+ throw new IllegalArgumentException("The only supported attributed values for 'checkpoint-policy' are 'item' and 'custom'.");
+ }
+ } else {
+ checkpointPolicy = "item";
+ }
+
+ chunk.setCheckpointPolicy(checkpointPolicy);
+ return checkpointPolicy;
+ }
+
+ public static int getSkipLimit(Chunk chunk) {
+ return Integer.valueOf(chunk.getSkipLimit());
+ }
+
+ public static int getRetryLimit(Chunk chunk) {
+ return Integer.valueOf(chunk.getRetryLimit());
+ }
+
+}