You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@batchee.apache.org by st...@apache.org on 2016/05/15 19:58:02 UTC
[3/3] incubator-batchee git commit: BATCHEE-88 improve handling in
case of error during step commit
BATCHEE-88 improve handling in case of error during step commit
Project: http://git-wip-us.apache.org/repos/asf/incubator-batchee/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-batchee/commit/7a74df4b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-batchee/tree/7a74df4b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-batchee/diff/7a74df4b
Branch: refs/heads/master
Commit: 7a74df4bc9295377ab35ea53eacb8204698cd425
Parents: 4313c77
Author: Mark Struberg <st...@apache.org>
Authored: Sun May 15 21:57:00 2016 +0200
Committer: Mark Struberg <st...@apache.org>
Committed: Sun May 15 21:57:00 2016 +0200
----------------------------------------------------------------------
.../batchee/its/transaction/TxErrorTest.java | 29 ++++++++++++++--
.../resources/META-INF/batch-jobs/txtest1.xml | 3 --
.../impl/controller/BaseStepController.java | 17 ++++++++--
.../controller/chunk/CheckpointManager.java | 28 +++++++++++++---
.../controller/chunk/ChunkStepController.java | 35 +++++++++++++++-----
5 files changed, 92 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/7a74df4b/integration-tests/transaction/src/test/java/org/apache/batchee/its/transaction/TxErrorTest.java
----------------------------------------------------------------------
diff --git a/integration-tests/transaction/src/test/java/org/apache/batchee/its/transaction/TxErrorTest.java b/integration-tests/transaction/src/test/java/org/apache/batchee/its/transaction/TxErrorTest.java
index acdbd51..564c7a7 100644
--- a/integration-tests/transaction/src/test/java/org/apache/batchee/its/transaction/TxErrorTest.java
+++ b/integration-tests/transaction/src/test/java/org/apache/batchee/its/transaction/TxErrorTest.java
@@ -19,6 +19,10 @@ package org.apache.batchee.its.transaction;
import javax.batch.operations.JobOperator;
import javax.batch.runtime.BatchRuntime;
import javax.batch.runtime.BatchStatus;
+import javax.batch.runtime.Metric;
+import javax.batch.runtime.StepExecution;
+
+import java.util.List;
import org.apache.batchee.util.Batches;
import org.testng.Assert;
@@ -30,8 +34,27 @@ public class TxErrorTest {
@Test
public void testRolledBackDuringWork() {
final JobOperator jobOperator = BatchRuntime.getJobOperator();
- BatchStatus batchStatus = Batches.waitFor(jobOperator, jobOperator.start("txtest1", null));
- Assert.assertEquals(batchStatus, BatchStatus.COMPLETED);
- Assert.assertEquals(TxErrorWriter1.written.intValue(), 5);
+ long executionId = jobOperator.start("txtest1", null);
+ BatchStatus batchStatus = Batches.waitFor(jobOperator, executionId);
+ Assert.assertEquals(batchStatus, BatchStatus.FAILED);
+ Assert.assertEquals(TxErrorWriter1.written.intValue(), 3);
+
+ List<StepExecution> stepExecutions = jobOperator.getStepExecutions(executionId);
+ Assert.assertEquals(stepExecutions.size(), 1);
+ StepExecution stepExecution = stepExecutions.get(0);
+ Metric[] metrics = stepExecution.getMetrics();
+ assertMetric(Metric.MetricType.READ_COUNT, 2, metrics);
+ assertMetric(Metric.MetricType.WRITE_COUNT, 2, metrics);
+ assertMetric(Metric.MetricType.ROLLBACK_COUNT, 1, metrics);
+ }
+
+ private void assertMetric(Metric.MetricType metricType, long expected, Metric[] metrics) {
+ for (Metric metric : metrics) {
+ if (metricType.equals(metric.getType())) {
+ Assert.assertEquals(metric.getValue(), expected);
+ return;
+ }
+ }
+ Assert.fail("MetricType " + metricType + " not in collected metrics");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/7a74df4b/integration-tests/transaction/src/test/resources/META-INF/batch-jobs/txtest1.xml
----------------------------------------------------------------------
diff --git a/integration-tests/transaction/src/test/resources/META-INF/batch-jobs/txtest1.xml b/integration-tests/transaction/src/test/resources/META-INF/batch-jobs/txtest1.xml
index 4b7a67c..ba6809f 100644
--- a/integration-tests/transaction/src/test/resources/META-INF/batch-jobs/txtest1.xml
+++ b/integration-tests/transaction/src/test/resources/META-INF/batch-jobs/txtest1.xml
@@ -18,9 +18,6 @@
<chunk item-count="1">
<reader ref="org.apache.batchee.its.transaction.TxErrorReader"></reader>
<writer ref="org.apache.batchee.its.transaction.TxErrorWriter1"></writer>
- <skippable-exception-classes>
- <include class="java.lang.Exception"/> <!-- all exceptions... -->
- </skippable-exception-classes>
</chunk>
</step>
</job>
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/7a74df4b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/BaseStepController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/BaseStepController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/BaseStepController.java
index d07d858..63381aa 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/BaseStepController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/BaseStepController.java
@@ -359,6 +359,11 @@ public abstract class BaseStepController implements ExecutionElementController {
}
protected void persistUserData() {
+ PersistentDataWrapper userData = resolveUserData();
+ storeUserData(userData);
+ }
+
+ protected PersistentDataWrapper resolveUserData() {
final ByteArrayOutputStream persistentBAOS = new ByteArrayOutputStream();
final ObjectOutputStream persistentDataOOS;
@@ -370,8 +375,16 @@ public abstract class BaseStepController implements ExecutionElementController {
throw new BatchContainerServiceException("Cannot persist the persistent user data for the step.", e);
}
- stepStatus.setPersistentUserData(new PersistentDataWrapper(persistentBAOS.toByteArray()));
- statusManagerService.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus);
+ return new PersistentDataWrapper(persistentBAOS.toByteArray());
+ }
+
+ protected void storeUserData(PersistentDataWrapper userData) {
+ try {
+ stepStatus.setPersistentUserData(userData);
+ statusManagerService.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus);
+ } catch (final Exception e) {
+ throw new BatchContainerServiceException("Cannot persist the persistent user data for the step.", e);
+ }
}
protected void persistExitStatusAndEndTimestamp() {
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/7a74df4b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java
index 7e07450..584d737 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java
@@ -16,6 +16,9 @@
*/
package org.apache.batchee.container.impl.controller.chunk;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.batchee.container.exception.BatchContainerRuntimeException;
import org.apache.batchee.container.exception.BatchContainerServiceException;
import org.apache.batchee.spi.DataRepresentationService;
@@ -75,28 +78,45 @@ public class CheckpointManager {
}
}
- public void checkpoint() {
+ /**
+ * Takes the current checkpoint data from the ItemReader and ItemWriter
+ * and store them in the database
+ */
+ public Map<CheckpointDataKey, CheckpointData> prepareCheckpoints() {
final CheckpointDataKey readerChkptDK;
final CheckpointDataKey writerChkptDK;
+ Map<CheckpointDataKey, CheckpointData> checkpoints = new HashMap<CheckpointDataKey, CheckpointData>(2);
try {
byte[] checkpointBytes = dataRepresentationService.toInternalRepresentation(readerProxy.checkpointInfo());
CheckpointData readerChkptData = new CheckpointData(jobInstanceID, stepId, CheckpointType.READER);
readerChkptData.setRestartToken(checkpointBytes);
readerChkptDK = new CheckpointDataKey(jobInstanceID, stepId, CheckpointType.READER);
- persistenceManagerService.setCheckpointData(readerChkptDK, readerChkptData);
+ checkpoints.put(readerChkptDK, readerChkptData);
checkpointBytes = dataRepresentationService.toInternalRepresentation(writerProxy.checkpointInfo());
CheckpointData writerChkptData = new CheckpointData(jobInstanceID, stepId, CheckpointType.WRITER);
writerChkptData.setRestartToken(checkpointBytes);
writerChkptDK = new CheckpointDataKey(jobInstanceID, stepId, CheckpointType.WRITER);
- persistenceManagerService.setCheckpointData(writerChkptDK, writerChkptData);
-
+ checkpoints.put(writerChkptDK, writerChkptData);
} catch (final Exception ex) {
// is this what I should be throwing here?
throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + stepId + "]", ex);
}
+
+ return checkpoints;
+ }
+
+ public void storeCheckPoints(Map<CheckpointDataKey, CheckpointData> checkpoints) {
+ try {
+ for (Map.Entry<CheckpointDataKey, CheckpointData> checkpointEntry : checkpoints.entrySet()) {
+ persistenceManagerService.setCheckpointData(checkpointEntry.getKey(), checkpointEntry.getValue());
+ }
+ } catch (final Exception ex) {
+ throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + stepId + "]", ex);
+ }
+
}
public int checkpointTimeout() {
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/7a74df4b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java
index ba256fa..6d72ccd 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java
@@ -48,10 +48,14 @@ import javax.batch.api.chunk.listener.RetryWriteListener;
import javax.batch.api.chunk.listener.SkipProcessListener;
import javax.batch.api.chunk.listener.SkipReadListener;
import javax.batch.api.chunk.listener.SkipWriteListener;
+import javax.batch.operations.BatchRuntimeException;
import javax.batch.runtime.BatchStatus;
+import javax.transaction.Status;
+
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.logging.Level;
@@ -585,11 +589,22 @@ public class ChunkStepController extends SingleThreadedStepController {
chunkProxy.afterChunk();
}
- checkpointManager.checkpoint();
-
- this.persistUserData();
-
- transactionManager.commit();
+ Map<CheckpointDataKey, CheckpointData> checkpoints = checkpointManager.prepareCheckpoints();
+ PersistentDataWrapper userData = resolveUserData();
+ try {
+ transactionManager.commit();
+ storeUserData(userData);
+ checkpointManager.storeCheckPoints(checkpoints);
+ } catch (Exception e) {
+ // only set the Exception if we didn't blow up before anyway
+ if (this.stepContext.getException() != null) {
+ this.stepContext.setException(e);
+ }
+ if (e instanceof BatchRuntimeException) {
+ throw e;
+ }
+ throw new BatchContainerServiceException("Cannot commit the transaction for the step.", e);
+ }
checkpointManager.endCheckpoint();
@@ -659,14 +674,15 @@ public class ChunkStepController extends SingleThreadedStepController {
*/
private void rollback(final Throwable t) {
try {
- // ignore, we blow up anyway
- transactionManager.setRollbackOnly();
try {
doClose();
} catch (Exception e) {
// ignore, we blow up anyway
}
+ // ignore, we blow up anyway
+ transactionManager.setRollbackOnly();
+
if (t instanceof Exception) {
Exception e = (Exception) t;
for (ChunkListener chunkProxy : chunkListeners) {
@@ -681,7 +697,10 @@ public class ChunkStepController extends SingleThreadedStepController {
// ever come up in the spec, but seems marginally more useful.
stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
} finally {
- transactionManager.rollback();
+ int txStatus = transactionManager.getStatus();
+ if (txStatus == Status.STATUS_ACTIVE || txStatus == Status.STATUS_MARKED_ROLLBACK) {
+ transactionManager.rollback();
+ }
throw new BatchContainerRuntimeException("Failure in Read-Process-Write Loop", t);
}
}