You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@batchee.apache.org by rm...@apache.org on 2013/11/05 08:39:30 UTC
[53/62] importing batchee from github - a fork from the IBm RI
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ChunkStepControllerImpl.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ChunkStepControllerImpl.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ChunkStepControllerImpl.java
deleted file mode 100755
index 34dcffc..0000000
--- a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ChunkStepControllerImpl.java
+++ /dev/null
@@ -1,1045 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- *
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License,
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.jbatch.container.impl;
-
-import java.io.ByteArrayInputStream;
-import java.io.PrintWriter;
-import java.io.Serializable;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.batch.api.chunk.CheckpointAlgorithm;
-import javax.batch.runtime.BatchStatus;
-
-import com.ibm.jbatch.container.artifact.proxy.CheckpointAlgorithmProxy;
-import com.ibm.jbatch.container.artifact.proxy.ChunkListenerProxy;
-import com.ibm.jbatch.container.artifact.proxy.InjectionReferences;
-import com.ibm.jbatch.container.artifact.proxy.ItemProcessListenerProxy;
-import com.ibm.jbatch.container.artifact.proxy.ItemProcessorProxy;
-import com.ibm.jbatch.container.artifact.proxy.ItemReadListenerProxy;
-import com.ibm.jbatch.container.artifact.proxy.ItemReaderProxy;
-import com.ibm.jbatch.container.artifact.proxy.ItemWriteListenerProxy;
-import com.ibm.jbatch.container.artifact.proxy.ItemWriterProxy;
-import com.ibm.jbatch.container.artifact.proxy.ProxyFactory;
-import com.ibm.jbatch.container.artifact.proxy.RetryProcessListenerProxy;
-import com.ibm.jbatch.container.artifact.proxy.RetryReadListenerProxy;
-import com.ibm.jbatch.container.artifact.proxy.RetryWriteListenerProxy;
-import com.ibm.jbatch.container.artifact.proxy.SkipProcessListenerProxy;
-import com.ibm.jbatch.container.artifact.proxy.SkipReadListenerProxy;
-import com.ibm.jbatch.container.artifact.proxy.SkipWriteListenerProxy;
-import com.ibm.jbatch.container.context.impl.MetricImpl;
-import com.ibm.jbatch.container.context.impl.StepContextImpl;
-import com.ibm.jbatch.container.exception.BatchContainerRuntimeException;
-import com.ibm.jbatch.container.exception.BatchContainerServiceException;
-import com.ibm.jbatch.container.jobinstance.RuntimeJobExecution;
-import com.ibm.jbatch.container.persistence.CheckpointAlgorithmFactory;
-import com.ibm.jbatch.container.persistence.CheckpointData;
-import com.ibm.jbatch.container.persistence.CheckpointDataKey;
-import com.ibm.jbatch.container.persistence.CheckpointManager;
-import com.ibm.jbatch.container.persistence.ItemCheckpointAlgorithm;
-import com.ibm.jbatch.container.services.IPersistenceManagerService;
-import com.ibm.jbatch.container.servicesmanager.ServicesManager;
-import com.ibm.jbatch.container.servicesmanager.ServicesManagerImpl;
-import com.ibm.jbatch.container.util.PartitionDataWrapper;
-import com.ibm.jbatch.container.util.TCCLObjectInputStream;
-import com.ibm.jbatch.container.validation.ArtifactValidationException;
-import com.ibm.jbatch.jsl.model.Chunk;
-import com.ibm.jbatch.jsl.model.ItemProcessor;
-import com.ibm.jbatch.jsl.model.ItemReader;
-import com.ibm.jbatch.jsl.model.ItemWriter;
-import com.ibm.jbatch.jsl.model.Property;
-import com.ibm.jbatch.jsl.model.Step;
-
-public class ChunkStepControllerImpl extends SingleThreadedStepControllerImpl {
-
- private final static String sourceClass = ChunkStepControllerImpl.class.getName();
- private final static Logger logger = Logger.getLogger(sourceClass);
-
- private Chunk chunk = null;
- private ItemReaderProxy readerProxy = null;
- private ItemProcessorProxy processorProxy = null;
- private ItemWriterProxy writerProxy = null;
- private CheckpointAlgorithmProxy checkpointProxy = null;
- private CheckpointAlgorithm chkptAlg = null;
- private CheckpointManager checkpointManager;
- private ServicesManager servicesManager = ServicesManagerImpl.getInstance();
- private IPersistenceManagerService _persistenceManagerService = null;
- private SkipHandler skipHandler = null;
- CheckpointDataKey readerChkptDK, writerChkptDK = null;
- CheckpointData readerChkptData = null;
- CheckpointData writerChkptData = null;
- List<ChunkListenerProxy> chunkListeners = null;
- List<SkipProcessListenerProxy> skipProcessListeners = null;
- List<SkipReadListenerProxy> skipReadListeners = null;
- List<SkipWriteListenerProxy> skipWriteListeners = null;
- List<RetryProcessListenerProxy> retryProcessListeners = null;
- List<RetryReadListenerProxy> retryReadListeners = null;
- List<RetryWriteListenerProxy> retryWriteListeners = null;
- List<ItemReadListenerProxy> itemReadListeners = null;
- List<ItemProcessListenerProxy> itemProcessListeners = null;
- List<ItemWriteListenerProxy> itemWriteListeners = null;
- private RetryHandler retryHandler;
-
- // metrics
- long readCount = 0;
- long writeCount = 0;
- long readSkipCount = 0;
- long processSkipCount = 0;
- long writeSkipCount = 0;
- boolean rollbackRetry = false;
-
- public ChunkStepControllerImpl(RuntimeJobExecution jobExecutionImpl, Step step, StepContextImpl stepContext, long rootJobExecutionId, BlockingQueue<PartitionDataWrapper> analyzerStatusQueue) {
- super(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerStatusQueue);
- }
-
- /**
- * Utility Class to hold statuses at each level of Read-Process-Write loop
- *
- */
- private class ItemStatus {
-
- public boolean isSkipped() {
- return skipped;
- }
-
- public void setSkipped(boolean skipped) {
- this.skipped = skipped;
- }
-
- public boolean isFiltered() {
- return filtered;
- }
-
- public void setFiltered(boolean filtered) {
- this.filtered = filtered;
- }
-
- public boolean isCheckPointed() {
- return checkPointed;
- }
-
- public void setCheckPointed(boolean checkPointed) {
- this.checkPointed = checkPointed;
- }
-
- public boolean isFinished() {
- return finished;
- }
-
- public void setFinished(boolean finished) {
- this.finished = finished;
- }
-
- public boolean isRetry() {
- return retry;
- }
-
- public void setRetry(boolean retry) {
- this.retry = retry;
- }
-
- public boolean isRollback() {
- return rollback;
- }
-
- public void setRollback(boolean rollback) {
- this.rollback = rollback;
- }
-
- private boolean skipped = false;
- private boolean filtered = false;
- private boolean finished = false;
- private boolean checkPointed = false;
- private boolean retry = false;
- private boolean rollback = false;
-
- }
-
- /**
- * We read and process one item at a time but write in chunks (group of
- * items). So, this method loops until we either reached the end of the
- * reader (not more items to read), or the writer buffer is full or a
- * checkpoint is triggered.
- *
- * @param chunkSize
- * write buffer size
- * @param theStatus
- * flags when the read-process reached the last record or a
- * checkpoint is required
- * @return an array list of objects to write
- */
- private List<Object> readAndProcess(int chunkSize, ItemStatus theStatus) {
- logger.entering(sourceClass, "readAndProcess", new Object[] { chunkSize, theStatus });
-
- List<Object> chunkToWrite = new ArrayList<Object>();
- Object itemRead = null;
- Object itemProcessed = null;
- int readProcessedCount = 0;
-
- while (true) {
- ItemStatus status = new ItemStatus();
- itemRead = readItem(status);
-
- if (status.isRollback()) {
- theStatus.setRollback(true);
- // inc rollbackCount
- stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
- break;
- }
-
- if (!status.isSkipped() && !status.isFinished()) {
- itemProcessed = processItem(itemRead, status);
-
- if (status.isRollback()) {
- theStatus.setRollback(true);
- // inc rollbackCount
- stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
- break;
- }
-
- if (!status.isSkipped() && !status.isFiltered()) {
- chunkToWrite.add(itemProcessed);
- readProcessedCount++;
- }
- }
-
- theStatus.setFinished(status.isFinished());
- theStatus.setCheckPointed(checkpointManager.ApplyCheckPointPolicy());
-
- // This will force the current item to finish processing on a stop
- // request
- if (stepContext.getBatchStatus().equals(BatchStatus.STOPPING)) {
- theStatus.setFinished(true);
- }
-
- // write buffer size reached
- if ((readProcessedCount == chunkSize) && (checkpointProxy.getCheckpointType() != "custom")) {
- break;
- }
-
- // checkpoint reached
- if (theStatus.isCheckPointed()) {
- break;
- }
-
- // last record in readerProxy reached
- if (theStatus.isFinished()) {
- break;
- }
-
- }
- logger.exiting(sourceClass, "readAndProcess", chunkToWrite);
- return chunkToWrite;
- }
-
- /**
- * Reads an item from the reader
- *
- * @param status
- * flags the current read status
- * @return the item read
- */
- private Object readItem(ItemStatus status) {
- logger.entering(sourceClass, "readItem", status);
- Object itemRead = null;
-
- try {
- // call read listeners before and after the actual read
- for (ItemReadListenerProxy readListenerProxy : itemReadListeners) {
- readListenerProxy.beforeRead();
- }
-
- itemRead = readerProxy.readItem();
-
- for (ItemReadListenerProxy readListenerProxy : itemReadListeners) {
- readListenerProxy.afterRead(itemRead);
- }
-
- // itemRead == null means we reached the end of
- // the readerProxy "resultset"
- status.setFinished(itemRead == null);
- if (!status.isFinished()) {
- stepContext.getMetric(MetricImpl.MetricType.READ_COUNT).incValue();
- }
- } catch (Exception e) {
- stepContext.setException(e);
- for (ItemReadListenerProxy readListenerProxy : itemReadListeners) {
- readListenerProxy.onReadError(e);
- }
- if(!rollbackRetry) {
- if (retryReadException(e)) {
- for (ItemReadListenerProxy readListenerProxy : itemReadListeners) {
- readListenerProxy.onReadError(e);
- }
- // if not a rollback exception, just retry the current item
- if (!retryHandler.isRollbackException(e)) {
- itemRead = readItem(status);
- } else {
- status.setRollback(true);
- rollbackRetry = true;
- // inc rollbackCount
- stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
- }
- }
- else if(skipReadException(e)) {
- status.setSkipped(true);
- stepContext.getMetric(MetricImpl.MetricType.READ_SKIP_COUNT).incValue();
-
- }
- else {
- throw new BatchContainerRuntimeException(e);
- }
- }
- else {
- // coming from a rollback retry
- if(skipReadException(e)) {
- status.setSkipped(true);
- stepContext.getMetric(MetricImpl.MetricType.READ_SKIP_COUNT).incValue();
-
- }
- else if (retryReadException(e)) {
- if (!retryHandler.isRollbackException(e)) {
- itemRead = readItem(status);
- }
- else {
- status.setRollback(true);
- // inc rollbackCount
- stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
- }
- }
- else {
- throw new BatchContainerRuntimeException(e);
- }
- }
-
- } catch (Throwable e) {
- throw new BatchContainerRuntimeException(e);
- }
-
- logger.exiting(sourceClass, "readItem", itemRead==null ? "<null>" : itemRead);
- return itemRead;
- }
-
- /**
- * Process an item previously read by the reader
- *
- * @param itemRead
- * the item read
- * @param status
- * flags the current process status
- * @return the processed item
- */
- private Object processItem(Object itemRead, ItemStatus status) {
- logger.entering(sourceClass, "processItem", new Object[] { itemRead, status });
- Object processedItem = null;
-
- // if no processor defined for this chunk
- if (processorProxy == null){
- return itemRead;
- }
-
- try {
-
- // call process listeners before and after the actual process call
- for (ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
- processListenerProxy.beforeProcess(itemRead);
- }
-
- processedItem = processorProxy.processItem(itemRead);
-
- if (processedItem == null) {
- // inc filterCount
- stepContext.getMetric(MetricImpl.MetricType.FILTER_COUNT).incValue();
- status.setFiltered(true);
- }
-
- for (ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
- processListenerProxy.afterProcess(itemRead, processedItem);
- }
- } catch (Exception e) {
- for (ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
- processListenerProxy.onProcessError(processedItem, e);
- }
- if(!rollbackRetry) {
- if (retryProcessException(e, itemRead)) {
- if (!retryHandler.isRollbackException(e)) {
- // call process listeners before and after the actual
- // process call
- for (ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
- processListenerProxy.beforeProcess(itemRead);
- }
- processedItem = processItem(itemRead, status);
- if (processedItem == null) {
- // inc filterCount
- stepContext.getMetric(MetricImpl.MetricType.FILTER_COUNT).incValue();
- status.setFiltered(true);
- }
-
- for (ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
- processListenerProxy.afterProcess(itemRead, processedItem);
- }
- } else {
- status.setRollback(true);
- rollbackRetry = true;
- // inc rollbackCount
- stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
- }
- }
- else if (skipProcessException(e, itemRead)) {
- status.setSkipped(true);
- stepContext.getMetric(MetricImpl.MetricType.PROCESS_SKIP_COUNT).incValue();
- }
- else {
- throw new BatchContainerRuntimeException(e);
- }
- }
- else {
- if (skipProcessException(e, itemRead)) {
- status.setSkipped(true);
- stepContext.getMetric(MetricImpl.MetricType.PROCESS_SKIP_COUNT).incValue();
- } else if (retryProcessException(e, itemRead)) {
- if (!retryHandler.isRollbackException(e)) {
- // call process listeners before and after the actual
- // process call
- for (ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
- processListenerProxy.beforeProcess(itemRead);
- }
- processedItem = processItem(itemRead, status);
- if (processedItem == null) {
- // inc filterCount
- stepContext.getMetric(MetricImpl.MetricType.FILTER_COUNT).incValue();
- status.setFiltered(true);
- }
-
- for (ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
- processListenerProxy.afterProcess(itemRead, processedItem);
- }
- } else {
- status.setRollback(true);
- rollbackRetry = true;
- // inc rollbackCount
- stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
- }
- } else {
- throw new BatchContainerRuntimeException(e);
- }
- }
-
- } catch (Throwable e) {
- throw new BatchContainerRuntimeException(e);
- }
-
- logger.exiting(sourceClass, "processItem", processedItem==null ? "<null>" : processedItem);
- return processedItem;
- }
-
- /**
- * Writes items
- *
- * @param theChunk
- * the array list with all items processed ready to be written
- */
- private void writeChunk(List<Object> theChunk, ItemStatus status) {
- logger.entering(sourceClass, "writeChunk", theChunk);
- if (!theChunk.isEmpty()) {
- try {
-
- // call read listeners before and after the actual read
- for (ItemWriteListenerProxy writeListenerProxy : itemWriteListeners) {
- writeListenerProxy.beforeWrite(theChunk);
- }
-
- writerProxy.writeItems(theChunk);
-
- for (ItemWriteListenerProxy writeListenerProxy : itemWriteListeners) {
- writeListenerProxy.afterWrite(theChunk);
- }
- stepContext.getMetric(MetricImpl.MetricType.WRITE_COUNT).incValueBy(theChunk.size());
- } catch (Exception e) {
- this.stepContext.setException(e);
- for (ItemWriteListenerProxy writeListenerProxy : itemWriteListeners) {
- writeListenerProxy.onWriteError(theChunk, e);
- }
- if(!rollbackRetry)
- {
- if (retryWriteException(e, theChunk)) {
- if (!retryHandler.isRollbackException(e)) {
- writeChunk(theChunk, status);
- } else {
- rollbackRetry = true;
- status.setRollback(true);
- // inc rollbackCount
- stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
- }
- } else if (skipWriteException(e, theChunk)) {
- stepContext.getMetric(MetricImpl.MetricType.WRITE_SKIP_COUNT).incValueBy(1);
- } else {
- throw new BatchContainerRuntimeException(e);
- }
-
- }
- else {
- if (skipWriteException(e, theChunk)) {
- stepContext.getMetric(MetricImpl.MetricType.WRITE_SKIP_COUNT).incValueBy(1);
- } else if (retryWriteException(e, theChunk)) {
- if (!retryHandler.isRollbackException(e)) {
- status.setRetry(true);
- writeChunk(theChunk, status);
- } else {
- rollbackRetry = true;
- status.setRollback(true);
- // inc rollbackCount
- stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
- }
- } else {
- throw new BatchContainerRuntimeException(e);
- }
- }
-
- } catch (Throwable e) {
- throw new BatchContainerRuntimeException(e);
- }
- }
- logger.exiting(sourceClass, "writeChunk");
- }
-
- /**
- * Main Read-Process-Write loop
- *
- * @throws Exception
- */
- private void invokeChunk() {
- logger.entering(sourceClass, "invokeChunk2");
-
- int itemCount = ChunkHelper.getItemCount(chunk);
- int timeInterval = ChunkHelper.getTimeLimit(chunk);
- List<Object> chunkToWrite = new ArrayList<Object>();
- boolean checkPointed = true;
- boolean rollback = false;
- Throwable caughtThrowable = null;
-
- // begin new transaction at first iteration or after a checkpoint commit
-
- try {
- transactionManager.begin();
- this.openReaderAndWriter();
- transactionManager.commit();
-
- while (true) {
-
- if (checkPointed || rollback) {
- if (this.checkpointProxy.getCheckpointType() == "custom" ){
- int newtimeOut = this.checkpointManager.checkpointTimeout();
- transactionManager.setTransactionTimeout(newtimeOut);
- }
- transactionManager.begin();
- for (ChunkListenerProxy chunkProxy : chunkListeners) {
- chunkProxy.beforeChunk();
- }
-
- if (rollback) {
- positionReaderAtCheckpoint();
- positionWriterAtCheckpoint();
- checkpointManager = new CheckpointManager(readerProxy, writerProxy,
- getCheckpointAlgorithm(itemCount, timeInterval), jobExecutionImpl.getExecutionId(), jobExecutionImpl
- .getJobInstance().getInstanceId(), step.getId());
- }
- }
-
- ItemStatus status = new ItemStatus();
-
- if (rollback) {
- rollback = false;
- }
-
- chunkToWrite = readAndProcess(itemCount, status);
-
- if (status.isRollback()) {
- itemCount = 1;
- rollback = true;
-
- readerProxy.close();
- writerProxy.close();
-
- transactionManager.rollback();
-
- continue;
- }
-
- writeChunk(chunkToWrite, status);
-
- if (status.isRollback()) {
- itemCount = 1;
- rollback = true;
-
- readerProxy.close();
- writerProxy.close();
-
- transactionManager.rollback();
-
- continue;
- }
- checkPointed = status.isCheckPointed();
-
- // we could finish the chunk in 3 conditions: buffer is full,
- // checkpoint, not more input
- if (status.isCheckPointed() || status.isFinished()) {
- // TODO: missing before checkpoint listeners
- // 1.- check if spec list proper steps for before checkpoint
- // 2.- ask Andy about retry
- // 3.- when do we stop?
-
- checkpointManager.checkpoint();
-
- for (ChunkListenerProxy chunkProxy : chunkListeners) {
- chunkProxy.afterChunk();
- }
-
- this.persistUserData();
-
- this.chkptAlg.beginCheckpoint();
-
- transactionManager.commit();
-
- this.chkptAlg.endCheckpoint();
-
- invokeCollectorIfPresent();
-
- // exit loop when last record is written
- if (status.isFinished()) {
- transactionManager.begin();
-
- readerProxy.close();
- writerProxy.close();
-
- transactionManager.commit();
- // increment commitCount
- stepContext.getMetric(MetricImpl.MetricType.COMMIT_COUNT).incValue();
- break;
- } else {
- // increment commitCount
- stepContext.getMetric(MetricImpl.MetricType.COMMIT_COUNT).incValue();
- }
-
- }
-
- }
- } catch (Exception e) {
- caughtThrowable = e;
- logger.log(Level.SEVERE, "Failure in Read-Process-Write Loop", e);
- // Only try to call onError() if we have an Exception, but not an Error.
- for (ChunkListenerProxy chunkProxy : chunkListeners) {
- try {
- chunkProxy.onError(e);
- } catch (Exception e1) {
- StringWriter sw = new StringWriter();
- PrintWriter pw = new PrintWriter(sw);
- e1.printStackTrace(pw);
- logger.warning("Caught secondary exception when calling chunk listener onError() with stack trace: " + sw.toString() + "\n. Will continue to remaining chunk listeners (if any) and rethrow wrapping the primary exception.");
- }
- }
- } catch (Throwable t) {
- caughtThrowable = t;
- logger.log(Level.SEVERE, "Failure in Read-Process-Write Loop", t);
- } finally {
- if (caughtThrowable != null) {
- transactionManager.setRollbackOnly();
- logger.warning("Caught throwable in chunk processing. Attempting to close all readers and writers.");
- readerProxy.close();
- writerProxy.close();
- transactionManager.rollback();
- logger.exiting(sourceClass, "invokeChunk");
- throw new BatchContainerRuntimeException("Failure in Read-Process-Write Loop", caughtThrowable);
- } else {
- logger.finest("Exiting normally");
- logger.exiting(sourceClass, "invokeChunk");
- }
- }
- }
-
- protected void invokeCoreStep() throws BatchContainerServiceException {
-
- this.chunk = step.getChunk();
-
- initializeChunkArtifacts();
-
- invokeChunk();
- }
-
- private CheckpointAlgorithm getCheckpointAlgorithm(int itemCount, int timeInterval) {
- CheckpointAlgorithm alg = null;
-
- if (checkpointProxy.getCheckpointType() == "item") {
- alg = new ItemCheckpointAlgorithm();
- ((ItemCheckpointAlgorithm) alg).setThresholds(itemCount, timeInterval);
- } else { // custom chkpt alg
- alg = (CheckpointAlgorithm) checkpointProxy;
- }
-
- return alg;
- }
-
- /*
- * Initialize itemreader, itemwriter, and item processor checkpoint
- */
- private void initializeChunkArtifacts() {
- String sourceMethod = "initializeChunkArtifacts";
- if (logger.isLoggable(Level.FINE))
- logger.entering(sourceClass, sourceMethod);
-
- int itemCount = ChunkHelper.getItemCount(chunk);
- int timeInterval = ChunkHelper.getTimeLimit(chunk);
- String checkpointPolicy = ChunkHelper.getCheckpointPolicy(chunk);
-
- ItemReader itemReader = chunk.getReader();
- List<Property> itemReaderProps = itemReader.getProperties() == null ? null : itemReader.getProperties().getPropertyList();
- try {
- InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext,
- itemReaderProps);
-
- readerProxy = ProxyFactory.createItemReaderProxy(itemReader.getRef(), injectionRef, stepContext);
-
- if (logger.isLoggable(Level.FINE)) {
- logger.fine("Created ItemReaderProxy for " + itemReader.getRef());
- }
- } catch (ArtifactValidationException e) {
- throw new BatchContainerServiceException("Cannot create the ItemReader [" + itemReader.getRef() + "]", e);
- }
-
- ItemProcessor itemProcessor = chunk.getProcessor();
- if (itemProcessor != null){
- List<Property> itemProcessorProps = itemProcessor.getProperties() == null ? null : itemProcessor.getProperties().getPropertyList();
- try {
-
- InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext,
- itemProcessorProps);
-
- processorProxy = ProxyFactory.createItemProcessorProxy(itemProcessor.getRef(), injectionRef, stepContext);
- if (logger.isLoggable(Level.FINE)) {
- logger.fine("Created ItemProcessorProxy for " + itemProcessor.getRef());
- }
- } catch (ArtifactValidationException e) {
- throw new BatchContainerServiceException("Cannot create the ItemProcessor [" + itemProcessor.getRef() + "]", e);
- }
- }
-
- ItemWriter itemWriter = chunk.getWriter();
- List<Property> itemWriterProps = itemWriter.getProperties() == null ? null : itemWriter.getProperties().getPropertyList();
- try {
- InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext,
- itemWriterProps);
-
- writerProxy = ProxyFactory.createItemWriterProxy(itemWriter.getRef(), injectionRef, stepContext);
- if (logger.isLoggable(Level.FINE)) {
- logger.fine("Created ItemWriterProxy for " + itemWriter.getRef());
- }
- } catch (ArtifactValidationException e) {
- throw new BatchContainerServiceException("Cannot create the ItemWriter [" + itemWriter.getRef() + "]", e);
- }
-
- try {
- List<Property> propList = null;
-
- if (chunk.getCheckpointAlgorithm() != null) {
-
- propList = (chunk.getCheckpointAlgorithm().getProperties() == null) ? null : chunk.getCheckpointAlgorithm().getProperties()
- .getPropertyList();
- }
-
- InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext,
- propList);
-
- checkpointProxy = CheckpointAlgorithmFactory.getCheckpointAlgorithmProxy(step, injectionRef, stepContext);
- if (logger.isLoggable(Level.FINE)) {
- logger.fine("Created CheckpointAlgorithmProxy for policy [" + checkpointPolicy + "]");
- }
- } catch (ArtifactValidationException e) {
- throw new BatchContainerServiceException("Cannot create the CheckpointAlgorithm for policy [" + chunk.getCheckpointPolicy()
- + "]", e);
- }
-
- InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext,
- null);
-
- this.chunkListeners = jobExecutionImpl.getListenerFactory().getChunkListeners(step, injectionRef, stepContext);
- this.skipProcessListeners = jobExecutionImpl.getListenerFactory().getSkipProcessListeners(step, injectionRef, stepContext);
- this.skipReadListeners = jobExecutionImpl.getListenerFactory().getSkipReadListeners(step, injectionRef, stepContext);
- this.skipWriteListeners = jobExecutionImpl.getListenerFactory().getSkipWriteListeners(step, injectionRef, stepContext);
- this.retryProcessListeners = jobExecutionImpl.getListenerFactory().getRetryProcessListeners(step, injectionRef, stepContext);
- this.retryReadListeners = jobExecutionImpl.getListenerFactory().getRetryReadListeners(step, injectionRef, stepContext);
- this.retryWriteListeners = jobExecutionImpl.getListenerFactory().getRetryWriteListeners(step, injectionRef, stepContext);
- this.itemReadListeners = jobExecutionImpl.getListenerFactory().getItemReadListeners(step, injectionRef, stepContext);
- this.itemProcessListeners = jobExecutionImpl.getListenerFactory().getItemProcessListeners(step, injectionRef, stepContext);
- this.itemWriteListeners = jobExecutionImpl.getListenerFactory().getItemWriteListeners(step, injectionRef, stepContext);
-
- if (checkpointProxy.getCheckpointType() == "item") {
- chkptAlg = new ItemCheckpointAlgorithm();
- ((ItemCheckpointAlgorithm) chkptAlg).setThresholds(itemCount, timeInterval);
- } else { // custom chkpt alg
- chkptAlg = (CheckpointAlgorithm) checkpointProxy;
- }
-
- if (logger.isLoggable(Level.FINE)) {
- logger.fine("Setting contexts for chunk artifacts");
- }
-
- if (logger.isLoggable(Level.FINE))
- logger.fine("Initialize checkpoint manager with item-count=" + itemCount);
- logger.fine("Initialize checkpoint manager with time-interval=" + timeInterval);
-
- checkpointManager = new CheckpointManager(readerProxy, writerProxy, chkptAlg, jobExecutionImpl.getExecutionId(), jobExecutionImpl
- .getJobInstance().getInstanceId(), step.getId());
-
- skipHandler = new SkipHandler(chunk, jobExecutionImpl.getJobInstance().getInstanceId(), step.getId());
- skipHandler.addSkipProcessListener(skipProcessListeners);
- skipHandler.addSkipReadListener(skipReadListeners);
- skipHandler.addSkipWriteListener(skipWriteListeners);
-
- retryHandler = new RetryHandler(chunk, jobExecutionImpl.getJobInstance().getInstanceId(), step.getId());
-
- retryHandler.addRetryProcessListener(retryProcessListeners);
- retryHandler.addRetryReadListener(retryReadListeners);
- retryHandler.addRetryWriteListener(retryWriteListeners);
-
- if (logger.isLoggable(Level.FINE))
- logger.exiting(sourceClass, sourceMethod);
- }
-
- private void openReaderAndWriter() {
- String sourceMethod = "openReaderAndWriter";
-
- if (logger.isLoggable(Level.FINE))
- logger.entering(sourceClass, sourceMethod);
-
- _persistenceManagerService = servicesManager.getPersistenceManagerService();
- readerChkptDK = new CheckpointDataKey(jobExecutionImpl.getJobInstance().getInstanceId(), step.getId(), "READER");
- CheckpointData readerChkptData = _persistenceManagerService.getCheckpointData(readerChkptDK);
- try {
-
- // check for data in backing store
- if (readerChkptData != null) {
-
- byte[] readertoken = readerChkptData.getRestartToken();
- ByteArrayInputStream readerChkptBA = new ByteArrayInputStream(readertoken);
- TCCLObjectInputStream readerOIS = null;
- try {
- readerOIS = new TCCLObjectInputStream(readerChkptBA);
- readerProxy.open((Serializable) readerOIS.readObject());
- readerOIS.close();
- } catch (Exception ex) {
- // is this what I should be throwing here?
- throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + step.getId() + "]", ex);
- }
- } else {
- // no chkpt data exists in the backing store
- readerChkptData = null;
- readerProxy.open(null);
- }
- } catch (ClassCastException e) {
- logger.warning("Expected CheckpointData but found" + readerChkptData );
- throw new IllegalStateException("Expected CheckpointData but found" + readerChkptData );
- }
-
- writerChkptDK = new CheckpointDataKey(jobExecutionImpl.getJobInstance().getInstanceId(), step.getId(), "WRITER");
- CheckpointData writerChkptData = _persistenceManagerService.getCheckpointData(writerChkptDK);
-
- try {
- // check for data in backing store
- if (writerChkptData != null) {
- byte[] writertoken = writerChkptData.getRestartToken();
- ByteArrayInputStream writerChkptBA = new ByteArrayInputStream(writertoken);
- TCCLObjectInputStream writerOIS = null;
- try {
- writerOIS = new TCCLObjectInputStream(writerChkptBA);
- writerProxy.open((Serializable) writerOIS.readObject());
- writerOIS.close();
- } catch (Exception ex) {
- // is this what I should be throwing here?
- throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + step.getId() + "]", ex);
- }
- } else {
- // no chkpt data exists in the backing store
- writerChkptData = null;
- writerProxy.open(null);
- }
- } catch (ClassCastException e) {
- logger.warning("Expected Checkpoint but found" + writerChkptData);
- throw new IllegalStateException("Expected Checkpoint but found" + writerChkptData);
- }
-
- // set up metrics
- // stepContext.addMetric(MetricImpl.Counter.valueOf("READ_COUNT"), 0);
- // stepContext.addMetric(MetricImpl.Counter.valueOf("WRITE_COUNT"), 0);
- // stepContext.addMetric(MetricImpl.Counter.valueOf("READ_SKIP_COUNT"),
- // 0);
- // stepContext.addMetric(MetricImpl.Counter.valueOf("PROCESS_SKIP_COUNT"),
- // 0);
- // stepContext.addMetric(MetricImpl.Counter.valueOf("WRITE_SKIP_COUNT"),
- // 0);
-
- if (logger.isLoggable(Level.FINE))
- logger.exiting(sourceClass, sourceMethod);
- }
-
- @Override
- public void stop() {
- stepContext.setBatchStatus(BatchStatus.STOPPING);
-
- // we don't need to call stop on the chunk implementation here since a
- // chunk always returns control to
- // the batch container after every item.
-
- }
-
- boolean skipReadException(Exception e) {
-
- try {
- skipHandler.handleExceptionRead(e);
- } catch (BatchContainerRuntimeException bcre) {
- return false;
- }
-
- return true;
-
- }
-
- boolean retryReadException(Exception e) {
-
- try {
- retryHandler.handleExceptionRead(e);
- } catch (BatchContainerRuntimeException bcre) {
- return false;
- }
-
- return true;
-
- }
-
- boolean skipProcessException(Exception e, Object record) {
-
- try {
- skipHandler.handleExceptionWithRecordProcess(e, record);
- } catch (BatchContainerRuntimeException bcre) {
- return false;
- }
-
- return true;
-
- }
-
- boolean retryProcessException(Exception e, Object record) {
-
- try {
- retryHandler.handleExceptionProcess(e, record);
- } catch (BatchContainerRuntimeException bcre) {
- return false;
- }
-
- return true;
-
- }
-
- boolean skipWriteException(Exception e, List<Object> chunkToWrite) {
-
-
-
- try {
- skipHandler.handleExceptionWithRecordListWrite(e, chunkToWrite);
- } catch (BatchContainerRuntimeException bcre) {
- return false;
- }
-
-
- return true;
-
- }
-
- boolean retryWriteException(Exception e, List<Object> chunkToWrite) {
-
- try {
- retryHandler.handleExceptionWrite(e, chunkToWrite);
- } catch (BatchContainerRuntimeException bcre) {
- return false;
- }
-
- return true;
-
- }
-
- private void positionReaderAtCheckpoint() {
- _persistenceManagerService = servicesManager.getPersistenceManagerService();
- readerChkptDK = new CheckpointDataKey(jobExecutionImpl.getJobInstance().getInstanceId(), step.getId(), "READER");
-
- CheckpointData readerData = _persistenceManagerService.getCheckpointData(readerChkptDK);
- try {
- // check for data in backing store
- if (readerData != null) {
- byte[] readertoken = readerData.getRestartToken();
- ByteArrayInputStream readerChkptBA = new ByteArrayInputStream(readertoken);
- TCCLObjectInputStream readerOIS = null;
- try {
- readerOIS = new TCCLObjectInputStream(readerChkptBA);
- readerProxy.open((Serializable) readerOIS.readObject());
- readerOIS.close();
- } catch (Exception ex) {
- // is this what I should be throwing here?
- throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + step.getId() + "]", ex);
- }
- } else {
- // no chkpt data exists in the backing store
- readerData = null;
- readerProxy.open(null);
- }
- } catch (ClassCastException e) {
- throw new IllegalStateException("Expected CheckpointData but found" + readerData);
- }
- }
-
- private void positionWriterAtCheckpoint() {
- _persistenceManagerService = servicesManager.getPersistenceManagerService();
- writerChkptDK = new CheckpointDataKey(jobExecutionImpl.getJobInstance().getInstanceId(), step.getId(), "WRITER");
-
- CheckpointData writerData = _persistenceManagerService.getCheckpointData(writerChkptDK);
-
- try {
- // check for data in backing store
- if (writerData != null) {
- byte[] writertoken = writerData.getRestartToken();
- ByteArrayInputStream writerChkptBA = new ByteArrayInputStream(writertoken);
- TCCLObjectInputStream writerOIS = null;
- try {
- writerOIS = new TCCLObjectInputStream(writerChkptBA);
- writerProxy.open((Serializable) writerOIS.readObject());
- writerOIS.close();
- } catch (Exception ex) {
- // is this what I should be throwing here?
- throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + step.getId() + "]", ex);
- }
- } else {
- // no chkpt data exists in the backing store
- writerData = null;
- writerProxy.open(null);
- }
- } catch (ClassCastException e) {
- throw new IllegalStateException("Expected CheckpointData but found" + writerData);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/JSR352.Runtime/src/com/ibm/jbatch/container/impl/DecisionControllerImpl.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/DecisionControllerImpl.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/DecisionControllerImpl.java
deleted file mode 100755
index 0595c48..0000000
--- a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/DecisionControllerImpl.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- *
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License,
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.jbatch.container.impl;
-
-import java.util.List;
-import java.util.logging.Logger;
-
-import javax.batch.runtime.StepExecution;
-
-import com.ibm.jbatch.container.IExecutionElementController;
-import com.ibm.jbatch.container.artifact.proxy.DeciderProxy;
-import com.ibm.jbatch.container.artifact.proxy.InjectionReferences;
-import com.ibm.jbatch.container.artifact.proxy.ProxyFactory;
-import com.ibm.jbatch.container.exception.BatchContainerRuntimeException;
-import com.ibm.jbatch.container.exception.BatchContainerServiceException;
-import com.ibm.jbatch.container.jobinstance.RuntimeJobExecution;
-import com.ibm.jbatch.container.jsl.ExecutionElement;
-import com.ibm.jbatch.container.services.IPersistenceManagerService;
-import com.ibm.jbatch.container.servicesmanager.ServicesManagerImpl;
-import com.ibm.jbatch.container.status.ExecutionStatus;
-import com.ibm.jbatch.container.status.ExtendedBatchStatus;
-import com.ibm.jbatch.container.validation.ArtifactValidationException;
-import com.ibm.jbatch.jsl.model.Decision;
-import com.ibm.jbatch.jsl.model.Property;
-
-public class DecisionControllerImpl implements IExecutionElementController {
-
- private final static String sourceClass = SplitControllerImpl.class.getName();
- private final static Logger logger = Logger.getLogger(sourceClass);
-
- private RuntimeJobExecution jobExecution;
-
- private Decision decision;
-
- private StepExecution[] previousStepExecutions = null;
-
- private IPersistenceManagerService persistenceService = null;
-
- public DecisionControllerImpl(RuntimeJobExecution jobExecution, Decision decision) {
- this.jobExecution = jobExecution;
- this.decision = decision;
- persistenceService = ServicesManagerImpl.getInstance().getPersistenceManagerService();
- }
-
- @Override
- public ExecutionStatus execute() {
-
- String deciderId = decision.getRef();
- List<Property> propList = (decision.getProperties() == null) ? null : decision.getProperties().getPropertyList();
-
- DeciderProxy deciderProxy;
-
- //Create a decider proxy and inject the associated properties
-
- /* Set the contexts associated with this scope */
- //job context is always in scope
- //the parent controller will only pass one valid context to a decision controller
- //so two of these contexts will always be null
- InjectionReferences injectionRef = new InjectionReferences(jobExecution.getJobContext(), null, propList);
-
- try {
- deciderProxy = ProxyFactory.createDeciderProxy(deciderId,injectionRef );
- } catch (ArtifactValidationException e) {
- throw new BatchContainerServiceException("Cannot create the decider [" + deciderId + "]", e);
- }
-
- String exitStatus = deciderProxy.decide(this.previousStepExecutions);
-
- logger.fine("Decider exiting and setting job-level exit status to " + exitStatus);
-
- //Set the value returned from the decider as the job context exit status.
- this.jobExecution.getJobContext().setExitStatus(exitStatus);
-
- return new ExecutionStatus(ExtendedBatchStatus.NORMAL_COMPLETION, exitStatus);
- }
-
- protected void setPreviousStepExecutions(ExecutionElement previousExecutionElement, IExecutionElementController previousElementController) {
- if (previousExecutionElement == null) {
- // only job context is available to the decider
- } else if (previousExecutionElement instanceof Decision) {
-
- throw new BatchContainerRuntimeException("A decision cannot precede another decision.");
-
- }
-
- List<Long> previousStepExecsIds = previousElementController.getLastRunStepExecutions();
-
- StepExecution[] stepExecArray = new StepExecution[previousStepExecsIds.size()];
-
- for (int i=0; i < stepExecArray.length; i++) {
- StepExecution stepExec = persistenceService.getStepExecutionByStepExecutionId(previousStepExecsIds.get(i));
- stepExecArray[i] = stepExec;
- }
-
- this.previousStepExecutions = stepExecArray;
-
- }
-
-
- @Override
- public void stop() {
- // no-op
- }
-
- @Override
- public List<Long> getLastRunStepExecutions() {
- // TODO Auto-generated method stub
- return null;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ExecutionElementControllerFactory.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ExecutionElementControllerFactory.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ExecutionElementControllerFactory.java
deleted file mode 100755
index ad504b7..0000000
--- a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ExecutionElementControllerFactory.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- *
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License,
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.jbatch.container.impl;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-
-import com.ibm.jbatch.container.context.impl.StepContextImpl;
-import com.ibm.jbatch.container.jobinstance.RuntimeJobExecution;
-import com.ibm.jbatch.container.util.PartitionDataWrapper;
-import com.ibm.jbatch.jsl.model.Batchlet;
-import com.ibm.jbatch.jsl.model.Chunk;
-import com.ibm.jbatch.jsl.model.Decision;
-import com.ibm.jbatch.jsl.model.Flow;
-import com.ibm.jbatch.jsl.model.Partition;
-import com.ibm.jbatch.jsl.model.Split;
-import com.ibm.jbatch.jsl.model.Step;
-
-public class ExecutionElementControllerFactory {
-
- private final static String CLASSNAME = ExecutionElementControllerFactory.class.getName();
- private final static Logger logger = Logger.getLogger(CLASSNAME);
-
- public static BaseStepControllerImpl getStepController(RuntimeJobExecution jobExecutionImpl, Step step, StepContextImpl stepContext, long rootJobExecutionId, BlockingQueue<PartitionDataWrapper> analyzerQueue) {
-
- String methodName = "getStepController";
-
- if(logger.isLoggable(Level.FINER)) { logger.logp (Level.FINER, CLASSNAME, methodName, "Get StepController for", step.getId());}
-
- Partition partition = step.getPartition();
- if (partition != null) {
-
- if (partition.getMapper() != null ) {
- if (logger.isLoggable(Level.FINER)) {
- logger.logp(Level.FINER, CLASSNAME, methodName, "Found partitioned step with mapper" , step);
- }
- return new PartitionedStepControllerImpl(jobExecutionImpl, step, stepContext, rootJobExecutionId);
- }
-
- if (partition.getPlan() != null) {
- if (partition.getPlan().getPartitions() != null) {
- if (logger.isLoggable(Level.FINER)) {
- logger.logp(Level.FINER, CLASSNAME, methodName, "Found partitioned step with plan", step);
- }
- return new PartitionedStepControllerImpl(jobExecutionImpl, step, stepContext, rootJobExecutionId);
- }
- }
- }
-
- Batchlet batchlet = step.getBatchlet();
- if (batchlet != null) {
- if(logger.isLoggable(Level.FINER)) {
- logger.finer("Found batchlet: " + batchlet + ", with ref= " + batchlet.getRef());
- }
- if (step.getChunk() != null) {
- throw new IllegalArgumentException("Step contains both a batchlet and a chunk. Aborting.");
- }
- return new BatchletStepControllerImpl(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerQueue);
- } else {
- Chunk chunk = step.getChunk();
- if(logger.isLoggable(Level.FINER)) {
- logger.finer("Found chunk: " + chunk);
- }
- if (chunk == null) {
- throw new IllegalArgumentException("Step does not contain either a batchlet or a chunk. Aborting.");
- }
- return new ChunkStepControllerImpl(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerQueue);
- }
- }
-
- public static DecisionControllerImpl getDecisionController(RuntimeJobExecution jobExecutionImpl, Decision decision) {
- return new DecisionControllerImpl(jobExecutionImpl, decision);
- }
-
- public static FlowControllerImpl getFlowController(RuntimeJobExecution jobExecutionImpl, Flow flow, long rootJobExecutionId) {
- return new FlowControllerImpl(jobExecutionImpl, flow, rootJobExecutionId);
- }
-
- public static SplitControllerImpl getSplitController(RuntimeJobExecution jobExecutionImpl, Split split, long rootJobExecutionId) {
- return new SplitControllerImpl(jobExecutionImpl, split, rootJobExecutionId);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ExecutionTransitioner.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ExecutionTransitioner.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ExecutionTransitioner.java
deleted file mode 100755
index 665f6e4..0000000
--- a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ExecutionTransitioner.java
+++ /dev/null
@@ -1,275 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- *
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License,
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.jbatch.container.impl;
-
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.logging.Logger;
-
-import javax.batch.runtime.BatchStatus;
-
-import com.ibm.jbatch.container.IController;
-import com.ibm.jbatch.container.IExecutionElementController;
-import com.ibm.jbatch.container.context.impl.JobContextImpl;
-import com.ibm.jbatch.container.context.impl.StepContextImpl;
-import com.ibm.jbatch.container.exception.BatchContainerRuntimeException;
-import com.ibm.jbatch.container.jobinstance.RuntimeJobExecution;
-import com.ibm.jbatch.container.jsl.ExecutionElement;
-import com.ibm.jbatch.container.jsl.IllegalTransitionException;
-import com.ibm.jbatch.container.jsl.Transition;
-import com.ibm.jbatch.container.jsl.TransitionElement;
-import com.ibm.jbatch.container.navigator.ModelNavigator;
-import com.ibm.jbatch.container.status.ExtendedBatchStatus;
-import com.ibm.jbatch.container.status.ExecutionStatus;
-import com.ibm.jbatch.container.util.PartitionDataWrapper;
-import com.ibm.jbatch.jsl.model.Decision;
-import com.ibm.jbatch.jsl.model.End;
-import com.ibm.jbatch.jsl.model.Fail;
-import com.ibm.jbatch.jsl.model.Flow;
-import com.ibm.jbatch.jsl.model.JSLJob;
-import com.ibm.jbatch.jsl.model.Split;
-import com.ibm.jbatch.jsl.model.Step;
-import com.ibm.jbatch.jsl.model.Stop;
-
-public class ExecutionTransitioner {
-
- private final static String CLASSNAME = ExecutionTransitioner.class.getName();
- private final static Logger logger = Logger.getLogger(CLASSNAME);
-
- private RuntimeJobExecution jobExecution;
- private long rootJobExecutionId;
- private ModelNavigator<?> modelNavigator;
-
- // 'volatile' since it receives stop on separate thread.
- private volatile IExecutionElementController currentStoppableElementController;
- private IExecutionElementController previousElementController;
- private ExecutionElement currentExecutionElement = null;
- private ExecutionElement previousExecutionElement = null;
-
-
- private JobContextImpl jobContext;
- private BlockingQueue<PartitionDataWrapper> analyzerQueue = null;
-
- private List<Long> stepExecIds;
-
- public ExecutionTransitioner(RuntimeJobExecution jobExecution, long rootJobExecutionId, ModelNavigator<?> modelNavigator) {
- this.jobExecution = jobExecution;
- this.rootJobExecutionId = rootJobExecutionId;
- this.modelNavigator = modelNavigator;
- this.jobContext = jobExecution.getJobContext();
- }
-
- public ExecutionTransitioner(RuntimeJobExecution jobExecution, long rootJobExecutionId, ModelNavigator<JSLJob> jobNavigator, BlockingQueue<PartitionDataWrapper> analyzerQueue) {
- this.jobExecution = jobExecution;
- this.rootJobExecutionId = rootJobExecutionId;
- this.modelNavigator = jobNavigator;
- this.jobContext = jobExecution.getJobContext();
- this.analyzerQueue = analyzerQueue;
- }
-
- /**
- * Used for job and flow.
- * @return
- */
- public ExecutionStatus doExecutionLoop() {
-
- final String methodName = "doExecutionLoop";
-
- try {
- currentExecutionElement = modelNavigator.getFirstExecutionElement(jobExecution.getRestartOn());
- } catch (IllegalTransitionException e) {
- String errorMsg = "Could not transition to first execution element within job.";
- logger.warning(errorMsg);
- throw new IllegalArgumentException(errorMsg, e);
- }
-
- logger.fine("First execution element = " + currentExecutionElement.getId());
-
- while (true) {
-
- if (jobContext.getBatchStatus().equals(BatchStatus.STOPPING)) {
- logger.fine(methodName + " Exiting execution loop as job is now in stopping state.");
- return new ExecutionStatus(ExtendedBatchStatus.JOB_OPERATOR_STOPPING);
- }
-
- IExecutionElementController currentElementController = getNextElementController();
- currentStoppableElementController = currentElementController;
-
- ExecutionStatus status = currentElementController.execute();
-
- // Nothing special for decision or step except to get exit status. For flow and split we want to bubble up though.
- if ((currentExecutionElement instanceof Split) || (currentExecutionElement instanceof Flow)) {
- // Exit status and restartOn should both be in the job context.
- if (!status.getExtendedBatchStatus().equals(ExtendedBatchStatus.NORMAL_COMPLETION)) {
- logger.fine("Breaking out of loop with return status = " + status.getExtendedBatchStatus().name());
- return status;
- }
- }
-
- // Seems like this should only happen if an Error is thrown at the step level, since normally a step-level
- // exception is caught and the fact that it was thrown capture in the ExecutionStatus
- if (jobContext.getBatchStatus().equals(BatchStatus.FAILED)) {
- String errorMsg = "Sub-execution returned its own BatchStatus of FAILED. Deal with this by throwing exception to the next layer.";
- logger.warning(errorMsg);
- throw new BatchContainerRuntimeException(errorMsg);
- }
-
- // set the execution element controller to null so we don't try to call stop on it after the element has finished executing
- currentStoppableElementController = null;
-
- logger.fine("Done executing element=" + currentExecutionElement.getId() + ", exitStatus=" + status.getExitStatus());
-
- if (jobContext.getBatchStatus().equals(BatchStatus.STOPPING)) {
- logger.fine(methodName + " Exiting as job has been stopped");
- return new ExecutionStatus(ExtendedBatchStatus.JOB_OPERATOR_STOPPING);
- }
-
- Transition nextTransition = null;
- try {
- nextTransition = modelNavigator.getNextTransition(currentExecutionElement, status);
- } catch (IllegalTransitionException e) {
- String errorMsg = "Problem transitioning to next execution element.";
- logger.warning(errorMsg);
- throw new BatchContainerRuntimeException(errorMsg, e);
- }
-
- //
- // We will find ourselves in one of four states now.
- //
- // 1. Finished transitioning after a normal execution, but nothing to do 'next'.
- // 2. We just executed a step which through an exception, but didn't match a transition element.
- // 3. We are going to 'next' to another execution element (and jump back to the top of this '
- // 'while'-loop.
- // 4. We matched a terminating transition element (<end>, <stop> or <fail).
- //
-
- // 1.
- if (nextTransition.isFinishedTransitioning()) {
- logger.fine(methodName + "No next execution element, and no transition element found either. Looks like we're done and ready for COMPLETED state.");
- this.stepExecIds = currentElementController.getLastRunStepExecutions();
- // Consider just passing the last 'status' back, but let's unwrap the exit status and pass a new NORMAL_COMPLETION
- // status back instead.
- return new ExecutionStatus(ExtendedBatchStatus.NORMAL_COMPLETION, status.getExitStatus());
- // 2.
- } else if (nextTransition.noTransitionElementMatchedAfterException()) {
- return new ExecutionStatus(ExtendedBatchStatus.EXCEPTION_THROWN, status.getExitStatus());
- // 3.
- } else if (nextTransition.getNextExecutionElement() != null) {
- // hold on to the previous execution element for the decider
- // we need it because we need to inject the context of the
- // previous execution element into the decider
- previousExecutionElement = currentExecutionElement;
- previousElementController = currentElementController;
- currentExecutionElement = nextTransition.getNextExecutionElement();
- // 4.
- } else if (nextTransition.getTransitionElement() != null) {
- ExecutionStatus terminatingStatus = handleTerminatingTransitionElement(nextTransition.getTransitionElement());
- logger.finer(methodName + " , Breaking out of execution loop after processing terminating transition element.");
- return terminatingStatus;
- } else {
- throw new IllegalStateException("Not sure how we'd end up in this state...aborting rather than looping.");
- }
- }
- }
-
-
- private IExecutionElementController getNextElementController() {
- IExecutionElementController elementController =null;
-
- if (currentExecutionElement instanceof Decision) {
- Decision decision = (Decision)currentExecutionElement;
- elementController = ExecutionElementControllerFactory.getDecisionController(jobExecution, decision);
- DecisionControllerImpl decisionController = (DecisionControllerImpl)elementController;
- decisionController.setPreviousStepExecutions(previousExecutionElement, previousElementController);
- } else if (currentExecutionElement instanceof Flow) {
- Flow flow = (Flow)currentExecutionElement;
- elementController = ExecutionElementControllerFactory.getFlowController(jobExecution, flow, rootJobExecutionId);
- } else if (currentExecutionElement instanceof Split) {
- Split split = (Split)currentExecutionElement;
- elementController = ExecutionElementControllerFactory.getSplitController(jobExecution, split, rootJobExecutionId);
- } else if (currentExecutionElement instanceof Step) {
- Step step = (Step)currentExecutionElement;
- StepContextImpl stepContext = new StepContextImpl(step.getId());
- elementController = ExecutionElementControllerFactory.getStepController(jobExecution, step, stepContext, rootJobExecutionId, analyzerQueue);
- }
- logger.fine("Next execution element controller = " + elementController);
- return elementController;
- }
-
-
- private ExecutionStatus handleTerminatingTransitionElement(TransitionElement transitionElement) {
-
- ExecutionStatus retVal;
-
- logger.fine("Found terminating transition element (stop, end, or fail).");
-
- if (transitionElement instanceof Stop) {
-
- Stop stopElement = (Stop)transitionElement;
- String restartOn = stopElement.getRestart();
- String exitStatusFromJSL = stopElement.getExitStatus();
- logger.fine("Next transition element is a <stop> : " + transitionElement + " with restartOn=" + restartOn +
- " , and JSL exit status = " + exitStatusFromJSL);
-
- retVal = new ExecutionStatus(ExtendedBatchStatus.JSL_STOP);
-
- if (exitStatusFromJSL != null) {
- jobContext.setExitStatus(exitStatusFromJSL);
- retVal.setExitStatus(exitStatusFromJSL);
- }
- if (restartOn != null) {
- jobContext.setRestartOn(restartOn);
- retVal.setRestartOn(restartOn);
- }
- } else if (transitionElement instanceof End) {
-
- End endElement = (End)transitionElement;
- String exitStatusFromJSL = endElement.getExitStatus();
- logger.fine("Next transition element is an <end> : " + transitionElement +
- " with JSL exit status = " + exitStatusFromJSL);
- retVal = new ExecutionStatus(ExtendedBatchStatus.JSL_END);
- if (exitStatusFromJSL != null) {
- jobContext.setExitStatus(exitStatusFromJSL);
- retVal.setExitStatus(exitStatusFromJSL);
- }
- } else if (transitionElement instanceof Fail) {
-
- Fail failElement = (Fail)transitionElement;
- String exitStatusFromJSL = failElement.getExitStatus();
- logger.fine("Next transition element is a <fail> : " + transitionElement +
- " with JSL exit status = " + exitStatusFromJSL);
- retVal = new ExecutionStatus(ExtendedBatchStatus.JSL_FAIL);
- if (exitStatusFromJSL != null) {
- jobContext.setExitStatus(exitStatusFromJSL);
- retVal.setExitStatus(exitStatusFromJSL);
- }
- } else {
- throw new IllegalStateException("Not sure how we'd get here...aborting.");
- }
- return retVal;
- }
-
- public IController getCurrentStoppableElementController() {
- return currentStoppableElementController;
- }
-
- public List<Long> getStepExecIds() {
- return stepExecIds;
- }
-
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/JSR352.Runtime/src/com/ibm/jbatch/container/impl/FlowControllerImpl.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/FlowControllerImpl.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/FlowControllerImpl.java
deleted file mode 100755
index 255d871..0000000
--- a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/FlowControllerImpl.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- *
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License,
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.jbatch.container.impl;
-
-import java.util.List;
-import java.util.logging.Logger;
-
-import javax.batch.runtime.BatchStatus;
-
-import com.ibm.jbatch.container.IController;
-import com.ibm.jbatch.container.IExecutionElementController;
-import com.ibm.jbatch.container.context.impl.JobContextImpl;
-import com.ibm.jbatch.container.jobinstance.RuntimeJobExecution;
-import com.ibm.jbatch.container.navigator.ModelNavigator;
-import com.ibm.jbatch.container.navigator.NavigatorFactory;
-import com.ibm.jbatch.container.services.IPersistenceManagerService;
-import com.ibm.jbatch.container.servicesmanager.ServicesManagerImpl;
-import com.ibm.jbatch.container.status.ExtendedBatchStatus;
-import com.ibm.jbatch.container.status.ExecutionStatus;
-import com.ibm.jbatch.jsl.model.Flow;
-
-public class FlowControllerImpl implements IExecutionElementController {
-
- private final static String CLASSNAME = FlowControllerImpl.class.getName();
- private final static Logger logger = Logger.getLogger(CLASSNAME);
-
- private final RuntimeJobExecution jobExecution;
- private final JobContextImpl jobContext;
-
- protected ModelNavigator<Flow> flowNavigator;
-
- protected Flow flow;
- private long rootJobExecutionId;
-
- private ExecutionTransitioner transitioner;
-
- //
- // The currently executing controller, this will only be set to the
- // local variable reference when we are ready to accept stop events for
- // this execution.
- private volatile IController currentStoppableElementController = null;
-
- private static IPersistenceManagerService _persistenceManagementService = ServicesManagerImpl.getInstance().getPersistenceManagerService();
-
-
- public FlowControllerImpl(RuntimeJobExecution jobExecution, Flow flow, long rootJobExecutionId) {
- this.jobExecution = jobExecution;
- this.jobContext = jobExecution.getJobContext();
- this.flowNavigator = NavigatorFactory.createFlowNavigator(flow);
- this.flow = flow;
- this.rootJobExecutionId = rootJobExecutionId;
- }
-
- @Override
- public ExecutionStatus execute() {
- if (!jobContext.getBatchStatus().equals(BatchStatus.STOPPING)) {
- transitioner = new ExecutionTransitioner(jobExecution, rootJobExecutionId, flowNavigator);
- return transitioner.doExecutionLoop();
- } else {
- return new ExecutionStatus(ExtendedBatchStatus.JOB_OPERATOR_STOPPING);
- }
- }
-
-
- @Override
- public void stop() {
- // Since this is not a top-level controller, don't try to filter based on existing status.. just pass
- // along the stop().
- IController stoppableElementController = transitioner.getCurrentStoppableElementController();
- if (stoppableElementController != null) {
- stoppableElementController.stop();
- }
- }
-
- @Override
- public List<Long> getLastRunStepExecutions() {
- return this.transitioner.getStepExecIds();
- }
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/JSR352.Runtime/src/com/ibm/jbatch/container/impl/FlowInSplitThreadRootControllerImpl.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/FlowInSplitThreadRootControllerImpl.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/FlowInSplitThreadRootControllerImpl.java
deleted file mode 100755
index f680586..0000000
--- a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/FlowInSplitThreadRootControllerImpl.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- *
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License,
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.jbatch.container.impl;
-
-import com.ibm.jbatch.container.jobinstance.RuntimeFlowInSplitExecution;
-import com.ibm.jbatch.container.status.ExtendedBatchStatus;
-import com.ibm.jbatch.container.status.ExecutionStatus;
-import com.ibm.jbatch.container.util.FlowInSplitBuilderConfig;
-
-public class FlowInSplitThreadRootControllerImpl extends JobThreadRootControllerImpl {
-
- private final static String CLASSNAME = FlowInSplitThreadRootControllerImpl.class.getName();
-
- // Careful, we have a separately named reference to the same object in the parent class
- RuntimeFlowInSplitExecution flowInSplitExecution;
-
- public FlowInSplitThreadRootControllerImpl(RuntimeFlowInSplitExecution flowInSplitExecution, FlowInSplitBuilderConfig config) {
- super(flowInSplitExecution, config.getRootJobExecutionId());
- this.flowInSplitExecution = flowInSplitExecution;
- }
-
- @Override
- /**
- * Not only are we setting the status correctly at the subjob level, we are also setting it on the execution
- * so that it is visible by the parent split.
- */
- public ExecutionStatus originateExecutionOnThread() {
- ExecutionStatus status = super.originateExecutionOnThread();
- flowInSplitExecution.setFlowStatus(status);
- return status;
- }
-
- @Override
- protected void batchStatusFailedFromException() {
- super.batchStatusFailedFromException();
- flowInSplitExecution.getFlowStatus().setExtendedBatchStatus(ExtendedBatchStatus.EXCEPTION_THROWN);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/JSR352.Runtime/src/com/ibm/jbatch/container/impl/JobControllerImpl.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/JobControllerImpl.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/JobControllerImpl.java
deleted file mode 100755
index 75e1332..0000000
--- a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/JobControllerImpl.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- *
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License,
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.jbatch.container.impl;
-
-import com.ibm.jbatch.container.jobinstance.RuntimeJobExecution;
-
-public class JobControllerImpl extends JobThreadRootControllerImpl {
-
- private final static String CLASSNAME = JobControllerImpl.class.getName();
-
- private JobControllerImpl(RuntimeJobExecution jobExecution, long rootJobExecutionId) {
- super(jobExecution, rootJobExecutionId);
- }
-
- public JobControllerImpl(RuntimeJobExecution jobExecution) {
- this(jobExecution, jobExecution.getExecutionId());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/JSR352.Runtime/src/com/ibm/jbatch/container/impl/JobThreadRootControllerImpl.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/JobThreadRootControllerImpl.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/JobThreadRootControllerImpl.java
deleted file mode 100755
index 3e30ab2..0000000
--- a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/JobThreadRootControllerImpl.java
+++ /dev/null
@@ -1,328 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- *
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License,
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.jbatch.container.impl;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.sql.Timestamp;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.batch.runtime.BatchStatus;
-
-import com.ibm.jbatch.container.IController;
-import com.ibm.jbatch.container.IThreadRootController;
-import com.ibm.jbatch.container.artifact.proxy.InjectionReferences;
-import com.ibm.jbatch.container.artifact.proxy.JobListenerProxy;
-import com.ibm.jbatch.container.artifact.proxy.ListenerFactory;
-import com.ibm.jbatch.container.context.impl.JobContextImpl;
-import com.ibm.jbatch.container.jobinstance.RuntimeJobExecution;
-import com.ibm.jbatch.container.navigator.ModelNavigator;
-import com.ibm.jbatch.container.services.IJobStatusManagerService;
-import com.ibm.jbatch.container.services.IPersistenceManagerService;
-import com.ibm.jbatch.container.servicesmanager.ServicesManagerImpl;
-import com.ibm.jbatch.container.status.ExtendedBatchStatus;
-import com.ibm.jbatch.container.status.ExecutionStatus;
-import com.ibm.jbatch.container.util.PartitionDataWrapper;
-import com.ibm.jbatch.jsl.model.JSLJob;
-import com.ibm.jbatch.jsl.model.JSLProperties;
-import com.ibm.jbatch.jsl.model.Property;
-
-public abstract class JobThreadRootControllerImpl implements IThreadRootController {
-
- private final static String CLASSNAME = JobThreadRootControllerImpl.class.getName();
- private final static Logger logger = Logger.getLogger(CLASSNAME);
-
- protected RuntimeJobExecution jobExecution;
- protected JobContextImpl jobContext;
- protected long rootJobExecutionId;
- protected long jobInstanceId;
- protected IJobStatusManagerService jobStatusService;
- protected IPersistenceManagerService persistenceService;
- private ListenerFactory listenerFactory = null;
-
- private ExecutionTransitioner transitioner;
- protected final ModelNavigator<JSLJob> jobNavigator;
- private BlockingQueue<PartitionDataWrapper> analyzerQueue;
-
- public JobThreadRootControllerImpl(RuntimeJobExecution jobExecution, long rootJobExecutionId) {
- this.jobExecution = jobExecution;
- this.jobContext = jobExecution.getJobContext();
- this.rootJobExecutionId = rootJobExecutionId;
- this.jobInstanceId = jobExecution.getInstanceId();
- this.jobStatusService = ServicesManagerImpl.getInstance().getJobStatusManagerService();
- this.persistenceService = ServicesManagerImpl.getInstance().getPersistenceManagerService();
- this.jobNavigator = jobExecution.getJobNavigator();
- setupListeners();
- }
-
- public JobThreadRootControllerImpl(RuntimeJobExecution jobExecution, long rootJobExecutionId, BlockingQueue<PartitionDataWrapper> analyzerQueue) {
- this(jobExecution, rootJobExecutionId);
- this.analyzerQueue = analyzerQueue;
- }
-
- /*
- * By not passing the rootJobExecutionId, we are "orphaning" the subjob execution and making it not findable from the parent.
- * This is exactly what we want for getStepExecutions()... we don't want it to get extraneous entries for the partitions.
- */
- public JobThreadRootControllerImpl(RuntimeJobExecution jobExecution, BlockingQueue<PartitionDataWrapper> analyzerQueue) {
- this(jobExecution, jobExecution.getExecutionId());
- this.analyzerQueue = analyzerQueue;
- }
-
- @Override
- public ExecutionStatus originateExecutionOnThread() {
- String methodName = "executeJob";
- logger.entering(CLASSNAME, methodName);
-
- ExecutionStatus retVal = null;
- try {
- // Check if we've already gotten the stop() command.
- if (!jobContext.getBatchStatus().equals(BatchStatus.STOPPING)) {
-
- // Now that we're ready to start invoking artifacts, set the status to 'STARTED'
- markJobStarted();
-
- jobListenersBeforeJob();
-
- // --------------------
- // The BIG loop transitioning
- // within the job !!!
- // --------------------
- transitioner = new ExecutionTransitioner(jobExecution, rootJobExecutionId, jobNavigator, analyzerQueue);
- retVal = transitioner.doExecutionLoop();
- ExtendedBatchStatus extBatchStatus = retVal.getExtendedBatchStatus();
- switch (extBatchStatus) {
- case JSL_STOP : jslStop();
- break;
- case JSL_FAIL : updateJobBatchStatus(BatchStatus.FAILED);
- break;
- case EXCEPTION_THROWN : updateJobBatchStatus(BatchStatus.FAILED);
- break;
- }
- }
- } catch (Throwable t) {
- // We still want to try to call the afterJob() listener and persist the batch and exit
- // status for the failure in an orderly fashion. So catch and continue.
- logWarning("Caught throwable in main execution loop", t);
- batchStatusFailedFromException();
- }
-
- endOfJob();
-
- logger.exiting(CLASSNAME, methodName);
- return retVal;
- }
-
- protected void setContextProperties() {
- JSLJob jobModel = jobExecution.getJobNavigator().getRootModelElement();
- JSLProperties jslProps = jobModel.getProperties();
-
- if (jslProps != null) {
- Properties contextProps = jobContext.getProperties();
- for (Property property : jslProps.getPropertyList()) {
- contextProps.setProperty(property.getName(), property.getValue());
- }
- }
- }
-
- protected void jslStop() {
- String restartOn = jobContext.getRestartOn();
- logger.fine("Logging JSL stop(): exitStatus = " + jobContext.getExitStatus() + ", restartOn = " +restartOn );
- batchStatusStopping();
- jobStatusService.updateJobStatusFromJSLStop(jobInstanceId, restartOn);
- return;
- }
-
- protected void markJobStarted() {
- updateJobBatchStatus(BatchStatus.STARTED);
- long time = System.currentTimeMillis();
- Timestamp timestamp = new Timestamp(time);
- jobExecution.setLastUpdateTime(timestamp);
- jobExecution.setStartTime(timestamp);
- persistenceService.markJobStarted(jobExecution.getExecutionId(), timestamp);
- }
-
- /*
- * Follow similar pattern for end of step in BaseStepControllerImpl
- *
- * 1. Execute the very last artifacts (jobListener)
- * 2. transition to final batch status
- * 3. default ExitStatus if necessary
- * 4. persist statuses and end time data
- *
- * We don't want to give up on the orderly process of 2,3,4, if we blow up
- * in after job, so catch that and keep on going.
- */
- protected void endOfJob() {
-
-
- // 1. Execute the very last artifacts (jobListener)
- try {
- jobListenersAfterJob();
- } catch (Throwable t) {
- StringWriter sw = new StringWriter();
- PrintWriter pw = new PrintWriter(sw);
- t.printStackTrace(pw);
- logger.warning("Error invoking jobListener.afterJob(). Stack trace: " + sw.toString());
- batchStatusFailedFromException();
- }
-
- // 2. transition to final batch status
- transitionToFinalBatchStatus();
-
- // 3. default ExitStatus if necessary
- if (jobContext.getExitStatus() == null) {
- logger.fine("No job-level exitStatus set, defaulting to job batch Status = " + jobContext.getBatchStatus());
- jobContext.setExitStatus(jobContext.getBatchStatus().name());
- }
-
- // 4. persist statuses and end time data
- logger.fine("Job complete for job id=" + jobExecution.getJobInstance().getJobName() + ", executionId=" + jobExecution.getExecutionId()
- + ", batchStatus=" + jobContext.getBatchStatus() + ", exitStatus=" + jobContext.getExitStatus());
- persistJobBatchAndExitStatus();
-
- }
-
- private void persistJobBatchAndExitStatus() {
- BatchStatus batchStatus = jobContext.getBatchStatus();
-
- // Take a current timestamp for last updated no matter what the status.
- long time = System.currentTimeMillis();
- Timestamp timestamp = new Timestamp(time);
- jobExecution.setLastUpdateTime(timestamp);
-
- // Perhaps these should be coordinated in a tran but probably better still would be
- // rethinking the table design to let the database provide us consistently with a single update.
- jobStatusService.updateJobBatchStatus(jobInstanceId, batchStatus);
- jobStatusService.updateJobExecutionStatus(jobExecution.getInstanceId(), jobContext.getBatchStatus(), jobContext.getExitStatus());
-
- if (batchStatus.equals(BatchStatus.COMPLETED) || batchStatus.equals(BatchStatus.STOPPED) ||
- batchStatus.equals(BatchStatus.FAILED)) {
-
- jobExecution.setEndTime(timestamp);
- persistenceService.updateWithFinalExecutionStatusesAndTimestamps(jobExecution.getExecutionId(),
- batchStatus, jobContext.getExitStatus(), timestamp);
- } else {
- throw new IllegalStateException("Not expected to encounter batchStatus of " + batchStatus +" at this point. Aborting.");
- }
- }
-
- /**
- * The only valid states at this point are STARTED or STOPPING. Shouldn't have
- * been able to get to COMPLETED, STOPPED, or FAILED at this point in the code.
- */
-
- private void transitionToFinalBatchStatus() {
- BatchStatus currentBatchStatus = jobContext.getBatchStatus();
- if (currentBatchStatus.equals(BatchStatus.STARTED)) {
- updateJobBatchStatus(BatchStatus.COMPLETED);
- } else if (currentBatchStatus.equals(BatchStatus.STOPPING)) {
- updateJobBatchStatus(BatchStatus.STOPPED);
- } else if (currentBatchStatus.equals(BatchStatus.FAILED)) {
- updateJobBatchStatus(BatchStatus.FAILED); // Should have already been done but maybe better for possible code refactoring to have it here.
- } else {
- throw new IllegalStateException("Step batch status should not be in a " + currentBatchStatus.name() + " state");
- }
- }
-
- protected void updateJobBatchStatus(BatchStatus batchStatus) {
- logger.fine("Setting job batch status to: " + batchStatus);
- jobContext.setBatchStatus(batchStatus);
- }
-
-
- protected void logWarning(String msg, Throwable t) {
- StringWriter sw = new StringWriter();
- t.printStackTrace(new PrintWriter(sw));
- logger.warning(msg + " with Throwable message: " + t.getMessage() + ", and stack trace: " + sw.toString());
- }
-
- /*
- * The thought here is that while we don't persist all the transitions in batch status (given
- * we plan to persist at the very end), we do persist STOPPING right away, since if we end up
- * "stuck in STOPPING" we at least will have a record in the database.
- */
- protected void batchStatusStopping() {
- updateJobBatchStatus(BatchStatus.STOPPING);
- long time = System.currentTimeMillis();
- Timestamp timestamp = new Timestamp(time);
- jobExecution.setLastUpdateTime(timestamp);
- persistenceService.updateBatchStatusOnly(jobExecution.getExecutionId(), BatchStatus.STOPPING, timestamp);
- }
-
-
-
- private void setupListeners() {
- JSLJob jobModel = jobExecution.getJobNavigator().getRootModelElement();
- InjectionReferences injectionRef = new InjectionReferences(jobContext, null, null);
- listenerFactory = new ListenerFactory(jobModel, injectionRef);
- jobExecution.setListenerFactory(listenerFactory);
- }
-
-
- @Override
- public void stop() {
- if (jobContext.getBatchStatus().equals(BatchStatus.STARTING) || jobContext.getBatchStatus().equals(BatchStatus.STARTED)) {
-
- batchStatusStopping();
-
- IController stoppableElementController = transitioner.getCurrentStoppableElementController();
- if (stoppableElementController != null) {
- stoppableElementController.stop();
- }
- } else {
- logger.info("Stop ignored since batch status for job is already set to: " + jobContext.getBatchStatus());
- }
- }
-
- // Call beforeJob() on all the job listeners
- protected void jobListenersBeforeJob() {
- List<JobListenerProxy> jobListeners = listenerFactory.getJobListeners();
- for (JobListenerProxy listenerProxy : jobListeners) {
- if (logger.isLoggable(Level.FINE)) {
- logger.fine("Invoking beforeJob() on jobListener: " + listenerProxy.getDelegate() + " of type: " + listenerProxy.getDelegate().getClass());
- }
- listenerProxy.beforeJob();
- }
- }
-
- // Call afterJob() on all the job listeners
- private void jobListenersAfterJob() {
- List<JobListenerProxy> jobListeners = listenerFactory.getJobListeners();
- for (JobListenerProxy listenerProxy : jobListeners) {
- if (logger.isLoggable(Level.FINE)) {
- logger.fine(" Invoking afterJob() on jobListener: " + listenerProxy.getDelegate() + " of type: " + listenerProxy.getDelegate().getClass());
- }
- listenerProxy.afterJob();
- }
- }
-
- protected void batchStatusFailedFromException() {
- updateJobBatchStatus(BatchStatus.FAILED);
- }
-
- @Override
- public List<Long> getLastRunStepExecutions() {
-
- return this.transitioner.getStepExecIds();
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/JSR352.Runtime/src/com/ibm/jbatch/container/impl/PartitionThreadRootControllerImpl.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/PartitionThreadRootControllerImpl.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/PartitionThreadRootControllerImpl.java
deleted file mode 100755
index 08d105b..0000000
--- a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/PartitionThreadRootControllerImpl.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- *
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License,
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.jbatch.container.impl;
-
-import com.ibm.jbatch.container.jobinstance.RuntimeJobExecution;
-import com.ibm.jbatch.container.util.PartitionsBuilderConfig;
-
-/**
- * Currently there's no special function on top of the subjob required of the partition.
- *
- */
-public class PartitionThreadRootControllerImpl extends JobThreadRootControllerImpl {
-
- private final static String CLASSNAME = PartitionThreadRootControllerImpl.class.getName();
-
- public PartitionThreadRootControllerImpl(RuntimeJobExecution jobExecution, PartitionsBuilderConfig config) {
- super(jobExecution, config.getAnalyzerQueue());
- }
-
-}