You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@batchee.apache.org by rm...@apache.org on 2013/11/05 08:38:45 UTC
[08/62] Importing JBatch Reference Implementation from IBM. We'll
fork it but this commit is to keep a track of what we forked.
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/f7740962/JSR352.Runtime/src/com/ibm/jbatch/container/impl/BatchKernelImpl.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/BatchKernelImpl.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/BatchKernelImpl.java
new file mode 100755
index 0000000..98d9fc9
--- /dev/null
+++ b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/BatchKernelImpl.java
@@ -0,0 +1,459 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.jbatch.container.impl;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.batch.operations.JobExecutionAlreadyCompleteException;
+import javax.batch.operations.JobExecutionNotMostRecentException;
+import javax.batch.operations.JobExecutionNotRunningException;
+import javax.batch.operations.JobRestartException;
+import javax.batch.operations.JobStartException;
+import javax.batch.operations.NoSuchJobExecutionException;
+import javax.batch.runtime.JobInstance;
+
+import com.ibm.jbatch.container.IThreadRootController;
+import com.ibm.jbatch.container.callback.IJobEndCallbackService;
+import com.ibm.jbatch.container.exception.BatchContainerServiceException;
+import com.ibm.jbatch.container.jobinstance.JobExecutionHelper;
+import com.ibm.jbatch.container.jobinstance.RuntimeFlowInSplitExecution;
+import com.ibm.jbatch.container.jobinstance.RuntimeJobExecution;
+import com.ibm.jbatch.container.services.IBatchKernelService;
+import com.ibm.jbatch.container.services.IJobExecution;
+import com.ibm.jbatch.container.services.IPersistenceManagerService;
+import com.ibm.jbatch.container.services.impl.NoOpBatchSecurityHelper;
+import com.ibm.jbatch.container.services.impl.RuntimeBatchJobUtil;
+import com.ibm.jbatch.container.servicesmanager.ServicesManager;
+import com.ibm.jbatch.container.servicesmanager.ServicesManagerImpl;
+import com.ibm.jbatch.container.util.BatchFlowInSplitWorkUnit;
+import com.ibm.jbatch.container.util.BatchPartitionWorkUnit;
+import com.ibm.jbatch.container.util.BatchWorkUnit;
+import com.ibm.jbatch.container.util.FlowInSplitBuilderConfig;
+import com.ibm.jbatch.container.util.PartitionsBuilderConfig;
+import com.ibm.jbatch.jsl.model.JSLJob;
+import com.ibm.jbatch.spi.BatchJobUtil;
+import com.ibm.jbatch.spi.BatchSPIManager;
+import com.ibm.jbatch.spi.BatchSecurityHelper;
+import com.ibm.jbatch.spi.services.IBatchConfig;
+import com.ibm.jbatch.spi.services.IBatchThreadPoolService;
+import com.ibm.jbatch.spi.services.ParallelTaskResult;
+
+public class BatchKernelImpl implements IBatchKernelService {
+
+ private final static String sourceClass = BatchKernelImpl.class.getName();
+ private final static Logger logger = Logger.getLogger(sourceClass);
+
+ private Map<Long, IThreadRootController> executionId2jobControllerMap = new ConcurrentHashMap<Long, IThreadRootController>();
+ private Set<Long> instanceIdExecutingSet = new HashSet<Long>();
+
+ ServicesManager servicesManager = ServicesManagerImpl.getInstance();
+
+ private IBatchThreadPoolService executorService = null;
+
+ private IJobEndCallbackService callbackService = null;
+
+ private IPersistenceManagerService persistenceService = null;
+
+ private BatchSecurityHelper batchSecurity = null;
+
+ private BatchJobUtil batchJobUtil = null;
+
+ public BatchKernelImpl() {
+ executorService = servicesManager.getThreadPoolService();
+ callbackService = servicesManager.getJobCallbackService();
+ persistenceService = servicesManager.getPersistenceManagerService();
+
+ // registering our implementation of the util class used to purge by apptag
+ batchJobUtil = new RuntimeBatchJobUtil();
+ BatchSPIManager.getInstance().registerBatchJobUtil(batchJobUtil);
+ }
+
+ public BatchSecurityHelper getBatchSecurityHelper() {
+ batchSecurity = BatchSPIManager.getInstance().getBatchSecurityHelper();
+ if (batchSecurity == null) {
+ batchSecurity = new NoOpBatchSecurityHelper();
+ }
+ return batchSecurity;
+ }
+
+ public void init(IBatchConfig pgcConfig) throws BatchContainerServiceException {
+ }
+
+ @Override
+ public void shutdown() throws BatchContainerServiceException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public IJobExecution startJob(String jobXML) throws JobStartException {
+ return startJob(jobXML, null);
+ }
+
+ @Override
+ public IJobExecution startJob(String jobXML, Properties jobParameters) throws JobStartException {
+ String method = "startJob";
+
+ if (logger.isLoggable(Level.FINER)) {
+ logger.entering(sourceClass, method, new Object[] { jobXML, jobParameters != null ? jobParameters : "<null>" });
+ }
+
+ RuntimeJobExecution jobExecution = JobExecutionHelper.startJob(jobXML, jobParameters);
+
+ // TODO - register with status manager
+
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine("JobExecution constructed: " + jobExecution);
+ }
+
+ BatchWorkUnit batchWork = new BatchWorkUnit(this, jobExecution);
+ registerCurrentInstanceAndExecution(jobExecution, batchWork.getController());
+
+ executorService.executeTask(batchWork, null);
+
+ if (logger.isLoggable(Level.FINER)) {
+ logger.exiting(sourceClass, method, jobExecution);
+ }
+
+ return jobExecution.getJobOperatorJobExecution();
+ }
+
+ @Override
+ public void stopJob(long executionId) throws NoSuchJobExecutionException, JobExecutionNotRunningException {
+
+ IThreadRootController controller = this.executionId2jobControllerMap.get(executionId);
+ if (controller == null) {
+ String msg = "JobExecution with execution id of " + executionId + "is not running.";
+ logger.warning("stopJob(): " + msg);
+ throw new JobExecutionNotRunningException(msg);
+ }
+ controller.stop();
+ }
+
+ @Override
+ public IJobExecution restartJob(long executionId) throws JobRestartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException, NoSuchJobExecutionException {
+ String method = "restartJob";
+
+ if (logger.isLoggable(Level.FINER)) {
+ logger.entering(sourceClass, method);
+ }
+
+ Properties dummyPropObj = new Properties();
+ return this.restartJob(executionId, dummyPropObj);
+ }
+
+
+
+
+ @Override
+ public IJobExecution restartJob(long executionId, Properties jobOverrideProps) throws JobRestartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException, NoSuchJobExecutionException {
+ String method = "restartJob";
+
+ if (logger.isLoggable(Level.FINER)) {
+ logger.entering(sourceClass, method);
+ }
+
+ RuntimeJobExecution jobExecution =
+ JobExecutionHelper.restartJob(executionId, jobOverrideProps);
+
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine("JobExecution constructed: " + jobExecution);
+ }
+
+ BatchWorkUnit batchWork = new BatchWorkUnit(this, jobExecution);
+
+ registerCurrentInstanceAndExecution(jobExecution, batchWork.getController());
+
+ executorService.executeTask(batchWork, null);
+
+ if (logger.isLoggable(Level.FINER)) {
+ logger.exiting(sourceClass, method, jobExecution);
+ }
+
+ return jobExecution.getJobOperatorJobExecution();
+ }
+
+ @Override
+ public void jobExecutionDone(RuntimeJobExecution jobExecution) {
+
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine("JobExecution done with batchStatus: " + jobExecution.getBatchStatus() + " , getting ready to invoke callbacks for JobExecution: " + jobExecution.getExecutionId());
+ }
+
+ callbackService.done(jobExecution.getExecutionId());
+
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine("Done invoking callbacks for JobExecution: " + jobExecution.getExecutionId());
+ }
+
+ // Remove from executionId, instanceId map,set after job is done
+ this.executionId2jobControllerMap.remove(jobExecution.getExecutionId());
+ this.instanceIdExecutingSet.remove(jobExecution.getInstanceId());
+
+ // AJM: ah - purge jobExecution from map here and flush to DB?
+ // edit: no long want a 2 tier for the jobexecution...do want it for step execution
+ // renamed method to flushAndRemoveStepExecution
+
+ }
+
+ public IJobExecution getJobExecution(long executionId) throws NoSuchJobExecutionException {
+ /*
+ * Keep logging on finest for apps like TCK which do polling
+ */
+ logger.finest("Entering " + sourceClass + ".getJobExecution(), executionId = " + executionId);
+ IJobExecution retVal = JobExecutionHelper.getPersistedJobOperatorJobExecution(executionId);
+
+ logger.finest("Exiting " + sourceClass + ".getJobExecution(), retVal = " + retVal);
+ return retVal;
+ }
+
+ @Override
+ public void startGeneratedJob(BatchWorkUnit batchWork) {
+ String method = "startGeneratedJob";
+
+ if (logger.isLoggable(Level.FINER)) {
+ logger.entering(sourceClass, method, new Object[] { batchWork });
+ }
+
+ //This call is non-blocking
+ ParallelTaskResult result = executorService.executeParallelTask(batchWork, null);
+
+ if (logger.isLoggable(Level.FINER)) {
+ logger.exiting(sourceClass, method, new Object[] { batchWork });
+ }
+ }
+
+ @Override
+ public int getJobInstanceCount(String jobName) {
+ int jobInstanceCount = 0;
+
+ jobInstanceCount = persistenceService.jobOperatorGetJobInstanceCount(jobName);
+
+ return jobInstanceCount;
+ }
+
+ @Override
+ public JobInstance getJobInstance(long executionId){
+ return JobExecutionHelper.getJobInstance(executionId);
+ }
+
+
+ /**
+ * Build a list of batch work units and set them up in STARTING state but don't start them yet.
+ */
+
+ @Override
+ public List<BatchPartitionWorkUnit> buildNewParallelPartitions(PartitionsBuilderConfig config)
+ throws JobRestartException, JobStartException {
+
+ List<JSLJob> jobModels = config.getJobModels();
+ Properties[] partitionPropertiesArray = config.getPartitionProperties();
+
+ List<BatchPartitionWorkUnit> batchWorkUnits = new ArrayList<BatchPartitionWorkUnit>(jobModels.size());
+
+ int instance = 0;
+ for (JSLJob parallelJob : jobModels){
+ Properties partitionProps = (partitionPropertiesArray == null) ? null : partitionPropertiesArray[instance];
+
+ if (logger.isLoggable(Level.FINER)) {
+ logger.finer("Starting execution for jobModel = " + parallelJob.toString());
+ }
+ RuntimeJobExecution jobExecution = JobExecutionHelper.startPartition(parallelJob, partitionProps);
+ jobExecution.setPartitionInstance(instance);
+
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine("JobExecution constructed: " + jobExecution);
+ }
+ BatchPartitionWorkUnit batchWork = new BatchPartitionWorkUnit(this, jobExecution, config);
+
+ registerCurrentInstanceAndExecution(jobExecution, batchWork.getController());
+
+ batchWorkUnits.add(batchWork);
+ instance++;
+ }
+
+ return batchWorkUnits;
+ }
+
+ @Override
+ public List<BatchPartitionWorkUnit> buildOnRestartParallelPartitions(PartitionsBuilderConfig config) throws JobRestartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException {
+
+ List<JSLJob> jobModels = config.getJobModels();
+ Properties[] partitionProperties = config.getPartitionProperties();
+
+ List<BatchPartitionWorkUnit> batchWorkUnits = new ArrayList<BatchPartitionWorkUnit>(jobModels.size());
+
+ //for now let always use a Properties array. We can add some more convenience methods later for null properties and what not
+
+ int instance = 0;
+ for (JSLJob parallelJob : jobModels){
+
+ Properties partitionProps = (partitionProperties == null) ? null : partitionProperties[instance];
+
+ try {
+ long execId = getMostRecentExecutionId(parallelJob);
+
+ RuntimeJobExecution jobExecution = null;
+ try {
+ jobExecution = JobExecutionHelper.restartPartition(execId, parallelJob, partitionProps);
+ jobExecution.setPartitionInstance(instance);
+ } catch (NoSuchJobExecutionException e) {
+ String errorMsg = "Caught NoSuchJobExecutionException but this is an internal JobExecution so this shouldn't have happened: execId =" + execId;
+ logger.severe(errorMsg);
+ throw new IllegalStateException(errorMsg, e);
+ }
+
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine("JobExecution constructed: " + jobExecution);
+ }
+
+ BatchPartitionWorkUnit batchWork = new BatchPartitionWorkUnit(this, jobExecution, config);
+ registerCurrentInstanceAndExecution(jobExecution, batchWork.getController());
+
+ batchWorkUnits.add(batchWork);
+ } catch (JobExecutionAlreadyCompleteException e) {
+ logger.fine("This execution already completed: " + parallelJob.getId());
+ }
+
+ instance++;
+ }
+
+ return batchWorkUnits;
+ }
+
+ @Override
+ public void restartGeneratedJob(BatchWorkUnit batchWork) throws JobRestartException {
+ String method = "restartGeneratedJob";
+
+ if (logger.isLoggable(Level.FINER)) {
+ logger.entering(sourceClass, method, new Object[] { batchWork });
+ }
+
+ //This call is non-blocking
+ ParallelTaskResult result = executorService.executeParallelTask(batchWork, null);
+
+ if (logger.isLoggable(Level.FINER)) {
+ logger.exiting(sourceClass, method, batchWork);
+ }
+
+ }
+
+ @Override
+ public BatchFlowInSplitWorkUnit buildNewFlowInSplitWorkUnit(FlowInSplitBuilderConfig config) {
+ JSLJob parallelJob = config.getJobModel();
+
+ RuntimeFlowInSplitExecution execution = JobExecutionHelper.startFlowInSplit(parallelJob);
+
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine("JobExecution constructed: " + execution);
+ }
+ BatchFlowInSplitWorkUnit batchWork = new BatchFlowInSplitWorkUnit(this, execution, config);
+
+ registerCurrentInstanceAndExecution(execution, batchWork.getController());
+ return batchWork;
+ }
+
+ private long getMostRecentExecutionId(JSLJob jobModel) {
+
+ //There can only be one instance associated with a subjob's id since it is generated from an unique
+ //job instance id. So there should be no way to directly start a subjob with particular
+ List<Long> instanceIds = persistenceService.jobOperatorGetJobInstanceIds(jobModel.getId(), 0, 2);
+
+ // Maybe we should blow up on '0' too?
+ if (instanceIds.size() > 1) {
+ String errorMsg = "Found " + instanceIds.size() + " entries for instance id = " + jobModel.getId() + ", which should not have happened. Blowing up.";
+ logger.severe(errorMsg);
+ throw new IllegalStateException(errorMsg);
+ }
+
+ List<IJobExecution> partitionExecs = persistenceService.jobOperatorGetJobExecutions(instanceIds.get(0));
+
+ Long execId = Long.MIN_VALUE;
+
+ for (IJobExecution partitionExec : partitionExecs ) {
+ if (partitionExec.getExecutionId() > execId ) {
+ execId = partitionExec.getExecutionId();
+ }
+ }
+ return execId;
+ }
+
+ @Override
+ public BatchFlowInSplitWorkUnit buildOnRestartFlowInSplitWorkUnit(FlowInSplitBuilderConfig config)
+ throws JobRestartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException {
+
+ String method = "buildOnRestartFlowInSplitWorkUnit";
+
+ JSLJob jobModel = config.getJobModel();
+
+ if (logger.isLoggable(Level.FINER)) {
+ logger.entering(sourceClass, method, jobModel);
+ }
+
+ long execId = getMostRecentExecutionId(jobModel);
+
+ RuntimeFlowInSplitExecution jobExecution = null;
+ try {
+ jobExecution = JobExecutionHelper.restartFlowInSplit(execId, jobModel);
+ } catch (NoSuchJobExecutionException e) {
+ String errorMsg = "Caught NoSuchJobExecutionException but this is an internal JobExecution so this shouldn't have happened: execId =" + execId;
+ logger.severe(errorMsg);
+ throw new IllegalStateException(errorMsg, e);
+ }
+
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine("JobExecution constructed: " + jobExecution);
+ }
+
+ BatchFlowInSplitWorkUnit batchWork = new BatchFlowInSplitWorkUnit(this, jobExecution, config);
+
+ registerCurrentInstanceAndExecution(jobExecution, batchWork.getController());
+ return batchWork;
+ }
+
+ private void registerCurrentInstanceAndExecution(RuntimeJobExecution jobExecution, IThreadRootController controller) {
+ long execId = jobExecution.getExecutionId();
+ long instanceId = jobExecution.getInstanceId();
+ String errorPrefix = "Tried to execute with Job executionId = " + execId + " and instanceId = " + instanceId + " ";
+ if (executionId2jobControllerMap.get(execId) != null) {
+ String errorMsg = errorPrefix + "but executionId is already currently executing.";
+ logger.warning(errorMsg);
+ throw new IllegalStateException(errorMsg);
+ } else if (instanceIdExecutingSet.contains(instanceId)) {
+ String errorMsg = errorPrefix + "but another execution with this instanceId is already currently executing.";
+ logger.warning(errorMsg);
+ throw new IllegalStateException(errorMsg);
+ } else {
+ instanceIdExecutingSet.add(instanceId);
+ executionId2jobControllerMap.put(jobExecution.getExecutionId(), controller);
+ }
+ }
+
+ @Override
+ public boolean isExecutionRunning(long executionId) {
+ return executionId2jobControllerMap.containsKey(executionId);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/f7740962/JSR352.Runtime/src/com/ibm/jbatch/container/impl/BatchletStepControllerImpl.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/BatchletStepControllerImpl.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/BatchletStepControllerImpl.java
new file mode 100755
index 0000000..2129c95
--- /dev/null
+++ b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/BatchletStepControllerImpl.java
@@ -0,0 +1,133 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.jbatch.container.impl;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.batch.runtime.BatchStatus;
+
+
+import com.ibm.jbatch.container.artifact.proxy.BatchletProxy;
+import com.ibm.jbatch.container.artifact.proxy.InjectionReferences;
+import com.ibm.jbatch.container.artifact.proxy.ProxyFactory;
+import com.ibm.jbatch.container.context.impl.StepContextImpl;
+import com.ibm.jbatch.container.exception.BatchContainerServiceException;
+import com.ibm.jbatch.container.jobinstance.RuntimeJobExecution;
+import com.ibm.jbatch.container.util.PartitionDataWrapper;
+import com.ibm.jbatch.container.validation.ArtifactValidationException;
+import com.ibm.jbatch.jsl.model.Batchlet;
+import com.ibm.jbatch.jsl.model.Partition;
+import com.ibm.jbatch.jsl.model.Property;
+import com.ibm.jbatch.jsl.model.Step;
+
+public class BatchletStepControllerImpl extends SingleThreadedStepControllerImpl {
+
+ private final static String sourceClass = BatchletStepControllerImpl.class.getName();
+ private final static Logger logger = Logger.getLogger(sourceClass);
+
+ private BatchletProxy batchletProxy;
+
+ public BatchletStepControllerImpl(RuntimeJobExecution jobExecutionImpl, Step step, StepContextImpl stepContext, long rootJobExecutionId, BlockingQueue<PartitionDataWrapper> analyzerStatusQueue) {
+ super(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerStatusQueue);
+ }
+
+ private void invokeBatchlet(Batchlet batchlet) throws BatchContainerServiceException {
+
+ String batchletId = batchlet.getRef();
+ List<Property> propList = (batchlet.getProperties() == null) ? null : batchlet.getProperties().getPropertyList();
+
+ String sourceMethod = "invokeBatchlet";
+ if (logger.isLoggable(Level.FINER)) {
+ logger.entering(sourceClass, sourceMethod, batchletId);
+ }
+
+ String exitStatus = null;
+
+ InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext,
+ propList);
+
+ try {
+ batchletProxy = ProxyFactory.createBatchletProxy(batchletId, injectionRef, stepContext);
+ } catch (ArtifactValidationException e) {
+ throw new BatchContainerServiceException("Cannot create the batchlet [" + batchletId + "]", e);
+ }
+
+ if (logger.isLoggable(Level.FINE))
+ logger.fine("Batchlet is loaded and validated: " + batchletProxy);
+
+
+ if (wasStopIssued()) {
+ logger.fine("Exit without executing batchlet since stop() request has been received.");
+ } else {
+ String processRetVal = batchletProxy.process();
+
+ logger.fine("Set process() return value = " + processRetVal + " for possible use as exitStatus");
+ stepContext.setBatchletProcessRetVal(processRetVal);
+
+ logger.exiting(sourceClass, sourceMethod, exitStatus==null ? "<null>" : exitStatus);
+ }
+ }
+
+ protected synchronized boolean wasStopIssued() {
+ // Might only be set to stopping at the job level. Use the lock for this object on this
+ // method along with the stop() method
+ if (jobExecutionImpl.getJobContext().getBatchStatus().equals(BatchStatus.STOPPING)){
+ stepContext.setBatchStatus(BatchStatus.STOPPING);
+ return true;
+ } else {
+ return false;
+ }
+ }
+ @Override
+ protected void invokeCoreStep() throws BatchContainerServiceException {
+
+ //TODO If this step is partitioned create partition artifacts
+ Partition partition = step.getPartition();
+ if (partition != null) {
+ //partition.getConcurrencyElements();
+ }
+ try {
+ invokeBatchlet(step.getBatchlet());
+ } finally {
+ invokeCollectorIfPresent();
+ }
+
+ }
+
+ @Override
+ public synchronized void stop() {
+
+ // It is possible for stop() to be issued before process()
+ if (BatchStatus.STARTING.equals(stepContext.getBatchStatus()) ||
+ BatchStatus.STARTED.equals(stepContext.getBatchStatus())) {
+
+ stepContext.setBatchStatus(BatchStatus.STOPPING);
+
+ if (batchletProxy != null) {
+ batchletProxy.stop();
+ }
+ } else {
+ //TODO do we need to throw an error if the batchlet is already stopping/stopped
+ //a stop gets issued twice
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/f7740962/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ChunkHelper.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ChunkHelper.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ChunkHelper.java
new file mode 100755
index 0000000..8cda7da
--- /dev/null
+++ b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ChunkHelper.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package com.ibm.jbatch.container.impl;
+
+import com.ibm.jbatch.jsl.model.Chunk;
+
+public class ChunkHelper {
+
+ public static int getItemCount(Chunk chunk) {
+ String chunkSizeStr = chunk.getItemCount();
+ int size = 10;
+
+ if (chunkSizeStr != null && !chunkSizeStr.isEmpty()) {
+ size = Integer.valueOf(chunk.getItemCount());
+ }
+
+ chunk.setItemCount(Integer.toString(size));
+ return size;
+ }
+
+ public static int getTimeLimit(Chunk chunk){
+ String chunkTimeLimitStr = chunk.getTimeLimit();
+ int timeLimit = 0; //default time limit = 0 seconds ie no timelimit
+
+ if (chunkTimeLimitStr != null && !chunkTimeLimitStr.isEmpty()) {
+ timeLimit = Integer.valueOf(chunk.getTimeLimit());
+ }
+
+ chunk.setTimeLimit(Integer.toString(timeLimit));
+ return timeLimit;
+ }
+
+ public static String getCheckpointPolicy(Chunk chunk) {
+ String checkpointPolicy = chunk.getCheckpointPolicy();
+
+ if (checkpointPolicy != null && !checkpointPolicy.isEmpty()) {
+ if (!(checkpointPolicy.equals("item") || checkpointPolicy.equals("custom"))) {
+ throw new IllegalArgumentException("The only supported attributed values for 'checkpoint-policy' are 'item' and 'custom'.");
+ }
+ } else {
+ checkpointPolicy = "item";
+ }
+
+ chunk.setCheckpointPolicy(checkpointPolicy);
+ return checkpointPolicy;
+ }
+
+ public static int getSkipLimit(Chunk chunk) {
+ return Integer.valueOf(chunk.getSkipLimit());
+ }
+
+ public static int getRetryLimit(Chunk chunk) {
+ return Integer.valueOf(chunk.getRetryLimit());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/f7740962/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ChunkStepControllerImpl.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ChunkStepControllerImpl.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ChunkStepControllerImpl.java
new file mode 100755
index 0000000..34dcffc
--- /dev/null
+++ b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ChunkStepControllerImpl.java
@@ -0,0 +1,1045 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.jbatch.container.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.PrintWriter;
+import java.io.Serializable;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.batch.api.chunk.CheckpointAlgorithm;
+import javax.batch.runtime.BatchStatus;
+
+import com.ibm.jbatch.container.artifact.proxy.CheckpointAlgorithmProxy;
+import com.ibm.jbatch.container.artifact.proxy.ChunkListenerProxy;
+import com.ibm.jbatch.container.artifact.proxy.InjectionReferences;
+import com.ibm.jbatch.container.artifact.proxy.ItemProcessListenerProxy;
+import com.ibm.jbatch.container.artifact.proxy.ItemProcessorProxy;
+import com.ibm.jbatch.container.artifact.proxy.ItemReadListenerProxy;
+import com.ibm.jbatch.container.artifact.proxy.ItemReaderProxy;
+import com.ibm.jbatch.container.artifact.proxy.ItemWriteListenerProxy;
+import com.ibm.jbatch.container.artifact.proxy.ItemWriterProxy;
+import com.ibm.jbatch.container.artifact.proxy.ProxyFactory;
+import com.ibm.jbatch.container.artifact.proxy.RetryProcessListenerProxy;
+import com.ibm.jbatch.container.artifact.proxy.RetryReadListenerProxy;
+import com.ibm.jbatch.container.artifact.proxy.RetryWriteListenerProxy;
+import com.ibm.jbatch.container.artifact.proxy.SkipProcessListenerProxy;
+import com.ibm.jbatch.container.artifact.proxy.SkipReadListenerProxy;
+import com.ibm.jbatch.container.artifact.proxy.SkipWriteListenerProxy;
+import com.ibm.jbatch.container.context.impl.MetricImpl;
+import com.ibm.jbatch.container.context.impl.StepContextImpl;
+import com.ibm.jbatch.container.exception.BatchContainerRuntimeException;
+import com.ibm.jbatch.container.exception.BatchContainerServiceException;
+import com.ibm.jbatch.container.jobinstance.RuntimeJobExecution;
+import com.ibm.jbatch.container.persistence.CheckpointAlgorithmFactory;
+import com.ibm.jbatch.container.persistence.CheckpointData;
+import com.ibm.jbatch.container.persistence.CheckpointDataKey;
+import com.ibm.jbatch.container.persistence.CheckpointManager;
+import com.ibm.jbatch.container.persistence.ItemCheckpointAlgorithm;
+import com.ibm.jbatch.container.services.IPersistenceManagerService;
+import com.ibm.jbatch.container.servicesmanager.ServicesManager;
+import com.ibm.jbatch.container.servicesmanager.ServicesManagerImpl;
+import com.ibm.jbatch.container.util.PartitionDataWrapper;
+import com.ibm.jbatch.container.util.TCCLObjectInputStream;
+import com.ibm.jbatch.container.validation.ArtifactValidationException;
+import com.ibm.jbatch.jsl.model.Chunk;
+import com.ibm.jbatch.jsl.model.ItemProcessor;
+import com.ibm.jbatch.jsl.model.ItemReader;
+import com.ibm.jbatch.jsl.model.ItemWriter;
+import com.ibm.jbatch.jsl.model.Property;
+import com.ibm.jbatch.jsl.model.Step;
+
+public class ChunkStepControllerImpl extends SingleThreadedStepControllerImpl {
+
+ private final static String sourceClass = ChunkStepControllerImpl.class.getName();
+ private final static Logger logger = Logger.getLogger(sourceClass);
+
+ private Chunk chunk = null;
+ private ItemReaderProxy readerProxy = null;
+ private ItemProcessorProxy processorProxy = null;
+ private ItemWriterProxy writerProxy = null;
+ private CheckpointAlgorithmProxy checkpointProxy = null;
+ private CheckpointAlgorithm chkptAlg = null;
+ private CheckpointManager checkpointManager;
+ private ServicesManager servicesManager = ServicesManagerImpl.getInstance();
+ private IPersistenceManagerService _persistenceManagerService = null;
+ private SkipHandler skipHandler = null;
+ CheckpointDataKey readerChkptDK, writerChkptDK = null;
+ CheckpointData readerChkptData = null;
+ CheckpointData writerChkptData = null;
+ List<ChunkListenerProxy> chunkListeners = null;
+ List<SkipProcessListenerProxy> skipProcessListeners = null;
+ List<SkipReadListenerProxy> skipReadListeners = null;
+ List<SkipWriteListenerProxy> skipWriteListeners = null;
+ List<RetryProcessListenerProxy> retryProcessListeners = null;
+ List<RetryReadListenerProxy> retryReadListeners = null;
+ List<RetryWriteListenerProxy> retryWriteListeners = null;
+ List<ItemReadListenerProxy> itemReadListeners = null;
+ List<ItemProcessListenerProxy> itemProcessListeners = null;
+ List<ItemWriteListenerProxy> itemWriteListeners = null;
+ private RetryHandler retryHandler;
+
+ // metrics
+ long readCount = 0;
+ long writeCount = 0;
+ long readSkipCount = 0;
+ long processSkipCount = 0;
+ long writeSkipCount = 0;
+ boolean rollbackRetry = false;
+
+ public ChunkStepControllerImpl(RuntimeJobExecution jobExecutionImpl, Step step, StepContextImpl stepContext, long rootJobExecutionId, BlockingQueue<PartitionDataWrapper> analyzerStatusQueue) {
+ super(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerStatusQueue);
+ }
+
+ /**
+ * Utility Class to hold statuses at each level of Read-Process-Write loop
+ *
+ */
+ private class ItemStatus {
+
+ public boolean isSkipped() {
+ return skipped;
+ }
+
+ public void setSkipped(boolean skipped) {
+ this.skipped = skipped;
+ }
+
+ public boolean isFiltered() {
+ return filtered;
+ }
+
+ public void setFiltered(boolean filtered) {
+ this.filtered = filtered;
+ }
+
+ public boolean isCheckPointed() {
+ return checkPointed;
+ }
+
+ public void setCheckPointed(boolean checkPointed) {
+ this.checkPointed = checkPointed;
+ }
+
+ public boolean isFinished() {
+ return finished;
+ }
+
+ public void setFinished(boolean finished) {
+ this.finished = finished;
+ }
+
+ public boolean isRetry() {
+ return retry;
+ }
+
+ public void setRetry(boolean retry) {
+ this.retry = retry;
+ }
+
+ public boolean isRollback() {
+ return rollback;
+ }
+
+ public void setRollback(boolean rollback) {
+ this.rollback = rollback;
+ }
+
+ private boolean skipped = false;
+ private boolean filtered = false;
+ private boolean finished = false;
+ private boolean checkPointed = false;
+ private boolean retry = false;
+ private boolean rollback = false;
+
+ }
+
+ /**
+ * We read and process one item at a time but write in chunks (group of
+ * items). So, this method loops until we either reached the end of the
+ * reader (not more items to read), or the writer buffer is full or a
+ * checkpoint is triggered.
+ *
+ * @param chunkSize
+ * write buffer size
+ * @param theStatus
+ * flags when the read-process reached the last record or a
+ * checkpoint is required
+ * @return an array list of objects to write
+ */
+ private List<Object> readAndProcess(int chunkSize, ItemStatus theStatus) {
+ logger.entering(sourceClass, "readAndProcess", new Object[] { chunkSize, theStatus });
+
+ List<Object> chunkToWrite = new ArrayList<Object>();
+ Object itemRead = null;
+ Object itemProcessed = null;
+ int readProcessedCount = 0;
+
+ while (true) {
+ ItemStatus status = new ItemStatus();
+ itemRead = readItem(status);
+
+ if (status.isRollback()) {
+ theStatus.setRollback(true);
+ // inc rollbackCount
+ stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
+ break;
+ }
+
+ if (!status.isSkipped() && !status.isFinished()) {
+ itemProcessed = processItem(itemRead, status);
+
+ if (status.isRollback()) {
+ theStatus.setRollback(true);
+ // inc rollbackCount
+ stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
+ break;
+ }
+
+ if (!status.isSkipped() && !status.isFiltered()) {
+ chunkToWrite.add(itemProcessed);
+ readProcessedCount++;
+ }
+ }
+
+ theStatus.setFinished(status.isFinished());
+ theStatus.setCheckPointed(checkpointManager.ApplyCheckPointPolicy());
+
+ // This will force the current item to finish processing on a stop
+ // request
+ if (stepContext.getBatchStatus().equals(BatchStatus.STOPPING)) {
+ theStatus.setFinished(true);
+ }
+
+ // write buffer size reached
+ if ((readProcessedCount == chunkSize) && (checkpointProxy.getCheckpointType() != "custom")) {
+ break;
+ }
+
+ // checkpoint reached
+ if (theStatus.isCheckPointed()) {
+ break;
+ }
+
+ // last record in readerProxy reached
+ if (theStatus.isFinished()) {
+ break;
+ }
+
+ }
+ logger.exiting(sourceClass, "readAndProcess", chunkToWrite);
+ return chunkToWrite;
+ }
+
+ /**
+ * Reads an item from the reader
+ *
+ * @param status
+ * flags the current read status
+ * @return the item read
+ */
+ private Object readItem(ItemStatus status) {
+ logger.entering(sourceClass, "readItem", status);
+ Object itemRead = null;
+
+ try {
+ // call read listeners before and after the actual read
+ for (ItemReadListenerProxy readListenerProxy : itemReadListeners) {
+ readListenerProxy.beforeRead();
+ }
+
+ itemRead = readerProxy.readItem();
+
+ for (ItemReadListenerProxy readListenerProxy : itemReadListeners) {
+ readListenerProxy.afterRead(itemRead);
+ }
+
+ // itemRead == null means we reached the end of
+ // the readerProxy "resultset"
+ status.setFinished(itemRead == null);
+ if (!status.isFinished()) {
+ stepContext.getMetric(MetricImpl.MetricType.READ_COUNT).incValue();
+ }
+ } catch (Exception e) {
+ stepContext.setException(e);
+ for (ItemReadListenerProxy readListenerProxy : itemReadListeners) {
+ readListenerProxy.onReadError(e);
+ }
+ if(!rollbackRetry) {
+ if (retryReadException(e)) {
+ for (ItemReadListenerProxy readListenerProxy : itemReadListeners) {
+ readListenerProxy.onReadError(e);
+ }
+ // if not a rollback exception, just retry the current item
+ if (!retryHandler.isRollbackException(e)) {
+ itemRead = readItem(status);
+ } else {
+ status.setRollback(true);
+ rollbackRetry = true;
+ // inc rollbackCount
+ stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
+ }
+ }
+ else if(skipReadException(e)) {
+ status.setSkipped(true);
+ stepContext.getMetric(MetricImpl.MetricType.READ_SKIP_COUNT).incValue();
+
+ }
+ else {
+ throw new BatchContainerRuntimeException(e);
+ }
+ }
+ else {
+ // coming from a rollback retry
+ if(skipReadException(e)) {
+ status.setSkipped(true);
+ stepContext.getMetric(MetricImpl.MetricType.READ_SKIP_COUNT).incValue();
+
+ }
+ else if (retryReadException(e)) {
+ if (!retryHandler.isRollbackException(e)) {
+ itemRead = readItem(status);
+ }
+ else {
+ status.setRollback(true);
+ // inc rollbackCount
+ stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
+ }
+ }
+ else {
+ throw new BatchContainerRuntimeException(e);
+ }
+ }
+
+ } catch (Throwable e) {
+ throw new BatchContainerRuntimeException(e);
+ }
+
+ logger.exiting(sourceClass, "readItem", itemRead==null ? "<null>" : itemRead);
+ return itemRead;
+ }
+
+ /**
+ * Process an item previously read by the reader
+ *
+ * @param itemRead
+ * the item read
+ * @param status
+ * flags the current process status
+ * @return the processed item
+ */
+ private Object processItem(Object itemRead, ItemStatus status) {
+ logger.entering(sourceClass, "processItem", new Object[] { itemRead, status });
+ Object processedItem = null;
+
+ // if no processor defined for this chunk
+ if (processorProxy == null){
+ return itemRead;
+ }
+
+ try {
+
+ // call process listeners before and after the actual process call
+ for (ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
+ processListenerProxy.beforeProcess(itemRead);
+ }
+
+ processedItem = processorProxy.processItem(itemRead);
+
+ if (processedItem == null) {
+ // inc filterCount
+ stepContext.getMetric(MetricImpl.MetricType.FILTER_COUNT).incValue();
+ status.setFiltered(true);
+ }
+
+ for (ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
+ processListenerProxy.afterProcess(itemRead, processedItem);
+ }
+ } catch (Exception e) {
+ for (ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
+ processListenerProxy.onProcessError(processedItem, e);
+ }
+ if(!rollbackRetry) {
+ if (retryProcessException(e, itemRead)) {
+ if (!retryHandler.isRollbackException(e)) {
+ // call process listeners before and after the actual
+ // process call
+ for (ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
+ processListenerProxy.beforeProcess(itemRead);
+ }
+ processedItem = processItem(itemRead, status);
+ if (processedItem == null) {
+ // inc filterCount
+ stepContext.getMetric(MetricImpl.MetricType.FILTER_COUNT).incValue();
+ status.setFiltered(true);
+ }
+
+ for (ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
+ processListenerProxy.afterProcess(itemRead, processedItem);
+ }
+ } else {
+ status.setRollback(true);
+ rollbackRetry = true;
+ // inc rollbackCount
+ stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
+ }
+ }
+ else if (skipProcessException(e, itemRead)) {
+ status.setSkipped(true);
+ stepContext.getMetric(MetricImpl.MetricType.PROCESS_SKIP_COUNT).incValue();
+ }
+ else {
+ throw new BatchContainerRuntimeException(e);
+ }
+ }
+ else {
+ if (skipProcessException(e, itemRead)) {
+ status.setSkipped(true);
+ stepContext.getMetric(MetricImpl.MetricType.PROCESS_SKIP_COUNT).incValue();
+ } else if (retryProcessException(e, itemRead)) {
+ if (!retryHandler.isRollbackException(e)) {
+ // call process listeners before and after the actual
+ // process call
+ for (ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
+ processListenerProxy.beforeProcess(itemRead);
+ }
+ processedItem = processItem(itemRead, status);
+ if (processedItem == null) {
+ // inc filterCount
+ stepContext.getMetric(MetricImpl.MetricType.FILTER_COUNT).incValue();
+ status.setFiltered(true);
+ }
+
+ for (ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
+ processListenerProxy.afterProcess(itemRead, processedItem);
+ }
+ } else {
+ status.setRollback(true);
+ rollbackRetry = true;
+ // inc rollbackCount
+ stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
+ }
+ } else {
+ throw new BatchContainerRuntimeException(e);
+ }
+ }
+
+ } catch (Throwable e) {
+ throw new BatchContainerRuntimeException(e);
+ }
+
+ logger.exiting(sourceClass, "processItem", processedItem==null ? "<null>" : processedItem);
+ return processedItem;
+ }
+
+ /**
+ * Writes items
+ *
+ * @param theChunk
+ * the array list with all items processed ready to be written
+ */
+ private void writeChunk(List<Object> theChunk, ItemStatus status) {
+ logger.entering(sourceClass, "writeChunk", theChunk);
+ if (!theChunk.isEmpty()) {
+ try {
+
+ // call read listeners before and after the actual read
+ for (ItemWriteListenerProxy writeListenerProxy : itemWriteListeners) {
+ writeListenerProxy.beforeWrite(theChunk);
+ }
+
+ writerProxy.writeItems(theChunk);
+
+ for (ItemWriteListenerProxy writeListenerProxy : itemWriteListeners) {
+ writeListenerProxy.afterWrite(theChunk);
+ }
+ stepContext.getMetric(MetricImpl.MetricType.WRITE_COUNT).incValueBy(theChunk.size());
+ } catch (Exception e) {
+ this.stepContext.setException(e);
+ for (ItemWriteListenerProxy writeListenerProxy : itemWriteListeners) {
+ writeListenerProxy.onWriteError(theChunk, e);
+ }
+ if(!rollbackRetry)
+ {
+ if (retryWriteException(e, theChunk)) {
+ if (!retryHandler.isRollbackException(e)) {
+ writeChunk(theChunk, status);
+ } else {
+ rollbackRetry = true;
+ status.setRollback(true);
+ // inc rollbackCount
+ stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
+ }
+ } else if (skipWriteException(e, theChunk)) {
+ stepContext.getMetric(MetricImpl.MetricType.WRITE_SKIP_COUNT).incValueBy(1);
+ } else {
+ throw new BatchContainerRuntimeException(e);
+ }
+
+ }
+ else {
+ if (skipWriteException(e, theChunk)) {
+ stepContext.getMetric(MetricImpl.MetricType.WRITE_SKIP_COUNT).incValueBy(1);
+ } else if (retryWriteException(e, theChunk)) {
+ if (!retryHandler.isRollbackException(e)) {
+ status.setRetry(true);
+ writeChunk(theChunk, status);
+ } else {
+ rollbackRetry = true;
+ status.setRollback(true);
+ // inc rollbackCount
+ stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
+ }
+ } else {
+ throw new BatchContainerRuntimeException(e);
+ }
+ }
+
+ } catch (Throwable e) {
+ throw new BatchContainerRuntimeException(e);
+ }
+ }
+ logger.exiting(sourceClass, "writeChunk");
+ }
+
+ /**
+ * Main Read-Process-Write loop
+ *
+ * @throws Exception
+ */
+ private void invokeChunk() {
+ logger.entering(sourceClass, "invokeChunk2");
+
+ int itemCount = ChunkHelper.getItemCount(chunk);
+ int timeInterval = ChunkHelper.getTimeLimit(chunk);
+ List<Object> chunkToWrite = new ArrayList<Object>();
+ boolean checkPointed = true;
+ boolean rollback = false;
+ Throwable caughtThrowable = null;
+
+ // begin new transaction at first iteration or after a checkpoint commit
+
+ try {
+ transactionManager.begin();
+ this.openReaderAndWriter();
+ transactionManager.commit();
+
+ while (true) {
+
+ if (checkPointed || rollback) {
+ if (this.checkpointProxy.getCheckpointType() == "custom" ){
+ int newtimeOut = this.checkpointManager.checkpointTimeout();
+ transactionManager.setTransactionTimeout(newtimeOut);
+ }
+ transactionManager.begin();
+ for (ChunkListenerProxy chunkProxy : chunkListeners) {
+ chunkProxy.beforeChunk();
+ }
+
+ if (rollback) {
+ positionReaderAtCheckpoint();
+ positionWriterAtCheckpoint();
+ checkpointManager = new CheckpointManager(readerProxy, writerProxy,
+ getCheckpointAlgorithm(itemCount, timeInterval), jobExecutionImpl.getExecutionId(), jobExecutionImpl
+ .getJobInstance().getInstanceId(), step.getId());
+ }
+ }
+
+ ItemStatus status = new ItemStatus();
+
+ if (rollback) {
+ rollback = false;
+ }
+
+ chunkToWrite = readAndProcess(itemCount, status);
+
+ if (status.isRollback()) {
+ itemCount = 1;
+ rollback = true;
+
+ readerProxy.close();
+ writerProxy.close();
+
+ transactionManager.rollback();
+
+ continue;
+ }
+
+ writeChunk(chunkToWrite, status);
+
+ if (status.isRollback()) {
+ itemCount = 1;
+ rollback = true;
+
+ readerProxy.close();
+ writerProxy.close();
+
+ transactionManager.rollback();
+
+ continue;
+ }
+ checkPointed = status.isCheckPointed();
+
+ // we could finish the chunk in 3 conditions: buffer is full,
+ // checkpoint, not more input
+ if (status.isCheckPointed() || status.isFinished()) {
+ // TODO: missing before checkpoint listeners
+ // 1.- check if spec list proper steps for before checkpoint
+ // 2.- ask Andy about retry
+ // 3.- when do we stop?
+
+ checkpointManager.checkpoint();
+
+ for (ChunkListenerProxy chunkProxy : chunkListeners) {
+ chunkProxy.afterChunk();
+ }
+
+ this.persistUserData();
+
+ this.chkptAlg.beginCheckpoint();
+
+ transactionManager.commit();
+
+ this.chkptAlg.endCheckpoint();
+
+ invokeCollectorIfPresent();
+
+ // exit loop when last record is written
+ if (status.isFinished()) {
+ transactionManager.begin();
+
+ readerProxy.close();
+ writerProxy.close();
+
+ transactionManager.commit();
+ // increment commitCount
+ stepContext.getMetric(MetricImpl.MetricType.COMMIT_COUNT).incValue();
+ break;
+ } else {
+ // increment commitCount
+ stepContext.getMetric(MetricImpl.MetricType.COMMIT_COUNT).incValue();
+ }
+
+ }
+
+ }
+ } catch (Exception e) {
+ caughtThrowable = e;
+ logger.log(Level.SEVERE, "Failure in Read-Process-Write Loop", e);
+ // Only try to call onError() if we have an Exception, but not an Error.
+ for (ChunkListenerProxy chunkProxy : chunkListeners) {
+ try {
+ chunkProxy.onError(e);
+ } catch (Exception e1) {
+ StringWriter sw = new StringWriter();
+ PrintWriter pw = new PrintWriter(sw);
+ e1.printStackTrace(pw);
+ logger.warning("Caught secondary exception when calling chunk listener onError() with stack trace: " + sw.toString() + "\n. Will continue to remaining chunk listeners (if any) and rethrow wrapping the primary exception.");
+ }
+ }
+ } catch (Throwable t) {
+ caughtThrowable = t;
+ logger.log(Level.SEVERE, "Failure in Read-Process-Write Loop", t);
+ } finally {
+ if (caughtThrowable != null) {
+ transactionManager.setRollbackOnly();
+ logger.warning("Caught throwable in chunk processing. Attempting to close all readers and writers.");
+ readerProxy.close();
+ writerProxy.close();
+ transactionManager.rollback();
+ logger.exiting(sourceClass, "invokeChunk");
+ throw new BatchContainerRuntimeException("Failure in Read-Process-Write Loop", caughtThrowable);
+ } else {
+ logger.finest("Exiting normally");
+ logger.exiting(sourceClass, "invokeChunk");
+ }
+ }
+ }
+
+ protected void invokeCoreStep() throws BatchContainerServiceException {
+
+ this.chunk = step.getChunk();
+
+ initializeChunkArtifacts();
+
+ invokeChunk();
+ }
+
+ private CheckpointAlgorithm getCheckpointAlgorithm(int itemCount, int timeInterval) {
+ CheckpointAlgorithm alg = null;
+
+ if (checkpointProxy.getCheckpointType() == "item") {
+ alg = new ItemCheckpointAlgorithm();
+ ((ItemCheckpointAlgorithm) alg).setThresholds(itemCount, timeInterval);
+ } else { // custom chkpt alg
+ alg = (CheckpointAlgorithm) checkpointProxy;
+ }
+
+ return alg;
+ }
+
+ /*
+ * Initialize itemreader, itemwriter, and item processor checkpoint
+ */
+ private void initializeChunkArtifacts() {
+ String sourceMethod = "initializeChunkArtifacts";
+ if (logger.isLoggable(Level.FINE))
+ logger.entering(sourceClass, sourceMethod);
+
+ int itemCount = ChunkHelper.getItemCount(chunk);
+ int timeInterval = ChunkHelper.getTimeLimit(chunk);
+ String checkpointPolicy = ChunkHelper.getCheckpointPolicy(chunk);
+
+ ItemReader itemReader = chunk.getReader();
+ List<Property> itemReaderProps = itemReader.getProperties() == null ? null : itemReader.getProperties().getPropertyList();
+ try {
+ InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext,
+ itemReaderProps);
+
+ readerProxy = ProxyFactory.createItemReaderProxy(itemReader.getRef(), injectionRef, stepContext);
+
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine("Created ItemReaderProxy for " + itemReader.getRef());
+ }
+ } catch (ArtifactValidationException e) {
+ throw new BatchContainerServiceException("Cannot create the ItemReader [" + itemReader.getRef() + "]", e);
+ }
+
+ ItemProcessor itemProcessor = chunk.getProcessor();
+ if (itemProcessor != null){
+ List<Property> itemProcessorProps = itemProcessor.getProperties() == null ? null : itemProcessor.getProperties().getPropertyList();
+ try {
+
+ InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext,
+ itemProcessorProps);
+
+ processorProxy = ProxyFactory.createItemProcessorProxy(itemProcessor.getRef(), injectionRef, stepContext);
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine("Created ItemProcessorProxy for " + itemProcessor.getRef());
+ }
+ } catch (ArtifactValidationException e) {
+ throw new BatchContainerServiceException("Cannot create the ItemProcessor [" + itemProcessor.getRef() + "]", e);
+ }
+ }
+
+ ItemWriter itemWriter = chunk.getWriter();
+ List<Property> itemWriterProps = itemWriter.getProperties() == null ? null : itemWriter.getProperties().getPropertyList();
+ try {
+ InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext,
+ itemWriterProps);
+
+ writerProxy = ProxyFactory.createItemWriterProxy(itemWriter.getRef(), injectionRef, stepContext);
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine("Created ItemWriterProxy for " + itemWriter.getRef());
+ }
+ } catch (ArtifactValidationException e) {
+ throw new BatchContainerServiceException("Cannot create the ItemWriter [" + itemWriter.getRef() + "]", e);
+ }
+
+ try {
+ List<Property> propList = null;
+
+ if (chunk.getCheckpointAlgorithm() != null) {
+
+ propList = (chunk.getCheckpointAlgorithm().getProperties() == null) ? null : chunk.getCheckpointAlgorithm().getProperties()
+ .getPropertyList();
+ }
+
+ InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext,
+ propList);
+
+ checkpointProxy = CheckpointAlgorithmFactory.getCheckpointAlgorithmProxy(step, injectionRef, stepContext);
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine("Created CheckpointAlgorithmProxy for policy [" + checkpointPolicy + "]");
+ }
+ } catch (ArtifactValidationException e) {
+ throw new BatchContainerServiceException("Cannot create the CheckpointAlgorithm for policy [" + chunk.getCheckpointPolicy()
+ + "]", e);
+ }
+
+ InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext,
+ null);
+
+ this.chunkListeners = jobExecutionImpl.getListenerFactory().getChunkListeners(step, injectionRef, stepContext);
+ this.skipProcessListeners = jobExecutionImpl.getListenerFactory().getSkipProcessListeners(step, injectionRef, stepContext);
+ this.skipReadListeners = jobExecutionImpl.getListenerFactory().getSkipReadListeners(step, injectionRef, stepContext);
+ this.skipWriteListeners = jobExecutionImpl.getListenerFactory().getSkipWriteListeners(step, injectionRef, stepContext);
+ this.retryProcessListeners = jobExecutionImpl.getListenerFactory().getRetryProcessListeners(step, injectionRef, stepContext);
+ this.retryReadListeners = jobExecutionImpl.getListenerFactory().getRetryReadListeners(step, injectionRef, stepContext);
+ this.retryWriteListeners = jobExecutionImpl.getListenerFactory().getRetryWriteListeners(step, injectionRef, stepContext);
+ this.itemReadListeners = jobExecutionImpl.getListenerFactory().getItemReadListeners(step, injectionRef, stepContext);
+ this.itemProcessListeners = jobExecutionImpl.getListenerFactory().getItemProcessListeners(step, injectionRef, stepContext);
+ this.itemWriteListeners = jobExecutionImpl.getListenerFactory().getItemWriteListeners(step, injectionRef, stepContext);
+
+ if (checkpointProxy.getCheckpointType() == "item") {
+ chkptAlg = new ItemCheckpointAlgorithm();
+ ((ItemCheckpointAlgorithm) chkptAlg).setThresholds(itemCount, timeInterval);
+ } else { // custom chkpt alg
+ chkptAlg = (CheckpointAlgorithm) checkpointProxy;
+ }
+
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine("Setting contexts for chunk artifacts");
+ }
+
+ if (logger.isLoggable(Level.FINE))
+ logger.fine("Initialize checkpoint manager with item-count=" + itemCount);
+ logger.fine("Initialize checkpoint manager with time-interval=" + timeInterval);
+
+ checkpointManager = new CheckpointManager(readerProxy, writerProxy, chkptAlg, jobExecutionImpl.getExecutionId(), jobExecutionImpl
+ .getJobInstance().getInstanceId(), step.getId());
+
+ skipHandler = new SkipHandler(chunk, jobExecutionImpl.getJobInstance().getInstanceId(), step.getId());
+ skipHandler.addSkipProcessListener(skipProcessListeners);
+ skipHandler.addSkipReadListener(skipReadListeners);
+ skipHandler.addSkipWriteListener(skipWriteListeners);
+
+ retryHandler = new RetryHandler(chunk, jobExecutionImpl.getJobInstance().getInstanceId(), step.getId());
+
+ retryHandler.addRetryProcessListener(retryProcessListeners);
+ retryHandler.addRetryReadListener(retryReadListeners);
+ retryHandler.addRetryWriteListener(retryWriteListeners);
+
+ if (logger.isLoggable(Level.FINE))
+ logger.exiting(sourceClass, sourceMethod);
+ }
+
+ private void openReaderAndWriter() {
+ String sourceMethod = "openReaderAndWriter";
+
+ if (logger.isLoggable(Level.FINE))
+ logger.entering(sourceClass, sourceMethod);
+
+ _persistenceManagerService = servicesManager.getPersistenceManagerService();
+ readerChkptDK = new CheckpointDataKey(jobExecutionImpl.getJobInstance().getInstanceId(), step.getId(), "READER");
+ CheckpointData readerChkptData = _persistenceManagerService.getCheckpointData(readerChkptDK);
+ try {
+
+ // check for data in backing store
+ if (readerChkptData != null) {
+
+ byte[] readertoken = readerChkptData.getRestartToken();
+ ByteArrayInputStream readerChkptBA = new ByteArrayInputStream(readertoken);
+ TCCLObjectInputStream readerOIS = null;
+ try {
+ readerOIS = new TCCLObjectInputStream(readerChkptBA);
+ readerProxy.open((Serializable) readerOIS.readObject());
+ readerOIS.close();
+ } catch (Exception ex) {
+ // is this what I should be throwing here?
+ throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + step.getId() + "]", ex);
+ }
+ } else {
+ // no chkpt data exists in the backing store
+ readerChkptData = null;
+ readerProxy.open(null);
+ }
+ } catch (ClassCastException e) {
+ logger.warning("Expected CheckpointData but found" + readerChkptData );
+ throw new IllegalStateException("Expected CheckpointData but found" + readerChkptData );
+ }
+
+ writerChkptDK = new CheckpointDataKey(jobExecutionImpl.getJobInstance().getInstanceId(), step.getId(), "WRITER");
+ CheckpointData writerChkptData = _persistenceManagerService.getCheckpointData(writerChkptDK);
+
+ try {
+ // check for data in backing store
+ if (writerChkptData != null) {
+ byte[] writertoken = writerChkptData.getRestartToken();
+ ByteArrayInputStream writerChkptBA = new ByteArrayInputStream(writertoken);
+ TCCLObjectInputStream writerOIS = null;
+ try {
+ writerOIS = new TCCLObjectInputStream(writerChkptBA);
+ writerProxy.open((Serializable) writerOIS.readObject());
+ writerOIS.close();
+ } catch (Exception ex) {
+ // is this what I should be throwing here?
+ throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + step.getId() + "]", ex);
+ }
+ } else {
+ // no chkpt data exists in the backing store
+ writerChkptData = null;
+ writerProxy.open(null);
+ }
+ } catch (ClassCastException e) {
+ logger.warning("Expected Checkpoint but found" + writerChkptData);
+ throw new IllegalStateException("Expected Checkpoint but found" + writerChkptData);
+ }
+
+ // set up metrics
+ // stepContext.addMetric(MetricImpl.Counter.valueOf("READ_COUNT"), 0);
+ // stepContext.addMetric(MetricImpl.Counter.valueOf("WRITE_COUNT"), 0);
+ // stepContext.addMetric(MetricImpl.Counter.valueOf("READ_SKIP_COUNT"),
+ // 0);
+ // stepContext.addMetric(MetricImpl.Counter.valueOf("PROCESS_SKIP_COUNT"),
+ // 0);
+ // stepContext.addMetric(MetricImpl.Counter.valueOf("WRITE_SKIP_COUNT"),
+ // 0);
+
+ if (logger.isLoggable(Level.FINE))
+ logger.exiting(sourceClass, sourceMethod);
+ }
+
+ @Override
+ public void stop() {
+ stepContext.setBatchStatus(BatchStatus.STOPPING);
+
+ // we don't need to call stop on the chunk implementation here since a
+ // chunk always returns control to
+ // the batch container after every item.
+
+ }
+
+ boolean skipReadException(Exception e) {
+
+ try {
+ skipHandler.handleExceptionRead(e);
+ } catch (BatchContainerRuntimeException bcre) {
+ return false;
+ }
+
+ return true;
+
+ }
+
+ boolean retryReadException(Exception e) {
+
+ try {
+ retryHandler.handleExceptionRead(e);
+ } catch (BatchContainerRuntimeException bcre) {
+ return false;
+ }
+
+ return true;
+
+ }
+
+ boolean skipProcessException(Exception e, Object record) {
+
+ try {
+ skipHandler.handleExceptionWithRecordProcess(e, record);
+ } catch (BatchContainerRuntimeException bcre) {
+ return false;
+ }
+
+ return true;
+
+ }
+
+ boolean retryProcessException(Exception e, Object record) {
+
+ try {
+ retryHandler.handleExceptionProcess(e, record);
+ } catch (BatchContainerRuntimeException bcre) {
+ return false;
+ }
+
+ return true;
+
+ }
+
+ boolean skipWriteException(Exception e, List<Object> chunkToWrite) {
+
+
+
+ try {
+ skipHandler.handleExceptionWithRecordListWrite(e, chunkToWrite);
+ } catch (BatchContainerRuntimeException bcre) {
+ return false;
+ }
+
+
+ return true;
+
+ }
+
+ boolean retryWriteException(Exception e, List<Object> chunkToWrite) {
+
+ try {
+ retryHandler.handleExceptionWrite(e, chunkToWrite);
+ } catch (BatchContainerRuntimeException bcre) {
+ return false;
+ }
+
+ return true;
+
+ }
+
+ private void positionReaderAtCheckpoint() {
+ _persistenceManagerService = servicesManager.getPersistenceManagerService();
+ readerChkptDK = new CheckpointDataKey(jobExecutionImpl.getJobInstance().getInstanceId(), step.getId(), "READER");
+
+ CheckpointData readerData = _persistenceManagerService.getCheckpointData(readerChkptDK);
+ try {
+ // check for data in backing store
+ if (readerData != null) {
+ byte[] readertoken = readerData.getRestartToken();
+ ByteArrayInputStream readerChkptBA = new ByteArrayInputStream(readertoken);
+ TCCLObjectInputStream readerOIS = null;
+ try {
+ readerOIS = new TCCLObjectInputStream(readerChkptBA);
+ readerProxy.open((Serializable) readerOIS.readObject());
+ readerOIS.close();
+ } catch (Exception ex) {
+ // is this what I should be throwing here?
+ throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + step.getId() + "]", ex);
+ }
+ } else {
+ // no chkpt data exists in the backing store
+ readerData = null;
+ readerProxy.open(null);
+ }
+ } catch (ClassCastException e) {
+ throw new IllegalStateException("Expected CheckpointData but found" + readerData);
+ }
+ }
+
+ private void positionWriterAtCheckpoint() {
+ _persistenceManagerService = servicesManager.getPersistenceManagerService();
+ writerChkptDK = new CheckpointDataKey(jobExecutionImpl.getJobInstance().getInstanceId(), step.getId(), "WRITER");
+
+ CheckpointData writerData = _persistenceManagerService.getCheckpointData(writerChkptDK);
+
+ try {
+ // check for data in backing store
+ if (writerData != null) {
+ byte[] writertoken = writerData.getRestartToken();
+ ByteArrayInputStream writerChkptBA = new ByteArrayInputStream(writertoken);
+ TCCLObjectInputStream writerOIS = null;
+ try {
+ writerOIS = new TCCLObjectInputStream(writerChkptBA);
+ writerProxy.open((Serializable) writerOIS.readObject());
+ writerOIS.close();
+ } catch (Exception ex) {
+ // is this what I should be throwing here?
+ throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + step.getId() + "]", ex);
+ }
+ } else {
+ // no chkpt data exists in the backing store
+ writerData = null;
+ writerProxy.open(null);
+ }
+ } catch (ClassCastException e) {
+ throw new IllegalStateException("Expected CheckpointData but found" + writerData);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/f7740962/JSR352.Runtime/src/com/ibm/jbatch/container/impl/DecisionControllerImpl.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/DecisionControllerImpl.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/DecisionControllerImpl.java
new file mode 100755
index 0000000..0595c48
--- /dev/null
+++ b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/DecisionControllerImpl.java
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.jbatch.container.impl;
+
+import java.util.List;
+import java.util.logging.Logger;
+
+import javax.batch.runtime.StepExecution;
+
+import com.ibm.jbatch.container.IExecutionElementController;
+import com.ibm.jbatch.container.artifact.proxy.DeciderProxy;
+import com.ibm.jbatch.container.artifact.proxy.InjectionReferences;
+import com.ibm.jbatch.container.artifact.proxy.ProxyFactory;
+import com.ibm.jbatch.container.exception.BatchContainerRuntimeException;
+import com.ibm.jbatch.container.exception.BatchContainerServiceException;
+import com.ibm.jbatch.container.jobinstance.RuntimeJobExecution;
+import com.ibm.jbatch.container.jsl.ExecutionElement;
+import com.ibm.jbatch.container.services.IPersistenceManagerService;
+import com.ibm.jbatch.container.servicesmanager.ServicesManagerImpl;
+import com.ibm.jbatch.container.status.ExecutionStatus;
+import com.ibm.jbatch.container.status.ExtendedBatchStatus;
+import com.ibm.jbatch.container.validation.ArtifactValidationException;
+import com.ibm.jbatch.jsl.model.Decision;
+import com.ibm.jbatch.jsl.model.Property;
+
+public class DecisionControllerImpl implements IExecutionElementController {
+
+ private final static String sourceClass = SplitControllerImpl.class.getName();
+ private final static Logger logger = Logger.getLogger(sourceClass);
+
+ private RuntimeJobExecution jobExecution;
+
+ private Decision decision;
+
+ private StepExecution[] previousStepExecutions = null;
+
+ private IPersistenceManagerService persistenceService = null;
+
+ public DecisionControllerImpl(RuntimeJobExecution jobExecution, Decision decision) {
+ this.jobExecution = jobExecution;
+ this.decision = decision;
+ persistenceService = ServicesManagerImpl.getInstance().getPersistenceManagerService();
+ }
+
+ @Override
+ public ExecutionStatus execute() {
+
+ String deciderId = decision.getRef();
+ List<Property> propList = (decision.getProperties() == null) ? null : decision.getProperties().getPropertyList();
+
+ DeciderProxy deciderProxy;
+
+ //Create a decider proxy and inject the associated properties
+
+ /* Set the contexts associated with this scope */
+ //job context is always in scope
+ //the parent controller will only pass one valid context to a decision controller
+ //so two of these contexts will always be null
+ InjectionReferences injectionRef = new InjectionReferences(jobExecution.getJobContext(), null, propList);
+
+ try {
+ deciderProxy = ProxyFactory.createDeciderProxy(deciderId,injectionRef );
+ } catch (ArtifactValidationException e) {
+ throw new BatchContainerServiceException("Cannot create the decider [" + deciderId + "]", e);
+ }
+
+ String exitStatus = deciderProxy.decide(this.previousStepExecutions);
+
+ logger.fine("Decider exiting and setting job-level exit status to " + exitStatus);
+
+ //Set the value returned from the decider as the job context exit status.
+ this.jobExecution.getJobContext().setExitStatus(exitStatus);
+
+ return new ExecutionStatus(ExtendedBatchStatus.NORMAL_COMPLETION, exitStatus);
+ }
+
+ protected void setPreviousStepExecutions(ExecutionElement previousExecutionElement, IExecutionElementController previousElementController) {
+ if (previousExecutionElement == null) {
+ // only job context is available to the decider
+ } else if (previousExecutionElement instanceof Decision) {
+
+ throw new BatchContainerRuntimeException("A decision cannot precede another decision.");
+
+ }
+
+ List<Long> previousStepExecsIds = previousElementController.getLastRunStepExecutions();
+
+ StepExecution[] stepExecArray = new StepExecution[previousStepExecsIds.size()];
+
+ for (int i=0; i < stepExecArray.length; i++) {
+ StepExecution stepExec = persistenceService.getStepExecutionByStepExecutionId(previousStepExecsIds.get(i));
+ stepExecArray[i] = stepExec;
+ }
+
+ this.previousStepExecutions = stepExecArray;
+
+ }
+
+
+ @Override
+ public void stop() {
+ // no-op
+ }
+
+ @Override
+ public List<Long> getLastRunStepExecutions() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/f7740962/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ExecutionElementControllerFactory.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ExecutionElementControllerFactory.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ExecutionElementControllerFactory.java
new file mode 100755
index 0000000..ad504b7
--- /dev/null
+++ b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ExecutionElementControllerFactory.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.jbatch.container.impl;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+
+import com.ibm.jbatch.container.context.impl.StepContextImpl;
+import com.ibm.jbatch.container.jobinstance.RuntimeJobExecution;
+import com.ibm.jbatch.container.util.PartitionDataWrapper;
+import com.ibm.jbatch.jsl.model.Batchlet;
+import com.ibm.jbatch.jsl.model.Chunk;
+import com.ibm.jbatch.jsl.model.Decision;
+import com.ibm.jbatch.jsl.model.Flow;
+import com.ibm.jbatch.jsl.model.Partition;
+import com.ibm.jbatch.jsl.model.Split;
+import com.ibm.jbatch.jsl.model.Step;
+
+public class ExecutionElementControllerFactory {
+
+ private final static String CLASSNAME = ExecutionElementControllerFactory.class.getName();
+ private final static Logger logger = Logger.getLogger(CLASSNAME);
+
+ public static BaseStepControllerImpl getStepController(RuntimeJobExecution jobExecutionImpl, Step step, StepContextImpl stepContext, long rootJobExecutionId, BlockingQueue<PartitionDataWrapper> analyzerQueue) {
+
+ String methodName = "getStepController";
+
+ if(logger.isLoggable(Level.FINER)) { logger.logp (Level.FINER, CLASSNAME, methodName, "Get StepController for", step.getId());}
+
+ Partition partition = step.getPartition();
+ if (partition != null) {
+
+ if (partition.getMapper() != null ) {
+ if (logger.isLoggable(Level.FINER)) {
+ logger.logp(Level.FINER, CLASSNAME, methodName, "Found partitioned step with mapper" , step);
+ }
+ return new PartitionedStepControllerImpl(jobExecutionImpl, step, stepContext, rootJobExecutionId);
+ }
+
+ if (partition.getPlan() != null) {
+ if (partition.getPlan().getPartitions() != null) {
+ if (logger.isLoggable(Level.FINER)) {
+ logger.logp(Level.FINER, CLASSNAME, methodName, "Found partitioned step with plan", step);
+ }
+ return new PartitionedStepControllerImpl(jobExecutionImpl, step, stepContext, rootJobExecutionId);
+ }
+ }
+ }
+
+ Batchlet batchlet = step.getBatchlet();
+ if (batchlet != null) {
+ if(logger.isLoggable(Level.FINER)) {
+ logger.finer("Found batchlet: " + batchlet + ", with ref= " + batchlet.getRef());
+ }
+ if (step.getChunk() != null) {
+ throw new IllegalArgumentException("Step contains both a batchlet and a chunk. Aborting.");
+ }
+ return new BatchletStepControllerImpl(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerQueue);
+ } else {
+ Chunk chunk = step.getChunk();
+ if(logger.isLoggable(Level.FINER)) {
+ logger.finer("Found chunk: " + chunk);
+ }
+ if (chunk == null) {
+ throw new IllegalArgumentException("Step does not contain either a batchlet or a chunk. Aborting.");
+ }
+ return new ChunkStepControllerImpl(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerQueue);
+ }
+ }
+
+ public static DecisionControllerImpl getDecisionController(RuntimeJobExecution jobExecutionImpl, Decision decision) {
+ return new DecisionControllerImpl(jobExecutionImpl, decision);
+ }
+
+ public static FlowControllerImpl getFlowController(RuntimeJobExecution jobExecutionImpl, Flow flow, long rootJobExecutionId) {
+ return new FlowControllerImpl(jobExecutionImpl, flow, rootJobExecutionId);
+ }
+
+ public static SplitControllerImpl getSplitController(RuntimeJobExecution jobExecutionImpl, Split split, long rootJobExecutionId) {
+ return new SplitControllerImpl(jobExecutionImpl, split, rootJobExecutionId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/f7740962/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ExecutionTransitioner.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ExecutionTransitioner.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ExecutionTransitioner.java
new file mode 100755
index 0000000..665f6e4
--- /dev/null
+++ b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ExecutionTransitioner.java
@@ -0,0 +1,275 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.jbatch.container.impl;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.logging.Logger;
+
+import javax.batch.runtime.BatchStatus;
+
+import com.ibm.jbatch.container.IController;
+import com.ibm.jbatch.container.IExecutionElementController;
+import com.ibm.jbatch.container.context.impl.JobContextImpl;
+import com.ibm.jbatch.container.context.impl.StepContextImpl;
+import com.ibm.jbatch.container.exception.BatchContainerRuntimeException;
+import com.ibm.jbatch.container.jobinstance.RuntimeJobExecution;
+import com.ibm.jbatch.container.jsl.ExecutionElement;
+import com.ibm.jbatch.container.jsl.IllegalTransitionException;
+import com.ibm.jbatch.container.jsl.Transition;
+import com.ibm.jbatch.container.jsl.TransitionElement;
+import com.ibm.jbatch.container.navigator.ModelNavigator;
+import com.ibm.jbatch.container.status.ExtendedBatchStatus;
+import com.ibm.jbatch.container.status.ExecutionStatus;
+import com.ibm.jbatch.container.util.PartitionDataWrapper;
+import com.ibm.jbatch.jsl.model.Decision;
+import com.ibm.jbatch.jsl.model.End;
+import com.ibm.jbatch.jsl.model.Fail;
+import com.ibm.jbatch.jsl.model.Flow;
+import com.ibm.jbatch.jsl.model.JSLJob;
+import com.ibm.jbatch.jsl.model.Split;
+import com.ibm.jbatch.jsl.model.Step;
+import com.ibm.jbatch.jsl.model.Stop;
+
+public class ExecutionTransitioner {
+
+ private final static String CLASSNAME = ExecutionTransitioner.class.getName();
+ private final static Logger logger = Logger.getLogger(CLASSNAME);
+
+ private RuntimeJobExecution jobExecution;
+ private long rootJobExecutionId;
+ private ModelNavigator<?> modelNavigator;
+
+ // 'volatile' since it receives stop on separate thread.
+ private volatile IExecutionElementController currentStoppableElementController;
+ private IExecutionElementController previousElementController;
+ private ExecutionElement currentExecutionElement = null;
+ private ExecutionElement previousExecutionElement = null;
+
+
+ private JobContextImpl jobContext;
+ private BlockingQueue<PartitionDataWrapper> analyzerQueue = null;
+
+ private List<Long> stepExecIds;
+
+ public ExecutionTransitioner(RuntimeJobExecution jobExecution, long rootJobExecutionId, ModelNavigator<?> modelNavigator) {
+ this.jobExecution = jobExecution;
+ this.rootJobExecutionId = rootJobExecutionId;
+ this.modelNavigator = modelNavigator;
+ this.jobContext = jobExecution.getJobContext();
+ }
+
+ public ExecutionTransitioner(RuntimeJobExecution jobExecution, long rootJobExecutionId, ModelNavigator<JSLJob> jobNavigator, BlockingQueue<PartitionDataWrapper> analyzerQueue) {
+ this.jobExecution = jobExecution;
+ this.rootJobExecutionId = rootJobExecutionId;
+ this.modelNavigator = jobNavigator;
+ this.jobContext = jobExecution.getJobContext();
+ this.analyzerQueue = analyzerQueue;
+ }
+
+ /**
+ * Used for job and flow.
+ * @return
+ */
+ public ExecutionStatus doExecutionLoop() {
+
+ final String methodName = "doExecutionLoop";
+
+ try {
+ currentExecutionElement = modelNavigator.getFirstExecutionElement(jobExecution.getRestartOn());
+ } catch (IllegalTransitionException e) {
+ String errorMsg = "Could not transition to first execution element within job.";
+ logger.warning(errorMsg);
+ throw new IllegalArgumentException(errorMsg, e);
+ }
+
+ logger.fine("First execution element = " + currentExecutionElement.getId());
+
+ while (true) {
+
+ if (jobContext.getBatchStatus().equals(BatchStatus.STOPPING)) {
+ logger.fine(methodName + " Exiting execution loop as job is now in stopping state.");
+ return new ExecutionStatus(ExtendedBatchStatus.JOB_OPERATOR_STOPPING);
+ }
+
+ IExecutionElementController currentElementController = getNextElementController();
+ currentStoppableElementController = currentElementController;
+
+ ExecutionStatus status = currentElementController.execute();
+
+ // Nothing special for decision or step except to get exit status. For flow and split we want to bubble up though.
+ if ((currentExecutionElement instanceof Split) || (currentExecutionElement instanceof Flow)) {
+ // Exit status and restartOn should both be in the job context.
+ if (!status.getExtendedBatchStatus().equals(ExtendedBatchStatus.NORMAL_COMPLETION)) {
+ logger.fine("Breaking out of loop with return status = " + status.getExtendedBatchStatus().name());
+ return status;
+ }
+ }
+
+ // Seems like this should only happen if an Error is thrown at the step level, since normally a step-level
+ // exception is caught and the fact that it was thrown capture in the ExecutionStatus
+ if (jobContext.getBatchStatus().equals(BatchStatus.FAILED)) {
+ String errorMsg = "Sub-execution returned its own BatchStatus of FAILED. Deal with this by throwing exception to the next layer.";
+ logger.warning(errorMsg);
+ throw new BatchContainerRuntimeException(errorMsg);
+ }
+
+ // set the execution element controller to null so we don't try to call stop on it after the element has finished executing
+ currentStoppableElementController = null;
+
+ logger.fine("Done executing element=" + currentExecutionElement.getId() + ", exitStatus=" + status.getExitStatus());
+
+ if (jobContext.getBatchStatus().equals(BatchStatus.STOPPING)) {
+ logger.fine(methodName + " Exiting as job has been stopped");
+ return new ExecutionStatus(ExtendedBatchStatus.JOB_OPERATOR_STOPPING);
+ }
+
+ Transition nextTransition = null;
+ try {
+ nextTransition = modelNavigator.getNextTransition(currentExecutionElement, status);
+ } catch (IllegalTransitionException e) {
+ String errorMsg = "Problem transitioning to next execution element.";
+ logger.warning(errorMsg);
+ throw new BatchContainerRuntimeException(errorMsg, e);
+ }
+
+ //
+ // We will find ourselves in one of four states now.
+ //
+ // 1. Finished transitioning after a normal execution, but nothing to do 'next'.
+ // 2. We just executed a step which through an exception, but didn't match a transition element.
+ // 3. We are going to 'next' to another execution element (and jump back to the top of this '
+ // 'while'-loop.
+ // 4. We matched a terminating transition element (<end>, <stop> or <fail).
+ //
+
+ // 1.
+ if (nextTransition.isFinishedTransitioning()) {
+ logger.fine(methodName + "No next execution element, and no transition element found either. Looks like we're done and ready for COMPLETED state.");
+ this.stepExecIds = currentElementController.getLastRunStepExecutions();
+ // Consider just passing the last 'status' back, but let's unwrap the exit status and pass a new NORMAL_COMPLETION
+ // status back instead.
+ return new ExecutionStatus(ExtendedBatchStatus.NORMAL_COMPLETION, status.getExitStatus());
+ // 2.
+ } else if (nextTransition.noTransitionElementMatchedAfterException()) {
+ return new ExecutionStatus(ExtendedBatchStatus.EXCEPTION_THROWN, status.getExitStatus());
+ // 3.
+ } else if (nextTransition.getNextExecutionElement() != null) {
+ // hold on to the previous execution element for the decider
+ // we need it because we need to inject the context of the
+ // previous execution element into the decider
+ previousExecutionElement = currentExecutionElement;
+ previousElementController = currentElementController;
+ currentExecutionElement = nextTransition.getNextExecutionElement();
+ // 4.
+ } else if (nextTransition.getTransitionElement() != null) {
+ ExecutionStatus terminatingStatus = handleTerminatingTransitionElement(nextTransition.getTransitionElement());
+ logger.finer(methodName + " , Breaking out of execution loop after processing terminating transition element.");
+ return terminatingStatus;
+ } else {
+ throw new IllegalStateException("Not sure how we'd end up in this state...aborting rather than looping.");
+ }
+ }
+ }
+
+
+ private IExecutionElementController getNextElementController() {
+ IExecutionElementController elementController =null;
+
+ if (currentExecutionElement instanceof Decision) {
+ Decision decision = (Decision)currentExecutionElement;
+ elementController = ExecutionElementControllerFactory.getDecisionController(jobExecution, decision);
+ DecisionControllerImpl decisionController = (DecisionControllerImpl)elementController;
+ decisionController.setPreviousStepExecutions(previousExecutionElement, previousElementController);
+ } else if (currentExecutionElement instanceof Flow) {
+ Flow flow = (Flow)currentExecutionElement;
+ elementController = ExecutionElementControllerFactory.getFlowController(jobExecution, flow, rootJobExecutionId);
+ } else if (currentExecutionElement instanceof Split) {
+ Split split = (Split)currentExecutionElement;
+ elementController = ExecutionElementControllerFactory.getSplitController(jobExecution, split, rootJobExecutionId);
+ } else if (currentExecutionElement instanceof Step) {
+ Step step = (Step)currentExecutionElement;
+ StepContextImpl stepContext = new StepContextImpl(step.getId());
+ elementController = ExecutionElementControllerFactory.getStepController(jobExecution, step, stepContext, rootJobExecutionId, analyzerQueue);
+ }
+ logger.fine("Next execution element controller = " + elementController);
+ return elementController;
+ }
+
+
+ private ExecutionStatus handleTerminatingTransitionElement(TransitionElement transitionElement) {
+
+ ExecutionStatus retVal;
+
+ logger.fine("Found terminating transition element (stop, end, or fail).");
+
+ if (transitionElement instanceof Stop) {
+
+ Stop stopElement = (Stop)transitionElement;
+ String restartOn = stopElement.getRestart();
+ String exitStatusFromJSL = stopElement.getExitStatus();
+ logger.fine("Next transition element is a <stop> : " + transitionElement + " with restartOn=" + restartOn +
+ " , and JSL exit status = " + exitStatusFromJSL);
+
+ retVal = new ExecutionStatus(ExtendedBatchStatus.JSL_STOP);
+
+ if (exitStatusFromJSL != null) {
+ jobContext.setExitStatus(exitStatusFromJSL);
+ retVal.setExitStatus(exitStatusFromJSL);
+ }
+ if (restartOn != null) {
+ jobContext.setRestartOn(restartOn);
+ retVal.setRestartOn(restartOn);
+ }
+ } else if (transitionElement instanceof End) {
+
+ End endElement = (End)transitionElement;
+ String exitStatusFromJSL = endElement.getExitStatus();
+ logger.fine("Next transition element is an <end> : " + transitionElement +
+ " with JSL exit status = " + exitStatusFromJSL);
+ retVal = new ExecutionStatus(ExtendedBatchStatus.JSL_END);
+ if (exitStatusFromJSL != null) {
+ jobContext.setExitStatus(exitStatusFromJSL);
+ retVal.setExitStatus(exitStatusFromJSL);
+ }
+ } else if (transitionElement instanceof Fail) {
+
+ Fail failElement = (Fail)transitionElement;
+ String exitStatusFromJSL = failElement.getExitStatus();
+ logger.fine("Next transition element is a <fail> : " + transitionElement +
+ " with JSL exit status = " + exitStatusFromJSL);
+ retVal = new ExecutionStatus(ExtendedBatchStatus.JSL_FAIL);
+ if (exitStatusFromJSL != null) {
+ jobContext.setExitStatus(exitStatusFromJSL);
+ retVal.setExitStatus(exitStatusFromJSL);
+ }
+ } else {
+ throw new IllegalStateException("Not sure how we'd get here...aborting.");
+ }
+ return retVal;
+ }
+
+ public IController getCurrentStoppableElementController() {
+ return currentStoppableElementController;
+ }
+
+ public List<Long> getStepExecIds() {
+ return stepExecIds;
+ }
+
+
+}
\ No newline at end of file