You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2018/05/01 20:35:45 UTC

[geode] branch develop updated: GEODE-5155 hang recovering transaction state for crashed server

This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 2a02923  GEODE-5155 hang recovering transaction state for crashed server
2a02923 is described below

commit 2a02923a2fe3ead102ff79e76c47935e84f76859
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Tue May 1 13:34:39 2018 -0700

    GEODE-5155 hang recovering transaction state for crashed server
    
    The fix is to check to see if the message was removed due to a
    memberDeparted event.
    
    When that happens a departureNoticed flag was being set in TXCommitMessage
    but the wait loop in the transaction tracker wasn't checking this flag.
---
 .../geode/internal/cache/TXCommitMessage.java      |  9 +++++++-
 .../geode/internal/cache/TXFarSideCMTracker.java   | 20 ++++++++---------
 .../internal/DlockAndTxlockRegressionTest.java     | 25 +++++++++++++++++++---
 3 files changed, 40 insertions(+), 14 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
index d78f60a..8354430 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
@@ -1943,15 +1943,20 @@ public class TXCommitMessage extends PooledDistributionMessage
   public void quorumLost(DistributionManager distributionManager,
       Set<InternalDistributedMember> failures, List<InternalDistributedMember> remaining) {}
 
+  /** return true if the member initiating this transaction has left the cluster */
+  public boolean isDepartureNoticed() {
+    return departureNoticed;
+  }
+
   @Override
   public void memberDeparted(DistributionManager distributionManager,
       final InternalDistributedMember id, boolean crashed) {
+
     if (!getSender().equals(id)) {
       return;
     }
     this.dm.removeMembershipListener(this);
 
-    ThreadGroup group = LoggingThreadGroup.createThreadGroup("TXCommitMessage Threads", logger);
     synchronized (this) {
       if (isProcessing() || this.departureNoticed) {
         if (logger.isDebugEnabled()) {
@@ -1963,6 +1968,8 @@ public class TXCommitMessage extends PooledDistributionMessage
       this.departureNoticed = true;
     }
 
+    ThreadGroup group = LoggingThreadGroup.createThreadGroup("TXCommitMessage Threads", logger);
+
     // Send message to fellow FarSiders (aka recipients), if any, to
     // determine if any one of them have received a CommitProcessMessage
     if (this.farSiders != null && !this.farSiders.isEmpty()) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXFarSideCMTracker.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXFarSideCMTracker.java
index 8f2093f..eff9fa3 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXFarSideCMTracker.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXFarSideCMTracker.java
@@ -162,23 +162,23 @@ public class TXFarSideCMTracker {
    * departed/ing Originator (this will most likely be called nearly the same time as
    * commitProcessReceived
    */
-  public void waitToProcess(TXLockId lk, DistributionManager dm) {
-    waitForMemberToDepart(lk.getMemberId(), dm);
-    final TXCommitMessage mess;
+  public void waitToProcess(TXLockId lockId, DistributionManager dm) {
+    waitForMemberToDepart(lockId.getMemberId(), dm);
+    final TXCommitMessage commitMessage;
     synchronized (this.txInProgress) {
-      mess = (TXCommitMessage) this.txInProgress.get(lk);
+      commitMessage = (TXCommitMessage) this.txInProgress.get(lockId);
     }
-    if (mess != null) {
-      synchronized (mess) {
+    if (commitMessage != null) {
+      synchronized (commitMessage) {
         // tx in progress, we must wait until its done
-        while (!mess.wasProcessed()) {
+        while (!(commitMessage.wasProcessed() || commitMessage.isDepartureNoticed())) {
           try {
-            mess.wait();
+            commitMessage.wait(100);
           } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
             logger.error(LocalizedMessage.create(
                 LocalizedStrings.TxFarSideTracker_WAITING_TO_COMPLETE_ON_MESSAGE_0_CAUGHT_AN_INTERRUPTED_EXCEPTION,
-                mess), ie);
+                commitMessage), ie);
             break;
           }
         }
@@ -186,7 +186,7 @@ public class TXFarSideCMTracker {
     } else {
       // tx may have completed
       for (int i = this.txHistory.length - 1; i >= 0; --i) {
-        if (lk.equals(this.txHistory[i])) {
+        if (lockId.equals(this.txHistory[i])) {
           return;
         }
       }
diff --git a/geode-core/src/test/java/org/apache/geode/distributed/internal/DlockAndTxlockRegressionTest.java b/geode-core/src/test/java/org/apache/geode/distributed/internal/DlockAndTxlockRegressionTest.java
index 8e19b73..3298a83 100644
--- a/geode-core/src/test/java/org/apache/geode/distributed/internal/DlockAndTxlockRegressionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/distributed/internal/DlockAndTxlockRegressionTest.java
@@ -33,6 +33,9 @@ import org.apache.geode.cache.RegionShortcut;
 import org.apache.geode.distributed.ConfigurationProperties;
 import org.apache.geode.distributed.DistributedLockService;
 import org.apache.geode.internal.OSProcess;
+import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.TXState;
+import org.apache.geode.internal.cache.TXStateProxyImpl;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.test.dunit.AsyncInvocation;
 import org.apache.geode.test.dunit.DistributedTestUtils;
@@ -54,6 +57,8 @@ public class DlockAndTxlockRegressionTest extends JUnit4CacheTestCase {
   @Rule
   public DistributedRestoreSystemProperties restoreSystemProperties =
       new DistributedRestoreSystemProperties();
+  private static Object commitLock = new Object();
+  private static boolean committing;
 
   @Override
   public Properties getDistributedSystemProperties() {
@@ -148,13 +153,11 @@ public class DlockAndTxlockRegressionTest extends JUnit4CacheTestCase {
       }
 
       Throwable failure = null;
-      int asyncIndex = 0;
       for (AsyncInvocation asyncInvocation : asyncInvocations) {
         asyncInvocation.join(30000);
         if (asyncInvocation.exceptionOccurred()) {
           failure = asyncInvocation.getException();
         }
-        asyncIndex++;
       }
       if (failure != null) {
         throw new RuntimeException("test failed", failure);
@@ -170,8 +173,14 @@ public class DlockAndTxlockRegressionTest extends JUnit4CacheTestCase {
     }
   }
 
-  public void forceDisconnect() {
+  public void forceDisconnect() throws Exception {
     Cache existingCache = basicGetCache();
+    synchronized (commitLock) {
+      committing = false;
+      while (!committing) {
+        commitLock.wait();
+      }
+    }
     if (existingCache != null && !existingCache.isClosed()) {
       DistributedTestUtils.crashDistributedSystem(getCache().getDistributedSystem());
     }
@@ -209,6 +218,16 @@ public class DlockAndTxlockRegressionTest extends JUnit4CacheTestCase {
 
         region.put("TestKey", "TestValue" + random.nextInt(100000));
 
+        TXManagerImpl mgr = (TXManagerImpl) getCache().getCacheTransactionManager();
+        TXStateProxyImpl txProxy = (TXStateProxyImpl) mgr.getTXState();
+        TXState txState = (TXState) txProxy.getRealDeal(null, null);
+        txState.setBeforeSend(() -> {
+          synchronized (commitLock) {
+            committing = true;
+            commitLock.notifyAll();
+          }
+        });
+
         try {
           cache.getCacheTransactionManager().commit();
         } catch (CommitConflictException e) {

-- 
To stop receiving notification emails like this one, please contact
bschuchardt@apache.org.