You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2018/08/24 20:37:43 UTC

[geode] branch feature/GEODE-5624 updated (966852d -> 4193fb7)

This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a change to branch feature/GEODE-5624
in repository https://gitbox.apache.org/repos/asf/geode.git.


    from 966852d  GEODE-5619: Change scanning for XSDRootElement to consider all packag… (#2361)
     new 413b34f  GEODE-5624: Use a thread to do beforeCompletion and afterCompletion.
     new 4193fb7  wip -- refactor classes.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../ClientServerJTAFailoverDistributedTest.java    |  29 +++
 .../internal/cache/SingleThreadJTAExecutor.java    | 197 +++++++++++++++++++++
 .../org/apache/geode/internal/cache/TXState.java   | 103 +++++++----
 .../cache/SingleThreadJTAExecutorTest.java         | 109 ++++++++++++
 .../apache/geode/internal/cache/TXStateTest.java   |  51 ++----
 5 files changed, 419 insertions(+), 70 deletions(-)
 create mode 100644 geode-core/src/main/java/org/apache/geode/internal/cache/SingleThreadJTAExecutor.java
 create mode 100644 geode-core/src/test/java/org/apache/geode/internal/cache/SingleThreadJTAExecutorTest.java


[geode] 02/02: wip -- refactor classes.

Posted by es...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a commit to branch feature/GEODE-5624
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 4193fb719694615765fea70fd2169a1b6653c3ce
Author: eshu <es...@pivotal.io>
AuthorDate: Fri Aug 24 13:37:08 2018 -0700

    wip -- refactor classes.
---
 .../ClientServerJTAFailoverDistributedTest.java    |  14 +-
 .../internal/cache/SingleThreadJTAExecutor.java    | 197 ++++++++++++++++++
 .../org/apache/geode/internal/cache/TXState.java   | 225 +++------------------
 .../cache/TXStateSynchronizationRunnable.java      | 144 -------------
 .../cache/SingleThreadJTAExecutorTest.java         | 109 ++++++++++
 .../apache/geode/internal/cache/TXStateTest.java   |  51 ++---
 6 files changed, 363 insertions(+), 377 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/jta/ClientServerJTAFailoverDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/jta/ClientServerJTAFailoverDistributedTest.java
index e4589fa..2ae4e9b 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/jta/ClientServerJTAFailoverDistributedTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/jta/ClientServerJTAFailoverDistributedTest.java
@@ -130,7 +130,8 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable {
     cacheRule.getOrCreateCache().createRegionFactory(RegionShortcut.PARTITION)
         .setPartitionAttributes(partitionAttributes).create(regionName);
     if (hasReplicateRegion) {
-      cacheRule.getOrCreateCache().createRegionFactory(RegionShortcut.REPLICATE).create(replicateRegionName);
+      cacheRule.getOrCreateCache().createRegionFactory(RegionShortcut.REPLICATE)
+          .create(replicateRegionName);
     }
 
     CacheServer server = cacheRule.getCache().addCacheServer();
@@ -154,7 +155,8 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable {
         clientCacheRule.getClientCache().createClientRegionFactory(ClientRegionShortcut.LOCAL);
     crf.setPoolName(pool.getName());
     crf.create(regionName);
-    if (hasReplicateRegion) crf.create(replicateRegionName);
+    if (hasReplicateRegion)
+      crf.create(replicateRegionName);
 
     if (ports.length > 1) {
       pool.acquireConnection(new ServerLocation(hostName, port1));
@@ -178,11 +180,12 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable {
     Object[] results = new Object[2];
     InternalClientCache cache = clientCacheRule.getClientCache();
     Region region = cache.getRegion(regionName);
-    Region replicateRegion = hasReplicateRegion? cache.getRegion(replicateRegionName) : null;
+    Region replicateRegion = hasReplicateRegion ? cache.getRegion(replicateRegionName) : null;
     TXManagerImpl txManager = (TXManagerImpl) cache.getCacheTransactionManager();
     txManager.begin();
     region.put(key, newValue);
-    if (hasReplicateRegion) replicateRegion.put(key, newValue);
+    if (hasReplicateRegion)
+      replicateRegion.put(key, newValue);
 
     TXStateProxyImpl txStateProxy = (TXStateProxyImpl) txManager.getTXState();
     ClientTXStateStub clientTXStateStub = (ClientTXStateStub) txStateProxy.getRealDeal(null, null);
@@ -212,7 +215,8 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable {
     }
     if (isCommit) {
       assertEquals(newValue, region.get(key));
-      if (hasReplicateRegion) assertEquals(newValue, replicateRegion.get(key));
+      if (hasReplicateRegion)
+        assertEquals(newValue, replicateRegion.get(key));
     } else {
       assertEquals(value, region.get(key));
     }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/SingleThreadJTAExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/SingleThreadJTAExecutor.java
new file mode 100644
index 0000000..4df8ca4
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/SingleThreadJTAExecutor.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import java.util.concurrent.Executor;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.cache.SynchronizationCommitConflictException;
+import org.apache.geode.internal.logging.LogService;
+
+/**
+ * TXStateSynchronizationThread manages beforeCompletion and afterCompletion calls.
+ * The thread should be instantiated with a Runnable that invokes beforeCompletion behavior.
+ * Then you must invoke executeAfterCompletion() with another Runnable that invokes afterCompletion
+ * behavior.
+ *
+ * @since Geode 1.6.0
+ */
+public class SingleThreadJTAExecutor {
+  private static final Logger logger = LogService.getLogger();
+
+  private final Object beforeCompletionSync = new Object();
+  private boolean beforeCompletionStarted;
+  private boolean beforeCompletionFinished;
+  private SynchronizationCommitConflictException beforeCompletionException;
+
+  private final Object afterCompletionSync = new Object();
+  private boolean afterCompletionStarted;
+  private boolean afterCompletionFinished;
+  private int afterCompletionStatus = -1;
+  private boolean afterCompletionCancelled;
+  private RuntimeException afterCompletionException;
+
+  public SingleThreadJTAExecutor() {}
+
+  void doOps(TXState txState) {
+    doBeforeCompletionOp(txState);
+    doAfterCompletionOp(txState);
+  }
+
+  void doBeforeCompletionOp(TXState txState) {
+    synchronized (beforeCompletionSync) {
+      try {
+        txState.doBeforeCompletion();
+      } catch (SynchronizationCommitConflictException exception) {
+        beforeCompletionException = exception;
+      } finally {
+        if (logger.isDebugEnabled()) {
+          logger.debug("beforeCompletion notification completed");
+        }
+        beforeCompletionFinished = true;
+        beforeCompletionSync.notifyAll();
+      }
+    }
+  }
+
+  boolean isBeforeCompletionStarted() {
+    synchronized (beforeCompletionSync) {
+      return beforeCompletionStarted;
+    }
+  }
+
+  boolean isAfterCompletionStarted() {
+    synchronized (afterCompletionSync) {
+      return afterCompletionStarted;
+    }
+  }
+
+  boolean isBeforeCompletionFinished() {
+    synchronized (beforeCompletionSync) {
+      return beforeCompletionFinished;
+    }
+  }
+
+  boolean isAfterCompletionFinished() {
+    synchronized (afterCompletionSync) {
+      return afterCompletionFinished;
+    }
+  }
+
+  public void executeBeforeCompletion(TXState txState, Executor executor) {
+    executor.execute(() -> doOps(txState));
+
+    synchronized (beforeCompletionSync) {
+      beforeCompletionStarted = true;
+      while (!beforeCompletionFinished) {
+        try {
+          beforeCompletionSync.wait(1000);
+        } catch (InterruptedException ignore) {
+          // eat the interrupt and check for exit conditions
+        }
+        txState.getCache().getCancelCriterion().checkCancelInProgress(null);
+      }
+      if (getBeforeCompletionException() != null) {
+        throw getBeforeCompletionException();
+      }
+    }
+  }
+
+  SynchronizationCommitConflictException getBeforeCompletionException() {
+    return beforeCompletionException;
+  }
+
+  private void doAfterCompletionOp(TXState txState) {
+    synchronized (afterCompletionSync) {
+      // there should be a transaction timeout that keeps this thread
+      // from sitting around forever if the client goes away
+      // The above was done by setting afterCompletionCancelled in txState
+      // during cleanup. When client departed, the transaction/JTA
+      // will be timed out and cleanup code will be executed.
+      final boolean isDebugEnabled = logger.isDebugEnabled();
+      while (afterCompletionStatus == -1 && !afterCompletionCancelled) {
+        try {
+          if (isDebugEnabled) {
+            logger.debug("waiting for afterCompletion notification");
+          }
+          afterCompletionSync.wait(1000);
+        } catch (InterruptedException ignore) {
+          // eat the interrupt and check for exit conditions
+        }
+      }
+      afterCompletionStarted = true;
+      if (isDebugEnabled) {
+        logger.debug("executing afterCompletion notification");
+      }
+      try {
+        if (!afterCompletionCancelled) {
+          txState.doAfterCompletion(afterCompletionStatus);
+        } else {
+          txState.doCleanup();
+        }
+      } catch (RuntimeException exception) {
+        afterCompletionException = exception;
+      } finally {
+        if (isDebugEnabled) {
+          logger.debug("afterCompletion notification completed");
+        }
+        afterCompletionFinished = true;
+        afterCompletionSync.notifyAll();
+      }
+    }
+  }
+
+  public void executeAfterCompletion(TXState txState, int status) {
+    synchronized (afterCompletionSync) {
+      afterCompletionStatus = status;
+      afterCompletionSync.notifyAll();
+      waitForAfterCompletionToFinish(txState);
+      if (getAfterCompletionException() != null) {
+        throw getAfterCompletionException();
+      }
+    }
+  }
+
+  private void waitForAfterCompletionToFinish(TXState txState) {
+    while (!afterCompletionFinished) {
+      try {
+        afterCompletionSync.wait(1000);
+      } catch (InterruptedException ignore) {
+        // eat the interrupt and check for exit conditions
+      }
+      txState.getCache().getCancelCriterion().checkCancelInProgress(null);
+    }
+  }
+
+  RuntimeException getAfterCompletionException() {
+    return afterCompletionException;
+  }
+
+  /**
+   * stop waiting for an afterCompletion to arrive and just exit
+   */
+  public void cleanup(TXState txState) {
+    synchronized (afterCompletionSync) {
+      afterCompletionCancelled = true;
+      afterCompletionSync.notifyAll();
+      waitForAfterCompletionToFinish(txState);
+    }
+  }
+
+  public boolean shouldDoCleanup() {
+    return isBeforeCompletionStarted() && !isAfterCompletionStarted();
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
index f33267e..bce1af4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
@@ -32,7 +32,6 @@ import javax.transaction.Status;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.CancelException;
-import org.apache.geode.InternalGemFireError;
 import org.apache.geode.SystemFailure;
 import org.apache.geode.cache.CommitConflictException;
 import org.apache.geode.cache.DiskAccessException;
@@ -44,7 +43,6 @@ import org.apache.geode.cache.Region.Entry;
 import org.apache.geode.cache.RegionDestroyedException;
 import org.apache.geode.cache.SynchronizationCommitConflictException;
 import org.apache.geode.cache.TransactionDataRebalancedException;
-import org.apache.geode.cache.TransactionException;
 import org.apache.geode.cache.TransactionId;
 import org.apache.geode.cache.TransactionWriter;
 import org.apache.geode.cache.TransactionWriterException;
@@ -52,7 +50,6 @@ import org.apache.geode.cache.UnsupportedOperationInTransactionException;
 import org.apache.geode.cache.client.internal.ServerRegionDataAccess;
 import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.TXManagerCancelledException;
-import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.cache.control.MemoryThresholds;
@@ -108,9 +105,7 @@ public class TXState implements TXStateInterface {
    * and afterCompletion so that beforeCompletion can obtain locks for the afterCompletion step.
    * This is that thread
    */
-  protected volatile TXStateSynchronizationRunnable syncRunnable;
-  private volatile SynchronizationCommitConflictException beforeCompletionException;
-  private volatile RuntimeException afterCompletionException;
+  private final SingleThreadJTAExecutor singleThreadJTAExecutor;
 
   // Internal testing hooks
   private Runnable internalAfterReservation;
@@ -153,6 +148,11 @@ public class TXState implements TXStateInterface {
   private volatile DistributedMember proxyServer;
 
   public TXState(TXStateProxy proxy, boolean onBehalfOfRemoteStub) {
+    this(proxy, onBehalfOfRemoteStub, new SingleThreadJTAExecutor());
+  }
+
+  public TXState(TXStateProxy proxy, boolean onBehalfOfRemoteStub,
+      SingleThreadJTAExecutor singleThreadJTAExecutor) {
     this.beginTime = CachePerfStats.getStatTime();
     this.regions = new IdentityHashMap<>();
 
@@ -165,7 +165,7 @@ public class TXState implements TXStateInterface {
     this.internalAfterSend = null;
     this.proxy = proxy;
     this.onBehalfOfRemoteStub = onBehalfOfRemoteStub;
-
+    this.singleThreadJTAExecutor = singleThreadJTAExecutor;
   }
 
   private boolean hasSeenEvent(EntryEventImpl event) {
@@ -428,7 +428,7 @@ public class TXState implements TXStateInterface {
       }
 
       /*
-       * If there is a TransactionWriter plugged in, we need to to give it an opportunity to abort
+       * If there is a TransactionWriter plugged in, we need to to give it an opportunity to cleanup
        * the transaction.
        */
       TransactionWriter writer = this.proxy.getTxMgr().getWriter();
@@ -868,6 +868,14 @@ public class TXState implements TXStateInterface {
   }
 
   protected void cleanup() {
+    if (singleThreadJTAExecutor.shouldDoCleanup()) {
+      singleThreadJTAExecutor.cleanup(this);
+    } else {
+      doCleanup();
+    }
+  }
+
+  protected void doCleanup() {
     IllegalArgumentException iae = null;
     try {
       this.closed = true;
@@ -921,9 +929,7 @@ public class TXState implements TXStateInterface {
       synchronized (this.completionGuard) {
         this.completionGuard.notifyAll();
       }
-      if (this.syncRunnable != null) {
-        this.syncRunnable.abort();
-      }
+
       if (iae != null && !this.proxy.getCache().isClosed()) {
         throw iae;
       }
@@ -1010,75 +1016,6 @@ public class TXState implements TXStateInterface {
     }
   }
 
-//  //////////////////////////////////////////////////////////////////
-//  // JTA Synchronization implementation //
-//  //////////////////////////////////////////////////////////////////
-//  /*
-//   * (non-Javadoc)
-//   *
-//   * @see org.apache.geode.internal.cache.TXStateInterface#beforeCompletion()
-//   */
-//  @Override
-//  public synchronized void beforeCompletion() throws SynchronizationCommitConflictException {
-//    if (this.closed) {
-//      throw new TXManagerCancelledException();
-//    }
-//    if (beforeCompletionCalled) {
-//      // do not re-execute beforeCompletion again
-//      return;
-//    }
-//    beforeCompletionCalled = true;
-//    doBeforeCompletion();
-//  }
-//
-//  private void doBeforeCompletion() {
-//    final long opStart = CachePerfStats.getStatTime();
-//    this.jtaLifeTime = opStart - getBeginTime();
-//    try {
-//      reserveAndCheck();
-//      /*
-//       * If there is a TransactionWriter plugged in, we need to to give it an opportunity to abort
-//       * the transaction.
-//       */
-//      TransactionWriter writer = this.proxy.getTxMgr().getWriter();
-//      if (writer != null) {
-//        try {
-//          // need to mark this so we don't fire again in commit
-//          firedWriter = true;
-//          TXEvent event = getEvent();
-//          if (!event.hasOnlyInternalEvents()) {
-//            writer.beforeCommit(event);
-//          }
-//        } catch (TransactionWriterException twe) {
-//          throw new CommitConflictException(twe);
-//        } catch (VirtualMachineError err) {
-//          // cleanup(); this allocates objects so I don't think we can do it - that leaves the TX
-//          // open, but we are poison pilling so we should be ok??
-//
-//          SystemFailure.initiateFailure(err);
-//          // If this ever returns, rethrow the error. We're poisoned
-//          // now, so don't let this thread continue.
-//          throw err;
-//        } catch (Throwable t) {
-//          // Whenever you catch Error or Throwable, you must also
-//          // catch VirtualMachineError (see above). However, there is
-//          // _still_ a possibility that you are dealing with a cascading
-//          // error condition, so you also need to check to see if the JVM
-//          // is still usable:
-//          SystemFailure.checkFailure();
-//          throw new CommitConflictException(t);
-//        }
-//      }
-//    } catch (CommitConflictException commitConflict) {
-//      cleanup();
-//      proxy.getTxMgr().noteCommitFailure(opStart, this.jtaLifeTime, this);
-//      throw new SynchronizationCommitConflictException(
-//          LocalizedStrings.TXState_CONFLICT_DETECTED_IN_GEMFIRE_TRANSACTION_0
-//              .toLocalizedString(getTransactionId()),
-//          commitConflict);
-//    }
-//  }
-
   //////////////////////////////////////////////////////////////////
   // JTA Synchronization implementation //
   //////////////////////////////////////////////////////////////////
@@ -1098,55 +1035,23 @@ public class TXState implements TXStateInterface {
       return;
     }
     beforeCompletionCalled = true;
-
-    TXStateSynchronizationRunnable sync = createTxStateSynchronizationRunnable();
-    setSynchronizationRunnable(sync);
-
-    Executor exec = getExecutor();
-    exec.execute(sync);
-    sync.waitForFirstExecution();
-    if (getBeforeCompletionException() != null) {
-      throw getBeforeCompletionException();
-    }
-    //doBeforeCompletion();
-  }
-
-  TXStateSynchronizationRunnable createTxStateSynchronizationRunnable() {
-    Runnable beforeCompletion = new Runnable() {
-      @SuppressWarnings("synthetic-access")
-      public void run() {
-        doBeforeCompletion();
-      }
-    };
-
-    return new TXStateSynchronizationRunnable(getCache().getCancelCriterion(),
-        beforeCompletion);
+    singleThreadJTAExecutor.executeBeforeCompletion(this,
+        getExecutor());
   }
 
   Executor getExecutor() {
-    return InternalDistributedSystem.getConnectedInstance().getDistributionManager()
-        .getWaitingThreadPool();
-  }
-
-  SynchronizationCommitConflictException getBeforeCompletionException() {
-    return beforeCompletionException;
-  }
-
-  private void setSynchronizationRunnable(TXStateSynchronizationRunnable synchronizationRunnable) {
-    syncRunnable = synchronizationRunnable;
+    return getCache().getDistributionManager().getWaitingThreadPool();
   }
 
-
-  private void doBeforeCompletion() {
+  void doBeforeCompletion() {
     proxy.getTxMgr().setTXState(null);
     final long opStart = CachePerfStats.getStatTime();
     this.jtaLifeTime = opStart - getBeginTime();
 
-
     try {
       reserveAndCheck();
       /*
-       * If there is a TransactionWriter plugged in, we need to to give it an opportunity to abort
+       * If there is a TransactionWriter plugged in, we need to to give it an opportunity to cleanup
        * the transaction.
        */
       TransactionWriter writer = this.proxy.getTxMgr().getWriter();
@@ -1189,28 +1094,18 @@ public class TXState implements TXStateInterface {
   }
 
   /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.internal.cache.TXStateInterface#afterCompletion(int)
- */
+   * (non-Javadoc)
+   *
+   * @see org.apache.geode.internal.cache.TXStateInterface#afterCompletion(int)
+   */
   @Override
   public synchronized void afterCompletion(int status) {
     proxy.getTxMgr().setTXState(null);
-    Runnable afterCompletion = new Runnable() {
-      @SuppressWarnings("synthetic-access")
-      public void run() {
-        doAfterCompletion(status);
-      }
-    };
     // if there was a beforeCompletion call then there will be a thread
     // sitting in the waiting pool to execute afterCompletion. Otherwise
     // throw FailedSynchronizationException().
-    TXStateSynchronizationRunnable sync = getSynchronizationRunnable();
-    if (sync != null) {
-      sync.runSecondRunnable(afterCompletion);
-      if (getAfterCompletionException() != null) {
-        throw getAfterCompletionException();
-      }
+    if (beforeCompletionCalled) {
+      singleThreadJTAExecutor.executeAfterCompletion(this, status);
     } else {
       // rollback does not run beforeCompletion.
       if (status != Status.STATUS_ROLLEDBACK) {
@@ -1221,15 +1116,7 @@ public class TXState implements TXStateInterface {
     }
   }
 
-  TXStateSynchronizationRunnable getSynchronizationRunnable() {
-    return this.syncRunnable;
-  }
-
-  RuntimeException getAfterCompletionException() {
-    return afterCompletionException;
-  }
-
-  private void doAfterCompletion(int status) {
+  void doAfterCompletion(int status) {
     final long opStart = CachePerfStats.getStatTime();
     switch (status) {
       case Status.STATUS_COMMITTED:
@@ -1259,60 +1146,6 @@ public class TXState implements TXStateInterface {
     }
   }
 
-//  /*
-//   * (non-Javadoc)
-//   *
-//   * @see org.apache.geode.internal.cache.TXStateInterface#afterCompletion(int)
-//   */
-//  @Override
-//  public synchronized void afterCompletion(int status) {
-//    this.proxy.getTxMgr().setTXState(null);
-//    // For commit, beforeCompletion should be called. Otherwise
-//    // throw FailedSynchronizationException().
-//    if (wasBeforeCompletionCalled()) {
-//      doAfterCompletion(status);
-//    } else {
-//      // rollback does not run beforeCompletion.
-//      if (status != Status.STATUS_ROLLEDBACK) {
-//        throw new FailedSynchronizationException(
-//            "Could not execute afterCompletion when beforeCompletion was not executed");
-//      }
-//      doAfterCompletion(status);
-//    }
-//  }
-//
-//  private void doAfterCompletion(int status) {
-//    final long opStart = CachePerfStats.getStatTime();
-//    try {
-//      switch (status) {
-//        case Status.STATUS_COMMITTED:
-//          Assert.assertTrue(this.locks != null,
-//              "Gemfire Transaction afterCompletion called with illegal state.");
-//          try {
-//            commit();
-//            saveTXCommitMessageForClientFailover();
-//          } catch (CommitConflictException error) {
-//            Assert.assertTrue(false, "Gemfire Transaction " + getTransactionId()
-//                + " afterCompletion failed.due to CommitConflictException: " + error);
-//          }
-//
-//          this.proxy.getTxMgr().noteCommitSuccess(opStart, this.jtaLifeTime, this);
-//          this.locks = null;
-//          break;
-//        case Status.STATUS_ROLLEDBACK:
-//          this.jtaLifeTime = opStart - getBeginTime();
-//          rollback();
-//          saveTXCommitMessageForClientFailover();
-//          this.proxy.getTxMgr().noteRollbackSuccess(opStart, this.jtaLifeTime, this);
-//          break;
-//        default:
-//          Assert.assertTrue(false, "Unknown JTA Synchronization status " + status);
-//      }
-//    } catch (InternalGemFireError error) {
-//      throw new TransactionException(error);
-//    }
-//  }
-
   boolean wasBeforeCompletionCalled() {
     return beforeCompletionCalled;
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnable.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnable.java
deleted file mode 100644
index 28f367b..0000000
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnable.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.internal.cache;
-
-import org.apache.logging.log4j.Logger;
-
-import org.apache.geode.CancelCriterion;
-import org.apache.geode.internal.cache.tier.sockets.CommBufferPool;
-import org.apache.geode.internal.logging.LogService;
-
-/**
- * TXStateSynchronizationThread manages beforeCompletion and afterCompletion calls.
- * The thread should be instantiated with a Runnable that invokes beforeCompletion behavior.
- * Then you must invoke runSecondRunnable() with another Runnable that invokes afterCompletion
- * behavior.
- *
- * @since Geode 1.6.0
- */
-public class TXStateSynchronizationRunnable implements Runnable {
-  private static final Logger logger = LogService.getLogger();
-
-  private final CancelCriterion cancelCriterion;
-
-  private Runnable firstRunnable;
-  private final Object firstRunnableSync = new Object();
-  private boolean firstRunnableCompleted;
-
-  private Runnable secondRunnable;
-  private final Object secondRunnableSync = new Object();
-  private boolean secondRunnableCompleted;
-
-  private boolean abort;
-
-  public TXStateSynchronizationRunnable(final CancelCriterion cancelCriterion, final Runnable beforeCompletion) {
-    this.cancelCriterion = cancelCriterion;
-    this.firstRunnable = beforeCompletion;
-  }
-
-  @Override
-  public void run() {
-    doSynchronizationOps();
-  }
-
-  private void doSynchronizationOps() {
-    synchronized (this.firstRunnableSync) {
-      try {
-        this.firstRunnable.run();
-      } finally {
-        if (logger.isTraceEnabled()) {
-          logger.trace("beforeCompletion notification completed");
-        }
-        this.firstRunnableCompleted = true;
-        this.firstRunnable = null;
-        this.firstRunnableSync.notifyAll();
-      }
-    }
-    synchronized (this.secondRunnableSync) {
-      // TODO there should be a transaction timeout that keeps this thread
-      // from sitting around forever if the client goes away
-      final boolean isTraceEnabled = logger.isTraceEnabled();
-      while (this.secondRunnable == null && !this.abort) {
-        try {
-          if (isTraceEnabled) {
-            logger.trace("waiting for afterCompletion notification");
-          }
-          this.secondRunnableSync.wait(1000);
-        } catch (InterruptedException ignore) {
-          // eat the interrupt and check for exit conditions
-        }
-      }
-      if (isTraceEnabled) {
-        logger.trace("executing afterCompletion notification");
-      }
-      try {
-        if (!this.abort) {
-          this.secondRunnable.run();
-        }
-      } finally {
-        if (isTraceEnabled) {
-          logger.trace("afterCompletion notification completed");
-        }
-        this.secondRunnableCompleted = true;
-        this.secondRunnable = null;
-        this.secondRunnableSync.notifyAll();
-      }
-    }
-  }
-
-  /**
-   * wait for the initial beforeCompletion step to finish
-   */
-  public void waitForFirstExecution() {
-    synchronized (this.firstRunnableSync) {
-      while (!this.firstRunnableCompleted) {
-        try {
-          this.firstRunnableSync.wait(1000);
-        } catch (InterruptedException ignore) {
-          // eat the interrupt and check for exit conditions
-        }
-        cancelCriterion.checkCancelInProgress(null);
-      }
-    }
-  }
-
-  /**
-   * run the afterCompletion portion of synchronization. This method schedules execution of the
-   * given runnable and then waits for it to finish running
-   */
-  public void runSecondRunnable(Runnable r) {
-    synchronized (this.secondRunnableSync) {
-      this.secondRunnable = r;
-      this.secondRunnableSync.notifyAll();
-      while (!this.secondRunnableCompleted && !this.abort) {
-        try {
-          this.secondRunnableSync.wait(1000);
-        } catch (InterruptedException ignore) {
-          // eat the interrupt and check for exit conditions
-        }
-        cancelCriterion.checkCancelInProgress(null);
-      }
-    }
-  }
-
-  /**
-   * stop waiting for an afterCompletion to arrive and just exit
-   */
-  public void abort() {
-    synchronized (this.secondRunnableSync) {
-      this.abort = true;
-    }
-  }
-}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/SingleThreadJTAExecutorTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/SingleThreadJTAExecutorTest.java
new file mode 100644
index 0000000..79f4324
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/SingleThreadJTAExecutorTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Java6Assertions.assertThat;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import javax.transaction.Status;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.cache.SynchronizationCommitConflictException;
+import org.apache.geode.cache.TransactionException;
+
+public class SingleThreadJTAExecutorTest {
+  private TXState txState;
+  private SingleThreadJTAExecutor singleThreadJTAExecutor;
+  private ExecutorService executor;
+
+  @Before
+  public void setup() {
+    txState = mock(TXState.class, RETURNS_DEEP_STUBS);
+    executor = Executors.newSingleThreadExecutor();
+  }
+
+  @Test
+  public void executeBeforeCompletionCallsDoBeforeCompletion() {
+    singleThreadJTAExecutor = new SingleThreadJTAExecutor();
+
+    singleThreadJTAExecutor.executeBeforeCompletion(txState, executor);
+
+    verify(txState, times(1)).doBeforeCompletion();
+
+    assertThat(singleThreadJTAExecutor.isBeforeCompletionFinished()).isTrue();
+  }
+
+  @Test(expected = SynchronizationCommitConflictException.class)
+  public void executeBeforeCompletionThrowsExceptionIfBeforeCompletionFailed() {
+    singleThreadJTAExecutor = new SingleThreadJTAExecutor();
+    doThrow(new SynchronizationCommitConflictException("")).when(txState).doBeforeCompletion();
+
+    singleThreadJTAExecutor.executeBeforeCompletion(txState, executor);
+
+    verify(txState, times(1)).doBeforeCompletion();
+    assertThat(singleThreadJTAExecutor.isBeforeCompletionFinished()).isTrue();
+  }
+
+  @Test
+  public void executeAfterCompletionCallsDoAfterCompletion() {
+    singleThreadJTAExecutor = new SingleThreadJTAExecutor();
+    int status = Status.STATUS_COMMITTED;
+
+    singleThreadJTAExecutor.executeBeforeCompletion(txState, executor);
+    singleThreadJTAExecutor.executeAfterCompletion(txState, status);
+
+    verify(txState, times(1)).doBeforeCompletion();
+    verify(txState, times(1)).doAfterCompletion(status);
+    assertThat(singleThreadJTAExecutor.isBeforeCompletionFinished()).isTrue();
+  }
+
+  @Test
+  public void executeAfterCompletionThrowsExceptionIfAfterCompletionFailed() {
+    singleThreadJTAExecutor = new SingleThreadJTAExecutor();
+    int status = Status.STATUS_COMMITTED;
+    TransactionException exception = new TransactionException("");
+    doThrow(exception).when(txState).doAfterCompletion(status);
+
+    singleThreadJTAExecutor.executeBeforeCompletion(txState, executor);
+
+    assertThatThrownBy(() -> singleThreadJTAExecutor.executeAfterCompletion(txState, status))
+        .isSameAs(exception);
+    verify(txState, times(1)).doBeforeCompletion();
+    verify(txState, times(1)).doAfterCompletion(status);
+  }
+
+  @Test
+  public void executorThreadNoLongerWaitForAfterCompletionIfTXStateIsCleanedUp() {
+    singleThreadJTAExecutor = new SingleThreadJTAExecutor();
+
+    singleThreadJTAExecutor.executeBeforeCompletion(txState, executor);
+    singleThreadJTAExecutor.cleanup(txState);
+
+    verify(txState, times(1)).doBeforeCompletion();
+    assertThat(singleThreadJTAExecutor.isBeforeCompletionFinished()).isTrue();
+    assertThat(singleThreadJTAExecutor.isAfterCompletionFinished()).isTrue();
+  }
+
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
index 5ec4cbd..c1e9acf 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
@@ -17,6 +17,7 @@ package org.apache.geode.internal.cache;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
@@ -26,16 +27,17 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.concurrent.Executor;
+
 import javax.transaction.Status;
 
 import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.geode.cache.CommitConflictException;
+import org.apache.geode.cache.FailedSynchronizationException;
 import org.apache.geode.cache.SynchronizationCommitConflictException;
 import org.apache.geode.cache.TransactionDataNodeHasDepartedException;
-import org.apache.geode.cache.TransactionException;
-import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 
 public class TXStateTest {
   private TXStateProxyImpl txStateProxy;
@@ -44,60 +46,54 @@ public class TXStateTest {
 
   @Before
   public void setup() {
-    txStateProxy = mock(TXStateProxyImpl.class);
+    txStateProxy = mock(TXStateProxyImpl.class, RETURNS_DEEP_STUBS);
     exception = new CommitConflictException("");
     transactionDataNodeHasDepartedException = new TransactionDataNodeHasDepartedException("");
 
     when(txStateProxy.getTxMgr()).thenReturn(mock(TXManagerImpl.class));
   }
 
-
   @Test
-  public void beforeCompletionThrowsIfReserveAndCheckFails() {
+  public void doBeforeCompletionThrowsIfReserveAndCheckFails() {
     TXState txState = spy(new TXState(txStateProxy, true));
+    doReturn(mock(Executor.class)).when(txState).getExecutor();
     doThrow(exception).when(txState).reserveAndCheck();
 
-    assertThatThrownBy(() -> txState.beforeCompletion())
+    assertThatThrownBy(() -> txState.doBeforeCompletion())
         .isInstanceOf(SynchronizationCommitConflictException.class);
   }
 
-
   @Test
-  public void afterCompletionThrowsIfCommitFails() {
+  public void doAfterCompletionThrowsIfCommitFails() {
     TXState txState = spy(new TXState(txStateProxy, true));
     doReturn(mock(InternalCache.class)).when(txState).getCache();
     doReturn(true).when(txState).wasBeforeCompletionCalled();
     txState.reserveAndCheck();
     doThrow(transactionDataNodeHasDepartedException).when(txState).commit();
 
-    assertThatThrownBy(() -> txState.afterCompletion(Status.STATUS_COMMITTED))
+    assertThatThrownBy(() -> txState.doAfterCompletion(Status.STATUS_COMMITTED))
         .isSameAs(transactionDataNodeHasDepartedException);
   }
 
   @Test
-  public void afterCompletionThrowsTransactionExceptionIfCommitFailedCommitConflictException() {
-    TXState txState = spy(new TXState(txStateProxy, true));
-    doReturn(mock(InternalCache.class)).when(txState).getCache();
-    doReturn(true).when(txState).wasBeforeCompletionCalled();
-    doThrow(exception).when(txState).commit();
-
-    assertThatThrownBy(() -> txState.afterCompletion(Status.STATUS_COMMITTED))
-        .isInstanceOf(TransactionException.class);
-  }
-
-  @Test
-  public void afterCompletionCanCommitJTA() {
+  public void doAfterCompletionCanCommitJTA() {
     TXState txState = spy(new TXState(txStateProxy, false));
     doReturn(mock(InternalCache.class)).when(txState).getCache();
     txState.reserveAndCheck();
     txState.closed = true;
     doReturn(true).when(txState).wasBeforeCompletionCalled();
-    txState.afterCompletion(Status.STATUS_COMMITTED);
+    txState.doAfterCompletion(Status.STATUS_COMMITTED);
 
     assertThat(txState.locks).isNull();
     verify(txState, times(1)).saveTXCommitMessageForClientFailover();
   }
 
+  @Test(expected = FailedSynchronizationException.class)
+  public void afterCompletionThrowsExceptionIfBeforeCompletionNotCalled() {
+    TXState txState = new TXState(txStateProxy, true);
+    txState.afterCompletion(Status.STATUS_COMMITTED);
+  }
+
   @Test
   public void afterCompletionCanRollbackJTA() {
     TXState txState = spy(new TXState(txStateProxy, true));
@@ -153,16 +149,7 @@ public class TXStateTest {
   public void getOriginatingMemberReturnsNullIfNotOriginatedFromClient() {
     TXState txState = spy(new TXState(txStateProxy, false));
 
-    assertThat(txState.getOriginatingMember()).isNull();
+    assertThat(txState.getOriginatingMember()).isSameAs(txStateProxy.getOnBehalfOfClientMember());
   }
 
-  @Test
-  public void getOriginatingMemberReturnsClientMemberIfOriginatedFromClient() {
-    InternalDistributedMember client = mock(InternalDistributedMember.class);
-    TXStateProxyImpl proxy = new TXStateProxyImpl(mock(InternalCache.class),
-        mock(TXManagerImpl.class), mock(TXId.class), client);
-    TXState txState = spy(new TXState(proxy, false));
-
-    assertThat(txState.getOriginatingMember()).isEqualTo(client);
-  }
 }


[geode] 01/02: GEODE-5624: Use a thread to do beforeCompletion and afterCompletion.

Posted by es...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a commit to branch feature/GEODE-5624
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 413b34f3a63af62094925b16f280ae4a641d4dc6
Author: eshu <es...@pivotal.io>
AuthorDate: Thu Aug 23 14:24:48 2018 -0700

    GEODE-5624: Use a thread to do beforeCompletion and afterCompletion.
---
 .../ClientServerJTAFailoverDistributedTest.java    |  25 ++
 .../org/apache/geode/internal/cache/TXState.java   | 268 ++++++++++++++++++---
 .../cache/TXStateSynchronizationRunnable.java      | 144 +++++++++++
 3 files changed, 400 insertions(+), 37 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/jta/ClientServerJTAFailoverDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/jta/ClientServerJTAFailoverDistributedTest.java
index c623766..e4589fa 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/jta/ClientServerJTAFailoverDistributedTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/jta/ClientServerJTAFailoverDistributedTest.java
@@ -59,12 +59,14 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable {
   private String hostName;
   private String uniqueName;
   private String regionName;
+  private String replicateRegionName;
   private VM server1;
   private VM server2;
   private VM server3;
   private VM client1;
   private int port1;
   private int port2;
+  private boolean hasReplicateRegion = false;
 
   private final int key = 1;
   private final String value = "value1";
@@ -92,6 +94,7 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable {
     hostName = getHostName();
     uniqueName = getClass().getSimpleName() + "_" + testName.getMethodName();
     regionName = uniqueName + "_region";
+    replicateRegionName = uniqueName + "_replicate_region";
   }
 
   @Test
@@ -126,6 +129,9 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable {
     PartitionAttributes partitionAttributes = factory.create();
     cacheRule.getOrCreateCache().createRegionFactory(RegionShortcut.PARTITION)
         .setPartitionAttributes(partitionAttributes).create(regionName);
+    if (hasReplicateRegion) {
+      cacheRule.getOrCreateCache().createRegionFactory(RegionShortcut.REPLICATE).create(replicateRegionName);
+    }
 
     CacheServer server = cacheRule.getCache().addCacheServer();
     server.setPort(0);
@@ -148,6 +154,7 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable {
         clientCacheRule.getClientCache().createClientRegionFactory(ClientRegionShortcut.LOCAL);
     crf.setPoolName(pool.getName());
     crf.create(regionName);
+    if (hasReplicateRegion) crf.create(replicateRegionName);
 
     if (ports.length > 1) {
       pool.acquireConnection(new ServerLocation(hostName, port1));
@@ -171,9 +178,11 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable {
     Object[] results = new Object[2];
     InternalClientCache cache = clientCacheRule.getClientCache();
     Region region = cache.getRegion(regionName);
+    Region replicateRegion = hasReplicateRegion? cache.getRegion(replicateRegionName) : null;
     TXManagerImpl txManager = (TXManagerImpl) cache.getCacheTransactionManager();
     txManager.begin();
     region.put(key, newValue);
+    if (hasReplicateRegion) replicateRegion.put(key, newValue);
 
     TXStateProxyImpl txStateProxy = (TXStateProxyImpl) txManager.getTXState();
     ClientTXStateStub clientTXStateStub = (ClientTXStateStub) txStateProxy.getRealDeal(null, null);
@@ -188,6 +197,7 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable {
   private void doAfterCompletion(TransactionId transactionId, boolean isCommit) {
     InternalClientCache cache = clientCacheRule.getClientCache();
     Region region = cache.getRegion(regionName);
+    Region replicateRegion = cache.getRegion(replicateRegionName);
     TXManagerImpl txManager = (TXManagerImpl) cache.getCacheTransactionManager();
     txManager.resume(transactionId);
 
@@ -202,6 +212,7 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable {
     }
     if (isCommit) {
       assertEquals(newValue, region.get(key));
+      if (hasReplicateRegion) assertEquals(newValue, replicateRegion.get(key));
     } else {
       assertEquals(value, region.get(key));
     }
@@ -292,4 +303,18 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable {
     txStateStub.beforeCompletion();
   }
 
+  @Test
+  public void jtaCanFailoverToJTAHostForMixedRegionsAfterDoneBeforeCompletion() {
+    hasReplicateRegion = true;
+    port2 = server2.invoke(() -> createServerRegion(1, false));
+    server2.invoke(() -> doPut(key, value));
+    port1 = server1.invoke(() -> createServerRegion(1, true));
+
+    client1.invoke(() -> createClientRegion(port1, port2));
+    Object[] beforeCompletionResults = client1.invoke(() -> doBeforeCompletion());
+
+    server1.invoke(() -> cacheRule.getCache().close());
+
+    client1.invoke(() -> doAfterCompletion((TransactionId) beforeCompletionResults[0], true));
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
index 9494fd3..f33267e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Executor;
 import java.util.concurrent.locks.ReentrantLock;
 
 import javax.transaction.Status;
@@ -51,6 +52,7 @@ import org.apache.geode.cache.UnsupportedOperationInTransactionException;
 import org.apache.geode.cache.client.internal.ServerRegionDataAccess;
 import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.TXManagerCancelledException;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.cache.control.MemoryThresholds;
@@ -101,6 +103,15 @@ public class TXState implements TXStateInterface {
   // Access this variable should be in synchronized block.
   private boolean beforeCompletionCalled;
 
+  /**
+   * for client/server JTA transactions we need to have a single thread handle both beforeCompletion
+   * and afterCompletion so that beforeCompletion can obtain locks for the afterCompletion step.
+   * This is that thread
+   */
+  protected volatile TXStateSynchronizationRunnable syncRunnable;
+  private volatile SynchronizationCommitConflictException beforeCompletionException;
+  private volatile RuntimeException afterCompletionException;
+
   // Internal testing hooks
   private Runnable internalAfterReservation;
   protected Runnable internalAfterConflictCheck;
@@ -910,6 +921,9 @@ public class TXState implements TXStateInterface {
       synchronized (this.completionGuard) {
         this.completionGuard.notifyAll();
       }
+      if (this.syncRunnable != null) {
+        this.syncRunnable.abort();
+      }
       if (iae != null && !this.proxy.getCache().isClosed()) {
         throw iae;
       }
@@ -996,6 +1010,75 @@ public class TXState implements TXStateInterface {
     }
   }
 
+//  //////////////////////////////////////////////////////////////////
+//  // JTA Synchronization implementation //
+//  //////////////////////////////////////////////////////////////////
+//  /*
+//   * (non-Javadoc)
+//   *
+//   * @see org.apache.geode.internal.cache.TXStateInterface#beforeCompletion()
+//   */
+//  @Override
+//  public synchronized void beforeCompletion() throws SynchronizationCommitConflictException {
+//    if (this.closed) {
+//      throw new TXManagerCancelledException();
+//    }
+//    if (beforeCompletionCalled) {
+//      // do not re-execute beforeCompletion again
+//      return;
+//    }
+//    beforeCompletionCalled = true;
+//    doBeforeCompletion();
+//  }
+//
+//  private void doBeforeCompletion() {
+//    final long opStart = CachePerfStats.getStatTime();
+//    this.jtaLifeTime = opStart - getBeginTime();
+//    try {
+//      reserveAndCheck();
+//      /*
+//       * If there is a TransactionWriter plugged in, we need to to give it an opportunity to abort
+//       * the transaction.
+//       */
+//      TransactionWriter writer = this.proxy.getTxMgr().getWriter();
+//      if (writer != null) {
+//        try {
+//          // need to mark this so we don't fire again in commit
+//          firedWriter = true;
+//          TXEvent event = getEvent();
+//          if (!event.hasOnlyInternalEvents()) {
+//            writer.beforeCommit(event);
+//          }
+//        } catch (TransactionWriterException twe) {
+//          throw new CommitConflictException(twe);
+//        } catch (VirtualMachineError err) {
+//          // cleanup(); this allocates objects so I don't think we can do it - that leaves the TX
+//          // open, but we are poison pilling so we should be ok??
+//
+//          SystemFailure.initiateFailure(err);
+//          // If this ever returns, rethrow the error. We're poisoned
+//          // now, so don't let this thread continue.
+//          throw err;
+//        } catch (Throwable t) {
+//          // Whenever you catch Error or Throwable, you must also
+//          // catch VirtualMachineError (see above). However, there is
+//          // _still_ a possibility that you are dealing with a cascading
+//          // error condition, so you also need to check to see if the JVM
+//          // is still usable:
+//          SystemFailure.checkFailure();
+//          throw new CommitConflictException(t);
+//        }
+//      }
+//    } catch (CommitConflictException commitConflict) {
+//      cleanup();
+//      proxy.getTxMgr().noteCommitFailure(opStart, this.jtaLifeTime, this);
+//      throw new SynchronizationCommitConflictException(
+//          LocalizedStrings.TXState_CONFLICT_DETECTED_IN_GEMFIRE_TRANSACTION_0
+//              .toLocalizedString(getTransactionId()),
+//          commitConflict);
+//    }
+//  }
+
   //////////////////////////////////////////////////////////////////
   // JTA Synchronization implementation //
   //////////////////////////////////////////////////////////////////
@@ -1009,17 +1092,57 @@ public class TXState implements TXStateInterface {
     if (this.closed) {
       throw new TXManagerCancelledException();
     }
+
     if (beforeCompletionCalled) {
       // do not re-execute beforeCompletion again
       return;
     }
     beforeCompletionCalled = true;
-    doBeforeCompletion();
+
+    TXStateSynchronizationRunnable sync = createTxStateSynchronizationRunnable();
+    setSynchronizationRunnable(sync);
+
+    Executor exec = getExecutor();
+    exec.execute(sync);
+    sync.waitForFirstExecution();
+    if (getBeforeCompletionException() != null) {
+      throw getBeforeCompletionException();
+    }
+    //doBeforeCompletion();
   }
 
+  TXStateSynchronizationRunnable createTxStateSynchronizationRunnable() {
+    Runnable beforeCompletion = new Runnable() {
+      @SuppressWarnings("synthetic-access")
+      public void run() {
+        doBeforeCompletion();
+      }
+    };
+
+    return new TXStateSynchronizationRunnable(getCache().getCancelCriterion(),
+        beforeCompletion);
+  }
+
+  Executor getExecutor() {
+    return InternalDistributedSystem.getConnectedInstance().getDistributionManager()
+        .getWaitingThreadPool();
+  }
+
+  SynchronizationCommitConflictException getBeforeCompletionException() {
+    return beforeCompletionException;
+  }
+
+  private void setSynchronizationRunnable(TXStateSynchronizationRunnable synchronizationRunnable) {
+    syncRunnable = synchronizationRunnable;
+  }
+
+
   private void doBeforeCompletion() {
+    proxy.getTxMgr().setTXState(null);
     final long opStart = CachePerfStats.getStatTime();
     this.jtaLifeTime = opStart - getBeginTime();
+
+
     try {
       reserveAndCheck();
       /*
@@ -1066,17 +1189,28 @@ public class TXState implements TXStateInterface {
   }
 
   /*
-   * (non-Javadoc)
-   *
-   * @see org.apache.geode.internal.cache.TXStateInterface#afterCompletion(int)
-   */
+ * (non-Javadoc)
+ *
+ * @see org.apache.geode.internal.cache.TXStateInterface#afterCompletion(int)
+ */
   @Override
   public synchronized void afterCompletion(int status) {
-    this.proxy.getTxMgr().setTXState(null);
-    // For commit, beforeCompletion should be called. Otherwise
+    proxy.getTxMgr().setTXState(null);
+    Runnable afterCompletion = new Runnable() {
+      @SuppressWarnings("synthetic-access")
+      public void run() {
+        doAfterCompletion(status);
+      }
+    };
+    // if there was a beforeCompletion call then there will be a thread
+    // sitting in the waiting pool to execute afterCompletion. Otherwise
     // throw FailedSynchronizationException().
-    if (wasBeforeCompletionCalled()) {
-      doAfterCompletion(status);
+    TXStateSynchronizationRunnable sync = getSynchronizationRunnable();
+    if (sync != null) {
+      sync.runSecondRunnable(afterCompletion);
+      if (getAfterCompletionException() != null) {
+        throw getAfterCompletionException();
+      }
     } else {
       // rollback does not run beforeCompletion.
       if (status != Status.STATUS_ROLLEDBACK) {
@@ -1087,37 +1221,97 @@ public class TXState implements TXStateInterface {
     }
   }
 
+  TXStateSynchronizationRunnable getSynchronizationRunnable() {
+    return this.syncRunnable;
+  }
+
+  RuntimeException getAfterCompletionException() {
+    return afterCompletionException;
+  }
+
   private void doAfterCompletion(int status) {
     final long opStart = CachePerfStats.getStatTime();
-    try {
-      switch (status) {
-        case Status.STATUS_COMMITTED:
-          Assert.assertTrue(this.locks != null,
-              "Gemfire Transaction afterCompletion called with illegal state.");
-          try {
-            commit();
-            saveTXCommitMessageForClientFailover();
-          } catch (CommitConflictException error) {
-            Assert.assertTrue(false, "Gemfire Transaction " + getTransactionId()
-                + " afterCompletion failed.due to CommitConflictException: " + error);
-          }
-
-          this.proxy.getTxMgr().noteCommitSuccess(opStart, this.jtaLifeTime, this);
-          this.locks = null;
-          break;
-        case Status.STATUS_ROLLEDBACK:
-          this.jtaLifeTime = opStart - getBeginTime();
-          rollback();
+    switch (status) {
+      case Status.STATUS_COMMITTED:
+        Assert.assertTrue(this.locks != null,
+            "Gemfire Transaction afterCompletion called with illegal state.");
+        try {
+          proxy.getTxMgr().setTXState(null);
+          commit();
           saveTXCommitMessageForClientFailover();
-          this.proxy.getTxMgr().noteRollbackSuccess(opStart, this.jtaLifeTime, this);
-          break;
-        default:
-          Assert.assertTrue(false, "Unknown JTA Synchronization status " + status);
-      }
-    } catch (InternalGemFireError error) {
-      throw new TransactionException(error);
-    }
-  }
+        } catch (CommitConflictException error) {
+          Assert.assertTrue(false, "Gemfire Transaction " + getTransactionId()
+              + " afterCompletion failed.due to CommitConflictException: " + error);
+        }
+
+        this.proxy.getTxMgr().noteCommitSuccess(opStart, this.jtaLifeTime, this);
+        this.locks = null;
+        break;
+      case Status.STATUS_ROLLEDBACK:
+        this.jtaLifeTime = opStart - getBeginTime();
+        this.proxy.getTxMgr().setTXState(null);
+        rollback();
+        saveTXCommitMessageForClientFailover();
+        this.proxy.getTxMgr().noteRollbackSuccess(opStart, this.jtaLifeTime, this);
+        break;
+      default:
+        Assert.assertTrue(false, "Unknown JTA Synchronization status " + status);
+    }
+  }
+
+//  /*
+//   * (non-Javadoc)
+//   *
+//   * @see org.apache.geode.internal.cache.TXStateInterface#afterCompletion(int)
+//   */
+//  @Override
+//  public synchronized void afterCompletion(int status) {
+//    this.proxy.getTxMgr().setTXState(null);
+//    // For commit, beforeCompletion should be called. Otherwise
+//    // throw FailedSynchronizationException().
+//    if (wasBeforeCompletionCalled()) {
+//      doAfterCompletion(status);
+//    } else {
+//      // rollback does not run beforeCompletion.
+//      if (status != Status.STATUS_ROLLEDBACK) {
+//        throw new FailedSynchronizationException(
+//            "Could not execute afterCompletion when beforeCompletion was not executed");
+//      }
+//      doAfterCompletion(status);
+//    }
+//  }
+//
+//  private void doAfterCompletion(int status) {
+//    final long opStart = CachePerfStats.getStatTime();
+//    try {
+//      switch (status) {
+//        case Status.STATUS_COMMITTED:
+//          Assert.assertTrue(this.locks != null,
+//              "Gemfire Transaction afterCompletion called with illegal state.");
+//          try {
+//            commit();
+//            saveTXCommitMessageForClientFailover();
+//          } catch (CommitConflictException error) {
+//            Assert.assertTrue(false, "Gemfire Transaction " + getTransactionId()
+//                + " afterCompletion failed.due to CommitConflictException: " + error);
+//          }
+//
+//          this.proxy.getTxMgr().noteCommitSuccess(opStart, this.jtaLifeTime, this);
+//          this.locks = null;
+//          break;
+//        case Status.STATUS_ROLLEDBACK:
+//          this.jtaLifeTime = opStart - getBeginTime();
+//          rollback();
+//          saveTXCommitMessageForClientFailover();
+//          this.proxy.getTxMgr().noteRollbackSuccess(opStart, this.jtaLifeTime, this);
+//          break;
+//        default:
+//          Assert.assertTrue(false, "Unknown JTA Synchronization status " + status);
+//      }
+//    } catch (InternalGemFireError error) {
+//      throw new TransactionException(error);
+//    }
+//  }
 
   boolean wasBeforeCompletionCalled() {
     return beforeCompletionCalled;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnable.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnable.java
new file mode 100644
index 0000000..28f367b
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnable.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.internal.cache.tier.sockets.CommBufferPool;
+import org.apache.geode.internal.logging.LogService;
+
+/**
+ * TXStateSynchronizationThread manages beforeCompletion and afterCompletion calls.
+ * The thread should be instantiated with a Runnable that invokes beforeCompletion behavior.
+ * Then you must invoke runSecondRunnable() with another Runnable that invokes afterCompletion
+ * behavior.
+ *
+ * @since Geode 1.6.0
+ */
+public class TXStateSynchronizationRunnable implements Runnable {
+  private static final Logger logger = LogService.getLogger();
+
+  private final CancelCriterion cancelCriterion;
+
+  private Runnable firstRunnable;
+  private final Object firstRunnableSync = new Object();
+  private boolean firstRunnableCompleted;
+
+  private Runnable secondRunnable;
+  private final Object secondRunnableSync = new Object();
+  private boolean secondRunnableCompleted;
+
+  private boolean abort;
+
+  public TXStateSynchronizationRunnable(final CancelCriterion cancelCriterion, final Runnable beforeCompletion) {
+    this.cancelCriterion = cancelCriterion;
+    this.firstRunnable = beforeCompletion;
+  }
+
+  @Override
+  public void run() {
+    doSynchronizationOps();
+  }
+
+  private void doSynchronizationOps() {
+    synchronized (this.firstRunnableSync) {
+      try {
+        this.firstRunnable.run();
+      } finally {
+        if (logger.isTraceEnabled()) {
+          logger.trace("beforeCompletion notification completed");
+        }
+        this.firstRunnableCompleted = true;
+        this.firstRunnable = null;
+        this.firstRunnableSync.notifyAll();
+      }
+    }
+    synchronized (this.secondRunnableSync) {
+      // TODO there should be a transaction timeout that keeps this thread
+      // from sitting around forever if the client goes away
+      final boolean isTraceEnabled = logger.isTraceEnabled();
+      while (this.secondRunnable == null && !this.abort) {
+        try {
+          if (isTraceEnabled) {
+            logger.trace("waiting for afterCompletion notification");
+          }
+          this.secondRunnableSync.wait(1000);
+        } catch (InterruptedException ignore) {
+          // eat the interrupt and check for exit conditions
+        }
+      }
+      if (isTraceEnabled) {
+        logger.trace("executing afterCompletion notification");
+      }
+      try {
+        if (!this.abort) {
+          this.secondRunnable.run();
+        }
+      } finally {
+        if (isTraceEnabled) {
+          logger.trace("afterCompletion notification completed");
+        }
+        this.secondRunnableCompleted = true;
+        this.secondRunnable = null;
+        this.secondRunnableSync.notifyAll();
+      }
+    }
+  }
+
+  /**
+   * wait for the initial beforeCompletion step to finish
+   */
+  public void waitForFirstExecution() {
+    synchronized (this.firstRunnableSync) {
+      while (!this.firstRunnableCompleted) {
+        try {
+          this.firstRunnableSync.wait(1000);
+        } catch (InterruptedException ignore) {
+          // eat the interrupt and check for exit conditions
+        }
+        cancelCriterion.checkCancelInProgress(null);
+      }
+    }
+  }
+
+  /**
+   * run the afterCompletion portion of synchronization. This method schedules execution of the
+   * given runnable and then waits for it to finish running
+   */
+  public void runSecondRunnable(Runnable r) {
+    synchronized (this.secondRunnableSync) {
+      this.secondRunnable = r;
+      this.secondRunnableSync.notifyAll();
+      while (!this.secondRunnableCompleted && !this.abort) {
+        try {
+          this.secondRunnableSync.wait(1000);
+        } catch (InterruptedException ignore) {
+          // eat the interrupt and check for exit conditions
+        }
+        cancelCriterion.checkCancelInProgress(null);
+      }
+    }
+  }
+
+  /**
+   * stop waiting for an afterCompletion to arrive and just exit
+   */
+  public void abort() {
+    synchronized (this.secondRunnableSync) {
+      this.abort = true;
+    }
+  }
+}