You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2018/09/10 23:04:39 UTC

[geode] branch develop updated: GEODE-5697: Handle CacheClosedException when performing beforeCompletion and afterCompletion. (#2448)

This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 7f60549  GEODE-5697: Handle CacheClosedException when performing beforeCompletion and afterCompletion. (#2448)
7f60549 is described below

commit 7f605494e4cfe87c168215bb54ffaeb9c2fe9503
Author: pivotal-eshu <es...@pivotal.io>
AuthorDate: Mon Sep 10 16:04:32 2018 -0700

    GEODE-5697: Handle CacheClosedException when performing beforeCompletion and afterCompletion. (#2448)
---
 .../apache/geode/codeAnalysis/excludedClasses.txt  |  1 +
 .../geode/internal/cache/AfterCompletion.java      | 66 +++++++++++++++-------
 .../geode/internal/cache/BeforeCompletion.java     |  9 ++-
 .../internal/cache/SingleThreadJTAExecutor.java    | 21 ++++---
 .../org/apache/geode/internal/cache/TXState.java   | 65 ++++++++++++---------
 .../geode/internal/cache/AfterCompletionTest.java  | 63 ++++++++++-----------
 .../geode/internal/cache/BeforeCompletionTest.java | 24 ++++++++
 .../cache/SingleThreadJTAExecutorTest.java         | 17 ++++++
 .../apache/geode/internal/cache/TXStateTest.java   | 40 +++++++++++--
 9 files changed, 213 insertions(+), 93 deletions(-)

diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
index 41203d4..d9df46b 100644
--- a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
+++ b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
@@ -94,3 +94,4 @@ org/apache/geode/cache/query/internal/types/TypeUtils$ComparisonStrategy$5
 org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl$ClosedPoolConnectionList
 org/apache/geode/cache/query/internal/parse/ASTArithmeticOp
 org/apache/geode/internal/cache/partitioned/ManageBackupBucketMessage$ReplyType
+org/apache/geode/internal/cache/AfterCompletion$Action
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AfterCompletion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AfterCompletion.java
index 028e067..d0495f1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AfterCompletion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AfterCompletion.java
@@ -26,25 +26,38 @@ public class AfterCompletion {
 
   private boolean started;
   private boolean finished;
-  private int status = -1;
-  private boolean cancelled;
   private RuntimeException exception;
 
+  private enum Action {
+    COMMIT, ROLLBACK, CANCEL
+  };
+
+  private Action action;
+
   public synchronized void doOp(TXState txState, CancelCriterion cancelCriterion) {
     // there should be a transaction timeout that keeps this thread
     // from sitting around forever if the client goes away
     // The above was done by setting afterCompletionCancelled in txState
     // during cleanup. When client departed, the transaction/JTA
     // will be timed out and cleanup code will be executed.
-    waitForExecuteOrCancel(cancelCriterion);
+    try {
+      waitForExecuteOrCancel(cancelCriterion);
+    } catch (RuntimeException | Error ignore) {
+      action = Action.CANCEL;
+    }
     started = true;
     logger.debug("executing afterCompletion notification");
-
     try {
-      if (cancelled) {
-        txState.doCleanup();
-      } else {
-        txState.doAfterCompletion(status);
+      switch (action) {
+        case CANCEL:
+          txState.doCleanup();
+          break;
+        case COMMIT:
+          txState.doAfterCompletionCommit();
+          break;
+        case ROLLBACK:
+          txState.doAfterCompletionRollback();
+          break;
       }
     } catch (RuntimeException exception) {
       this.exception = exception;
@@ -56,13 +69,15 @@ public class AfterCompletion {
   }
 
   private void waitForExecuteOrCancel(CancelCriterion cancelCriterion) {
-    waitForCondition(cancelCriterion, () -> status != -1 || cancelled);
+    waitForCondition(cancelCriterion, () -> action != null);
   }
 
   private synchronized void waitForCondition(CancelCriterion cancelCriterion,
       BooleanSupplier condition) {
     while (!condition.getAsBoolean()) {
-      cancelCriterion.checkCancelInProgress(null);
+      if (cancelCriterion != null) {
+        cancelCriterion.checkCancelInProgress(null);
+      }
       try {
         logger.debug("waiting for notification");
         wait(1000);
@@ -72,26 +87,35 @@ public class AfterCompletion {
     }
   }
 
-  public synchronized void execute(CancelCriterion cancelCriterion, int status) {
-    this.status = status;
-    signalAndWaitForDoOp(cancelCriterion);
+  public void executeCommit() {
+    executeAction(Action.COMMIT);
   }
 
-  private void signalAndWaitForDoOp(CancelCriterion cancelCriterion) {
-    notifyAll();
-    waitUntilFinished(cancelCriterion);
+  public void executeRollback() {
+    executeAction(Action.ROLLBACK);
+  }
+
+  private synchronized void executeAction(Action action) {
+    this.action = action;
+    signalAndWaitForDoOp();
     if (exception != null) {
       throw exception;
     }
+
+  }
+
+  private void signalAndWaitForDoOp() {
+    notifyAll();
+    waitUntilFinished();
   }
 
-  private void waitUntilFinished(CancelCriterion cancelCriterion) {
-    waitForCondition(cancelCriterion, () -> finished);
+  private void waitUntilFinished() {
+    waitForCondition(null, () -> finished);
   }
 
-  public synchronized void cancel(CancelCriterion cancelCriterion) {
-    cancelled = true;
-    signalAndWaitForDoOp(cancelCriterion);
+  public synchronized void cancel() {
+    action = Action.CANCEL;
+    signalAndWaitForDoOp();
   }
 
   public synchronized boolean isStarted() {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BeforeCompletion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BeforeCompletion.java
index 247a0f2..166a0ca 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BeforeCompletion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BeforeCompletion.java
@@ -17,7 +17,10 @@ package org.apache.geode.internal.cache;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.CacheClosedException;
 import org.apache.geode.cache.SynchronizationCommitConflictException;
+import org.apache.geode.cache.TransactionDataNodeHasDepartedException;
+import org.apache.geode.cache.TransactionException;
 import org.apache.geode.internal.logging.LogService;
 
 public class BeforeCompletion {
@@ -25,13 +28,17 @@ public class BeforeCompletion {
 
   private boolean started;
   private boolean finished;
-  private SynchronizationCommitConflictException exception;
+  private RuntimeException exception;
 
   public synchronized void doOp(TXState txState) {
     try {
       txState.doBeforeCompletion();
     } catch (SynchronizationCommitConflictException exception) {
       this.exception = exception;
+    } catch (CacheClosedException exception) {
+      this.exception = new TransactionDataNodeHasDepartedException(exception);
+    } catch (RuntimeException exception) {
+      this.exception = new TransactionException(exception);
     } finally {
       logger.debug("beforeCompletion notification completed");
       finished = true;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/SingleThreadJTAExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/SingleThreadJTAExecutor.java
index 636def9..6413748 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/SingleThreadJTAExecutor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/SingleThreadJTAExecutor.java
@@ -42,9 +42,12 @@ public class SingleThreadJTAExecutor {
     this.afterCompletion = afterCompletion;
   }
 
-  private void doOps(TXState txState, CancelCriterion cancelCriterion) {
-    beforeCompletion.doOp(txState);
-    afterCompletion.doOp(txState, cancelCriterion);
+  void doOps(TXState txState, CancelCriterion cancelCriterion) {
+    try {
+      beforeCompletion.doOp(txState);
+    } finally {
+      afterCompletion.doOp(txState, cancelCriterion);
+    }
   }
 
   public void executeBeforeCompletion(TXState txState, Executor executor,
@@ -54,15 +57,19 @@ public class SingleThreadJTAExecutor {
     beforeCompletion.execute(cancelCriterion);
   }
 
-  public void executeAfterCompletion(CancelCriterion cancelCriterion, int status) {
-    afterCompletion.execute(cancelCriterion, status);
+  public void executeAfterCompletionCommit() {
+    afterCompletion.executeCommit();
+  }
+
+  public void executeAfterCompletionRollback() {
+    afterCompletion.executeRollback();
   }
 
   /**
    * stop waiting for an afterCompletion to arrive and just exit
    */
-  public void cleanup(CancelCriterion cancelCriterion) {
-    afterCompletion.cancel(cancelCriterion);
+  public void cleanup() {
+    afterCompletion.cancel();
   }
 
   public boolean shouldDoCleanup() {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
index 5263e2e..4e2857b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
@@ -872,7 +872,7 @@ public class TXState implements TXStateInterface {
 
   protected void cleanup() {
     if (singleThreadJTAExecutor.shouldDoCleanup()) {
-      singleThreadJTAExecutor.cleanup(getCancelCriterion());
+      singleThreadJTAExecutor.cleanup();
     } else {
       doCleanup();
     }
@@ -1111,45 +1111,54 @@ public class TXState implements TXStateInterface {
     // if there was a beforeCompletion call then there will be a thread
     // sitting in the waiting pool to execute afterCompletion. Otherwise
     // throw FailedSynchronizationException().
-    if (beforeCompletionCalled) {
-      singleThreadJTAExecutor.executeAfterCompletion(getCancelCriterion(), status);
+    if (wasBeforeCompletionCalled()) {
+      switch (status) {
+        case Status.STATUS_COMMITTED:
+          singleThreadJTAExecutor.executeAfterCompletionCommit();
+          break;
+        case Status.STATUS_ROLLEDBACK:
+          singleThreadJTAExecutor.executeAfterCompletionRollback();
+          break;
+        default:
+          throw new TransactionException("Unknown JTA Synchronization status " + status);
+      }
     } else {
       // rollback does not run beforeCompletion.
       if (status != Status.STATUS_ROLLEDBACK) {
         throw new FailedSynchronizationException(
             "Could not execute afterCompletion when beforeCompletion was not executed");
       }
-      doAfterCompletion(status);
+      doAfterCompletionRollback();
     }
   }
 
-  void doAfterCompletion(int status) {
+  void doAfterCompletionCommit() {
     final long opStart = CachePerfStats.getStatTime();
     try {
-      switch (status) {
-        case Status.STATUS_COMMITTED:
-          Assert.assertTrue(this.locks != null,
-              "Gemfire Transaction afterCompletion called with illegal state.");
-          try {
-            commit();
-            saveTXCommitMessageForClientFailover();
-          } catch (CommitConflictException error) {
-            Assert.assertTrue(false, "Gemfire Transaction " + getTransactionId()
-                + " afterCompletion failed.due to CommitConflictException: " + error);
-          }
-
-          this.proxy.getTxMgr().noteCommitSuccess(opStart, this.jtaLifeTime, this);
-          this.locks = null;
-          break;
-        case Status.STATUS_ROLLEDBACK:
-          this.jtaLifeTime = opStart - getBeginTime();
-          rollback();
-          saveTXCommitMessageForClientFailover();
-          this.proxy.getTxMgr().noteRollbackSuccess(opStart, this.jtaLifeTime, this);
-          break;
-        default:
-          Assert.assertTrue(false, "Unknown JTA Synchronization status " + status);
+      Assert.assertTrue(this.locks != null,
+          "Gemfire Transaction afterCompletion called with illegal state.");
+      try {
+        commit();
+        saveTXCommitMessageForClientFailover();
+      } catch (CommitConflictException error) {
+        Assert.assertTrue(false, "Gemfire Transaction " + getTransactionId()
+            + " afterCompletion failed.due to CommitConflictException: " + error);
       }
+      this.proxy.getTxMgr().noteCommitSuccess(opStart, this.jtaLifeTime, this);
+      this.locks = null;
+
+    } catch (InternalGemFireError error) {
+      throw new TransactionException(error);
+    }
+  }
+
+  void doAfterCompletionRollback() {
+    final long opStart = CachePerfStats.getStatTime();
+    this.jtaLifeTime = opStart - getBeginTime();
+    try {
+      rollback();
+      saveTXCommitMessageForClientFailover();
+      this.proxy.getTxMgr().noteRollbackSuccess(opStart, this.jtaLifeTime, this);
     } catch (InternalGemFireError error) {
       throw new TransactionException(error);
     }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/AfterCompletionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/AfterCompletionTest.java
index d94df0e..bb0d1ad 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/AfterCompletionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/AfterCompletionTest.java
@@ -16,7 +16,6 @@ package org.apache.geode.internal.cache;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -24,8 +23,6 @@ import static org.mockito.Mockito.verify;
 
 import java.util.concurrent.TimeUnit;
 
-import javax.transaction.Status;
-
 import org.awaitility.Awaitility;
 import org.junit.Before;
 import org.junit.Test;
@@ -46,22 +43,6 @@ public class AfterCompletionTest {
   }
 
   @Test
-  public void executeThrowsIfCancelCriterionThrows() {
-    doThrow(new RuntimeException()).when(cancelCriterion).checkCancelInProgress(null);
-
-    assertThatThrownBy(() -> afterCompletion.execute(cancelCriterion, Status.STATUS_COMMITTED))
-        .isInstanceOf(RuntimeException.class);
-  }
-
-  @Test
-  public void cancelThrowsIfCancelCriterionThrows() {
-    doThrow(new RuntimeException()).when(cancelCriterion).checkCancelInProgress(null);
-
-    assertThatThrownBy(() -> afterCompletion.cancel(cancelCriterion))
-        .isInstanceOf(RuntimeException.class);
-  }
-
-  @Test
   public void isStartedReturnsFalseIfNotExecuted() {
     assertThat(afterCompletion.isStarted()).isFalse();
   }
@@ -70,52 +51,70 @@ public class AfterCompletionTest {
   public void isStartedReturnsTrueIfExecuted() {
     startDoOp();
 
-    afterCompletion.execute(cancelCriterion, Status.STATUS_COMMITTED);
+    afterCompletion.executeCommit();
 
     verifyDoOpFinished();
     assertThat(afterCompletion.isStarted()).isTrue();
   }
 
   @Test
-  public void executeCallsDoAfterCompletion() {
+  public void executeCallsDoAfterCompletionCommit() {
     startDoOp();
 
-    afterCompletion.execute(cancelCriterion, Status.STATUS_COMMITTED);
+    afterCompletion.executeCommit();
     verifyDoOpFinished();
-    verify(txState, times(1)).doAfterCompletion(eq(Status.STATUS_COMMITTED));
+    verify(txState, times(1)).doAfterCompletionCommit();
   }
 
   @Test
-  public void executeThrowsDoAfterCompletionThrows() {
+  public void executeThrowsDoAfterCompletionCommitThrows() {
     startDoOp();
-    doThrow(new RuntimeException()).when(txState).doAfterCompletion(Status.STATUS_COMMITTED);
+    doThrow(new RuntimeException()).when(txState).doAfterCompletionCommit();
 
-    assertThatThrownBy(() -> afterCompletion.execute(cancelCriterion, Status.STATUS_COMMITTED))
+    assertThatThrownBy(() -> afterCompletion.executeCommit())
         .isInstanceOf(RuntimeException.class);
 
     verifyDoOpFinished();
   }
 
   @Test
-  public void cancelCallsDoCleanup() {
+  public void executeCallsDoAfterCompletionRollback() {
     startDoOp();
 
-    afterCompletion.cancel(cancelCriterion);
+    afterCompletion.executeRollback();
     verifyDoOpFinished();
-    verify(txState, times(1)).doCleanup();
+    verify(txState, times(1)).doAfterCompletionRollback();
   }
 
   @Test
-  public void cancelThrowsDoCleanupThrows() {
+  public void executeThrowsDoAfterCompletionRollbackThrows() {
     startDoOp();
-    doThrow(new RuntimeException()).when(txState).doCleanup();
+    doThrow(new RuntimeException()).when(txState).doAfterCompletionRollback();
 
-    assertThatThrownBy(() -> afterCompletion.cancel(cancelCriterion))
+    assertThatThrownBy(() -> afterCompletion.executeRollback())
         .isInstanceOf(RuntimeException.class);
 
     verifyDoOpFinished();
   }
 
+  @Test
+  public void doOpInvokesDoCleanupIfCancelCriteriaThrows() {
+    doThrow(new RuntimeException()).when(cancelCriterion).checkCancelInProgress(null);
+
+    afterCompletion.doOp(txState, cancelCriterion);
+
+    verify(txState).doCleanup();
+  }
+
+  @Test
+  public void cancelCallsDoCleanup() {
+    startDoOp();
+
+    afterCompletion.cancel();
+    verifyDoOpFinished();
+    verify(txState, times(1)).doCleanup();
+  }
+
   private void startDoOp() {
     doOpThread = new Thread(() -> afterCompletion.doOp(txState, cancelCriterion));
     doOpThread.start();
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/BeforeCompletionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/BeforeCompletionTest.java
index 1f541b6..cbd5a18 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/BeforeCompletionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/BeforeCompletionTest.java
@@ -28,7 +28,10 @@ import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.CacheClosedException;
 import org.apache.geode.cache.SynchronizationCommitConflictException;
+import org.apache.geode.cache.TransactionDataNodeHasDepartedException;
+import org.apache.geode.cache.TransactionException;
 
 public class BeforeCompletionTest {
 
@@ -54,6 +57,27 @@ public class BeforeCompletionTest {
   }
 
   @Test
+  public void executeThrowsTransactionDataNodeHasDepartedExceptionIfDoOpFailedWithCacheClosedException() {
+    doThrow(new CacheClosedException("")).when(txState).doBeforeCompletion();
+
+    beforeCompletion.doOp(txState);
+
+    assertThatThrownBy(() -> beforeCompletion.execute(cancelCriterion))
+        .isInstanceOf(TransactionDataNodeHasDepartedException.class);
+  }
+
+  @Test
+  public void executeThrowsTransactionExceptionIfDoOpFailedWithRuntimeException() {
+    doThrow(new RuntimeException("")).when(txState).doBeforeCompletion();
+
+    beforeCompletion.doOp(txState);
+
+    assertThatThrownBy(() -> beforeCompletion.execute(cancelCriterion))
+        .isInstanceOf(TransactionException.class);
+  }
+
+
+  @Test
   public void doOpCallsDoBeforeCompletion() {
     beforeCompletion.doOp(txState);
 
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/SingleThreadJTAExecutorTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/SingleThreadJTAExecutorTest.java
index bc48eb4..a026730 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/SingleThreadJTAExecutorTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/SingleThreadJTAExecutorTest.java
@@ -16,6 +16,7 @@ package org.apache.geode.internal.cache;
 
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -63,4 +64,20 @@ public class SingleThreadJTAExecutorTest {
         () -> inOrder.verify(afterCompletion, times(1)).doOp(eq(txState), eq(cancelCriterion)));
   }
 
+  @Test
+  public void cleanupInvokesCancel() {
+    singleThreadJTAExecutor.cleanup();
+
+    verify(afterCompletion, times(1)).cancel();
+  }
+
+  @Test(expected = RuntimeException.class)
+  public void doOpsInvokesAfterCompletionDoOpWhenBeforeCompletionThrows() {
+    doThrow(RuntimeException.class).when(beforeCompletion).doOp(txState);
+
+    singleThreadJTAExecutor.doOps(txState, cancelCriterion);
+
+    verify(afterCompletion, times(1)).doOp(eq(txState), eq(cancelCriterion));
+  }
+
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
index 21fadf0..d65c361 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
@@ -17,6 +17,7 @@ package org.apache.geode.internal.cache;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.catchThrowable;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
@@ -36,17 +37,20 @@ import org.apache.geode.cache.CommitConflictException;
 import org.apache.geode.cache.FailedSynchronizationException;
 import org.apache.geode.cache.SynchronizationCommitConflictException;
 import org.apache.geode.cache.TransactionDataNodeHasDepartedException;
+import org.apache.geode.cache.TransactionException;
 
 public class TXStateTest {
   private TXStateProxyImpl txStateProxy;
   private CommitConflictException exception;
   private TransactionDataNodeHasDepartedException transactionDataNodeHasDepartedException;
+  private SingleThreadJTAExecutor executor;
 
   @Before
   public void setup() {
     txStateProxy = mock(TXStateProxyImpl.class, RETURNS_DEEP_STUBS);
     exception = new CommitConflictException("");
     transactionDataNodeHasDepartedException = new TransactionDataNodeHasDepartedException("");
+    executor = mock(SingleThreadJTAExecutor.class);
 
     when(txStateProxy.getTxMgr()).thenReturn(mock(TXManagerImpl.class));
   }
@@ -63,11 +67,10 @@ public class TXStateTest {
   @Test
   public void doAfterCompletionThrowsIfCommitFails() {
     TXState txState = spy(new TXState(txStateProxy, true));
-    doReturn(true).when(txState).wasBeforeCompletionCalled();
     txState.reserveAndCheck();
     doThrow(transactionDataNodeHasDepartedException).when(txState).commit();
 
-    assertThatThrownBy(() -> txState.doAfterCompletion(Status.STATUS_COMMITTED))
+    assertThatThrownBy(() -> txState.doAfterCompletionCommit())
         .isSameAs(transactionDataNodeHasDepartedException);
   }
 
@@ -76,8 +79,7 @@ public class TXStateTest {
     TXState txState = spy(new TXState(txStateProxy, false));
     txState.reserveAndCheck();
     txState.closed = true;
-    doReturn(true).when(txState).wasBeforeCompletionCalled();
-    txState.doAfterCompletion(Status.STATUS_COMMITTED);
+    txState.doAfterCompletionCommit();
 
     assertThat(txState.locks).isNull();
     verify(txState, times(1)).saveTXCommitMessageForClientFailover();
@@ -90,6 +92,36 @@ public class TXStateTest {
   }
 
   @Test
+  public void afterCompletionInvokesExecuteAfterCompletionCommitIfBeforeCompletionCalled() {
+    TXState txState = spy(new TXState(txStateProxy, true, executor));
+    doReturn(true).when(txState).wasBeforeCompletionCalled();
+
+    txState.afterCompletion(Status.STATUS_COMMITTED);
+
+    verify(executor, times(1)).executeAfterCompletionCommit();
+  }
+
+  @Test
+  public void afterCompletionThrowsWithUnexpectedStatusIfBeforeCompletionCalled() {
+    TXState txState = spy(new TXState(txStateProxy, true, executor));
+    doReturn(true).when(txState).wasBeforeCompletionCalled();
+
+    Throwable thrown = catchThrowable(() -> txState.afterCompletion(Status.STATUS_NO_TRANSACTION));
+
+    assertThat(thrown).isInstanceOf(TransactionException.class);
+  }
+
+  @Test
+  public void afterCompletionInvokesExecuteAfterCompletionRollbackIfBeforeCompletionCalled() {
+    TXState txState = spy(new TXState(txStateProxy, true, executor));
+    doReturn(true).when(txState).wasBeforeCompletionCalled();
+
+    txState.afterCompletion(Status.STATUS_ROLLEDBACK);
+
+    verify(executor, times(1)).executeAfterCompletionRollback();
+  }
+
+  @Test
   public void afterCompletionCanRollbackJTA() {
     TXState txState = spy(new TXState(txStateProxy, true));
     txState.afterCompletion(Status.STATUS_ROLLEDBACK);