You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by an...@apache.org on 2018/03/23 21:01:49 UTC

incubator-tephra git commit: (TEPHRA-279) Make TransactionContext resilient to exceptions from getTransactionAwareName()

Repository: incubator-tephra
Updated Branches:
  refs/heads/master db6ef6d2b -> 6878295e3


(TEPHRA-279) Make TransactionContext resilient to exceptions from getTransactionAwareName()

This closes #69 from GitHub.

Signed-off-by: anew <an...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/6878295e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/6878295e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/6878295e

Branch: refs/heads/master
Commit: 6878295e31505eccbd04c5eeca97f5c2fea87650
Parents: db6ef6d
Author: anew <an...@apache.org>
Authored: Thu Jan 11 21:29:10 2018 -0800
Committer: anew <an...@apache.org>
Committed: Fri Mar 23 14:01:02 2018 -0700

----------------------------------------------------------------------
 .../org/apache/tephra/TransactionContext.java   | 112 ++++++++++++-------
 .../java/org/apache/tephra/DummyTxAware.java    |   4 +
 .../apache/tephra/TransactionContextTest.java   |  78 +++++++++++++
 3 files changed, 153 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/6878295e/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java b/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java
index 3c11e96..0c846d6 100644
--- a/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java
+++ b/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java
@@ -122,13 +122,15 @@ public class TransactionContext {
     for (TransactionAware txAware : txAwares) {
       try {
         txAware.startTx(currentTx);
-      } catch (Throwable e) {
-        String message = String.format("Unable to start transaction-aware '%s' for transaction %d. ",
-                                       txAware.getTransactionAwareName(), currentTx.getTransactionId());
-        LOG.warn(message, e);
-        txClient.abort(currentTx);
-        currentTx = null;
-        throw new TransactionFailureException(message, e);
+      } catch (Throwable t) {
+        try {
+          txClient.abort(currentTx);
+          TransactionFailureException tfe = createTransactionFailure("start", txAware, t);
+          LOG.warn(tfe.getMessage());
+          throw tfe;
+        } finally {
+          currentTx = null;
+        }
       }
     }
   }
