You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@batchee.apache.org by rm...@apache.org on 2013/11/05 08:39:06 UTC
[29/62] importing batchee from github - a fork from the IBm RI
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java
new file mode 100755
index 0000000..7c8eec7
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java
@@ -0,0 +1,888 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.container.impl.controller.chunk;
+
+import org.apache.batchee.container.exception.BatchContainerRuntimeException;
+import org.apache.batchee.container.exception.BatchContainerServiceException;
+import org.apache.batchee.container.impl.MetricImpl;
+import org.apache.batchee.container.impl.StepContextImpl;
+import org.apache.batchee.container.impl.controller.SingleThreadedStepController;
+import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
+import org.apache.batchee.container.proxy.CheckpointAlgorithmProxy;
+import org.apache.batchee.container.proxy.ChunkListenerProxy;
+import org.apache.batchee.container.proxy.InjectionReferences;
+import org.apache.batchee.container.proxy.ItemProcessListenerProxy;
+import org.apache.batchee.container.proxy.ItemProcessorProxy;
+import org.apache.batchee.container.proxy.ItemReadListenerProxy;
+import org.apache.batchee.container.proxy.ItemReaderProxy;
+import org.apache.batchee.container.proxy.ItemWriteListenerProxy;
+import org.apache.batchee.container.proxy.ItemWriterProxy;
+import org.apache.batchee.container.proxy.ProxyFactory;
+import org.apache.batchee.container.proxy.RetryProcessListenerProxy;
+import org.apache.batchee.container.proxy.RetryReadListenerProxy;
+import org.apache.batchee.container.proxy.RetryWriteListenerProxy;
+import org.apache.batchee.container.proxy.SkipProcessListenerProxy;
+import org.apache.batchee.container.proxy.SkipReadListenerProxy;
+import org.apache.batchee.container.proxy.SkipWriteListenerProxy;
+import org.apache.batchee.container.services.ServicesManager;
+import org.apache.batchee.container.util.PartitionDataWrapper;
+import org.apache.batchee.container.util.TCCLObjectInputStream;
+import org.apache.batchee.jaxb.Chunk;
+import org.apache.batchee.jaxb.ItemProcessor;
+import org.apache.batchee.jaxb.ItemReader;
+import org.apache.batchee.jaxb.ItemWriter;
+import org.apache.batchee.jaxb.Property;
+import org.apache.batchee.jaxb.Step;
+import org.apache.batchee.spi.PersistenceManagerService;
+
+import javax.batch.api.chunk.CheckpointAlgorithm;
+import javax.batch.runtime.BatchStatus;
+import java.io.ByteArrayInputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class ChunkStepController extends SingleThreadedStepController {
+
+ private final static String sourceClass = ChunkStepController.class.getName();
+ private final static Logger logger = Logger.getLogger(sourceClass);
+
+ private final PersistenceManagerService persistenceManagerService = ServicesManager.service(PersistenceManagerService.class);
+
+ private Chunk chunk = null;
+ private ItemReaderProxy readerProxy = null;
+ private ItemProcessorProxy processorProxy = null;
+ private ItemWriterProxy writerProxy = null;
+ private CheckpointAlgorithmProxy checkpointProxy = null;
+ private CheckpointAlgorithm chkptAlg = null;
+ private CheckpointManager checkpointManager;
+ private SkipHandler skipHandler = null;
+ private CheckpointDataKey readerChkptDK, writerChkptDK = null;
+ private List<ChunkListenerProxy> chunkListeners = null;
+ private List<ItemReadListenerProxy> itemReadListeners = null;
+ private List<ItemProcessListenerProxy> itemProcessListeners = null;
+ private List<ItemWriteListenerProxy> itemWriteListeners = null;
+ private RetryHandler retryHandler;
+
+ private boolean rollbackRetry = false;
+
+ public ChunkStepController(final RuntimeJobExecution jobExecutionImpl, final Step step, final StepContextImpl stepContext,
+ final long rootJobExecutionId, final BlockingQueue<PartitionDataWrapper> analyzerStatusQueue) {
+ super(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerStatusQueue);
+ }
+
+ /**
+ * Utility Class to hold statuses at each level of Read-Process-Write loop
+ */
+ private class ItemStatus {
+
+ public boolean isSkipped() {
+ return skipped;
+ }
+
+ public void setSkipped(boolean skipped) {
+ this.skipped = skipped;
+ }
+
+ public boolean isFiltered() {
+ return filtered;
+ }
+
+ public void setFiltered(boolean filtered) {
+ this.filtered = filtered;
+ }
+
+ public boolean isCheckPointed() {
+ return checkPointed;
+ }
+
+ public void setCheckPointed(boolean checkPointed) {
+ this.checkPointed = checkPointed;
+ }
+
+ public boolean isFinished() {
+ return finished;
+ }
+
+ public void setFinished(boolean finished) {
+ this.finished = finished;
+ }
+
+ public void setRetry(boolean ignored) {
+ // no-op
+ }
+
+ public boolean isRollback() {
+ return rollback;
+ }
+
+ public void setRollback(boolean rollback) {
+ this.rollback = rollback;
+ }
+
+ private boolean skipped = false;
+ private boolean filtered = false;
+ private boolean finished = false;
+ private boolean checkPointed = false;
+ private boolean rollback = false;
+
+ }
+
+ /**
+ * We read and process one item at a time but write in chunks (group of
+ * items). So, this method loops until we either reached the end of the
+ * reader (not more items to read), or the writer buffer is full or a
+ * checkpoint is triggered.
+ *
+ * @param chunkSize write buffer size
+ * @param theStatus flags when the read-process reached the last record or a
+ * checkpoint is required
+ * @return an array list of objects to write
+ */
+ private List<Object> readAndProcess(int chunkSize, ItemStatus theStatus) {
+ List<Object> chunkToWrite = new ArrayList<Object>();
+ Object itemRead;
+ Object itemProcessed;
+ int readProcessedCount = 0;
+
+ while (true) {
+ ItemStatus status = new ItemStatus();
+ itemRead = readItem(status);
+
+ if (status.isRollback()) {
+ theStatus.setRollback(true);
+ // inc rollbackCount
+ stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
+ break;
+ }
+
+ if (!status.isSkipped() && !status.isFinished()) {
+ itemProcessed = processItem(itemRead, status);
+
+ if (status.isRollback()) {
+ theStatus.setRollback(true);
+ // inc rollbackCount
+ stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
+ break;
+ }
+
+ if (!status.isSkipped() && !status.isFiltered()) {
+ chunkToWrite.add(itemProcessed);
+ readProcessedCount++;
+ }
+ }
+
+ theStatus.setFinished(status.isFinished());
+ theStatus.setCheckPointed(checkpointManager.applyCheckPointPolicy());
+
+ // This will force the current item to finish processing on a stop
+ // request
+ if (stepContext.getBatchStatus().equals(BatchStatus.STOPPING)) {
+ theStatus.setFinished(true);
+ }
+
+ // write buffer size reached
+ if ((readProcessedCount == chunkSize) && !("custom".equals(checkpointProxy.getCheckpointType()))) {
+ break;
+ }
+
+ // checkpoint reached
+ if (theStatus.isCheckPointed()) {
+ break;
+ }
+
+ // last record in readerProxy reached
+ if (theStatus.isFinished()) {
+ break;
+ }
+
+ }
+ return chunkToWrite;
+ }
+
+ /**
+ * Reads an item from the reader
+ *
+ * @param status flags the current read status
+ * @return the item read
+ */
+ private Object readItem(ItemStatus status) {
+ Object itemRead = null;
+
+ try {
+ // call read listeners before and after the actual read
+ for (ItemReadListenerProxy readListenerProxy : itemReadListeners) {
+ readListenerProxy.beforeRead();
+ }
+
+ itemRead = readerProxy.readItem();
+
+ for (ItemReadListenerProxy readListenerProxy : itemReadListeners) {
+ readListenerProxy.afterRead(itemRead);
+ }
+
+ // itemRead == null means we reached the end of
+ // the readerProxy "resultset"
+ status.setFinished(itemRead == null);
+ if (!status.isFinished()) {
+ stepContext.getMetric(MetricImpl.MetricType.READ_COUNT).incValue();
+ }
+ } catch (Exception e) {
+ stepContext.setException(e);
+ for (ItemReadListenerProxy readListenerProxy : itemReadListeners) {
+ readListenerProxy.onReadError(e);
+ }
+ if (!rollbackRetry) {
+ if (retryReadException(e)) {
+ for (ItemReadListenerProxy readListenerProxy : itemReadListeners) {
+ readListenerProxy.onReadError(e);
+ }
+ // if not a rollback exception, just retry the current item
+ if (!retryHandler.isRollbackException(e)) {
+ itemRead = readItem(status);
+ } else {
+ status.setRollback(true);
+ rollbackRetry = true;
+ // inc rollbackCount
+ stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
+ }
+ } else if (skipReadException(e)) {
+ status.setSkipped(true);
+ stepContext.getMetric(MetricImpl.MetricType.READ_SKIP_COUNT).incValue();
+
+ } else {
+ throw new BatchContainerRuntimeException(e);
+ }
+ } else {
+ // coming from a rollback retry
+ if (skipReadException(e)) {
+ status.setSkipped(true);
+ stepContext.getMetric(MetricImpl.MetricType.READ_SKIP_COUNT).incValue();
+
+ } else if (retryReadException(e)) {
+ if (!retryHandler.isRollbackException(e)) {
+ itemRead = readItem(status);
+ } else {
+ status.setRollback(true);
+ // inc rollbackCount
+ stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
+ }
+ } else {
+ throw new BatchContainerRuntimeException(e);
+ }
+ }
+
+ } catch (final Throwable e) {
+ throw new BatchContainerRuntimeException(e);
+ }
+
+ return itemRead;
+ }
+
+ /**
+ * Process an item previously read by the reader
+ *
+ * @param itemRead the item read
+ * @param status flags the current process status
+ * @return the processed item
+ */
+ private Object processItem(final Object itemRead, final ItemStatus status) {
+ Object processedItem = null;
+
+ // if no processor defined for this chunk
+ if (processorProxy == null) {
+ return itemRead;
+ }
+
+ try {
+
+ // call process listeners before and after the actual process call
+ for (final ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
+ processListenerProxy.beforeProcess(itemRead);
+ }
+
+ processedItem = processorProxy.processItem(itemRead);
+
+ if (processedItem == null) {
+ // inc filterCount
+ stepContext.getMetric(MetricImpl.MetricType.FILTER_COUNT).incValue();
+ status.setFiltered(true);
+ }
+
+ for (final ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
+ processListenerProxy.afterProcess(itemRead, processedItem);
+ }
+ } catch (final Exception e) {
+ for (final ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
+ processListenerProxy.onProcessError(processedItem, e);
+ }
+ if (!rollbackRetry) {
+ if (retryProcessException(e, itemRead)) {
+ if (!retryHandler.isRollbackException(e)) {
+ // call process listeners before and after the actual
+ // process call
+ for (ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
+ processListenerProxy.beforeProcess(itemRead);
+ }
+ processedItem = processItem(itemRead, status);
+ if (processedItem == null) {
+ // inc filterCount
+ stepContext.getMetric(MetricImpl.MetricType.FILTER_COUNT).incValue();
+ status.setFiltered(true);
+ }
+
+ for (final ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
+ processListenerProxy.afterProcess(itemRead, processedItem);
+ }
+ } else {
+ status.setRollback(true);
+ rollbackRetry = true;
+ // inc rollbackCount
+ stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
+ }
+ } else if (skipProcessException(e, itemRead)) {
+ status.setSkipped(true);
+ stepContext.getMetric(MetricImpl.MetricType.PROCESS_SKIP_COUNT).incValue();
+ } else {
+ throw new BatchContainerRuntimeException(e);
+ }
+ } else {
+ if (skipProcessException(e, itemRead)) {
+ status.setSkipped(true);
+ stepContext.getMetric(MetricImpl.MetricType.PROCESS_SKIP_COUNT).incValue();
+ } else if (retryProcessException(e, itemRead)) {
+ if (!retryHandler.isRollbackException(e)) {
+ // call process listeners before and after the actual
+ // process call
+ for (final ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
+ processListenerProxy.beforeProcess(itemRead);
+ }
+ processedItem = processItem(itemRead, status);
+ if (processedItem == null) {
+ // inc filterCount
+ stepContext.getMetric(MetricImpl.MetricType.FILTER_COUNT).incValue();
+ status.setFiltered(true);
+ }
+
+ for (final ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
+ processListenerProxy.afterProcess(itemRead, processedItem);
+ }
+ } else {
+ status.setRollback(true);
+ rollbackRetry = true;
+ // inc rollbackCount
+ stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
+ }
+ } else {
+ throw new BatchContainerRuntimeException(e);
+ }
+ }
+
+ } catch (final Throwable e) {
+ throw new BatchContainerRuntimeException(e);
+ }
+
+ return processedItem;
+ }
+
+ /**
+ * Writes items
+ *
+ * @param theChunk the array list with all items processed ready to be written
+ */
+ private void writeChunk(List<Object> theChunk, ItemStatus status) {
+ if (!theChunk.isEmpty()) {
+ try {
+
+ // call read listeners before and after the actual read
+ for (ItemWriteListenerProxy writeListenerProxy : itemWriteListeners) {
+ writeListenerProxy.beforeWrite(theChunk);
+ }
+
+ writerProxy.writeItems(theChunk);
+
+ for (ItemWriteListenerProxy writeListenerProxy : itemWriteListeners) {
+ writeListenerProxy.afterWrite(theChunk);
+ }
+ stepContext.getMetric(MetricImpl.MetricType.WRITE_COUNT).incValueBy(theChunk.size());
+ } catch (Exception e) {
+ this.stepContext.setException(e);
+ for (ItemWriteListenerProxy writeListenerProxy : itemWriteListeners) {
+ writeListenerProxy.onWriteError(theChunk, e);
+ }
+ if (!rollbackRetry) {
+ if (retryWriteException(e, theChunk)) {
+ if (!retryHandler.isRollbackException(e)) {
+ writeChunk(theChunk, status);
+ } else {
+ rollbackRetry = true;
+ status.setRollback(true);
+ // inc rollbackCount
+ stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
+ }
+ } else if (skipWriteException(e, theChunk)) {
+ stepContext.getMetric(MetricImpl.MetricType.WRITE_SKIP_COUNT).incValueBy(1);
+ } else {
+ throw new BatchContainerRuntimeException(e);
+ }
+
+ } else {
+ if (skipWriteException(e, theChunk)) {
+ stepContext.getMetric(MetricImpl.MetricType.WRITE_SKIP_COUNT).incValueBy(1);
+ } else if (retryWriteException(e, theChunk)) {
+ if (!retryHandler.isRollbackException(e)) {
+ status.setRetry(true);
+ writeChunk(theChunk, status);
+ } else {
+ rollbackRetry = true;
+ status.setRollback(true);
+ // inc rollbackCount
+ stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
+ }
+ } else {
+ throw new BatchContainerRuntimeException(e);
+ }
+ }
+
+ } catch (Throwable e) {
+ throw new BatchContainerRuntimeException(e);
+ }
+ }
+ }
+
+ private void invokeChunk() {
+ int itemCount = ChunkHelper.getItemCount(chunk);
+ int timeInterval = ChunkHelper.getTimeLimit(chunk);
+ boolean checkPointed = true;
+ boolean rollback = false;
+ Throwable caughtThrowable = null;
+
+ // begin new transaction at first iteration or after a checkpoint commit
+
+ try {
+ transactionManager.begin();
+ this.openReaderAndWriter();
+ transactionManager.commit();
+
+ while (true) {
+
+ if (checkPointed || rollback) {
+ if ("custom".equals(checkpointProxy.getCheckpointType())) {
+ int newtimeOut = this.checkpointManager.checkpointTimeout();
+ transactionManager.setTransactionTimeout(newtimeOut);
+ }
+ transactionManager.begin();
+ for (ChunkListenerProxy chunkProxy : chunkListeners) {
+ chunkProxy.beforeChunk();
+ }
+
+ if (rollback) {
+ positionReaderAtCheckpoint();
+ positionWriterAtCheckpoint();
+ checkpointManager = new CheckpointManager(readerProxy, writerProxy,
+ getCheckpointAlgorithm(itemCount, timeInterval), jobExecutionImpl
+ .getJobInstance().getInstanceId(), step.getId());
+ }
+ }
+
+ ItemStatus status = new ItemStatus();
+
+ if (rollback) {
+ rollback = false;
+ }
+
+ final List<Object> chunkToWrite = readAndProcess(itemCount, status);
+
+ if (status.isRollback()) {
+ itemCount = 1;
+ rollback = true;
+
+ readerProxy.close();
+ writerProxy.close();
+
+ transactionManager.rollback();
+
+ continue;
+ }
+
+ writeChunk(chunkToWrite, status);
+
+ if (status.isRollback()) {
+ itemCount = 1;
+ rollback = true;
+
+ readerProxy.close();
+ writerProxy.close();
+
+ transactionManager.rollback();
+
+ continue;
+ }
+ checkPointed = status.isCheckPointed();
+
+ // we could finish the chunk in 3 conditions: buffer is full,
+ // checkpoint, not more input
+ if (status.isCheckPointed() || status.isFinished()) {
+ // TODO: missing before checkpoint listeners
+ // 1.- check if spec list proper steps for before checkpoint
+ // 2.- ask Andy about retry
+ // 3.- when do we stop?
+
+ checkpointManager.checkpoint();
+
+ for (ChunkListenerProxy chunkProxy : chunkListeners) {
+ chunkProxy.afterChunk();
+ }
+
+ this.persistUserData();
+
+ this.chkptAlg.beginCheckpoint();
+
+ transactionManager.commit();
+
+ this.chkptAlg.endCheckpoint();
+
+ invokeCollectorIfPresent();
+
+ // exit loop when last record is written
+ if (status.isFinished()) {
+ transactionManager.begin();
+
+ readerProxy.close();
+ writerProxy.close();
+
+ transactionManager.commit();
+ // increment commitCount
+ stepContext.getMetric(MetricImpl.MetricType.COMMIT_COUNT).incValue();
+ break;
+ } else {
+ // increment commitCount
+ stepContext.getMetric(MetricImpl.MetricType.COMMIT_COUNT).incValue();
+ }
+
+ }
+
+ }
+ } catch (final Exception e) {
+ caughtThrowable = e;
+ logger.log(Level.SEVERE, "Failure in Read-Process-Write Loop", e);
+ // Only try to call onError() if we have an Exception, but not an Error.
+ for (ChunkListenerProxy chunkProxy : chunkListeners) {
+ try {
+ chunkProxy.onError(e);
+ } catch (final Exception e1) {
+ logger.log(Level.SEVERE, e1.getMessage(), e1);
+ }
+ }
+ } catch (final Throwable t) {
+ caughtThrowable = t;
+ logger.log(Level.SEVERE, t.getMessage(), t);
+ } finally {
+ if (caughtThrowable != null) {
+ transactionManager.setRollbackOnly();
+ readerProxy.close();
+ writerProxy.close();
+ transactionManager.rollback();
+ throw new BatchContainerRuntimeException("Failure in Read-Process-Write Loop", caughtThrowable);
+ }
+ }
+ }
+
+ protected void invokeCoreStep() throws BatchContainerServiceException {
+
+ this.chunk = step.getChunk();
+
+ initializeChunkArtifacts();
+
+ invokeChunk();
+ }
+
+ private CheckpointAlgorithm getCheckpointAlgorithm(final int itemCount, final int timeInterval) {
+ final CheckpointAlgorithm alg;
+ if ("item".equals(checkpointProxy.getCheckpointType())) {
+ alg = new ItemCheckpointAlgorithm();
+ ((ItemCheckpointAlgorithm) alg).setThresholds(itemCount, timeInterval);
+ } else { // custom chkpt alg
+ alg = checkpointProxy;
+ }
+
+ return alg;
+ }
+
+ /*
+ * Initialize itemreader, itemwriter, and item processor checkpoint
+ */
+ private void initializeChunkArtifacts() {
+ final int itemCount = ChunkHelper.getItemCount(chunk);
+ final int timeInterval = ChunkHelper.getTimeLimit(chunk);
+
+ {
+ final ItemReader itemReader = chunk.getReader();
+ final List<Property> itemReaderProps = itemReader.getProperties() == null ? null : itemReader.getProperties().getPropertyList();
+ final InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, itemReaderProps);
+ readerProxy = ProxyFactory.createItemReaderProxy(itemReader.getRef(), injectionRef, stepContext, jobExecutionImpl);
+ }
+
+ {
+ final ItemProcessor itemProcessor = chunk.getProcessor();
+ if (itemProcessor != null) {
+ final List<Property> itemProcessorProps = itemProcessor.getProperties() == null ? null : itemProcessor.getProperties().getPropertyList();
+ final InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, itemProcessorProps);
+ processorProxy = ProxyFactory.createItemProcessorProxy(itemProcessor.getRef(), injectionRef, stepContext, jobExecutionImpl);
+ }
+ }
+
+ {
+ final ItemWriter itemWriter = chunk.getWriter();
+ final List<Property> itemWriterProps = itemWriter.getProperties() == null ? null : itemWriter.getProperties().getPropertyList();
+ final InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, itemWriterProps);
+ writerProxy = ProxyFactory.createItemWriterProxy(itemWriter.getRef(), injectionRef, stepContext, jobExecutionImpl);
+ }
+
+ {
+ final List<Property> propList;
+ if (chunk.getCheckpointAlgorithm() != null) {
+ propList = (chunk.getCheckpointAlgorithm().getProperties() == null) ? null : chunk.getCheckpointAlgorithm().getProperties().getPropertyList();
+ } else {
+ propList = null;
+ }
+
+ final InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propList);
+ checkpointProxy = CheckpointAlgorithmFactory.getCheckpointAlgorithmProxy(step, injectionRef, stepContext, jobExecutionImpl);
+ }
+
+ {
+ final InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, null);
+
+ this.chunkListeners = jobExecutionImpl.getListenerFactory().getChunkListeners(step, injectionRef, stepContext, jobExecutionImpl);
+ this.itemReadListeners = jobExecutionImpl.getListenerFactory().getItemReadListeners(step, injectionRef, stepContext, jobExecutionImpl);
+ this.itemProcessListeners = jobExecutionImpl.getListenerFactory().getItemProcessListeners(step, injectionRef, stepContext, jobExecutionImpl);
+ this.itemWriteListeners = jobExecutionImpl.getListenerFactory().getItemWriteListeners(step, injectionRef, stepContext, jobExecutionImpl);
+ final List<SkipProcessListenerProxy> skipProcessListeners = jobExecutionImpl.getListenerFactory().getSkipProcessListeners(step, injectionRef, stepContext, jobExecutionImpl);
+ final List<SkipReadListenerProxy> skipReadListeners = jobExecutionImpl.getListenerFactory().getSkipReadListeners(step, injectionRef, stepContext, jobExecutionImpl);
+ final List<SkipWriteListenerProxy> skipWriteListeners = jobExecutionImpl.getListenerFactory().getSkipWriteListeners(step, injectionRef, stepContext, jobExecutionImpl);
+ final List<RetryProcessListenerProxy> retryProcessListeners = jobExecutionImpl.getListenerFactory().getRetryProcessListeners(step, injectionRef, stepContext, jobExecutionImpl);
+ final List<RetryReadListenerProxy> retryReadListeners = jobExecutionImpl.getListenerFactory().getRetryReadListeners(step, injectionRef, stepContext, jobExecutionImpl);
+ final List<RetryWriteListenerProxy> retryWriteListeners = jobExecutionImpl.getListenerFactory().getRetryWriteListeners(step, injectionRef, stepContext, jobExecutionImpl);
+
+ if ("item".equals(checkpointProxy.getCheckpointType())) {
+ chkptAlg = new ItemCheckpointAlgorithm();
+ ItemCheckpointAlgorithm.class.cast(chkptAlg).setThresholds(itemCount, timeInterval);
+ } else { // custom chkpt alg
+ chkptAlg = checkpointProxy;
+ }
+
+ checkpointManager = new CheckpointManager(readerProxy, writerProxy, chkptAlg, jobExecutionImpl.getJobInstance().getInstanceId(), step.getId());
+
+ skipHandler = new SkipHandler(chunk);
+ skipHandler.addSkipProcessListener(skipProcessListeners);
+ skipHandler.addSkipReadListener(skipReadListeners);
+ skipHandler.addSkipWriteListener(skipWriteListeners);
+
+ retryHandler = new RetryHandler(chunk);
+
+ retryHandler.addRetryProcessListener(retryProcessListeners);
+ retryHandler.addRetryReadListener(retryReadListeners);
+ retryHandler.addRetryWriteListener(retryWriteListeners);
+ }
+ }
+
+ private void openReaderAndWriter() {
+ readerChkptDK = new CheckpointDataKey(jobExecutionImpl.getJobInstance().getInstanceId(), step.getId(), CheckpointType.READER);
+ CheckpointData readerChkptData = persistenceManagerService.getCheckpointData(readerChkptDK);
+ try {
+
+ // check for data in backing store
+ if (readerChkptData != null) {
+ final byte[] readertoken = readerChkptData.getRestartToken();
+ final ByteArrayInputStream readerChkptBA = new ByteArrayInputStream(readertoken);
+ TCCLObjectInputStream readerOIS;
+ try {
+ readerOIS = new TCCLObjectInputStream(readerChkptBA);
+ readerProxy.open((Serializable) readerOIS.readObject());
+ readerOIS.close();
+ } catch (final Exception ex) {
+ // is this what I should be throwing here?
+ throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + step.getId() + "]", ex);
+ }
+ } else {
+ // no chkpt data exists in the backing store
+ readerChkptData = null;
+ readerProxy.open(null);
+ }
+ } catch (final ClassCastException e) {
+ throw new IllegalStateException("Expected CheckpointData but found" + readerChkptData);
+ }
+
+ writerChkptDK = new CheckpointDataKey(jobExecutionImpl.getJobInstance().getInstanceId(), step.getId(), CheckpointType.WRITER);
+ CheckpointData writerChkptData = persistenceManagerService.getCheckpointData(writerChkptDK);
+ try {
+ // check for data in backing store
+ if (writerChkptData != null) {
+ final byte[] writertoken = writerChkptData.getRestartToken();
+ final ByteArrayInputStream writerChkptBA = new ByteArrayInputStream(writertoken);
+ TCCLObjectInputStream writerOIS;
+ try {
+ writerOIS = new TCCLObjectInputStream(writerChkptBA);
+ writerProxy.open((Serializable) writerOIS.readObject());
+ writerOIS.close();
+ } catch (final Exception ex) {
+ // is this what I should be throwing here?
+ throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + step.getId() + "]", ex);
+ }
+ } else {
+ // no chkpt data exists in the backing store
+ writerChkptData = null;
+ writerProxy.open(null);
+ }
+ } catch (final ClassCastException e) {
+ throw new IllegalStateException("Expected Checkpoint but found" + writerChkptData);
+ }
+
+ // set up metrics
+ // stepContext.addMetric(MetricImpl.Counter.valueOf("READ_COUNT"), 0);
+ // stepContext.addMetric(MetricImpl.Counter.valueOf("WRITE_COUNT"), 0);
+ // stepContext.addMetric(MetricImpl.Counter.valueOf("READ_SKIP_COUNT"), 0);
+ // stepContext.addMetric(MetricImpl.Counter.valueOf("PROCESS_SKIP_COUNT"), 0);
+ // stepContext.addMetric(MetricImpl.Counter.valueOf("WRITE_SKIP_COUNT"), 0);
+ }
+
+ @Override
+ public void stop() {
+ stepContext.setBatchStatus(BatchStatus.STOPPING);
+
+ // we don't need to call stop on the chunk implementation here since a
+ // chunk always returns control to
+ // the batch container after every item.
+
+ }
+
+ private boolean skipReadException(final Exception e) {
+ try {
+ skipHandler.handleExceptionRead(e);
+ } catch (final BatchContainerRuntimeException bcre) {
+ return false;
+ }
+ return true;
+ }
+
+ private boolean retryReadException(final Exception e) {
+ try {
+ retryHandler.handleExceptionRead(e);
+ } catch (final BatchContainerRuntimeException bcre) {
+ return false;
+ }
+ return true;
+
+ }
+
+ private boolean skipProcessException(final Exception e, final Object record) {
+ try {
+ skipHandler.handleExceptionWithRecordProcess(e, record);
+ } catch (BatchContainerRuntimeException bcre) {
+ return false;
+ }
+ return true;
+
+ }
+
+ private boolean retryProcessException(final Exception e, final Object record) {
+ try {
+ retryHandler.handleExceptionProcess(e, record);
+ } catch (BatchContainerRuntimeException bcre) {
+ return false;
+ }
+ return true;
+ }
+
+ private boolean skipWriteException(final Exception e, final List<Object> chunkToWrite) {
+ try {
+ skipHandler.handleExceptionWithRecordListWrite(e, chunkToWrite);
+ } catch (BatchContainerRuntimeException bcre) {
+ return false;
+ }
+ return true;
+ }
+
+ private boolean retryWriteException(final Exception e, final List<Object> chunkToWrite) {
+ try {
+ retryHandler.handleExceptionWrite(e, chunkToWrite);
+ } catch (BatchContainerRuntimeException bcre) {
+ return false;
+ }
+ return true;
+ }
+
+ private void positionReaderAtCheckpoint() {
+ readerChkptDK = new CheckpointDataKey(jobExecutionImpl.getJobInstance().getInstanceId(), step.getId(), CheckpointType.READER);
+
+ CheckpointData readerData = persistenceManagerService.getCheckpointData(readerChkptDK);
+ try {
+ // check for data in backing store
+ if (readerData != null) {
+ byte[] readertoken = readerData.getRestartToken();
+ ByteArrayInputStream readerChkptBA = new ByteArrayInputStream(readertoken);
+ TCCLObjectInputStream readerOIS;
+ try {
+ readerOIS = new TCCLObjectInputStream(readerChkptBA);
+ readerProxy.open((Serializable) readerOIS.readObject());
+ readerOIS.close();
+ } catch (Exception ex) {
+ // is this what I should be throwing here?
+ throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + step.getId() + "]", ex);
+ }
+ } else {
+ // no chkpt data exists in the backing store
+ readerData = null;
+ readerProxy.open(null);
+ }
+ } catch (final ClassCastException e) {
+ throw new IllegalStateException("Expected CheckpointData but found" + readerData);
+ }
+ }
+
+ private void positionWriterAtCheckpoint() {
+ writerChkptDK = new CheckpointDataKey(jobExecutionImpl.getJobInstance().getInstanceId(), step.getId(), CheckpointType.WRITER);
+
+ CheckpointData writerData = persistenceManagerService.getCheckpointData(writerChkptDK);
+ try {
+ // check for data in backing store
+ if (writerData != null) {
+ byte[] writertoken = writerData.getRestartToken();
+ ByteArrayInputStream writerChkptBA = new ByteArrayInputStream(writertoken);
+ TCCLObjectInputStream writerOIS;
+ try {
+ writerOIS = new TCCLObjectInputStream(writerChkptBA);
+ writerProxy.open((Serializable) writerOIS.readObject());
+ writerOIS.close();
+ } catch (Exception ex) {
+ // is this what I should be throwing here?
+ throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + step.getId() + "]", ex);
+ }
+ } else {
+ // no chkpt data exists in the backing store
+ writerData = null;
+ writerProxy.open(null);
+ }
+ } catch (ClassCastException e) {
+ throw new IllegalStateException("Expected CheckpointData but found" + writerData);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ItemCheckpointAlgorithm.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ItemCheckpointAlgorithm.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ItemCheckpointAlgorithm.java
new file mode 100755
index 0000000..f95aef3
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ItemCheckpointAlgorithm.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.batchee.container.impl.controller.chunk;
+
+import javax.batch.api.chunk.CheckpointAlgorithm;
+
+public final class ItemCheckpointAlgorithm implements CheckpointAlgorithm {
+ private long requests = 0;
+ private long checkpointBeginTime = 0;
+
+ private int time;
+ private int item;
+ private long currentTime;
+
+ public ItemCheckpointAlgorithm() {
+ checkpointBeginTime = System.currentTimeMillis();
+ currentTime = checkpointBeginTime;
+ }
+
+ @Override
+ public void endCheckpoint() throws Exception {
+ // no-op
+ }
+
+ public boolean isReadyToCheckpointItem() throws Exception {
+ requests++;
+
+ final boolean itemready = (requests >= item);
+ if (itemready) {
+ requests = 0;
+ }
+ return itemready;
+ }
+
+ public boolean isReadyToCheckpointTime() throws Exception {
+ boolean timeready = false;
+ currentTime = System.currentTimeMillis();
+ final long curdiff = currentTime - checkpointBeginTime;
+ final int diff = (int) curdiff / 1000;
+
+ if (diff >= time) {
+ timeready = true;
+
+ checkpointBeginTime = System.currentTimeMillis();
+
+ }
+
+ return timeready;
+ }
+
+ @Override
+ public boolean isReadyToCheckpoint() throws Exception {
+ boolean ready = false;
+
+ if (time == 0) { // no time limit, just check if item count has been reached
+ if (isReadyToCheckpointItem()) {
+ ready = true;
+ }
+ } else if (isReadyToCheckpointItem() || isReadyToCheckpointTime()) {
+ ready = true;
+ }
+
+ return ready;
+ }
+
+ public void setThresholds(int itemthreshold, int timethreshold) {
+ item = itemthreshold;
+ time = timethreshold;
+ }
+
+ @Override
+ public void beginCheckpoint() throws Exception {
+ checkpointBeginTime = currentTime;
+ }
+
+ @Override
+ public int checkpointTimeout() throws Exception {
+ return 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/PersistentDataWrapper.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/PersistentDataWrapper.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/PersistentDataWrapper.java
new file mode 100755
index 0000000..33611f7
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/PersistentDataWrapper.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.container.impl.controller.chunk;
+
+import java.io.Serializable;
+
+public class PersistentDataWrapper implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private byte[] persistentDataBytes;
+
+ public PersistentDataWrapper(byte[] persistentData) {
+ this.persistentDataBytes = persistentData;
+ }
+
+ public byte[] getPersistentDataBytes() {
+ return persistentDataBytes;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/RetryHandler.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/RetryHandler.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/RetryHandler.java
new file mode 100755
index 0000000..c0deda8
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/RetryHandler.java
@@ -0,0 +1,229 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.container.impl.controller.chunk;
+
+import org.apache.batchee.container.exception.BatchContainerRuntimeException;
+import org.apache.batchee.container.proxy.RetryProcessListenerProxy;
+import org.apache.batchee.container.proxy.RetryReadListenerProxy;
+import org.apache.batchee.container.proxy.RetryWriteListenerProxy;
+import org.apache.batchee.jaxb.Chunk;
+import org.apache.batchee.jaxb.ExceptionClassFilter;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class RetryHandler {
+ private List<RetryProcessListenerProxy> _retryProcessListeners = null;
+ private List<RetryReadListenerProxy> _retryReadListeners = null;
+ private List<RetryWriteListenerProxy> _retryWriteListeners = null;
+
+ private Set<String> _retryNoRBIncludeExceptions = null;
+ private Set<String> _retryNoRBExcludeExceptions = null;
+ private Set<String> _retryIncludeExceptions = null;
+ private Set<String> _retryExcludeExceptions = null;
+ private int _retryLimit = Integer.MIN_VALUE;
+ private long _retryCount = 0;
+ private Exception _retryException = null;
+
+ public RetryHandler(final Chunk chunk) {
+ try {
+ if (chunk.getRetryLimit() != null) {
+ _retryLimit = Integer.parseInt(chunk.getRetryLimit());
+ if (_retryLimit < 0) {
+ throw new IllegalArgumentException("The retry-limit attribute on a chunk cannot be a negative value");
+ }
+
+ }
+ } catch (final NumberFormatException nfe) {
+ throw new RuntimeException("NumberFormatException reading retry-limit", nfe);
+ }
+
+ // Read the include/exclude exceptions.
+ _retryIncludeExceptions = new HashSet<String>();
+ _retryExcludeExceptions = new HashSet<String>();
+ _retryNoRBIncludeExceptions = new HashSet<String>();
+ _retryNoRBExcludeExceptions = new HashSet<String>();
+
+ if (chunk.getRetryableExceptionClasses() != null) {
+ if (chunk.getRetryableExceptionClasses().getIncludeList() != null) {
+ final List<ExceptionClassFilter.Include> includes = chunk.getRetryableExceptionClasses().getIncludeList();
+ for (final ExceptionClassFilter.Include include : includes) {
+ _retryIncludeExceptions.add(include.getClazz().trim());
+ }
+ }
+ if (chunk.getRetryableExceptionClasses().getExcludeList() != null) {
+ final List<ExceptionClassFilter.Exclude> excludes = chunk.getRetryableExceptionClasses().getExcludeList();
+ for (final ExceptionClassFilter.Exclude exclude : excludes) {
+ _retryExcludeExceptions.add(exclude.getClazz().trim());
+ }
+ }
+ }
+
+ if (chunk.getNoRollbackExceptionClasses() != null) {
+ if (chunk.getNoRollbackExceptionClasses().getIncludeList() != null) {
+ final List<ExceptionClassFilter.Include> includes = chunk.getNoRollbackExceptionClasses().getIncludeList();
+ for (final ExceptionClassFilter.Include include : includes) {
+ _retryNoRBIncludeExceptions.add(include.getClazz().trim());
+ }
+ }
+ if (chunk.getNoRollbackExceptionClasses().getExcludeList() != null) {
+ final List<ExceptionClassFilter.Exclude> excludes = chunk.getNoRollbackExceptionClasses().getExcludeList();
+ for (final ExceptionClassFilter.Exclude exclude : excludes) {
+ _retryNoRBExcludeExceptions.add(exclude.getClazz().trim());
+ }
+ }
+ }
+ }
+
+
+ /**
+ * Add the user-defined RetryProcessListener.
+ */
+ public void addRetryProcessListener(List<RetryProcessListenerProxy> retryProcessListeners) {
+ _retryProcessListeners = retryProcessListeners;
+ }
+
+ /**
+ * Add the user-defined RetryReadListener.
+ */
+ public void addRetryReadListener(List<RetryReadListenerProxy> retryReadListeners) {
+ _retryReadListeners = retryReadListeners;
+ }
+
+ /**
+ * Add the user-defined RetryWriteListener.
+ */
+ public void addRetryWriteListener(List<RetryWriteListenerProxy> retryWriteListeners) {
+ _retryWriteListeners = retryWriteListeners;
+ }
+
+ public boolean isRollbackException(Exception e) {
+ return !isNoRollbackException(e);
+ }
+
+ /**
+ * Handle exception from a read failure.
+ */
+ public void handleExceptionRead(final Exception e) {
+ if (isRetryLimitReached() || !isRetryable(e)) {
+ throw new BatchContainerRuntimeException(e);
+ }
+
+ _retryException = e;
+ // Retry it. Log it. Call the RetryListener.
+ ++_retryCount;
+
+ if (_retryReadListeners != null) {
+ for (final RetryReadListenerProxy retryReadListenerProxy : _retryReadListeners) {
+ retryReadListenerProxy.onRetryReadException(e);
+ }
+ }
+ }
+
+ /**
+ * Handle exception from a process failure.
+ */
+ public void handleExceptionProcess(final Exception e, final Object w) {
+ if (isRetryLimitReached() || !isRetryable(e)) {
+ throw new BatchContainerRuntimeException(e);
+ }
+
+ _retryException = e;
+ // Retry it. Log it. Call the RetryListener.
+ ++_retryCount;
+
+ if (_retryProcessListeners != null) {
+ for (final RetryProcessListenerProxy retryProcessListenerProxy : _retryProcessListeners) {
+ retryProcessListenerProxy.onRetryProcessException(w, e);
+ }
+ }
+ }
+
+ /**
+ * Handle exception from a write failure.
+ */
+ public void handleExceptionWrite(final Exception e, final List<Object> w) {
+ if (isRetryLimitReached() || !isRetryable(e)) {
+ throw new BatchContainerRuntimeException(e);
+ }
+
+ // Retry it. Log it. Call the RetryListener.
+ _retryException = e;
+ ++_retryCount;
+
+ if (_retryWriteListeners != null) {
+ for (final RetryWriteListenerProxy retryWriteListenerProxy : _retryWriteListeners) {
+ retryWriteListenerProxy.onRetryWriteException(w, e);
+ }
+ }
+ }
+
+
+ /**
+ * Check the retryable exception lists to determine whether
+ * the given Exception is retryable.
+ */
+ private boolean isRetryable(final Exception e) {
+ return containsException(_retryIncludeExceptions, e) && !containsException(_retryExcludeExceptions, e);
+ }
+
+ private boolean isNoRollbackException(final Exception e) {
+ return containsException(_retryNoRBIncludeExceptions, e) && !containsException(_retryNoRBExcludeExceptions, e);
+ }
+
+ /**
+ * Check whether given exception is in the specified exception list
+ */
+ private boolean containsException(final Set<String> retryList, final Exception e) {
+ boolean retVal = false;
+
+ final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+ for (final String exClassName : retryList) {
+ try {
+ retVal = classLoader.loadClass(exClassName).isInstance(e);
+ if (retVal) {
+ break;
+ }
+ } catch (final ClassNotFoundException cnf) {
+ // no-op
+ }
+ }
+
+ return retVal;
+ }
+
+ /**
+ * Check if the retry limit has been reached.
+ * <p/>
+ * Note: if retry handling isn't enabled (i.e. not configured in xJCL), then this method
+ * will always return TRUE.
+ */
+ private boolean isRetryLimitReached() {
+ // Unlimited retries if it is never defined
+ return _retryLimit != Integer.MIN_VALUE && (_retryCount >= _retryLimit);
+
+ }
+
+ public Exception getException() {
+ return _retryException;
+ }
+
+ public String toString() {
+ return "RetryHandler{" + super.toString() + "}count:limit=" + _retryCount + ":" + _retryLimit;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/SkipHandler.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/SkipHandler.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/SkipHandler.java
new file mode 100755
index 0000000..02092da
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/SkipHandler.java
@@ -0,0 +1,199 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.batchee.container.impl.controller.chunk;
+
+import org.apache.batchee.container.exception.BatchContainerRuntimeException;
+import org.apache.batchee.container.proxy.SkipProcessListenerProxy;
+import org.apache.batchee.container.proxy.SkipReadListenerProxy;
+import org.apache.batchee.container.proxy.SkipWriteListenerProxy;
+import org.apache.batchee.jaxb.Chunk;
+import org.apache.batchee.jaxb.ExceptionClassFilter;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class SkipHandler {
+
+ /**
+ * Logic for handling skipped records.
+ */
+
+ private List<SkipProcessListenerProxy> _skipProcessListener = null;
+ private List<SkipReadListenerProxy> _skipReadListener = null;
+ private List<SkipWriteListenerProxy> _skipWriteListener = null;
+
+ private Set<String> _skipIncludeExceptions = null;
+ private Set<String> _skipExcludeExceptions = null;
+ private int _skipLimit = Integer.MIN_VALUE;
+ private long _skipCount = 0;
+
+ public SkipHandler(final Chunk chunk) {
+ try {
+ if (chunk.getSkipLimit() != null) {
+ _skipLimit = Integer.parseInt(chunk.getSkipLimit());
+ if (_skipLimit < 0) {
+ throw new IllegalArgumentException("The skip-limit attribute on a chunk cannot be a negative value");
+ }
+ }
+ } catch (final NumberFormatException nfe) {
+ throw new RuntimeException("NumberFormatException reading skip-limit", nfe);
+ }
+
+
+ // Read the include/exclude exceptions.
+
+ _skipIncludeExceptions = new HashSet<String>();
+ _skipExcludeExceptions = new HashSet<String>();
+
+ if (chunk.getSkippableExceptionClasses() != null && chunk.getSkippableExceptionClasses().getIncludeList() != null) {
+ final List<ExceptionClassFilter.Include> includes = chunk.getSkippableExceptionClasses().getIncludeList();
+ for (final ExceptionClassFilter.Include include : includes) {
+ _skipIncludeExceptions.add(include.getClazz().trim());
+ }
+ }
+
+ if (chunk.getSkippableExceptionClasses() != null && chunk.getSkippableExceptionClasses().getExcludeList() != null) {
+ final List<ExceptionClassFilter.Exclude> excludes = chunk.getSkippableExceptionClasses().getExcludeList();
+ for (final ExceptionClassFilter.Exclude exclude : excludes) {
+ _skipExcludeExceptions.add(exclude.getClazz().trim());
+ }
+ }
+ }
+
+ /**
+ * Add the user-defined SkipReadListeners.
+ */
+ public void addSkipReadListener(List<SkipReadListenerProxy> skipReadListener) {
+ _skipReadListener = skipReadListener;
+ }
+
+ /**
+ * Add the user-defined SkipWriteListeners.
+ */
+ public void addSkipWriteListener(List<SkipWriteListenerProxy> skipWriteListener) {
+ _skipWriteListener = skipWriteListener;
+ }
+
+ /**
+ * Add the user-defined SkipReadListeners.
+ */
+ public void addSkipProcessListener(List<SkipProcessListenerProxy> skipProcessListener) {
+ _skipProcessListener = skipProcessListener;
+ }
+
+
+ /**
+ * Handle exception from a read failure.
+ */
+ public void handleExceptionRead(Exception e) {
+ if (isSkipLimitReached() || !isSkippable(e)) {
+ throw new BatchContainerRuntimeException(e);
+ }
+
+ // Skip it. Log it. Call the SkipListener.
+ ++_skipCount;
+
+ if (_skipReadListener != null) {
+ for (final SkipReadListenerProxy skipReadListenerProxy : _skipReadListener) {
+ skipReadListenerProxy.onSkipReadItem(e);
+ }
+ }
+ }
+
+ /**
+ * Handle exception from a process failure.
+ */
+ public void handleExceptionWithRecordProcess(final Exception e, final Object w) {
+ if (isSkipLimitReached() || !isSkippable(e)) {
+ throw new BatchContainerRuntimeException(e);
+ }
+
+ // Skip it. Log it. Call the SkipProcessListener.
+ ++_skipCount;
+
+ if (_skipProcessListener != null) {
+ for (SkipProcessListenerProxy skipProcessListenerProxy : _skipProcessListener) {
+ skipProcessListenerProxy.onSkipProcessItem(w, e);
+ }
+ }
+ }
+
+ /**
+ * Handle exception from a write failure.
+ */
+ public void handleExceptionWithRecordListWrite(final Exception e, final List<Object> items) {
+ if (isSkipLimitReached() || !isSkippable(e)) {
+ throw new BatchContainerRuntimeException(e);
+ }
+
+ // Skip it. Log it. Call the SkipListener.
+ ++_skipCount;
+
+ if (_skipWriteListener != null) {
+ for (SkipWriteListenerProxy skipWriteListenerProxy : _skipWriteListener) {
+ skipWriteListenerProxy.onSkipWriteItem(items, e);
+ }
+ }
+ }
+
+
+ /**
+ * Check the skipCount and skippable exception lists to determine whether
+ * the given Exception is skippable.
+ */
+ private boolean isSkippable(final Exception e) {
+ return containsSkippable(_skipIncludeExceptions, e) && !containsSkippable(_skipExcludeExceptions, e);
+ }
+
+ /**
+ * Check whether given exception is in skippable exception list
+ */
+ private boolean containsSkippable(final Set<String> skipList, final Exception e) {
+ final ClassLoader tccl = Thread.currentThread().getContextClassLoader();
+ for (final String exClassName : skipList) {
+ try {
+ if (tccl.loadClass(exClassName).isInstance(e)) {
+ return true;
+ }
+ } catch (final ClassNotFoundException cnf) {
+ // no-op
+ }
+ }
+
+ return false;
+ }
+
+
+ /**
+ * Check if the skip limit has been reached.
+ * <p/>
+ * Note: if skip handling isn't enabled (i.e. not configured in xJCL), then
+ * this method will always return TRUE.
+ */
+ private boolean isSkipLimitReached() {
+ // Unlimited skips if it is never defined
+ return _skipLimit != Integer.MIN_VALUE && (_skipCount >= _skipLimit);
+
+ }
+
+ public String toString() {
+ return "SkipHandler{" + super.toString() + "}count:limit=" + _skipCount + ":" + _skipLimit;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/JobExecutionHelper.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/JobExecutionHelper.java b/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/JobExecutionHelper.java
new file mode 100755
index 0000000..5b89a88
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/JobExecutionHelper.java
@@ -0,0 +1,217 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.container.impl.jobinstance;
+
+import org.apache.batchee.container.impl.JobContextImpl;
+import org.apache.batchee.container.impl.JobInstanceImpl;
+import org.apache.batchee.container.jsl.JobModelResolver;
+import org.apache.batchee.container.modelresolver.PropertyResolver;
+import org.apache.batchee.container.modelresolver.PropertyResolverFactory;
+import org.apache.batchee.container.navigator.ModelNavigator;
+import org.apache.batchee.container.navigator.NavigatorFactory;
+import org.apache.batchee.container.services.InternalJobExecution;
+import org.apache.batchee.container.services.JobStatusManagerService;
+import org.apache.batchee.container.services.ServicesManager;
+import org.apache.batchee.container.status.JobStatus;
+import org.apache.batchee.jaxb.JSLJob;
+import org.apache.batchee.jaxb.JSLProperties;
+import org.apache.batchee.spi.PersistenceManagerService;
+import org.apache.batchee.spi.SecurityService;
+
+import javax.batch.operations.JobExecutionAlreadyCompleteException;
+import javax.batch.operations.JobExecutionNotMostRecentException;
+import javax.batch.operations.JobRestartException;
+import javax.batch.operations.JobStartException;
+import javax.batch.operations.NoSuchJobExecutionException;
+import javax.batch.runtime.BatchStatus;
+import javax.batch.runtime.JobInstance;
+import java.util.Properties;
+
+public class JobExecutionHelper {
+ private static final JobStatusManagerService JOB_STATUS_MANAGER_SERVICE = ServicesManager.service(JobStatusManagerService.class);
+ private static final PersistenceManagerService PERSISTENCE_MANAGER_SERVICE = ServicesManager.service(PersistenceManagerService.class);
+ private static final SecurityService SECURITY_SERVICE = ServicesManager.service(SecurityService.class);
+
+ private static ModelNavigator<JSLJob> getResolvedJobNavigator(final String jobXml, final Properties jobParameters, final boolean parallelExecution) {
+ final JSLJob jobModel = new JobModelResolver().resolveModel(jobXml);
+ final PropertyResolver<JSLJob> propResolver = PropertyResolverFactory.createJobPropertyResolver(parallelExecution);
+ propResolver.substituteProperties(jobModel, jobParameters);
+ return NavigatorFactory.createJobNavigator(jobModel);
+ }
+
+ private static ModelNavigator<JSLJob> getResolvedJobNavigator(final JSLJob jobModel, final Properties jobParameters, final boolean parallelExecution) {
+ final PropertyResolver<JSLJob> propResolver = PropertyResolverFactory.createJobPropertyResolver(parallelExecution);
+ propResolver.substituteProperties(jobModel, jobParameters);
+ return NavigatorFactory.createJobNavigator(jobModel);
+ }
+
+ private static JobContextImpl getJobContext(ModelNavigator<JSLJob> jobNavigator) {
+ final JSLProperties jslProperties;
+ if (jobNavigator.getRootModelElement() != null) {
+ jslProperties = jobNavigator.getRootModelElement().getProperties();
+ } else {
+ jslProperties = new JSLProperties();
+ }
+ return new JobContextImpl(jobNavigator, jslProperties);
+ }
+
+ private static JobInstance getNewJobInstance(final String name, final String jobXml) {
+ return PERSISTENCE_MANAGER_SERVICE.createJobInstance(name, SECURITY_SERVICE.getLoggedUser(), jobXml);
+ }
+
+ private static JobInstance getNewSubJobInstance(final String name) {
+ return PERSISTENCE_MANAGER_SERVICE.createSubJobInstance(name, SECURITY_SERVICE.getLoggedUser());
+ }
+
+ private static JobStatus createNewJobStatus(final JobInstance jobInstance) {
+ final long instanceId = jobInstance.getInstanceId();
+ final JobStatus jobStatus = JOB_STATUS_MANAGER_SERVICE.createJobStatus(instanceId);
+ jobStatus.setJobInstance(jobInstance);
+ return jobStatus;
+ }
+
+ private static void validateRestartableFalseJobsDoNotRestart(final JSLJob jobModel)
+ throws JobRestartException {
+ if (jobModel.getRestartable() != null && jobModel.getRestartable().equalsIgnoreCase("false")) {
+ throw new JobRestartException("Job Restartable attribute is false, Job cannot be restarted.");
+ }
+ }
+
+ public static RuntimeJobExecution startJob(final String jobXML, final Properties jobParameters) throws JobStartException {
+ final JSLJob jobModel = new JobModelResolver().resolveModel(jobXML);
+ final ModelNavigator<JSLJob> jobNavigator = getResolvedJobNavigator(jobModel, jobParameters, false);
+ final JobContextImpl jobContext = getJobContext(jobNavigator);
+ final JobInstance jobInstance = getNewJobInstance(jobNavigator.getRootModelElement().getId(), jobXML);
+ final RuntimeJobExecution executionHelper = PERSISTENCE_MANAGER_SERVICE.createJobExecution(jobInstance, jobParameters, jobContext.getBatchStatus());
+
+ executionHelper.prepareForExecution(jobContext);
+
+ final JobStatus jobStatus = createNewJobStatus(jobInstance);
+ JOB_STATUS_MANAGER_SERVICE.updateJobStatus(jobStatus);
+
+ return executionHelper;
+ }
+
+ public static RuntimeFlowInSplitExecution startFlowInSplit(final JSLJob jobModel) throws JobStartException {
+ final ModelNavigator<JSLJob> jobNavigator = getResolvedJobNavigator(jobModel, null, true);
+ final JobContextImpl jobContext = getJobContext(jobNavigator);
+ final JobInstance jobInstance = getNewSubJobInstance(jobNavigator.getRootModelElement().getId());
+ final RuntimeFlowInSplitExecution executionHelper = PERSISTENCE_MANAGER_SERVICE.createFlowInSplitExecution(jobInstance, jobContext.getBatchStatus());
+
+ executionHelper.prepareForExecution(jobContext);
+
+ final JobStatus jobStatus = createNewJobStatus(jobInstance);
+ JOB_STATUS_MANAGER_SERVICE.updateJobStatus(jobStatus);
+
+ return executionHelper;
+ }
+
+ public static RuntimeJobExecution startPartition(JSLJob jobModel, Properties jobParameters) throws JobStartException {
+ final ModelNavigator<JSLJob> jobNavigator = getResolvedJobNavigator(jobModel, jobParameters, true);
+ final JobContextImpl jobContext = getJobContext(jobNavigator);
+
+ final JobInstance jobInstance = getNewSubJobInstance(jobNavigator.getRootModelElement().getId());
+
+ final RuntimeJobExecution executionHelper = PERSISTENCE_MANAGER_SERVICE.createJobExecution(jobInstance, jobParameters, jobContext.getBatchStatus());
+
+ executionHelper.prepareForExecution(jobContext);
+
+ final JobStatus jobStatus = createNewJobStatus(jobInstance);
+ JOB_STATUS_MANAGER_SERVICE.updateJobStatus(jobStatus);
+
+ return executionHelper;
+ }
+
+ private static void validateJobInstanceNotCompleteOrAbandonded(final JobStatus jobStatus) throws JobRestartException, JobExecutionAlreadyCompleteException {
+ if (jobStatus.getBatchStatus() == null) {
+ throw new IllegalStateException("On restart, we didn't find an earlier batch status.");
+ }
+
+ if (jobStatus.getBatchStatus().equals(BatchStatus.COMPLETED)) {
+ throw new JobExecutionAlreadyCompleteException("Already completed job instance = " + jobStatus.getJobInstanceId());
+ } else if (jobStatus.getBatchStatus().equals(BatchStatus.ABANDONED)) {
+ throw new JobRestartException("Abandoned job instance = " + jobStatus.getJobInstanceId());
+ }
+ }
+
+ private static void validateJobExecutionIsMostRecent(final long jobInstanceId, final long executionId) throws JobExecutionNotMostRecentException {
+ final long mostRecentExecutionId = PERSISTENCE_MANAGER_SERVICE.getMostRecentExecutionId(jobInstanceId);
+ if (mostRecentExecutionId != executionId) {
+ throw new JobExecutionNotMostRecentException("ExecutionId: " + executionId + " is not the most recent execution.");
+ }
+ }
+
+ public static RuntimeJobExecution restartPartition(final long execId, final JSLJob gennedJobModel, final Properties partitionProps) throws JobRestartException,
+ JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException, NoSuchJobExecutionException {
+ return restartExecution(execId, gennedJobModel, partitionProps, true, false);
+ }
+
+ public static RuntimeFlowInSplitExecution restartFlowInSplit(final long execId, final JSLJob gennedJobModel) throws JobRestartException,
+ JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException, NoSuchJobExecutionException {
+ return (RuntimeFlowInSplitExecution) restartExecution(execId, gennedJobModel, null, true, true);
+ }
+
+ public static RuntimeJobExecution restartJob(final long executionId, final Properties restartJobParameters) throws JobRestartException,
+ JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException, NoSuchJobExecutionException {
+ return restartExecution(executionId, null, restartJobParameters, false, false);
+ }
+
+ private static RuntimeJobExecution restartExecution(final long executionId, final JSLJob gennedJobModel, final Properties restartJobParameters, final boolean parallelExecution, final boolean flowInSplit) throws JobRestartException,
+ JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException, NoSuchJobExecutionException {
+
+ final long jobInstanceId = PERSISTENCE_MANAGER_SERVICE.getJobInstanceIdByExecutionId(executionId);
+ final JobStatus jobStatus = JOB_STATUS_MANAGER_SERVICE.getJobStatus(jobInstanceId);
+
+ validateJobExecutionIsMostRecent(jobInstanceId, executionId);
+
+ validateJobInstanceNotCompleteOrAbandonded(jobStatus);
+
+ final JobInstanceImpl jobInstance = jobStatus.getJobInstance();
+
+ final ModelNavigator<JSLJob> jobNavigator;
+ // If we are in a parallel job that is genned use the regenned JSL.
+ if (gennedJobModel == null) {
+ jobNavigator = getResolvedJobNavigator(jobInstance.getJobXML(), restartJobParameters, parallelExecution);
+ } else {
+ jobNavigator = getResolvedJobNavigator(gennedJobModel, restartJobParameters, parallelExecution);
+ }
+ // JSLJob jobModel = ModelResolverFactory.createJobResolver().resolveModel(jobInstance.getJobXML());
+ validateRestartableFalseJobsDoNotRestart(jobNavigator.getRootModelElement());
+
+ final JobContextImpl jobContext = getJobContext(jobNavigator);
+
+ final RuntimeJobExecution executionHelper;
+ if (flowInSplit) {
+ executionHelper = PERSISTENCE_MANAGER_SERVICE.createFlowInSplitExecution(jobInstance, jobContext.getBatchStatus());
+ } else {
+ executionHelper = PERSISTENCE_MANAGER_SERVICE.createJobExecution(jobInstance, restartJobParameters, jobContext.getBatchStatus());
+ }
+ executionHelper.prepareForExecution(jobContext, jobStatus.getRestartOn());
+ JOB_STATUS_MANAGER_SERVICE.updateJobStatusWithNewExecution(jobInstance.getInstanceId(), executionHelper.getExecutionId());
+
+ return executionHelper;
+ }
+
+ public static InternalJobExecution getPersistedJobOperatorJobExecution(final long jobExecutionId) throws NoSuchJobExecutionException {
+ return PERSISTENCE_MANAGER_SERVICE.jobOperatorGetJobExecution(jobExecutionId);
+ }
+
+
+ public static JobInstance getJobInstance(final long executionId) {
+ return JOB_STATUS_MANAGER_SERVICE.getJobStatusFromExecutionId(executionId).getJobInstance();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/RuntimeFlowInSplitExecution.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/RuntimeFlowInSplitExecution.java b/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/RuntimeFlowInSplitExecution.java
new file mode 100755
index 0000000..8570351
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/RuntimeFlowInSplitExecution.java
@@ -0,0 +1,37 @@
+/**
+ * Copyright 2012 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.container.impl.jobinstance;
+
+import org.apache.batchee.container.status.ExecutionStatus;
+
+import javax.batch.runtime.JobInstance;
+
+public class RuntimeFlowInSplitExecution extends RuntimeJobExecution {
+ public RuntimeFlowInSplitExecution(final JobInstance jobInstance, final long executionId) {
+ super(jobInstance, executionId);
+ }
+
+ private ExecutionStatus flowStatus;
+
+ public ExecutionStatus getFlowStatus() {
+ return flowStatus;
+ }
+
+ public void setFlowStatus(ExecutionStatus flowStatus) {
+ this.flowStatus = flowStatus;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/RuntimeJobExecution.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/RuntimeJobExecution.java b/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/RuntimeJobExecution.java
new file mode 100755
index 0000000..d9fdb84
--- /dev/null
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/jobinstance/RuntimeJobExecution.java
@@ -0,0 +1,189 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.container.impl.jobinstance;
+
+import org.apache.batchee.container.impl.JobContextImpl;
+import org.apache.batchee.container.impl.JobExecutionImpl;
+import org.apache.batchee.container.navigator.ModelNavigator;
+import org.apache.batchee.container.proxy.ListenerFactory;
+import org.apache.batchee.container.services.InternalJobExecution;
+import org.apache.batchee.jaxb.JSLJob;
+
+import javax.batch.runtime.BatchStatus;
+import javax.batch.runtime.JobInstance;
+import java.io.Closeable;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Properties;
+
+public class RuntimeJobExecution {
+ private ModelNavigator<JSLJob> jobNavigator = null;
+ private JobInstance jobInstance;
+ private long executionId;
+ private String restartOn;
+ private JobContextImpl jobContext = null;
+ private ListenerFactory listenerFactory;
+ private InternalJobExecution operatorJobExecution = null;
+ private Integer partitionInstance = null;
+ private Collection<Closeable> releasables = new ArrayList<Closeable>();
+
+ public RuntimeJobExecution(final JobInstance jobInstance, final long executionId) {
+ this.jobInstance = jobInstance;
+ this.executionId = executionId;
+ this.operatorJobExecution = new JobExecutionImpl(executionId, jobInstance.getInstanceId());
+ }
+
+
+ /*
+ * Non-spec'd methods (not on the interface, but maybe we should
+ * put on a second interface).
+ */
+
+ public void prepareForExecution(final JobContextImpl jobContext, final String restartOn) {
+ this.jobContext = jobContext;
+ this.jobNavigator = jobContext.getNavigator();
+ jobContext.setExecutionId(executionId);
+ jobContext.setInstanceId(jobInstance.getInstanceId());
+ this.restartOn = restartOn;
+ operatorJobExecution.setJobContext(jobContext);
+ }
+
+ public void prepareForExecution(final JobContextImpl jobContext) {
+ prepareForExecution(jobContext, null);
+ }
+
+ public void setRestartOn(final String restartOn) {
+ this.restartOn = restartOn;
+ }
+
+ public long getExecutionId() {
+ return executionId;
+ }
+
+ public long getInstanceId() {
+ return jobInstance.getInstanceId();
+ }
+
+ public JobInstance getJobInstance() {
+ return jobInstance;
+ }
+
+ public ModelNavigator<JSLJob> getJobNavigator() {
+ return jobNavigator;
+ }
+
+ public JobContextImpl getJobContext() {
+ return jobContext;
+ }
+
+ public String getRestartOn() {
+ return restartOn;
+ }
+
+ public ListenerFactory getListenerFactory() {
+ return listenerFactory;
+ }
+
+ public void setListenerFactory(final ListenerFactory listenerFactory) {
+ this.listenerFactory = listenerFactory;
+ }
+
+ public InternalJobExecution getJobOperatorJobExecution() {
+ return operatorJobExecution;
+ }
+
+ public BatchStatus getBatchStatus() {
+ return this.jobContext.getBatchStatus();
+ }
+
+ public String getExitStatus() {
+ return this.jobContext.getExitStatus();
+ }
+
+ public void setBatchStatus(final String status) {
+ operatorJobExecution.setBatchStatus(status);
+ }
+
+ public void setCreateTime(final Timestamp ts) {
+ operatorJobExecution.setCreateTime(ts);
+ }
+
+ public void setEndTime(final Timestamp ts) {
+ operatorJobExecution.setEndTime(ts);
+ }
+
+ public void setExitStatus(final String status) {
+ //exitStatus = status;
+ operatorJobExecution.setExitStatus(status);
+
+ }
+
+ public void setLastUpdateTime(final Timestamp ts) {
+ operatorJobExecution.setLastUpdateTime(ts);
+ }
+
+ public void setStartTime(final Timestamp ts) {
+ operatorJobExecution.setStartTime(ts);
+ }
+
+ public void setJobParameters(final Properties jProps) {
+ operatorJobExecution.setJobParameters(jProps);
+ }
+
+ public Properties getJobParameters() {
+ return operatorJobExecution.getJobParameters();
+ }
+
+ public Date getStartTime() {
+ return operatorJobExecution.getStartTime();
+ }
+
+ public Date getEndTime() {
+ return operatorJobExecution.getEndTime();
+ }
+
+ public Date getLastUpdatedTime() {
+ return operatorJobExecution.getLastUpdatedTime();
+ }
+
+ public Date getCreateTime() {
+ return operatorJobExecution.getCreateTime();
+ }
+
+ @Override
+ public String toString() {
+ return " executionId: " + executionId + " restartOn: " + restartOn + "\n-----------------------\n" + "jobInstance: \n " + jobInstance;
+ }
+
+ public Integer getPartitionInstance() {
+ return partitionInstance;
+ }
+
+ public void setPartitionInstance(final Integer partitionInstance) {
+ this.partitionInstance = partitionInstance;
+ }
+
+ public Collection<Closeable> getReleasables() {
+ return releasables;
+ }
+
+ public synchronized void addReleasable(final Closeable releasable) {
+ releasables.add(releasable);
+ }
+}