You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2018/08/24 20:37:45 UTC

[geode] 02/02: wip -- refactor classes.

This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a commit to branch feature/GEODE-5624
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 4193fb719694615765fea70fd2169a1b6653c3ce
Author: eshu <es...@pivotal.io>
AuthorDate: Fri Aug 24 13:37:08 2018 -0700

    wip -- refactor classes.
---
 .../ClientServerJTAFailoverDistributedTest.java    |  14 +-
 .../internal/cache/SingleThreadJTAExecutor.java    | 197 ++++++++++++++++++
 .../org/apache/geode/internal/cache/TXState.java   | 225 +++------------------
 .../cache/TXStateSynchronizationRunnable.java      | 144 -------------
 .../cache/SingleThreadJTAExecutorTest.java         | 109 ++++++++++
 .../apache/geode/internal/cache/TXStateTest.java   |  51 ++---
 6 files changed, 363 insertions(+), 377 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/jta/ClientServerJTAFailoverDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/jta/ClientServerJTAFailoverDistributedTest.java
index e4589fa..2ae4e9b 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/jta/ClientServerJTAFailoverDistributedTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/jta/ClientServerJTAFailoverDistributedTest.java
@@ -130,7 +130,8 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable {
     cacheRule.getOrCreateCache().createRegionFactory(RegionShortcut.PARTITION)
         .setPartitionAttributes(partitionAttributes).create(regionName);
     if (hasReplicateRegion) {
-      cacheRule.getOrCreateCache().createRegionFactory(RegionShortcut.REPLICATE).create(replicateRegionName);
+      cacheRule.getOrCreateCache().createRegionFactory(RegionShortcut.REPLICATE)
+          .create(replicateRegionName);
     }
 
     CacheServer server = cacheRule.getCache().addCacheServer();
@@ -154,7 +155,8 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable {
         clientCacheRule.getClientCache().createClientRegionFactory(ClientRegionShortcut.LOCAL);
     crf.setPoolName(pool.getName());
     crf.create(regionName);
-    if (hasReplicateRegion) crf.create(replicateRegionName);
+    if (hasReplicateRegion)
+      crf.create(replicateRegionName);
 
     if (ports.length > 1) {
       pool.acquireConnection(new ServerLocation(hostName, port1));
@@ -178,11 +180,12 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable {
     Object[] results = new Object[2];
     InternalClientCache cache = clientCacheRule.getClientCache();
     Region region = cache.getRegion(regionName);
-    Region replicateRegion = hasReplicateRegion? cache.getRegion(replicateRegionName) : null;
+    Region replicateRegion = hasReplicateRegion ? cache.getRegion(replicateRegionName) : null;
     TXManagerImpl txManager = (TXManagerImpl) cache.getCacheTransactionManager();
     txManager.begin();
     region.put(key, newValue);
-    if (hasReplicateRegion) replicateRegion.put(key, newValue);
+    if (hasReplicateRegion)
+      replicateRegion.put(key, newValue);
 
     TXStateProxyImpl txStateProxy = (TXStateProxyImpl) txManager.getTXState();
     ClientTXStateStub clientTXStateStub = (ClientTXStateStub) txStateProxy.getRealDeal(null, null);
@@ -212,7 +215,8 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable {
     }
     if (isCommit) {
       assertEquals(newValue, region.get(key));
-      if (hasReplicateRegion) assertEquals(newValue, replicateRegion.get(key));
+      if (hasReplicateRegion)
+        assertEquals(newValue, replicateRegion.get(key));
     } else {
       assertEquals(value, region.get(key));
     }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/SingleThreadJTAExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/SingleThreadJTAExecutor.java
new file mode 100644
index 0000000..4df8ca4
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/SingleThreadJTAExecutor.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import java.util.concurrent.Executor;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.cache.SynchronizationCommitConflictException;
+import org.apache.geode.internal.logging.LogService;
+
+/**
+ * TXStateSynchronizationThread manages beforeCompletion and afterCompletion calls.
+ * The thread should be instantiated with a Runnable that invokes beforeCompletion behavior.
+ * Then you must invoke executeAfterCompletion() with another Runnable that invokes afterCompletion
+ * behavior.
+ *
+ * @since Geode 1.6.0
+ */
+public class SingleThreadJTAExecutor {
+  private static final Logger logger = LogService.getLogger();
+
+  private final Object beforeCompletionSync = new Object();
+  private boolean beforeCompletionStarted;
+  private boolean beforeCompletionFinished;
+  private SynchronizationCommitConflictException beforeCompletionException;
+
+  private final Object afterCompletionSync = new Object();
+  private boolean afterCompletionStarted;
+  private boolean afterCompletionFinished;
+  private int afterCompletionStatus = -1;
+  private boolean afterCompletionCancelled;
+  private RuntimeException afterCompletionException;
+
+  public SingleThreadJTAExecutor() {}
+
+  void doOps(TXState txState) {
+    doBeforeCompletionOp(txState);
+    doAfterCompletionOp(txState);
+  }
+
+  void doBeforeCompletionOp(TXState txState) {
+    synchronized (beforeCompletionSync) {
+      try {
+        txState.doBeforeCompletion();
+      } catch (SynchronizationCommitConflictException exception) {
+        beforeCompletionException = exception;
+      } finally {
+        if (logger.isDebugEnabled()) {
+          logger.debug("beforeCompletion notification completed");
+        }
+        beforeCompletionFinished = true;
+        beforeCompletionSync.notifyAll();
+      }
+    }
+  }
+
+  boolean isBeforeCompletionStarted() {
+    synchronized (beforeCompletionSync) {
+      return beforeCompletionStarted;
+    }
+  }
+
+  boolean isAfterCompletionStarted() {
+    synchronized (afterCompletionSync) {
+      return afterCompletionStarted;
+    }
+  }
+
+  boolean isBeforeCompletionFinished() {
+    synchronized (beforeCompletionSync) {
+      return beforeCompletionFinished;
+    }
+  }
+
+  boolean isAfterCompletionFinished() {
+    synchronized (afterCompletionSync) {
+      return afterCompletionFinished;
+    }
+  }
+
+  public void executeBeforeCompletion(TXState txState, Executor executor) {
+    executor.execute(() -> doOps(txState));
+
+    synchronized (beforeCompletionSync) {
+      beforeCompletionStarted = true;
+      while (!beforeCompletionFinished) {
+        try {
+          beforeCompletionSync.wait(1000);
+        } catch (InterruptedException ignore) {
+          // eat the interrupt and check for exit conditions
+        }
+        txState.getCache().getCancelCriterion().checkCancelInProgress(null);
+      }
+      if (getBeforeCompletionException() != null) {
+        throw getBeforeCompletionException();
+      }
+    }
+  }
+
+  SynchronizationCommitConflictException getBeforeCompletionException() {
+    return beforeCompletionException;
+  }
+
+  private void doAfterCompletionOp(TXState txState) {
+    synchronized (afterCompletionSync) {
+      // there should be a transaction timeout that keeps this thread
+      // from sitting around forever if the client goes away
+      // The above was done by setting afterCompletionCancelled in txState
+      // during cleanup. When client departed, the transaction/JTA
+      // will be timed out and cleanup code will be executed.
+      final boolean isDebugEnabled = logger.isDebugEnabled();
+      while (afterCompletionStatus == -1 && !afterCompletionCancelled) {
+        try {
+          if (isDebugEnabled) {
+            logger.debug("waiting for afterCompletion notification");
+          }
+          afterCompletionSync.wait(1000);
+        } catch (InterruptedException ignore) {
+          // eat the interrupt and check for exit conditions
+        }
+      }
+      afterCompletionStarted = true;
+      if (isDebugEnabled) {
+        logger.debug("executing afterCompletion notification");
+      }
+      try {
+        if (!afterCompletionCancelled) {
+          txState.doAfterCompletion(afterCompletionStatus);
+        } else {
+          txState.doCleanup();
+        }
+      } catch (RuntimeException exception) {
+        afterCompletionException = exception;
+      } finally {
+        if (isDebugEnabled) {
+          logger.debug("afterCompletion notification completed");
+        }
+        afterCompletionFinished = true;
+        afterCompletionSync.notifyAll();
+      }
+    }
+  }
+
+  public void executeAfterCompletion(TXState txState, int status) {
+    synchronized (afterCompletionSync) {
+      afterCompletionStatus = status;
+      afterCompletionSync.notifyAll();
+      waitForAfterCompletionToFinish(txState);
+      if (getAfterCompletionException() != null) {
+        throw getAfterCompletionException();
+      }
+    }
+  }
+
+  private void waitForAfterCompletionToFinish(TXState txState) {
+    while (!afterCompletionFinished) {
+      try {
+        afterCompletionSync.wait(1000);
+      } catch (InterruptedException ignore) {
+        // eat the interrupt and check for exit conditions
+      }
+      txState.getCache().getCancelCriterion().checkCancelInProgress(null);
+    }
+  }
+
+  RuntimeException getAfterCompletionException() {
+    return afterCompletionException;
+  }
+
+  /**
+   * stop waiting for an afterCompletion to arrive and just exit
+   */
+  public void cleanup(TXState txState) {
+    synchronized (afterCompletionSync) {
+      afterCompletionCancelled = true;
+      afterCompletionSync.notifyAll();
+      waitForAfterCompletionToFinish(txState);
+    }
+  }
+
+  public boolean shouldDoCleanup() {
+    return isBeforeCompletionStarted() && !isAfterCompletionStarted();
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
index f33267e..bce1af4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
@@ -32,7 +32,6 @@ import javax.transaction.Status;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.CancelException;
-import org.apache.geode.InternalGemFireError;
 import org.apache.geode.SystemFailure;
 import org.apache.geode.cache.CommitConflictException;
 import org.apache.geode.cache.DiskAccessException;
@@ -44,7 +43,6 @@ import org.apache.geode.cache.Region.Entry;
 import org.apache.geode.cache.RegionDestroyedException;
 import org.apache.geode.cache.SynchronizationCommitConflictException;
 import org.apache.geode.cache.TransactionDataRebalancedException;
-import org.apache.geode.cache.TransactionException;
 import org.apache.geode.cache.TransactionId;
 import org.apache.geode.cache.TransactionWriter;
 import org.apache.geode.cache.TransactionWriterException;
@@ -52,7 +50,6 @@ import org.apache.geode.cache.UnsupportedOperationInTransactionException;
 import org.apache.geode.cache.client.internal.ServerRegionDataAccess;
 import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.TXManagerCancelledException;
-import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.cache.control.MemoryThresholds;
@@ -108,9 +105,7 @@ public class TXState implements TXStateInterface {
    * and afterCompletion so that beforeCompletion can obtain locks for the afterCompletion step.
    * This is that thread
    */
-  protected volatile TXStateSynchronizationRunnable syncRunnable;
-  private volatile SynchronizationCommitConflictException beforeCompletionException;
-  private volatile RuntimeException afterCompletionException;
+  private final SingleThreadJTAExecutor singleThreadJTAExecutor;
 
   // Internal testing hooks
   private Runnable internalAfterReservation;
@@ -153,6 +148,11 @@ public class TXState implements TXStateInterface {
   private volatile DistributedMember proxyServer;
 
   public TXState(TXStateProxy proxy, boolean onBehalfOfRemoteStub) {
+    this(proxy, onBehalfOfRemoteStub, new SingleThreadJTAExecutor());
+  }
+
+  public TXState(TXStateProxy proxy, boolean onBehalfOfRemoteStub,
+      SingleThreadJTAExecutor singleThreadJTAExecutor) {
     this.beginTime = CachePerfStats.getStatTime();
     this.regions = new IdentityHashMap<>();
 
@@ -165,7 +165,7 @@ public class TXState implements TXStateInterface {
     this.internalAfterSend = null;
     this.proxy = proxy;
     this.onBehalfOfRemoteStub = onBehalfOfRemoteStub;
-
+    this.singleThreadJTAExecutor = singleThreadJTAExecutor;
   }
 
   private boolean hasSeenEvent(EntryEventImpl event) {
@@ -428,7 +428,7 @@ public class TXState implements TXStateInterface {
       }
 
       /*
-       * If there is a TransactionWriter plugged in, we need to to give it an opportunity to abort
+       * If there is a TransactionWriter plugged in, we need to to give it an opportunity to cleanup
        * the transaction.
        */
       TransactionWriter writer = this.proxy.getTxMgr().getWriter();
@@ -868,6 +868,14 @@ public class TXState implements TXStateInterface {
   }
 
   protected void cleanup() {
+    if (singleThreadJTAExecutor.shouldDoCleanup()) {
+      singleThreadJTAExecutor.cleanup(this);
+    } else {
+      doCleanup();
+    }
+  }
+
+  protected void doCleanup() {
     IllegalArgumentException iae = null;
     try {
       this.closed = true;
@@ -921,9 +929,7 @@ public class TXState implements TXStateInterface {
       synchronized (this.completionGuard) {
         this.completionGuard.notifyAll();
       }
-      if (this.syncRunnable != null) {
-        this.syncRunnable.abort();
-      }
+
       if (iae != null && !this.proxy.getCache().isClosed()) {
         throw iae;
       }
@@ -1010,75 +1016,6 @@ public class TXState implements TXStateInterface {
     }
   }
 
-//  //////////////////////////////////////////////////////////////////
-//  // JTA Synchronization implementation //
-//  //////////////////////////////////////////////////////////////////
-//  /*
-//   * (non-Javadoc)
-//   *
-//   * @see org.apache.geode.internal.cache.TXStateInterface#beforeCompletion()
-//   */
-//  @Override
-//  public synchronized void beforeCompletion() throws SynchronizationCommitConflictException {
-//    if (this.closed) {
-//      throw new TXManagerCancelledException();
-//    }
-//    if (beforeCompletionCalled) {
-//      // do not re-execute beforeCompletion again
-//      return;
-//    }
-//    beforeCompletionCalled = true;
-//    doBeforeCompletion();
-//  }
-//
-//  private void doBeforeCompletion() {
-//    final long opStart = CachePerfStats.getStatTime();
-//    this.jtaLifeTime = opStart - getBeginTime();
-//    try {
-//      reserveAndCheck();
-//      /*
-//       * If there is a TransactionWriter plugged in, we need to to give it an opportunity to abort
-//       * the transaction.
-//       */
-//      TransactionWriter writer = this.proxy.getTxMgr().getWriter();
-//      if (writer != null) {
-//        try {
-//          // need to mark this so we don't fire again in commit
-//          firedWriter = true;
-//          TXEvent event = getEvent();
-//          if (!event.hasOnlyInternalEvents()) {
-//            writer.beforeCommit(event);
-//          }
-//        } catch (TransactionWriterException twe) {
-//          throw new CommitConflictException(twe);
-//        } catch (VirtualMachineError err) {
-//          // cleanup(); this allocates objects so I don't think we can do it - that leaves the TX
-//          // open, but we are poison pilling so we should be ok??
-//
-//          SystemFailure.initiateFailure(err);
-//          // If this ever returns, rethrow the error. We're poisoned
-//          // now, so don't let this thread continue.
-//          throw err;
-//        } catch (Throwable t) {
-//          // Whenever you catch Error or Throwable, you must also
-//          // catch VirtualMachineError (see above). However, there is
-//          // _still_ a possibility that you are dealing with a cascading
-//          // error condition, so you also need to check to see if the JVM
-//          // is still usable:
-//          SystemFailure.checkFailure();
-//          throw new CommitConflictException(t);
-//        }
-//      }
-//    } catch (CommitConflictException commitConflict) {
-//      cleanup();
-//      proxy.getTxMgr().noteCommitFailure(opStart, this.jtaLifeTime, this);
-//      throw new SynchronizationCommitConflictException(
-//          LocalizedStrings.TXState_CONFLICT_DETECTED_IN_GEMFIRE_TRANSACTION_0
-//              .toLocalizedString(getTransactionId()),
-//          commitConflict);
-//    }
-//  }
-
   //////////////////////////////////////////////////////////////////
   // JTA Synchronization implementation //
   //////////////////////////////////////////////////////////////////
@@ -1098,55 +1035,23 @@ public class TXState implements TXStateInterface {
       return;
     }
     beforeCompletionCalled = true;
-
-    TXStateSynchronizationRunnable sync = createTxStateSynchronizationRunnable();
-    setSynchronizationRunnable(sync);
-
-    Executor exec = getExecutor();
-    exec.execute(sync);
-    sync.waitForFirstExecution();
-    if (getBeforeCompletionException() != null) {
-      throw getBeforeCompletionException();
-    }
-    //doBeforeCompletion();
-  }
-
-  TXStateSynchronizationRunnable createTxStateSynchronizationRunnable() {
-    Runnable beforeCompletion = new Runnable() {
-      @SuppressWarnings("synthetic-access")
-      public void run() {
-        doBeforeCompletion();
-      }
-    };
-
-    return new TXStateSynchronizationRunnable(getCache().getCancelCriterion(),
-        beforeCompletion);
+    singleThreadJTAExecutor.executeBeforeCompletion(this,
+        getExecutor());
   }
 
   Executor getExecutor() {
-    return InternalDistributedSystem.getConnectedInstance().getDistributionManager()
-        .getWaitingThreadPool();
-  }
-
-  SynchronizationCommitConflictException getBeforeCompletionException() {
-    return beforeCompletionException;
-  }
-
-  private void setSynchronizationRunnable(TXStateSynchronizationRunnable synchronizationRunnable) {
-    syncRunnable = synchronizationRunnable;
+    return getCache().getDistributionManager().getWaitingThreadPool();
   }
 
-
-  private void doBeforeCompletion() {
+  void doBeforeCompletion() {
     proxy.getTxMgr().setTXState(null);
     final long opStart = CachePerfStats.getStatTime();
     this.jtaLifeTime = opStart - getBeginTime();
 
-
     try {
       reserveAndCheck();
       /*
-       * If there is a TransactionWriter plugged in, we need to to give it an opportunity to abort
+       * If there is a TransactionWriter plugged in, we need to to give it an opportunity to cleanup
        * the transaction.
        */
       TransactionWriter writer = this.proxy.getTxMgr().getWriter();
@@ -1189,28 +1094,18 @@ public class TXState implements TXStateInterface {
   }
 
   /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.internal.cache.TXStateInterface#afterCompletion(int)
- */
+   * (non-Javadoc)
+   *
+   * @see org.apache.geode.internal.cache.TXStateInterface#afterCompletion(int)
+   */
   @Override
   public synchronized void afterCompletion(int status) {
     proxy.getTxMgr().setTXState(null);
-    Runnable afterCompletion = new Runnable() {
-      @SuppressWarnings("synthetic-access")
-      public void run() {
-        doAfterCompletion(status);
-      }
-    };
     // if there was a beforeCompletion call then there will be a thread
     // sitting in the waiting pool to execute afterCompletion. Otherwise
     // throw FailedSynchronizationException().
-    TXStateSynchronizationRunnable sync = getSynchronizationRunnable();
-    if (sync != null) {
-      sync.runSecondRunnable(afterCompletion);
-      if (getAfterCompletionException() != null) {
-        throw getAfterCompletionException();
-      }
+    if (beforeCompletionCalled) {
+      singleThreadJTAExecutor.executeAfterCompletion(this, status);
     } else {
       // rollback does not run beforeCompletion.
       if (status != Status.STATUS_ROLLEDBACK) {
@@ -1221,15 +1116,7 @@ public class TXState implements TXStateInterface {
     }
   }
 
-  TXStateSynchronizationRunnable getSynchronizationRunnable() {
-    return this.syncRunnable;
-  }
-
-  RuntimeException getAfterCompletionException() {
-    return afterCompletionException;
-  }
-
-  private void doAfterCompletion(int status) {
+  void doAfterCompletion(int status) {
     final long opStart = CachePerfStats.getStatTime();
     switch (status) {
       case Status.STATUS_COMMITTED:
@@ -1259,60 +1146,6 @@ public class TXState implements TXStateInterface {
     }
   }
 
-//  /*
-//   * (non-Javadoc)
-//   *
-//   * @see org.apache.geode.internal.cache.TXStateInterface#afterCompletion(int)
-//   */
-//  @Override
-//  public synchronized void afterCompletion(int status) {
-//    this.proxy.getTxMgr().setTXState(null);
-//    // For commit, beforeCompletion should be called. Otherwise
-//    // throw FailedSynchronizationException().
-//    if (wasBeforeCompletionCalled()) {
-//      doAfterCompletion(status);
-//    } else {
-//      // rollback does not run beforeCompletion.
-//      if (status != Status.STATUS_ROLLEDBACK) {
-//        throw new FailedSynchronizationException(
-//            "Could not execute afterCompletion when beforeCompletion was not executed");
-//      }
-//      doAfterCompletion(status);
-//    }
-//  }
-//
-//  private void doAfterCompletion(int status) {
-//    final long opStart = CachePerfStats.getStatTime();
-//    try {
-//      switch (status) {
-//        case Status.STATUS_COMMITTED:
-//          Assert.assertTrue(this.locks != null,
-//              "Gemfire Transaction afterCompletion called with illegal state.");
-//          try {
-//            commit();
-//            saveTXCommitMessageForClientFailover();
-//          } catch (CommitConflictException error) {
-//            Assert.assertTrue(false, "Gemfire Transaction " + getTransactionId()
-//                + " afterCompletion failed.due to CommitConflictException: " + error);
-//          }
-//
-//          this.proxy.getTxMgr().noteCommitSuccess(opStart, this.jtaLifeTime, this);
-//          this.locks = null;
-//          break;
-//        case Status.STATUS_ROLLEDBACK:
-//          this.jtaLifeTime = opStart - getBeginTime();
-//          rollback();
-//          saveTXCommitMessageForClientFailover();
-//          this.proxy.getTxMgr().noteRollbackSuccess(opStart, this.jtaLifeTime, this);
-//          break;
-//        default:
-//          Assert.assertTrue(false, "Unknown JTA Synchronization status " + status);
-//      }
-//    } catch (InternalGemFireError error) {
-//      throw new TransactionException(error);
-//    }
-//  }
-
   boolean wasBeforeCompletionCalled() {
     return beforeCompletionCalled;
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnable.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnable.java
deleted file mode 100644
index 28f367b..0000000
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnable.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.internal.cache;
-
-import org.apache.logging.log4j.Logger;
-
-import org.apache.geode.CancelCriterion;
-import org.apache.geode.internal.cache.tier.sockets.CommBufferPool;
-import org.apache.geode.internal.logging.LogService;
-
-/**
- * TXStateSynchronizationThread manages beforeCompletion and afterCompletion calls.
- * The thread should be instantiated with a Runnable that invokes beforeCompletion behavior.
- * Then you must invoke runSecondRunnable() with another Runnable that invokes afterCompletion
- * behavior.
- *
- * @since Geode 1.6.0
- */
-public class TXStateSynchronizationRunnable implements Runnable {
-  private static final Logger logger = LogService.getLogger();
-
-  private final CancelCriterion cancelCriterion;
-
-  private Runnable firstRunnable;
-  private final Object firstRunnableSync = new Object();
-  private boolean firstRunnableCompleted;
-
-  private Runnable secondRunnable;
-  private final Object secondRunnableSync = new Object();
-  private boolean secondRunnableCompleted;
-
-  private boolean abort;
-
-  public TXStateSynchronizationRunnable(final CancelCriterion cancelCriterion, final Runnable beforeCompletion) {
-    this.cancelCriterion = cancelCriterion;
-    this.firstRunnable = beforeCompletion;
-  }
-
-  @Override
-  public void run() {
-    doSynchronizationOps();
-  }
-
-  private void doSynchronizationOps() {
-    synchronized (this.firstRunnableSync) {
-      try {
-        this.firstRunnable.run();
-      } finally {
-        if (logger.isTraceEnabled()) {
-          logger.trace("beforeCompletion notification completed");
-        }
-        this.firstRunnableCompleted = true;
-        this.firstRunnable = null;
-        this.firstRunnableSync.notifyAll();
-      }
-    }
-    synchronized (this.secondRunnableSync) {
-      // TODO there should be a transaction timeout that keeps this thread
-      // from sitting around forever if the client goes away
-      final boolean isTraceEnabled = logger.isTraceEnabled();
-      while (this.secondRunnable == null && !this.abort) {
-        try {
-          if (isTraceEnabled) {
-            logger.trace("waiting for afterCompletion notification");
-          }
-          this.secondRunnableSync.wait(1000);
-        } catch (InterruptedException ignore) {
-          // eat the interrupt and check for exit conditions
-        }
-      }
-      if (isTraceEnabled) {
-        logger.trace("executing afterCompletion notification");
-      }
-      try {
-        if (!this.abort) {
-          this.secondRunnable.run();
-        }
-      } finally {
-        if (isTraceEnabled) {
-          logger.trace("afterCompletion notification completed");
-        }
-        this.secondRunnableCompleted = true;
-        this.secondRunnable = null;
-        this.secondRunnableSync.notifyAll();
-      }
-    }
-  }
-
-  /**
-   * wait for the initial beforeCompletion step to finish
-   */
-  public void waitForFirstExecution() {
-    synchronized (this.firstRunnableSync) {
-      while (!this.firstRunnableCompleted) {
-        try {
-          this.firstRunnableSync.wait(1000);
-        } catch (InterruptedException ignore) {
-          // eat the interrupt and check for exit conditions
-        }
-        cancelCriterion.checkCancelInProgress(null);
-      }
-    }
-  }
-
-  /**
-   * run the afterCompletion portion of synchronization. This method schedules execution of the
-   * given runnable and then waits for it to finish running
-   */
-  public void runSecondRunnable(Runnable r) {
-    synchronized (this.secondRunnableSync) {
-      this.secondRunnable = r;
-      this.secondRunnableSync.notifyAll();
-      while (!this.secondRunnableCompleted && !this.abort) {
-        try {
-          this.secondRunnableSync.wait(1000);
-        } catch (InterruptedException ignore) {
-          // eat the interrupt and check for exit conditions
-        }
-        cancelCriterion.checkCancelInProgress(null);
-      }
-    }
-  }
-
-  /**
-   * stop waiting for an afterCompletion to arrive and just exit
-   */
-  public void abort() {
-    synchronized (this.secondRunnableSync) {
-      this.abort = true;
-    }
-  }
-}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/SingleThreadJTAExecutorTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/SingleThreadJTAExecutorTest.java
new file mode 100644
index 0000000..79f4324
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/SingleThreadJTAExecutorTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Java6Assertions.assertThat;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import javax.transaction.Status;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.cache.SynchronizationCommitConflictException;
+import org.apache.geode.cache.TransactionException;
+
+public class SingleThreadJTAExecutorTest {
+  private TXState txState;
+  private SingleThreadJTAExecutor singleThreadJTAExecutor;
+  private ExecutorService executor;
+
+  @Before
+  public void setup() {
+    txState = mock(TXState.class, RETURNS_DEEP_STUBS);
+    executor = Executors.newSingleThreadExecutor();
+  }
+
+  @Test
+  public void executeBeforeCompletionCallsDoBeforeCompletion() {
+    singleThreadJTAExecutor = new SingleThreadJTAExecutor();
+
+    singleThreadJTAExecutor.executeBeforeCompletion(txState, executor);
+
+    verify(txState, times(1)).doBeforeCompletion();
+
+    assertThat(singleThreadJTAExecutor.isBeforeCompletionFinished()).isTrue();
+  }
+
+  @Test(expected = SynchronizationCommitConflictException.class)
+  public void executeBeforeCompletionThrowsExceptionIfBeforeCompletionFailed() {
+    singleThreadJTAExecutor = new SingleThreadJTAExecutor();
+    doThrow(new SynchronizationCommitConflictException("")).when(txState).doBeforeCompletion();
+
+    singleThreadJTAExecutor.executeBeforeCompletion(txState, executor);
+
+    verify(txState, times(1)).doBeforeCompletion();
+    assertThat(singleThreadJTAExecutor.isBeforeCompletionFinished()).isTrue();
+  }
+
+  @Test
+  public void executeAfterCompletionCallsDoAfterCompletion() {
+    singleThreadJTAExecutor = new SingleThreadJTAExecutor();
+    int status = Status.STATUS_COMMITTED;
+
+    singleThreadJTAExecutor.executeBeforeCompletion(txState, executor);
+    singleThreadJTAExecutor.executeAfterCompletion(txState, status);
+
+    verify(txState, times(1)).doBeforeCompletion();
+    verify(txState, times(1)).doAfterCompletion(status);
+    assertThat(singleThreadJTAExecutor.isBeforeCompletionFinished()).isTrue();
+  }
+
+  @Test
+  public void executeAfterCompletionThrowsExceptionIfAfterCompletionFailed() {
+    singleThreadJTAExecutor = new SingleThreadJTAExecutor();
+    int status = Status.STATUS_COMMITTED;
+    TransactionException exception = new TransactionException("");
+    doThrow(exception).when(txState).doAfterCompletion(status);
+
+    singleThreadJTAExecutor.executeBeforeCompletion(txState, executor);
+
+    assertThatThrownBy(() -> singleThreadJTAExecutor.executeAfterCompletion(txState, status))
+        .isSameAs(exception);
+    verify(txState, times(1)).doBeforeCompletion();
+    verify(txState, times(1)).doAfterCompletion(status);
+  }
+
+  @Test
+  public void executorThreadNoLongerWaitForAfterCompletionIfTXStateIsCleanedUp() {
+    singleThreadJTAExecutor = new SingleThreadJTAExecutor();
+
+    singleThreadJTAExecutor.executeBeforeCompletion(txState, executor);
+    singleThreadJTAExecutor.cleanup(txState);
+
+    verify(txState, times(1)).doBeforeCompletion();
+    assertThat(singleThreadJTAExecutor.isBeforeCompletionFinished()).isTrue();
+    assertThat(singleThreadJTAExecutor.isAfterCompletionFinished()).isTrue();
+  }
+
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
index 5ec4cbd..c1e9acf 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
@@ -17,6 +17,7 @@ package org.apache.geode.internal.cache;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
@@ -26,16 +27,17 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.concurrent.Executor;
+
 import javax.transaction.Status;
 
 import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.geode.cache.CommitConflictException;
+import org.apache.geode.cache.FailedSynchronizationException;
 import org.apache.geode.cache.SynchronizationCommitConflictException;
 import org.apache.geode.cache.TransactionDataNodeHasDepartedException;
-import org.apache.geode.cache.TransactionException;
-import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 
 public class TXStateTest {
   private TXStateProxyImpl txStateProxy;
@@ -44,60 +46,54 @@ public class TXStateTest {
 
   @Before
   public void setup() {
-    txStateProxy = mock(TXStateProxyImpl.class);
+    txStateProxy = mock(TXStateProxyImpl.class, RETURNS_DEEP_STUBS);
     exception = new CommitConflictException("");
     transactionDataNodeHasDepartedException = new TransactionDataNodeHasDepartedException("");
 
     when(txStateProxy.getTxMgr()).thenReturn(mock(TXManagerImpl.class));
   }
 
-
   @Test
-  public void beforeCompletionThrowsIfReserveAndCheckFails() {
+  public void doBeforeCompletionThrowsIfReserveAndCheckFails() {
     TXState txState = spy(new TXState(txStateProxy, true));
+    doReturn(mock(Executor.class)).when(txState).getExecutor();
     doThrow(exception).when(txState).reserveAndCheck();
 
-    assertThatThrownBy(() -> txState.beforeCompletion())
+    assertThatThrownBy(() -> txState.doBeforeCompletion())
         .isInstanceOf(SynchronizationCommitConflictException.class);
   }
 
-
   @Test
-  public void afterCompletionThrowsIfCommitFails() {
+  public void doAfterCompletionThrowsIfCommitFails() {
     TXState txState = spy(new TXState(txStateProxy, true));
     doReturn(mock(InternalCache.class)).when(txState).getCache();
     doReturn(true).when(txState).wasBeforeCompletionCalled();
     txState.reserveAndCheck();
     doThrow(transactionDataNodeHasDepartedException).when(txState).commit();
 
-    assertThatThrownBy(() -> txState.afterCompletion(Status.STATUS_COMMITTED))
+    assertThatThrownBy(() -> txState.doAfterCompletion(Status.STATUS_COMMITTED))
         .isSameAs(transactionDataNodeHasDepartedException);
   }
 
   @Test
-  public void afterCompletionThrowsTransactionExceptionIfCommitFailedCommitConflictException() {
-    TXState txState = spy(new TXState(txStateProxy, true));
-    doReturn(mock(InternalCache.class)).when(txState).getCache();
-    doReturn(true).when(txState).wasBeforeCompletionCalled();
-    doThrow(exception).when(txState).commit();
-
-    assertThatThrownBy(() -> txState.afterCompletion(Status.STATUS_COMMITTED))
-        .isInstanceOf(TransactionException.class);
-  }
-
-  @Test
-  public void afterCompletionCanCommitJTA() {
+  public void doAfterCompletionCanCommitJTA() {
     TXState txState = spy(new TXState(txStateProxy, false));
     doReturn(mock(InternalCache.class)).when(txState).getCache();
     txState.reserveAndCheck();
     txState.closed = true;
     doReturn(true).when(txState).wasBeforeCompletionCalled();
-    txState.afterCompletion(Status.STATUS_COMMITTED);
+    txState.doAfterCompletion(Status.STATUS_COMMITTED);
 
     assertThat(txState.locks).isNull();
     verify(txState, times(1)).saveTXCommitMessageForClientFailover();
   }
 
+  @Test(expected = FailedSynchronizationException.class)
+  public void afterCompletionThrowsExceptionIfBeforeCompletionNotCalled() {
+    TXState txState = new TXState(txStateProxy, true);
+    txState.afterCompletion(Status.STATUS_COMMITTED);
+  }
+
   @Test
   public void afterCompletionCanRollbackJTA() {
     TXState txState = spy(new TXState(txStateProxy, true));
@@ -153,16 +149,7 @@ public class TXStateTest {
   public void getOriginatingMemberReturnsNullIfNotOriginatedFromClient() {
     TXState txState = spy(new TXState(txStateProxy, false));
 
-    assertThat(txState.getOriginatingMember()).isNull();
+    assertThat(txState.getOriginatingMember()).isSameAs(txStateProxy.getOnBehalfOfClientMember());
   }
 
-  @Test
-  public void getOriginatingMemberReturnsClientMemberIfOriginatedFromClient() {
-    InternalDistributedMember client = mock(InternalDistributedMember.class);
-    TXStateProxyImpl proxy = new TXStateProxyImpl(mock(InternalCache.class),
-        mock(TXManagerImpl.class), mock(TXId.class), client);
-    TXState txState = spy(new TXState(proxy, false));
-
-    assertThat(txState.getOriginatingMember()).isEqualTo(client);
-  }
 }