You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2018/05/01 20:35:45 UTC
[geode] branch develop updated: GEODE-5155 hang recovering
transaction state for crashed server
This is an automated email from the ASF dual-hosted git repository.
bschuchardt pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 2a02923 GEODE-5155 hang recovering transaction state for crashed server
2a02923 is described below
commit 2a02923a2fe3ead102ff79e76c47935e84f76859
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Tue May 1 13:34:39 2018 -0700
GEODE-5155 hang recovering transaction state for crashed server
The fix is to check to see if the message was removed due to a
memberDeparted event.
When that happens a departureNoticed flag was being set in TXCommitMessage
but the wait loop in the transaction tracker wasn't checking this flag.
---
.../geode/internal/cache/TXCommitMessage.java | 9 +++++++-
.../geode/internal/cache/TXFarSideCMTracker.java | 20 ++++++++---------
.../internal/DlockAndTxlockRegressionTest.java | 25 +++++++++++++++++++---
3 files changed, 40 insertions(+), 14 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
index d78f60a..8354430 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
@@ -1943,15 +1943,20 @@ public class TXCommitMessage extends PooledDistributionMessage
public void quorumLost(DistributionManager distributionManager,
Set<InternalDistributedMember> failures, List<InternalDistributedMember> remaining) {}
+ /** return true if the member initiating this transaction has left the cluster */
+ public boolean isDepartureNoticed() {
+ return departureNoticed;
+ }
+
@Override
public void memberDeparted(DistributionManager distributionManager,
final InternalDistributedMember id, boolean crashed) {
+
if (!getSender().equals(id)) {
return;
}
this.dm.removeMembershipListener(this);
- ThreadGroup group = LoggingThreadGroup.createThreadGroup("TXCommitMessage Threads", logger);
synchronized (this) {
if (isProcessing() || this.departureNoticed) {
if (logger.isDebugEnabled()) {
@@ -1963,6 +1968,8 @@ public class TXCommitMessage extends PooledDistributionMessage
this.departureNoticed = true;
}
+ ThreadGroup group = LoggingThreadGroup.createThreadGroup("TXCommitMessage Threads", logger);
+
// Send message to fellow FarSiders (aka recipients), if any, to
// determine if any one of them have received a CommitProcessMessage
if (this.farSiders != null && !this.farSiders.isEmpty()) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXFarSideCMTracker.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXFarSideCMTracker.java
index 8f2093f..eff9fa3 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXFarSideCMTracker.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXFarSideCMTracker.java
@@ -162,23 +162,23 @@ public class TXFarSideCMTracker {
* departed/ing Originator (this will most likely be called nearly the same time as
* commitProcessReceived
*/
- public void waitToProcess(TXLockId lk, DistributionManager dm) {
- waitForMemberToDepart(lk.getMemberId(), dm);
- final TXCommitMessage mess;
+ public void waitToProcess(TXLockId lockId, DistributionManager dm) {
+ waitForMemberToDepart(lockId.getMemberId(), dm);
+ final TXCommitMessage commitMessage;
synchronized (this.txInProgress) {
- mess = (TXCommitMessage) this.txInProgress.get(lk);
+ commitMessage = (TXCommitMessage) this.txInProgress.get(lockId);
}
- if (mess != null) {
- synchronized (mess) {
+ if (commitMessage != null) {
+ synchronized (commitMessage) {
// tx in progress, we must wait until its done
- while (!mess.wasProcessed()) {
+ while (!(commitMessage.wasProcessed() || commitMessage.isDepartureNoticed())) {
try {
- mess.wait();
+ commitMessage.wait(100);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
logger.error(LocalizedMessage.create(
LocalizedStrings.TxFarSideTracker_WAITING_TO_COMPLETE_ON_MESSAGE_0_CAUGHT_AN_INTERRUPTED_EXCEPTION,
- mess), ie);
+ commitMessage), ie);
break;
}
}
@@ -186,7 +186,7 @@ public class TXFarSideCMTracker {
} else {
// tx may have completed
for (int i = this.txHistory.length - 1; i >= 0; --i) {
- if (lk.equals(this.txHistory[i])) {
+ if (lockId.equals(this.txHistory[i])) {
return;
}
}
diff --git a/geode-core/src/test/java/org/apache/geode/distributed/internal/DlockAndTxlockRegressionTest.java b/geode-core/src/test/java/org/apache/geode/distributed/internal/DlockAndTxlockRegressionTest.java
index 8e19b73..3298a83 100644
--- a/geode-core/src/test/java/org/apache/geode/distributed/internal/DlockAndTxlockRegressionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/distributed/internal/DlockAndTxlockRegressionTest.java
@@ -33,6 +33,9 @@ import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.distributed.ConfigurationProperties;
import org.apache.geode.distributed.DistributedLockService;
import org.apache.geode.internal.OSProcess;
+import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.TXState;
+import org.apache.geode.internal.cache.TXStateProxyImpl;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.DistributedTestUtils;
@@ -54,6 +57,8 @@ public class DlockAndTxlockRegressionTest extends JUnit4CacheTestCase {
@Rule
public DistributedRestoreSystemProperties restoreSystemProperties =
new DistributedRestoreSystemProperties();
+ private static Object commitLock = new Object();
+ private static boolean committing;
@Override
public Properties getDistributedSystemProperties() {
@@ -148,13 +153,11 @@ public class DlockAndTxlockRegressionTest extends JUnit4CacheTestCase {
}
Throwable failure = null;
- int asyncIndex = 0;
for (AsyncInvocation asyncInvocation : asyncInvocations) {
asyncInvocation.join(30000);
if (asyncInvocation.exceptionOccurred()) {
failure = asyncInvocation.getException();
}
- asyncIndex++;
}
if (failure != null) {
throw new RuntimeException("test failed", failure);
@@ -170,8 +173,14 @@ public class DlockAndTxlockRegressionTest extends JUnit4CacheTestCase {
}
}
- public void forceDisconnect() {
+ public void forceDisconnect() throws Exception {
Cache existingCache = basicGetCache();
+ synchronized (commitLock) {
+ committing = false;
+ while (!committing) {
+ commitLock.wait();
+ }
+ }
if (existingCache != null && !existingCache.isClosed()) {
DistributedTestUtils.crashDistributedSystem(getCache().getDistributedSystem());
}
@@ -209,6 +218,16 @@ public class DlockAndTxlockRegressionTest extends JUnit4CacheTestCase {
region.put("TestKey", "TestValue" + random.nextInt(100000));
+ TXManagerImpl mgr = (TXManagerImpl) getCache().getCacheTransactionManager();
+ TXStateProxyImpl txProxy = (TXStateProxyImpl) mgr.getTXState();
+ TXState txState = (TXState) txProxy.getRealDeal(null, null);
+ txState.setBeforeSend(() -> {
+ synchronized (commitLock) {
+ committing = true;
+ commitLock.notifyAll();
+ }
+ });
+
try {
cache.getCacheTransactionManager().commit();
} catch (CommitConflictException e) {
--
To stop receiving notification emails like this one, please contact
bschuchardt@apache.org.