@@ -147,8 +149,11 @@ public class TransactionContext {
     checkForConflicts();
     persist();
     commit();
-    postCommit();
-    currentTx = null;
+    try {
+      postCommit();
+    } finally {
+      currentTx = null;
+    }
   }
 
   /**
@@ -228,23 +233,32 @@ public class TransactionContext {
       boolean success = true;
       for (TransactionAware txAware : txAwares) {
         try {
-          if (!txAware.rollbackTx()) {
-            success = false;
-          }
-        } catch (Throwable e) {
-          String message = String.format("Unable to roll back changes in transaction-aware '%s' for transaction %d. ",
-                                         txAware.getTransactionAwareName(), currentTx.getTransactionId());
-          LOG.warn(message, e);
+          success = txAware.rollbackTx() && success;
+        } catch (Throwable t) {
+          TransactionFailureException tfe = createTransactionFailure("roll back changes in", txAware, t);
+          LOG.warn(tfe.getMessage());
           if (cause == null) {
-            cause = new TransactionFailureException(message, e);
+            cause = tfe;
+          } else {
+            cause.addSuppressed(tfe);
           }
           success = false;
         }
       }
-      if (success) {
-        txClient.abort(currentTx);
-      } else {
-        txClient.invalidate(currentTx.getTransactionId());
+      try {
+        if (success) {
+          txClient.abort(currentTx);
+        } else {
+          txClient.invalidate(currentTx.getTransactionId());
+        }
+      } catch (Throwable t) {
+        if (cause == null) {
+          cause = new TransactionFailureException(
+            String.format("Error while calling transaction service to %s transaction %d.",
+                          success ? "abort" : "invalidate", currentTx.getTransactionId()));
+        } else {
+          cause.addSuppressed(t);
+        }
       }
       if (cause != null) {
         throw cause;
@@ -259,12 +273,11 @@ public class TransactionContext {
     for (TransactionAware txAware : txAwares) {
       try {
         changes.addAll(txAware.getTxChanges());
-      } catch (Throwable e) {
-        String message = String.format("Unable to retrieve changes from transaction-aware '%s' for transaction %d. ",
-                                       txAware.getTransactionAwareName(), currentTx.getTransactionId());
-        LOG.warn(message, e);
-        abort(new TransactionFailureException(message, e));
+      } catch (Throwable t) {
+        TransactionFailureException tfe = createTransactionFailure("retrieve changes from", txAware, t);
+        LOG.warn(tfe.getMessage());
         // abort will throw that exception
+        abort(tfe);
       }
     }
     try {
@@ -281,24 +294,18 @@ public class TransactionContext {
 
   private void persist() throws TransactionFailureException {
     for (TransactionAware txAware : txAwares) {
-      boolean success;
+      boolean success = false;
       Throwable cause = null;
       try {
         success = txAware.commitTx();
       } catch (Throwable e) {
-        success = false;
         cause = e;
       }
       if (!success) {
-        String message = String.format("Unable to persist changes of transaction-aware '%s' for transaction %d. ",
-                                       txAware.getTransactionAwareName(), currentTx.getTransactionId());
-        if (cause == null) {
-          LOG.warn(message);
-        } else {
-          LOG.warn(message, cause);
-        }
-        abort(new TransactionFailureException(message, cause));
+        TransactionFailureException tfe = createTransactionFailure("persist changes of", txAware, cause);
+        LOG.warn(tfe.getMessage());
         // abort will throw that exception
+        abort(tfe);
       }
     }
   }
@@ -321,15 +328,38 @@ public class TransactionContext {
     for (TransactionAware txAware : txAwares) {
       try {
         txAware.postTxCommit();
-      } catch (Throwable e) {
-        String message = String.format("Unable to perform post-commit in transaction-aware '%s' for transaction %d. ",
-                                       txAware.getTransactionAwareName(), currentTx.getTransactionId());
-        LOG.warn(message, e);
-        cause = new TransactionFailureException(message, e);
+      } catch (Throwable t) {
+        TransactionFailureException tfe = createTransactionFailure("perform post-commit for", txAware, t);
+        LOG.warn(tfe.getMessage());
+        if (cause == null) {
+          cause = tfe;
+        } else {
+          cause.addSuppressed(tfe);
+        }
       }
     }
     if (cause != null) {
       throw cause;
     }
   }
+
+  private TransactionFailureException createTransactionFailure(String action,
+                                                               TransactionAware txAware,
+                                                               Throwable cause) {
+    String txAwareName;
+    Throwable thrownForName = null;
+    try {
+      txAwareName = txAware.getTransactionAwareName();
+    } catch (Throwable t) {
+      thrownForName = t;
+      txAwareName = "unknown";
+    }
+    TransactionFailureException tfe = new TransactionFailureException(
+      String.format("Unable to %s transaction-aware '%s' for transaction %d",
+                    action, txAwareName, currentTx.getTransactionId()), cause);
+    if (thrownForName != null) {
+      tfe.addSuppressed(thrownForName);
+    }
+    return tfe;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/6878295e/tephra-core/src/test/java/org/apache/tephra/DummyTxAware.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/DummyTxAware.java b/tephra-core/src/test/java/org/apache/tephra/DummyTxAware.java
index 54e8a8c..3ed2b88 100644
--- a/tephra-core/src/test/java/org/apache/tephra/DummyTxAware.java
+++ b/tephra-core/src/test/java/org/apache/tephra/DummyTxAware.java
@@ -40,6 +40,7 @@ class DummyTxAware implements TransactionAware {
   InduceFailure failCommitTxOnce = InduceFailure.NoFailure;
   InduceFailure failPostCommitTxOnce = InduceFailure.NoFailure;
   InduceFailure failRollbackTxOnce = InduceFailure.NoFailure;
+  InduceFailure failGetName = InduceFailure.NoFailure;
 
   void addChange(byte[] key) {
     changes.add(key);
@@ -118,6 +119,9 @@ class DummyTxAware implements TransactionAware {
 
   @Override
   public String getTransactionAwareName() {
+    if (failGetName == InduceFailure.ThrowException) {
+      throw new RuntimeException("get name failure");
+    }
     return "dummy";
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/6878295e/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java
index fcf793e..9502ccf 100644
--- a/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java
@@ -501,4 +501,82 @@ public class TransactionContextTest {
 
     Assert.assertEquals(txClient.state, DummyTxClient.CommitState.Aborted);
   }
+
+  @Test
+  public void testGetTxAwareNameFails() throws TransactionFailureException {
+
+    // tests that under any scneario that can make a transaction fail, exceptions from
+    // getTransactionAwareName() do not affect proper abort, and the transaction context's
+    // state is clear (no current transaction) afterwards.
+    TransactionContext context = newTransactionContext(ds1);
+
+    ds1.failGetName = DummyTxAware.InduceFailure.ThrowException;
+    // the txAware will throw exceptions whenever getTransactionAwareName() is called.
+    // This is called in various failure scenarios. Test these scenarios one by one and check that
+    // the tx context is still functional after that.
+
+    // test failure during startTx()
+    ds1.failStartTxOnce = DummyTxAware.InduceFailure.ThrowException;
+    try {
+      context.start();
+      Assert.fail("Start should have failed - exception should be thrown");
+    } catch (TransactionFailureException e) {
+      Assert.assertEquals("start failure", e.getCause().getMessage());
+      Assert.assertEquals("get name failure", e.getSuppressed()[0].getMessage());
+    }
+    Assert.assertNull(context.getCurrentTransaction());
+
+    // test failure during getTxChanges()
+    ds1.failChangesTxOnce = DummyTxAware.InduceFailure.ThrowException;
+    context.start();
+    try {
+      context.finish();
+      Assert.fail("Get changes should have failed - exception should be thrown");
+    } catch (TransactionFailureException e) {
+      Assert.assertEquals("changes failure", e.getCause().getMessage());
+      Assert.assertEquals("get name failure", e.getSuppressed()[0].getMessage());
+    }
+    Assert.assertNull(context.getCurrentTransaction());
+
+    // test failure during commitTx()
+    ds1.failCommitTxOnce = DummyTxAware.InduceFailure.ThrowException;
+    context.start();
+    try {
+      context.finish();
+      Assert.fail("Persist should have failed - exception should be thrown");
+    } catch (TransactionFailureException e) {
+      Assert.assertEquals("persist failure", e.getCause().getMessage());
+      Assert.assertEquals("get name failure", e.getSuppressed()[0].getMessage());
+    }
+    Assert.assertNull(context.getCurrentTransaction());
+
+    // test failure during rollbackTx()
+    ds1.failRollbackTxOnce = DummyTxAware.InduceFailure.ThrowException;
+    context.start();
+    try {
+      context.abort();
+      Assert.fail("Rollback should have failed - exception should be thrown");
+    } catch (TransactionFailureException e) {
+      Assert.assertEquals("rollback failure", e.getCause().getMessage());
+      Assert.assertEquals("get name failure", e.getSuppressed()[0].getMessage());
+    }
+    Assert.assertNull(context.getCurrentTransaction());
+
+    // test failure during postTxCommit()
+    ds1.failPostCommitTxOnce = DummyTxAware.InduceFailure.ThrowException;
+    context.start();
+    try {
+      context.finish();
+      Assert.fail("Post Commit should have failed - exception should be thrown");
+    } catch (TransactionFailureException e) {
+      Assert.assertEquals("post failure", e.getCause().getMessage());
+      Assert.assertEquals("get name failure", e.getSuppressed()[0].getMessage());
+    }
+    Assert.assertNull(context.getCurrentTransaction());
+
+    Assert.assertTrue(context.removeTransactionAware(ds1));
+    context.start();
+    context.finish();
+    Assert.assertNull(context.getCurrentTransaction());
+  }
 }