You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2018/08/24 20:37:44 UTC

[geode] 01/02: GEODE-5624: Use a thread to do beforeCompletion and afterCompletion.

This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a commit to branch feature/GEODE-5624
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 413b34f3a63af62094925b16f280ae4a641d4dc6
Author: eshu <es...@pivotal.io>
AuthorDate: Thu Aug 23 14:24:48 2018 -0700

    GEODE-5624: Use a thread to do beforeCompletion and afterCompletion.
---
 .../ClientServerJTAFailoverDistributedTest.java    |  25 ++
 .../org/apache/geode/internal/cache/TXState.java   | 268 ++++++++++++++++++---
 .../cache/TXStateSynchronizationRunnable.java      | 144 +++++++++++
 3 files changed, 400 insertions(+), 37 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/jta/ClientServerJTAFailoverDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/jta/ClientServerJTAFailoverDistributedTest.java
index c623766..e4589fa 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/jta/ClientServerJTAFailoverDistributedTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/jta/ClientServerJTAFailoverDistributedTest.java
@@ -59,12 +59,14 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable {
   private String hostName;
   private String uniqueName;
   private String regionName;
+  private String replicateRegionName;
   private VM server1;
   private VM server2;
   private VM server3;
   private VM client1;
   private int port1;
   private int port2;
+  private boolean hasReplicateRegion = false;
 
   private final int key = 1;
   private final String value = "value1";
@@ -92,6 +94,7 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable {
     hostName = getHostName();
     uniqueName = getClass().getSimpleName() + "_" + testName.getMethodName();
     regionName = uniqueName + "_region";
+    replicateRegionName = uniqueName + "_replicate_region";
   }
 
   @Test
@@ -126,6 +129,9 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable {
     PartitionAttributes partitionAttributes = factory.create();
     cacheRule.getOrCreateCache().createRegionFactory(RegionShortcut.PARTITION)
         .setPartitionAttributes(partitionAttributes).create(regionName);
+    if (hasReplicateRegion) {
+      cacheRule.getOrCreateCache().createRegionFactory(RegionShortcut.REPLICATE).create(replicateRegionName);
+    }
 
     CacheServer server = cacheRule.getCache().addCacheServer();
     server.setPort(0);
@@ -148,6 +154,7 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable {
         clientCacheRule.getClientCache().createClientRegionFactory(ClientRegionShortcut.LOCAL);
     crf.setPoolName(pool.getName());
     crf.create(regionName);
+    if (hasReplicateRegion) crf.create(replicateRegionName);
 
     if (ports.length > 1) {
       pool.acquireConnection(new ServerLocation(hostName, port1));
@@ -171,9 +178,11 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable {
     Object[] results = new Object[2];
     InternalClientCache cache = clientCacheRule.getClientCache();
     Region region = cache.getRegion(regionName);
+    Region replicateRegion = hasReplicateRegion? cache.getRegion(replicateRegionName) : null;
     TXManagerImpl txManager = (TXManagerImpl) cache.getCacheTransactionManager();
     txManager.begin();
     region.put(key, newValue);
+    if (hasReplicateRegion) replicateRegion.put(key, newValue);
 
     TXStateProxyImpl txStateProxy = (TXStateProxyImpl) txManager.getTXState();
     ClientTXStateStub clientTXStateStub = (ClientTXStateStub) txStateProxy.getRealDeal(null, null);
@@ -188,6 +197,7 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable {
   private void doAfterCompletion(TransactionId transactionId, boolean isCommit) {
     InternalClientCache cache = clientCacheRule.getClientCache();
     Region region = cache.getRegion(regionName);
+    Region replicateRegion = cache.getRegion(replicateRegionName);
     TXManagerImpl txManager = (TXManagerImpl) cache.getCacheTransactionManager();
     txManager.resume(transactionId);
 
@@ -202,6 +212,7 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable {
     }
     if (isCommit) {
       assertEquals(newValue, region.get(key));
+      if (hasReplicateRegion) assertEquals(newValue, replicateRegion.get(key));
     } else {
       assertEquals(value, region.get(key));
     }
@@ -292,4 +303,18 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable {
     txStateStub.beforeCompletion();
   }
 
+  @Test
+  public void jtaCanFailoverToJTAHostForMixedRegionsAfterDoneBeforeCompletion() {
+    hasReplicateRegion = true;
+    port2 = server2.invoke(() -> createServerRegion(1, false));
+    server2.invoke(() -> doPut(key, value));
+    port1 = server1.invoke(() -> createServerRegion(1, true));
+
+    client1.invoke(() -> createClientRegion(port1, port2));
+    Object[] beforeCompletionResults = client1.invoke(() -> doBeforeCompletion());
+
+    server1.invoke(() -> cacheRule.getCache().close());
+
+    client1.invoke(() -> doAfterCompletion((TransactionId) beforeCompletionResults[0], true));
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
index 9494fd3..f33267e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Executor;
 import java.util.concurrent.locks.ReentrantLock;
 
 import javax.transaction.Status;
@@ -51,6 +52,7 @@ import org.apache.geode.cache.UnsupportedOperationInTransactionException;
 import org.apache.geode.cache.client.internal.ServerRegionDataAccess;
 import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.TXManagerCancelledException;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.cache.control.MemoryThresholds;
@@ -101,6 +103,15 @@ public class TXState implements TXStateInterface {
   // Access this variable should be in synchronized block.
   private boolean beforeCompletionCalled;
 
+  /**
+   * for client/server JTA transactions we need to have a single thread handle both beforeCompletion
+   * and afterCompletion so that beforeCompletion can obtain locks for the afterCompletion step.
+   * This is that thread
+   */
+  protected volatile TXStateSynchronizationRunnable syncRunnable;
+  private volatile SynchronizationCommitConflictException beforeCompletionException;
+  private volatile RuntimeException afterCompletionException;
+
   // Internal testing hooks
   private Runnable internalAfterReservation;
   protected Runnable internalAfterConflictCheck;
@@ -910,6 +921,9 @@ public class TXState implements TXStateInterface {
       synchronized (this.completionGuard) {
         this.completionGuard.notifyAll();
       }
+      if (this.syncRunnable != null) {
+        this.syncRunnable.abort();
+      }
       if (iae != null && !this.proxy.getCache().isClosed()) {
         throw iae;
       }
@@ -996,6 +1010,75 @@ public class TXState implements TXStateInterface {
     }
   }
 
+//  //////////////////////////////////////////////////////////////////
+//  // JTA Synchronization implementation //
+//  //////////////////////////////////////////////////////////////////
+//  /*
+//   * (non-Javadoc)
+//   *
+//   * @see org.apache.geode.internal.cache.TXStateInterface#beforeCompletion()
+//   */
+//  @Override
+//  public synchronized void beforeCompletion() throws SynchronizationCommitConflictException {
+//    if (this.closed) {
+//      throw new TXManagerCancelledException();
+//    }
+//    if (beforeCompletionCalled) {
+//      // do not re-execute beforeCompletion again
+//      return;
+//    }
+//    beforeCompletionCalled = true;
+//    doBeforeCompletion();
+//  }
+//
+//  private void doBeforeCompletion() {
+//    final long opStart = CachePerfStats.getStatTime();
+//    this.jtaLifeTime = opStart - getBeginTime();
+//    try {
+//      reserveAndCheck();
+//      /*
+//       * If there is a TransactionWriter plugged in, we need to to give it an opportunity to abort
+//       * the transaction.
+//       */
+//      TransactionWriter writer = this.proxy.getTxMgr().getWriter();
+//      if (writer != null) {
+//        try {
+//          // need to mark this so we don't fire again in commit
+//          firedWriter = true;
+//          TXEvent event = getEvent();
+//          if (!event.hasOnlyInternalEvents()) {
+//            writer.beforeCommit(event);
+//          }
+//        } catch (TransactionWriterException twe) {
+//          throw new CommitConflictException(twe);
+//        } catch (VirtualMachineError err) {
+//          // cleanup(); this allocates objects so I don't think we can do it - that leaves the TX
+//          // open, but we are poison pilling so we should be ok??
+//
+//          SystemFailure.initiateFailure(err);
+//          // If this ever returns, rethrow the error. We're poisoned
+//          // now, so don't let this thread continue.
+//          throw err;
+//        } catch (Throwable t) {
+//          // Whenever you catch Error or Throwable, you must also
+//          // catch VirtualMachineError (see above). However, there is
+//          // _still_ a possibility that you are dealing with a cascading
+//          // error condition, so you also need to check to see if the JVM
+//          // is still usable:
+//          SystemFailure.checkFailure();
+//          throw new CommitConflictException(t);
+//        }
+//      }
+//    } catch (CommitConflictException commitConflict) {
+//      cleanup();
+//      proxy.getTxMgr().noteCommitFailure(opStart, this.jtaLifeTime, this);
+//      throw new SynchronizationCommitConflictException(
+//          LocalizedStrings.TXState_CONFLICT_DETECTED_IN_GEMFIRE_TRANSACTION_0
+//              .toLocalizedString(getTransactionId()),
+//          commitConflict);
+//    }
+//  }
+
   //////////////////////////////////////////////////////////////////
   // JTA Synchronization implementation //
   //////////////////////////////////////////////////////////////////
@@ -1009,17 +1092,57 @@ public class TXState implements TXStateInterface {
     if (this.closed) {
       throw new TXManagerCancelledException();
     }
+
     if (beforeCompletionCalled) {
       // do not re-execute beforeCompletion again
       return;
     }
     beforeCompletionCalled = true;
-    doBeforeCompletion();
+
+    TXStateSynchronizationRunnable sync = createTxStateSynchronizationRunnable();
+    setSynchronizationRunnable(sync);
+
+    Executor exec = getExecutor();
+    exec.execute(sync);
+    sync.waitForFirstExecution();
+    if (getBeforeCompletionException() != null) {
+      throw getBeforeCompletionException();
+    }
+    //doBeforeCompletion();
   }
 
+  TXStateSynchronizationRunnable createTxStateSynchronizationRunnable() {
+    Runnable beforeCompletion = new Runnable() {
+      @SuppressWarnings("synthetic-access")
+      public void run() {
+        doBeforeCompletion();
+      }
+    };
+
+    return new TXStateSynchronizationRunnable(getCache().getCancelCriterion(),
+        beforeCompletion);
+  }
+
+  Executor getExecutor() {
+    return InternalDistributedSystem.getConnectedInstance().getDistributionManager()
+        .getWaitingThreadPool();
+  }
+
+  SynchronizationCommitConflictException getBeforeCompletionException() {
+    return beforeCompletionException;
+  }
+
+  private void setSynchronizationRunnable(TXStateSynchronizationRunnable synchronizationRunnable) {
+    syncRunnable = synchronizationRunnable;
+  }
+
+
   private void doBeforeCompletion() {
+    proxy.getTxMgr().setTXState(null);
     final long opStart = CachePerfStats.getStatTime();
     this.jtaLifeTime = opStart - getBeginTime();
+
+
     try {
       reserveAndCheck();
       /*
@@ -1066,17 +1189,28 @@ public class TXState implements TXStateInterface {
   }
 
   /*
-   * (non-Javadoc)
-   *
-   * @see org.apache.geode.internal.cache.TXStateInterface#afterCompletion(int)
-   */
+ * (non-Javadoc)
+ *
+ * @see org.apache.geode.internal.cache.TXStateInterface#afterCompletion(int)
+ */
   @Override
   public synchronized void afterCompletion(int status) {
-    this.proxy.getTxMgr().setTXState(null);
-    // For commit, beforeCompletion should be called. Otherwise
+    proxy.getTxMgr().setTXState(null);
+    Runnable afterCompletion = new Runnable() {
+      @SuppressWarnings("synthetic-access")
+      public void run() {
+        doAfterCompletion(status);
+      }
+    };
+    // if there was a beforeCompletion call then there will be a thread
+    // sitting in the waiting pool to execute afterCompletion. Otherwise
     // throw FailedSynchronizationException().
-    if (wasBeforeCompletionCalled()) {
-      doAfterCompletion(status);
+    TXStateSynchronizationRunnable sync = getSynchronizationRunnable();
+    if (sync != null) {
+      sync.runSecondRunnable(afterCompletion);
+      if (getAfterCompletionException() != null) {
+        throw getAfterCompletionException();
+      }
     } else {
       // rollback does not run beforeCompletion.
       if (status != Status.STATUS_ROLLEDBACK) {
@@ -1087,37 +1221,97 @@ public class TXState implements TXStateInterface {
     }
   }
 
+  TXStateSynchronizationRunnable getSynchronizationRunnable() {
+    return this.syncRunnable;
+  }
+
+  RuntimeException getAfterCompletionException() {
+    return afterCompletionException;
+  }
+
   private void doAfterCompletion(int status) {
     final long opStart = CachePerfStats.getStatTime();
-    try {
-      switch (status) {
-        case Status.STATUS_COMMITTED:
-          Assert.assertTrue(this.locks != null,
-              "Gemfire Transaction afterCompletion called with illegal state.");
-          try {
-            commit();
-            saveTXCommitMessageForClientFailover();
-          } catch (CommitConflictException error) {
-            Assert.assertTrue(false, "Gemfire Transaction " + getTransactionId()
-                + " afterCompletion failed.due to CommitConflictException: " + error);
-          }
-
-          this.proxy.getTxMgr().noteCommitSuccess(opStart, this.jtaLifeTime, this);
-          this.locks = null;
-          break;
-        case Status.STATUS_ROLLEDBACK:
-          this.jtaLifeTime = opStart - getBeginTime();
-          rollback();
+    switch (status) {
+      case Status.STATUS_COMMITTED:
+        Assert.assertTrue(this.locks != null,
+            "Gemfire Transaction afterCompletion called with illegal state.");
+        try {
+          proxy.getTxMgr().setTXState(null);
+          commit();
           saveTXCommitMessageForClientFailover();
-          this.proxy.getTxMgr().noteRollbackSuccess(opStart, this.jtaLifeTime, this);
-          break;
-        default:
-          Assert.assertTrue(false, "Unknown JTA Synchronization status " + status);
-      }
-    } catch (InternalGemFireError error) {
-      throw new TransactionException(error);
-    }
-  }
+        } catch (CommitConflictException error) {
+          Assert.assertTrue(false, "Gemfire Transaction " + getTransactionId()
+              + " afterCompletion failed.due to CommitConflictException: " + error);
+        }
+
+        this.proxy.getTxMgr().noteCommitSuccess(opStart, this.jtaLifeTime, this);
+        this.locks = null;
+        break;
+      case Status.STATUS_ROLLEDBACK:
+        this.jtaLifeTime = opStart - getBeginTime();
+        this.proxy.getTxMgr().setTXState(null);
+        rollback();
+        saveTXCommitMessageForClientFailover();
+        this.proxy.getTxMgr().noteRollbackSuccess(opStart, this.jtaLifeTime, this);
+        break;
+      default:
+        Assert.assertTrue(false, "Unknown JTA Synchronization status " + status);
+    }
+  }
+
+//  /*
+//   * (non-Javadoc)
+//   *
+//   * @see org.apache.geode.internal.cache.TXStateInterface#afterCompletion(int)
+//   */
+//  @Override
+//  public synchronized void afterCompletion(int status) {
+//    this.proxy.getTxMgr().setTXState(null);
+//    // For commit, beforeCompletion should be called. Otherwise
+//    // throw FailedSynchronizationException().
+//    if (wasBeforeCompletionCalled()) {
+//      doAfterCompletion(status);
+//    } else {
+//      // rollback does not run beforeCompletion.
+//      if (status != Status.STATUS_ROLLEDBACK) {
+//        throw new FailedSynchronizationException(
+//            "Could not execute afterCompletion when beforeCompletion was not executed");
+//      }
+//      doAfterCompletion(status);
+//    }
+//  }
+//
+//  private void doAfterCompletion(int status) {
+//    final long opStart = CachePerfStats.getStatTime();
+//    try {
+//      switch (status) {
+//        case Status.STATUS_COMMITTED:
+//          Assert.assertTrue(this.locks != null,
+//              "Gemfire Transaction afterCompletion called with illegal state.");
+//          try {
+//            commit();
+//            saveTXCommitMessageForClientFailover();
+//          } catch (CommitConflictException error) {
+//            Assert.assertTrue(false, "Gemfire Transaction " + getTransactionId()
+//                + " afterCompletion failed.due to CommitConflictException: " + error);
+//          }
+//
+//          this.proxy.getTxMgr().noteCommitSuccess(opStart, this.jtaLifeTime, this);
+//          this.locks = null;
+//          break;
+//        case Status.STATUS_ROLLEDBACK:
+//          this.jtaLifeTime = opStart - getBeginTime();
+//          rollback();
+//          saveTXCommitMessageForClientFailover();
+//          this.proxy.getTxMgr().noteRollbackSuccess(opStart, this.jtaLifeTime, this);
+//          break;
+//        default:
+//          Assert.assertTrue(false, "Unknown JTA Synchronization status " + status);
+//      }
+//    } catch (InternalGemFireError error) {
+//      throw new TransactionException(error);
+//    }
+//  }
 
   boolean wasBeforeCompletionCalled() {
     return beforeCompletionCalled;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnable.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnable.java
new file mode 100644
index 0000000..28f367b
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnable.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.internal.cache.tier.sockets.CommBufferPool;
+import org.apache.geode.internal.logging.LogService;
+
+/**
+ * TXStateSynchronizationThread manages beforeCompletion and afterCompletion calls.
+ * The thread should be instantiated with a Runnable that invokes beforeCompletion behavior.
+ * Then you must invoke runSecondRunnable() with another Runnable that invokes afterCompletion
+ * behavior.
+ *
+ * @since Geode 1.6.0
+ */
+public class TXStateSynchronizationRunnable implements Runnable {
+  private static final Logger logger = LogService.getLogger();
+
+  private final CancelCriterion cancelCriterion;
+
+  private Runnable firstRunnable;
+  private final Object firstRunnableSync = new Object();
+  private boolean firstRunnableCompleted;
+
+  private Runnable secondRunnable;
+  private final Object secondRunnableSync = new Object();
+  private boolean secondRunnableCompleted;
+
+  private boolean abort;
+
+  public TXStateSynchronizationRunnable(final CancelCriterion cancelCriterion, final Runnable beforeCompletion) {
+    this.cancelCriterion = cancelCriterion;
+    this.firstRunnable = beforeCompletion;
+  }
+
+  @Override
+  public void run() {
+    doSynchronizationOps();
+  }
+
+  private void doSynchronizationOps() {
+    synchronized (this.firstRunnableSync) {
+      try {
+        this.firstRunnable.run();
+      } finally {
+        if (logger.isTraceEnabled()) {
+          logger.trace("beforeCompletion notification completed");
+        }
+        this.firstRunnableCompleted = true;
+        this.firstRunnable = null;
+        this.firstRunnableSync.notifyAll();
+      }
+    }
+    synchronized (this.secondRunnableSync) {
+      // TODO there should be a transaction timeout that keeps this thread
+      // from sitting around forever if the client goes away
+      final boolean isTraceEnabled = logger.isTraceEnabled();
+      while (this.secondRunnable == null && !this.abort) {
+        try {
+          if (isTraceEnabled) {
+            logger.trace("waiting for afterCompletion notification");
+          }
+          this.secondRunnableSync.wait(1000);
+        } catch (InterruptedException ignore) {
+          // eat the interrupt and check for exit conditions
+        }
+      }
+      if (isTraceEnabled) {
+        logger.trace("executing afterCompletion notification");
+      }
+      try {
+        if (!this.abort) {
+          this.secondRunnable.run();
+        }
+      } finally {
+        if (isTraceEnabled) {
+          logger.trace("afterCompletion notification completed");
+        }
+        this.secondRunnableCompleted = true;
+        this.secondRunnable = null;
+        this.secondRunnableSync.notifyAll();
+      }
+    }
+  }
+
+  /**
+   * wait for the initial beforeCompletion step to finish
+   */
+  public void waitForFirstExecution() {
+    synchronized (this.firstRunnableSync) {
+      while (!this.firstRunnableCompleted) {
+        try {
+          this.firstRunnableSync.wait(1000);
+        } catch (InterruptedException ignore) {
+          // eat the interrupt and check for exit conditions
+        }
+        cancelCriterion.checkCancelInProgress(null);
+      }
+    }
+  }
+
+  /**
+   * run the afterCompletion portion of synchronization. This method schedules execution of the
+   * given runnable and then waits for it to finish running
+   */
+  public void runSecondRunnable(Runnable r) {
+    synchronized (this.secondRunnableSync) {
+      this.secondRunnable = r;
+      this.secondRunnableSync.notifyAll();
+      while (!this.secondRunnableCompleted && !this.abort) {
+        try {
+          this.secondRunnableSync.wait(1000);
+        } catch (InterruptedException ignore) {
+          // eat the interrupt and check for exit conditions
+        }
+        cancelCriterion.checkCancelInProgress(null);
+      }
+    }
+  }
+
+  /**
+   * stop waiting for an afterCompletion to arrive and just exit
+   */
+  public void abort() {
+    synchronized (this.secondRunnableSync) {
+      this.abort = true;
+    }
+  }
+}