You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by an...@apache.org on 2018/03/23 21:01:49 UTC
incubator-tephra git commit: (TEPHRA-279) Make TransactionContext
resilient to exceptions from getTransactionAwareName()
Repository: incubator-tephra
Updated Branches:
refs/heads/master db6ef6d2b -> 6878295e3
(TEPHRA-279) Make TransactionContext resilient to exceptions from getTransactionAwareName()
This closes #69 from GitHub.
Signed-off-by: anew <an...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/6878295e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/6878295e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/6878295e
Branch: refs/heads/master
Commit: 6878295e31505eccbd04c5eeca97f5c2fea87650
Parents: db6ef6d
Author: anew <an...@apache.org>
Authored: Thu Jan 11 21:29:10 2018 -0800
Committer: anew <an...@apache.org>
Committed: Fri Mar 23 14:01:02 2018 -0700
----------------------------------------------------------------------
.../org/apache/tephra/TransactionContext.java | 112 ++++++++++++-------
.../java/org/apache/tephra/DummyTxAware.java | 4 +
.../apache/tephra/TransactionContextTest.java | 78 +++++++++++++
3 files changed, 153 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/6878295e/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java b/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java
index 3c11e96..0c846d6 100644
--- a/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java
+++ b/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java
@@ -122,13 +122,15 @@ public class TransactionContext {
for (TransactionAware txAware : txAwares) {
try {
txAware.startTx(currentTx);
- } catch (Throwable e) {
- String message = String.format("Unable to start transaction-aware '%s' for transaction %d. ",
- txAware.getTransactionAwareName(), currentTx.getTransactionId());
- LOG.warn(message, e);
- txClient.abort(currentTx);
- currentTx = null;
- throw new TransactionFailureException(message, e);
+ } catch (Throwable t) {
+ try {
+ txClient.abort(currentTx);
+ TransactionFailureException tfe = createTransactionFailure("start", txAware, t);
+ LOG.warn(tfe.getMessage());
+ throw tfe;
+ } finally {
+ currentTx = null;
+ }
}
}
}
@@ -147,8 +149,11 @@ public class TransactionContext {
checkForConflicts();
persist();
commit();
- postCommit();
- currentTx = null;
+ try {
+ postCommit();
+ } finally {
+ currentTx = null;
+ }
}
/**
@@ -228,23 +233,32 @@ public class TransactionContext {
boolean success = true;
for (TransactionAware txAware : txAwares) {
try {
- if (!txAware.rollbackTx()) {
- success = false;
- }
- } catch (Throwable e) {
- String message = String.format("Unable to roll back changes in transaction-aware '%s' for transaction %d. ",
- txAware.getTransactionAwareName(), currentTx.getTransactionId());
- LOG.warn(message, e);
+ success = txAware.rollbackTx() && success;
+ } catch (Throwable t) {
+ TransactionFailureException tfe = createTransactionFailure("roll back changes in", txAware, t);
+ LOG.warn(tfe.getMessage());
if (cause == null) {
- cause = new TransactionFailureException(message, e);
+ cause = tfe;
+ } else {
+ cause.addSuppressed(tfe);
}
success = false;
}
}
- if (success) {
- txClient.abort(currentTx);
- } else {
- txClient.invalidate(currentTx.getTransactionId());
+ try {
+ if (success) {
+ txClient.abort(currentTx);
+ } else {
+ txClient.invalidate(currentTx.getTransactionId());
+ }
+ } catch (Throwable t) {
+ if (cause == null) {
+ cause = new TransactionFailureException(
+ String.format("Error while calling transaction service to %s transaction %d.",
+ success ? "abort" : "invalidate", currentTx.getTransactionId()));
+ } else {
+ cause.addSuppressed(t);
+ }
}
if (cause != null) {
throw cause;
@@ -259,12 +273,11 @@ public class TransactionContext {
for (TransactionAware txAware : txAwares) {
try {
changes.addAll(txAware.getTxChanges());
- } catch (Throwable e) {
- String message = String.format("Unable to retrieve changes from transaction-aware '%s' for transaction %d. ",
- txAware.getTransactionAwareName(), currentTx.getTransactionId());
- LOG.warn(message, e);
- abort(new TransactionFailureException(message, e));
+ } catch (Throwable t) {
+ TransactionFailureException tfe = createTransactionFailure("retrieve changes from", txAware, t);
+ LOG.warn(tfe.getMessage());
// abort will throw that exception
+ abort(tfe);
}
}
try {
@@ -281,24 +294,18 @@ public class TransactionContext {
private void persist() throws TransactionFailureException {
for (TransactionAware txAware : txAwares) {
- boolean success;
+ boolean success = false;
Throwable cause = null;
try {
success = txAware.commitTx();
} catch (Throwable e) {
- success = false;
cause = e;
}
if (!success) {
- String message = String.format("Unable to persist changes of transaction-aware '%s' for transaction %d. ",
- txAware.getTransactionAwareName(), currentTx.getTransactionId());
- if (cause == null) {
- LOG.warn(message);
- } else {
- LOG.warn(message, cause);
- }
- abort(new TransactionFailureException(message, cause));
+ TransactionFailureException tfe = createTransactionFailure("persist changes of", txAware, cause);
+ LOG.warn(tfe.getMessage());
// abort will throw that exception
+ abort(tfe);
}
}
}
@@ -321,15 +328,38 @@ public class TransactionContext {
for (TransactionAware txAware : txAwares) {
try {
txAware.postTxCommit();
- } catch (Throwable e) {
- String message = String.format("Unable to perform post-commit in transaction-aware '%s' for transaction %d. ",
- txAware.getTransactionAwareName(), currentTx.getTransactionId());
- LOG.warn(message, e);
- cause = new TransactionFailureException(message, e);
+ } catch (Throwable t) {
+ TransactionFailureException tfe = createTransactionFailure("perform post-commit for", txAware, t);
+ LOG.warn(tfe.getMessage());
+ if (cause == null) {
+ cause = tfe;
+ } else {
+ cause.addSuppressed(tfe);
+ }
}
}
if (cause != null) {
throw cause;
}
}
+
+ private TransactionFailureException createTransactionFailure(String action,
+ TransactionAware txAware,
+ Throwable cause) {
+ String txAwareName;
+ Throwable thrownForName = null;
+ try {
+ txAwareName = txAware.getTransactionAwareName();
+ } catch (Throwable t) {
+ thrownForName = t;
+ txAwareName = "unknown";
+ }
+ TransactionFailureException tfe = new TransactionFailureException(
+ String.format("Unable to %s transaction-aware '%s' for transaction %d",
+ action, txAwareName, currentTx.getTransactionId()), cause);
+ if (thrownForName != null) {
+ tfe.addSuppressed(thrownForName);
+ }
+ return tfe;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/6878295e/tephra-core/src/test/java/org/apache/tephra/DummyTxAware.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/DummyTxAware.java b/tephra-core/src/test/java/org/apache/tephra/DummyTxAware.java
index 54e8a8c..3ed2b88 100644
--- a/tephra-core/src/test/java/org/apache/tephra/DummyTxAware.java
+++ b/tephra-core/src/test/java/org/apache/tephra/DummyTxAware.java
@@ -40,6 +40,7 @@ class DummyTxAware implements TransactionAware {
InduceFailure failCommitTxOnce = InduceFailure.NoFailure;
InduceFailure failPostCommitTxOnce = InduceFailure.NoFailure;
InduceFailure failRollbackTxOnce = InduceFailure.NoFailure;
+ InduceFailure failGetName = InduceFailure.NoFailure;
void addChange(byte[] key) {
changes.add(key);
@@ -118,6 +119,9 @@ class DummyTxAware implements TransactionAware {
@Override
public String getTransactionAwareName() {
+ if (failGetName == InduceFailure.ThrowException) {
+ throw new RuntimeException("get name failure");
+ }
return "dummy";
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/6878295e/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java
index fcf793e..9502ccf 100644
--- a/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java
@@ -501,4 +501,82 @@ public class TransactionContextTest {
Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
}
+
+ @Test
+ public void testGetTxAwareNameFails() throws TransactionFailureException {
+
+ // tests that under any scneario that can make a transaction fail, exceptions from
+ // getTransactionAwareName() do not affect proper abort, and the transaction context's
+ // state is clear (no current transaction) afterwards.
+ TransactionContext context = newTransactionContext(ds1);
+
+ ds1.failGetName = DummyTxAware.InduceFailure.ThrowException;
+ // the txAware will throw exceptions whenever getTransactionAwareName() is called.
+ // This is called in various failure scenarios. Test these scenarios one by one and check that
+ // the tx context is still functional after that.
+
+ // test failure during startTx()
+ ds1.failStartTxOnce = DummyTxAware.InduceFailure.ThrowException;
+ try {
+ context.start();
+ Assert.fail("Start should have failed - exception should be thrown");
+ } catch (TransactionFailureException e) {
+ Assert.assertEquals("start failure", e.getCause().getMessage());
+ Assert.assertEquals("get name failure", e.getSuppressed()[0].getMessage());
+ }
+ Assert.assertNull(context.getCurrentTransaction());
+
+ // test failure during getTxChanges()
+ ds1.failChangesTxOnce = DummyTxAware.InduceFailure.ThrowException;
+ context.start();
+ try {
+ context.finish();
+ Assert.fail("Get changes should have failed - exception should be thrown");
+ } catch (TransactionFailureException e) {
+ Assert.assertEquals("changes failure", e.getCause().getMessage());
+ Assert.assertEquals("get name failure", e.getSuppressed()[0].getMessage());
+ }
+ Assert.assertNull(context.getCurrentTransaction());
+
+ // test failure during commitTx()
+ ds1.failCommitTxOnce = DummyTxAware.InduceFailure.ThrowException;
+ context.start();
+ try {
+ context.finish();
+ Assert.fail("Persist should have failed - exception should be thrown");
+ } catch (TransactionFailureException e) {
+ Assert.assertEquals("persist failure", e.getCause().getMessage());
+ Assert.assertEquals("get name failure", e.getSuppressed()[0].getMessage());
+ }
+ Assert.assertNull(context.getCurrentTransaction());
+
+ // test failure during rollbackTx()
+ ds1.failRollbackTxOnce = DummyTxAware.InduceFailure.ThrowException;
+ context.start();
+ try {
+ context.abort();
+ Assert.fail("Rollback should have failed - exception should be thrown");
+ } catch (TransactionFailureException e) {
+ Assert.assertEquals("rollback failure", e.getCause().getMessage());
+ Assert.assertEquals("get name failure", e.getSuppressed()[0].getMessage());
+ }
+ Assert.assertNull(context.getCurrentTransaction());
+
+ // test failure during postTxCommit()
+ ds1.failPostCommitTxOnce = DummyTxAware.InduceFailure.ThrowException;
+ context.start();
+ try {
+ context.finish();
+ Assert.fail("Post Commit should have failed - exception should be thrown");
+ } catch (TransactionFailureException e) {
+ Assert.assertEquals("post failure", e.getCause().getMessage());
+ Assert.assertEquals("get name failure", e.getSuppressed()[0].getMessage());
+ }
+ Assert.assertNull(context.getCurrentTransaction());
+
+ Assert.assertTrue(context.removeTransactionAware(ds1));
+ context.start();
+ context.finish();
+ Assert.assertNull(context.getCurrentTransaction());
+ }
}