You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2018/08/27 20:11:40 UTC

[geode] branch feature/GEODE-5624 updated: more refactoring.

This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a commit to branch feature/GEODE-5624
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/feature/GEODE-5624 by this push:
     new 13b6cfe  more refactoring.
13b6cfe is described below

commit 13b6cfe2cf2e1709a12d47e1dce7030af57e89f2
Author: eshu <es...@pivotal.io>
AuthorDate: Mon Aug 27 13:11:04 2018 -0700

    more refactoring.
---
 .../geode/internal/cache/AfterCompletion.java      | 105 +++++++++++++
 .../geode/internal/cache/BeforeCompletion.java     |  64 ++++++++
 .../internal/cache/SingleThreadJTAExecutor.java    | 172 +++------------------
 .../org/apache/geode/internal/cache/TXState.java   |  11 +-
 .../geode/internal/cache/AfterCompletionTest.java  | 131 ++++++++++++++++
 .../geode/internal/cache/BeforeCompletionTest.java |  98 ++++++++++++
 .../cache/SingleThreadJTAExecutorTest.java         |  84 +++-------
 7 files changed, 447 insertions(+), 218 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AfterCompletion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AfterCompletion.java
new file mode 100644
index 0000000..02ef92c
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AfterCompletion.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import java.util.function.BooleanSupplier;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.internal.logging.LogService;
+
+public class AfterCompletion {
+  private static final Logger logger = LogService.getLogger();
+
+  private boolean started;
+  private boolean finished;
+  private int status = -1;
+  private boolean cancelled;
+  private RuntimeException exception;
+
+  public synchronized void doOp(TXState txState, CancelCriterion cancelCriterion) {
+    // there should be a transaction timeout that keeps this thread
+    // from sitting around forever if the client goes away
+    // The above was done by setting afterCompletionCancelled in txState
+    // during cleanup. When client departed, the transaction/JTA
+    // will be timed out and cleanup code will be executed.
+    waitForExecuteOrCancel(cancelCriterion);
+    started = true;
+    logger.debug("executing afterCompletion notification");
+
+    try {
+      if (cancelled) {
+        txState.doCleanup();
+      } else {
+        txState.doAfterCompletion(status);
+      }
+    } catch (RuntimeException exception) {
+      this.exception = exception;
+    } finally {
+      logger.debug("afterCompletion notification completed");
+      finished = true;
+      notifyAll();
+    }
+  }
+
+  private void waitForExecuteOrCancel(CancelCriterion cancelCriterion) {
+    waitForCondition(cancelCriterion, () -> {
+      return (status == -1 && !cancelled);
+    });
+  }
+
+  private synchronized void waitForCondition(CancelCriterion cancelCriterion,
+      BooleanSupplier condition) {
+    while (condition.getAsBoolean()) {
+      cancelCriterion.checkCancelInProgress(null);
+      try {
+        logger.debug("waiting for notification");
+        wait(1000);
+      } catch (InterruptedException ignore) {
+        // eat the interrupt and check for exit conditions
+      }
+    }
+  }
+
+  public synchronized void execute(CancelCriterion cancelCriterion, int status) {
+    this.status = status;
+    signalAndWaitForDoOp(cancelCriterion);
+  }
+
+  private void signalAndWaitForDoOp(CancelCriterion cancelCriterion) {
+    notifyAll();
+    waitUntilFinished(cancelCriterion);
+    if (exception != null) {
+      throw exception;
+    }
+  }
+
+  private void waitUntilFinished(CancelCriterion cancelCriterion) {
+    waitForCondition(cancelCriterion, () -> {
+      return !finished;
+    });
+
+  }
+
+  public synchronized void cancel(CancelCriterion cancelCriterion) {
+    cancelled = true;
+    signalAndWaitForDoOp(cancelCriterion);
+  }
+
+  public synchronized boolean isStarted() {
+    return started;
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BeforeCompletion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BeforeCompletion.java
new file mode 100644
index 0000000..99a062c
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BeforeCompletion.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.SynchronizationCommitConflictException;
+import org.apache.geode.internal.logging.LogService;
+
+public class BeforeCompletion {
+  private static final Logger logger = LogService.getLogger();
+
+  private boolean started;
+  private boolean finished;
+  private SynchronizationCommitConflictException exception;
+
+  public synchronized void doOp(TXState txState) {
+    try {
+      txState.doBeforeCompletion();
+    } catch (SynchronizationCommitConflictException exception) {
+      this.exception = exception;
+    } finally {
+      logger.debug("beforeCompletion notification completed");
+      finished = true;
+      notifyAll();
+    }
+  }
+
+  public synchronized void execute(CancelCriterion cancelCriterion) {
+    started = true;
+    waitUntilFinished(cancelCriterion);
+    if (exception != null) {
+      throw exception;
+    }
+  }
+
+  private void waitUntilFinished(CancelCriterion cancelCriterion) {
+    while (!finished) {
+      cancelCriterion.checkCancelInProgress(null);
+      try {
+        wait(1000);
+      } catch (InterruptedException ignore) {
+        // eat the interrupt and check for exit conditions
+      }
+    }
+  }
+
+  public synchronized boolean isStarted() {
+    return started;
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/SingleThreadJTAExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/SingleThreadJTAExecutor.java
index 4df8ca4..7ecca6a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/SingleThreadJTAExecutor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/SingleThreadJTAExecutor.java
@@ -18,180 +18,54 @@ import java.util.concurrent.Executor;
 
 import org.apache.logging.log4j.Logger;
 
-import org.apache.geode.cache.SynchronizationCommitConflictException;
+import org.apache.geode.CancelCriterion;
 import org.apache.geode.internal.logging.LogService;
 
 /**
- * TXStateSynchronizationThread manages beforeCompletion and afterCompletion calls.
- * The thread should be instantiated with a Runnable that invokes beforeCompletion behavior.
- * Then you must invoke executeAfterCompletion() with another Runnable that invokes afterCompletion
- * behavior.
+ * This class ensures that beforeCompletion and afterCompletion are executed in the same thread.
  *
- * @since Geode 1.6.0
+ * @since Geode 1.7.0
  */
 public class SingleThreadJTAExecutor {
   private static final Logger logger = LogService.getLogger();
 
-  private final Object beforeCompletionSync = new Object();
-  private boolean beforeCompletionStarted;
-  private boolean beforeCompletionFinished;
-  private SynchronizationCommitConflictException beforeCompletionException;
+  private final BeforeCompletion beforeCompletion;
+  private final AfterCompletion afterCompletion;
 
-  private final Object afterCompletionSync = new Object();
-  private boolean afterCompletionStarted;
-  private boolean afterCompletionFinished;
-  private int afterCompletionStatus = -1;
-  private boolean afterCompletionCancelled;
-  private RuntimeException afterCompletionException;
-
-  public SingleThreadJTAExecutor() {}
-
-  void doOps(TXState txState) {
-    doBeforeCompletionOp(txState);
-    doAfterCompletionOp(txState);
-  }
-
-  void doBeforeCompletionOp(TXState txState) {
-    synchronized (beforeCompletionSync) {
-      try {
-        txState.doBeforeCompletion();
-      } catch (SynchronizationCommitConflictException exception) {
-        beforeCompletionException = exception;
-      } finally {
-        if (logger.isDebugEnabled()) {
-          logger.debug("beforeCompletion notification completed");
-        }
-        beforeCompletionFinished = true;
-        beforeCompletionSync.notifyAll();
-      }
-    }
-  }
-
-  boolean isBeforeCompletionStarted() {
-    synchronized (beforeCompletionSync) {
-      return beforeCompletionStarted;
-    }
-  }
-
-  boolean isAfterCompletionStarted() {
-    synchronized (afterCompletionSync) {
-      return afterCompletionStarted;
-    }
+  public SingleThreadJTAExecutor() {
+    this(new BeforeCompletion(), new AfterCompletion());
   }
 
-  boolean isBeforeCompletionFinished() {
-    synchronized (beforeCompletionSync) {
-      return beforeCompletionFinished;
-    }
+  public SingleThreadJTAExecutor(BeforeCompletion beforeCompletion,
+      AfterCompletion afterCompletion) {
+    this.beforeCompletion = beforeCompletion;
+    this.afterCompletion = afterCompletion;
   }
 
-  boolean isAfterCompletionFinished() {
-    synchronized (afterCompletionSync) {
-      return afterCompletionFinished;
-    }
+  private void doOps(TXState txState, CancelCriterion cancelCriterion) {
+    beforeCompletion.doOp(txState);
+    afterCompletion.doOp(txState, cancelCriterion);
   }
 
-  public void executeBeforeCompletion(TXState txState, Executor executor) {
-    executor.execute(() -> doOps(txState));
-
-    synchronized (beforeCompletionSync) {
-      beforeCompletionStarted = true;
-      while (!beforeCompletionFinished) {
-        try {
-          beforeCompletionSync.wait(1000);
-        } catch (InterruptedException ignore) {
-          // eat the interrupt and check for exit conditions
-        }
-        txState.getCache().getCancelCriterion().checkCancelInProgress(null);
-      }
-      if (getBeforeCompletionException() != null) {
-        throw getBeforeCompletionException();
-      }
-    }
-  }
-
-  SynchronizationCommitConflictException getBeforeCompletionException() {
-    return beforeCompletionException;
-  }
-
-  private void doAfterCompletionOp(TXState txState) {
-    synchronized (afterCompletionSync) {
-      // there should be a transaction timeout that keeps this thread
-      // from sitting around forever if the client goes away
-      // The above was done by setting afterCompletionCancelled in txState
-      // during cleanup. When client departed, the transaction/JTA
-      // will be timed out and cleanup code will be executed.
-      final boolean isDebugEnabled = logger.isDebugEnabled();
-      while (afterCompletionStatus == -1 && !afterCompletionCancelled) {
-        try {
-          if (isDebugEnabled) {
-            logger.debug("waiting for afterCompletion notification");
-          }
-          afterCompletionSync.wait(1000);
-        } catch (InterruptedException ignore) {
-          // eat the interrupt and check for exit conditions
-        }
-      }
-      afterCompletionStarted = true;
-      if (isDebugEnabled) {
-        logger.debug("executing afterCompletion notification");
-      }
-      try {
-        if (!afterCompletionCancelled) {
-          txState.doAfterCompletion(afterCompletionStatus);
-        } else {
-          txState.doCleanup();
-        }
-      } catch (RuntimeException exception) {
-        afterCompletionException = exception;
-      } finally {
-        if (isDebugEnabled) {
-          logger.debug("afterCompletion notification completed");
-        }
-        afterCompletionFinished = true;
-        afterCompletionSync.notifyAll();
-      }
-    }
-  }
-
-  public void executeAfterCompletion(TXState txState, int status) {
-    synchronized (afterCompletionSync) {
-      afterCompletionStatus = status;
-      afterCompletionSync.notifyAll();
-      waitForAfterCompletionToFinish(txState);
-      if (getAfterCompletionException() != null) {
-        throw getAfterCompletionException();
-      }
-    }
-  }
+  public void executeBeforeCompletion(TXState txState, Executor executor,
+      CancelCriterion cancelCriterion) {
+    executor.execute(() -> doOps(txState, cancelCriterion));
 
-  private void waitForAfterCompletionToFinish(TXState txState) {
-    while (!afterCompletionFinished) {
-      try {
-        afterCompletionSync.wait(1000);
-      } catch (InterruptedException ignore) {
-        // eat the interrupt and check for exit conditions
-      }
-      txState.getCache().getCancelCriterion().checkCancelInProgress(null);
-    }
+    beforeCompletion.execute(cancelCriterion);
   }
 
-  RuntimeException getAfterCompletionException() {
-    return afterCompletionException;
+  public void executeAfterCompletion(CancelCriterion cancelCriterion, int status) {
+    afterCompletion.execute(cancelCriterion, status);
   }
 
   /**
    * stop waiting for an afterCompletion to arrive and just exit
    */
-  public void cleanup(TXState txState) {
-    synchronized (afterCompletionSync) {
-      afterCompletionCancelled = true;
-      afterCompletionSync.notifyAll();
-      waitForAfterCompletionToFinish(txState);
-    }
+  public void cleanup(CancelCriterion cancelCriterion) {
+    afterCompletion.cancel(cancelCriterion);
   }
 
   public boolean shouldDoCleanup() {
-    return isBeforeCompletionStarted() && !isAfterCompletionStarted();
+    return beforeCompletion.isStarted() && !afterCompletion.isStarted();
   }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
index bce1af4..83d00d1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
@@ -31,6 +31,7 @@ import javax.transaction.Status;
 
 import org.apache.logging.log4j.Logger;
 
+import org.apache.geode.CancelCriterion;
 import org.apache.geode.CancelException;
 import org.apache.geode.SystemFailure;
 import org.apache.geode.cache.CommitConflictException;
@@ -869,7 +870,7 @@ public class TXState implements TXStateInterface {
 
   protected void cleanup() {
     if (singleThreadJTAExecutor.shouldDoCleanup()) {
-      singleThreadJTAExecutor.cleanup(this);
+      singleThreadJTAExecutor.cleanup(getCancelCriterion());
     } else {
       doCleanup();
     }
@@ -1036,13 +1037,17 @@ public class TXState implements TXStateInterface {
     }
     beforeCompletionCalled = true;
     singleThreadJTAExecutor.executeBeforeCompletion(this,
-        getExecutor());
+        getExecutor(), getCancelCriterion());
   }
 
   Executor getExecutor() {
     return getCache().getDistributionManager().getWaitingThreadPool();
   }
 
+  CancelCriterion getCancelCriterion() {
+    return getCache().getCancelCriterion();
+  }
+
   void doBeforeCompletion() {
     proxy.getTxMgr().setTXState(null);
     final long opStart = CachePerfStats.getStatTime();
@@ -1105,7 +1110,7 @@ public class TXState implements TXStateInterface {
     // sitting in the waiting pool to execute afterCompletion. Otherwise
     // throw FailedSynchronizationException().
     if (beforeCompletionCalled) {
-      singleThreadJTAExecutor.executeAfterCompletion(this, status);
+      singleThreadJTAExecutor.executeAfterCompletion(getCancelCriterion(), status);
     } else {
       // rollback does not run beforeCompletion.
       if (status != Status.STATUS_ROLLEDBACK) {
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/AfterCompletionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/AfterCompletionTest.java
new file mode 100644
index 0000000..d94df0e
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/AfterCompletionTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.transaction.Status;
+
+import org.awaitility.Awaitility;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.CancelCriterion;
+
+public class AfterCompletionTest {
+  private AfterCompletion afterCompletion;
+  private CancelCriterion cancelCriterion;
+  private TXState txState;
+  private Thread doOpThread;
+
+  @Before
+  public void setup() {
+    afterCompletion = new AfterCompletion();
+    cancelCriterion = mock(CancelCriterion.class);
+    txState = mock(TXState.class);
+  }
+
+  @Test
+  public void executeThrowsIfCancelCriterionThrows() {
+    doThrow(new RuntimeException()).when(cancelCriterion).checkCancelInProgress(null);
+
+    assertThatThrownBy(() -> afterCompletion.execute(cancelCriterion, Status.STATUS_COMMITTED))
+        .isInstanceOf(RuntimeException.class);
+  }
+
+  @Test
+  public void cancelThrowsIfCancelCriterionThrows() {
+    doThrow(new RuntimeException()).when(cancelCriterion).checkCancelInProgress(null);
+
+    assertThatThrownBy(() -> afterCompletion.cancel(cancelCriterion))
+        .isInstanceOf(RuntimeException.class);
+  }
+
+  @Test
+  public void isStartedReturnsFalseIfNotExecuted() {
+    assertThat(afterCompletion.isStarted()).isFalse();
+  }
+
+  @Test
+  public void isStartedReturnsTrueIfExecuted() {
+    startDoOp();
+
+    afterCompletion.execute(cancelCriterion, Status.STATUS_COMMITTED);
+
+    verifyDoOpFinished();
+    assertThat(afterCompletion.isStarted()).isTrue();
+  }
+
+  @Test
+  public void executeCallsDoAfterCompletion() {
+    startDoOp();
+
+    afterCompletion.execute(cancelCriterion, Status.STATUS_COMMITTED);
+    verifyDoOpFinished();
+    verify(txState, times(1)).doAfterCompletion(eq(Status.STATUS_COMMITTED));
+  }
+
+  @Test
+  public void executeThrowsDoAfterCompletionThrows() {
+    startDoOp();
+    doThrow(new RuntimeException()).when(txState).doAfterCompletion(Status.STATUS_COMMITTED);
+
+    assertThatThrownBy(() -> afterCompletion.execute(cancelCriterion, Status.STATUS_COMMITTED))
+        .isInstanceOf(RuntimeException.class);
+
+    verifyDoOpFinished();
+  }
+
+  @Test
+  public void cancelCallsDoCleanup() {
+    startDoOp();
+
+    afterCompletion.cancel(cancelCriterion);
+    verifyDoOpFinished();
+    verify(txState, times(1)).doCleanup();
+  }
+
+  @Test
+  public void cancelThrowsDoCleanupThrows() {
+    startDoOp();
+    doThrow(new RuntimeException()).when(txState).doCleanup();
+
+    assertThatThrownBy(() -> afterCompletion.cancel(cancelCriterion))
+        .isInstanceOf(RuntimeException.class);
+
+    verifyDoOpFinished();
+  }
+
+  private void startDoOp() {
+    doOpThread = new Thread(() -> afterCompletion.doOp(txState, cancelCriterion));
+    doOpThread.start();
+    Awaitility.await().atMost(60, TimeUnit.SECONDS)
+        .untilAsserted(() -> verify(cancelCriterion, times(1)).checkCancelInProgress(null));
+
+  }
+
+  private void verifyDoOpFinished() {
+    Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> !doOpThread.isAlive());
+  }
+
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/BeforeCompletionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/BeforeCompletionTest.java
new file mode 100644
index 0000000..1f541b6
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/BeforeCompletionTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.concurrent.TimeUnit;
+
+import org.awaitility.Awaitility;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.SynchronizationCommitConflictException;
+
+public class BeforeCompletionTest {
+
+  private BeforeCompletion beforeCompletion;
+  private CancelCriterion cancelCriterion;
+  private TXState txState;
+
+  @Before
+  public void setup() {
+    beforeCompletion = new BeforeCompletion();
+    cancelCriterion = mock(CancelCriterion.class);
+    txState = mock(TXState.class);
+  }
+
+  @Test
+  public void executeThrowsExceptionIfDoOpFailedWithException() {
+    doThrow(new SynchronizationCommitConflictException("")).when(txState).doBeforeCompletion();
+
+    beforeCompletion.doOp(txState);
+
+    assertThatThrownBy(() -> beforeCompletion.execute(cancelCriterion))
+        .isInstanceOf(SynchronizationCommitConflictException.class);
+  }
+
+  @Test
+  public void doOpCallsDoBeforeCompletion() {
+    beforeCompletion.doOp(txState);
+
+    verify(txState, times(1)).doBeforeCompletion();
+  }
+
+  @Test
+  public void isStartedReturnsFalseIfNotExecuted() {
+    assertThat(beforeCompletion.isStarted()).isFalse();
+  }
+
+  @Test
+  public void isStartedReturnsTrueIfExecuted() {
+    beforeCompletion.doOp(txState);
+    beforeCompletion.execute(cancelCriterion);
+
+    assertThat(beforeCompletion.isStarted()).isTrue();
+  }
+
+  @Test
+  public void executeThrowsIfCancelCriterionThrows() {
+    doThrow(new RuntimeException()).when(cancelCriterion).checkCancelInProgress(null);
+
+    assertThatThrownBy(() -> beforeCompletion.execute(cancelCriterion))
+        .isInstanceOf(RuntimeException.class);
+  }
+
+  @Test
+  public void executeWaitsUntilDoOpFinish() throws Exception {
+    Thread thread = new Thread(() -> beforeCompletion.execute(cancelCriterion));
+    thread.start();
+    // give the thread a chance to get past the "finished" check by waiting until
+    // checkCancelInProgress is called
+    Awaitility.await().atMost(60, TimeUnit.SECONDS)
+        .untilAsserted(() -> verify(cancelCriterion, times(1)).checkCancelInProgress(null));
+
+    beforeCompletion.doOp(txState);
+
+    Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> !(thread.isAlive()));
+  }
+
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/SingleThreadJTAExecutorTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/SingleThreadJTAExecutorTest.java
index 79f4324..1cf70a3 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/SingleThreadJTAExecutorTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/SingleThreadJTAExecutorTest.java
@@ -14,10 +14,9 @@
  */
 package org.apache.geode.internal.cache;
 
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.assertj.core.api.Java6Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
-import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -25,85 +24,38 @@ import static org.mockito.Mockito.verify;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
-import javax.transaction.Status;
-
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.InOrder;
 
-import org.apache.geode.cache.SynchronizationCommitConflictException;
-import org.apache.geode.cache.TransactionException;
+import org.apache.geode.CancelCriterion;
 
 public class SingleThreadJTAExecutorTest {
-  private TXState txState;
   private SingleThreadJTAExecutor singleThreadJTAExecutor;
+  private TXState txState;
   private ExecutorService executor;
+  private BeforeCompletion beforeCompletion;
+  private AfterCompletion afterCompletion;
+  private CancelCriterion cancelCriterion;
 
   @Before
   public void setup() {
     txState = mock(TXState.class, RETURNS_DEEP_STUBS);
     executor = Executors.newSingleThreadExecutor();
+    beforeCompletion = mock(BeforeCompletion.class);
+    afterCompletion = mock(AfterCompletion.class);
+    cancelCriterion = mock(CancelCriterion.class);
+    singleThreadJTAExecutor = new SingleThreadJTAExecutor(beforeCompletion, afterCompletion);
   }
 
   @Test
-  public void executeBeforeCompletionCallsDoBeforeCompletion() {
-    singleThreadJTAExecutor = new SingleThreadJTAExecutor();
-
-    singleThreadJTAExecutor.executeBeforeCompletion(txState, executor);
-
-    verify(txState, times(1)).doBeforeCompletion();
-
-    assertThat(singleThreadJTAExecutor.isBeforeCompletionFinished()).isTrue();
-  }
-
-  @Test(expected = SynchronizationCommitConflictException.class)
-  public void executeBeforeCompletionThrowsExceptionIfBeforeCompletionFailed() {
-    singleThreadJTAExecutor = new SingleThreadJTAExecutor();
-    doThrow(new SynchronizationCommitConflictException("")).when(txState).doBeforeCompletion();
-
-    singleThreadJTAExecutor.executeBeforeCompletion(txState, executor);
-
-    verify(txState, times(1)).doBeforeCompletion();
-    assertThat(singleThreadJTAExecutor.isBeforeCompletionFinished()).isTrue();
-  }
-
-  @Test
-  public void executeAfterCompletionCallsDoAfterCompletion() {
-    singleThreadJTAExecutor = new SingleThreadJTAExecutor();
-    int status = Status.STATUS_COMMITTED;
-
-    singleThreadJTAExecutor.executeBeforeCompletion(txState, executor);
-    singleThreadJTAExecutor.executeAfterCompletion(txState, status);
-
-    verify(txState, times(1)).doBeforeCompletion();
-    verify(txState, times(1)).doAfterCompletion(status);
-    assertThat(singleThreadJTAExecutor.isBeforeCompletionFinished()).isTrue();
-  }
-
-  @Test
-  public void executeAfterCompletionThrowsExceptionIfAfterCompletionFailed() {
-    singleThreadJTAExecutor = new SingleThreadJTAExecutor();
-    int status = Status.STATUS_COMMITTED;
-    TransactionException exception = new TransactionException("");
-    doThrow(exception).when(txState).doAfterCompletion(status);
-
-    singleThreadJTAExecutor.executeBeforeCompletion(txState, executor);
-
-    assertThatThrownBy(() -> singleThreadJTAExecutor.executeAfterCompletion(txState, status))
-        .isSameAs(exception);
-    verify(txState, times(1)).doBeforeCompletion();
-    verify(txState, times(1)).doAfterCompletion(status);
-  }
-
-  @Test
-  public void executorThreadNoLongerWaitForAfterCompletionIfTXStateIsCleanedUp() {
-    singleThreadJTAExecutor = new SingleThreadJTAExecutor();
-
-    singleThreadJTAExecutor.executeBeforeCompletion(txState, executor);
-    singleThreadJTAExecutor.cleanup(txState);
+  public void executeBeforeCompletionCallsDoOps() {
+    singleThreadJTAExecutor.executeBeforeCompletion(txState, executor, cancelCriterion);
 
-    verify(txState, times(1)).doBeforeCompletion();
-    assertThat(singleThreadJTAExecutor.isBeforeCompletionFinished()).isTrue();
-    assertThat(singleThreadJTAExecutor.isAfterCompletionFinished()).isTrue();
+    InOrder inOrder = inOrder(beforeCompletion, afterCompletion);
+    inOrder.verify(beforeCompletion, times(1)).doOp(eq(txState));
+    inOrder.verify(afterCompletion, times(1)).doOp(eq(txState), eq(cancelCriterion));
+    verify(beforeCompletion, times(1)).execute(eq(cancelCriterion));
   }
 
 }