You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2016/10/28 21:41:39 UTC
[85/98] [abbrv] incubator-geode git commit: GEODE-2024 Deadlock
creating a new lock service Grantor
GEODE-2024 Deadlock creating a new lock service Grantor
This change-set causes the code in TXLockServiceImpl.release() to
perform periodic checks to see if grantor recovery is being performed.
If so it skips releaseTryLocks, which requires a recovered grantor to
function. This is in line with the previous attempts to fix this
problem. The recovery message that is trying to obtain the recovery
write-lock now sets the "recovering" state in TXLockServiceImpl prior
to attempting to get the lock so that it is set when
TXLockServiceImpl.release() checks its state.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/f02ea36f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/f02ea36f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/f02ea36f
Branch: refs/heads/feature/GEM-983
Commit: f02ea36f2e3a440e8aa39815539f3aa2855ce124
Parents: 69a0877
Author: Bruce Schuchardt <bs...@pivotal.io>
Authored: Wed Oct 26 13:51:20 2016 -0700
Committer: Bruce Schuchardt <bs...@pivotal.io>
Committed: Wed Oct 26 13:53:00 2016 -0700
----------------------------------------------------------------------
.../locks/DLockRecoverGrantorProcessor.java | 16 +-
.../internal/locks/DLockService.java | 108 ++++++----
.../internal/cache/locks/TXLockServiceImpl.java | 35 ++--
.../locks/TXRecoverGrantorMessageProcessor.java | 8 +-
.../cache/locks/TXLockServiceDUnitTest.java | 210 ++++++++++++++-----
5 files changed, 272 insertions(+), 105 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f02ea36f/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockRecoverGrantorProcessor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockRecoverGrantorProcessor.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockRecoverGrantorProcessor.java
index 37fbfbe..2a48308 100755
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockRecoverGrantorProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockRecoverGrantorProcessor.java
@@ -91,7 +91,7 @@ public class DLockRecoverGrantorProcessor extends ReplyProcessor21 {
// process msg and reply from this VM...
if (msg.getSender() == null)
msg.setSender(dm.getId());
- msg.processMessage(dm);
+ msg.scheduleMessage(dm);
// keep waiting even if interrupted
try {
@@ -239,6 +239,20 @@ public class DLockRecoverGrantorProcessor extends ReplyProcessor21 {
processMessage(dm);
}
+ /**
+ * For unit testing we need to push the message through scheduleAction so that message observers
+ * are invoked
+ *
+ * @param dm the distribution manager
+ */
+ protected void scheduleMessage(DM dm) {
+ if (dm instanceof DistributionManager) {
+ super.scheduleAction((DistributionManager) dm);
+ } else {
+ processMessage(dm);
+ }
+ }
+
protected void processMessage(DM dm) {
MessageProcessor processor = nullServiceProcessor;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f02ea36f/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockService.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockService.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockService.java
index a859299..ca012d3 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockService.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockService.java
@@ -15,8 +15,18 @@
package org.apache.geode.distributed.internal.locks;
-import org.apache.geode.*;
-import org.apache.geode.distributed.*;
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.CancelException;
+import org.apache.geode.InternalGemFireError;
+import org.apache.geode.InternalGemFireException;
+import org.apache.geode.StatisticsFactory;
+import org.apache.geode.SystemFailure;
+import org.apache.geode.distributed.DistributedLockService;
+import org.apache.geode.distributed.DistributedSystem;
+import org.apache.geode.distributed.DistributedSystemDisconnectedException;
+import org.apache.geode.distributed.LeaseExpiredException;
+import org.apache.geode.distributed.LockNotHeldException;
+import org.apache.geode.distributed.LockServiceDestroyedException;
import org.apache.geode.distributed.internal.DM;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
@@ -39,8 +49,18 @@ import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
/**
@@ -290,7 +310,7 @@ public class DLockService extends DistributedLockService {
statStart = getStats().startGrantorWait();
if (!ownLockGrantorFutureResult) {
LockGrantorId lockGrantorIdRef =
- waitForLockGrantorFutureResult(lockGrantorFutureResultRef);
+ waitForLockGrantorFutureResult(lockGrantorFutureResultRef, 0, TimeUnit.MILLISECONDS);
if (lockGrantorIdRef != null) {
return lockGrantorIdRef;
} else {
@@ -366,7 +386,7 @@ public class DLockService extends DistributedLockService {
/**
* Creates a local {@link DLockGrantor}.
- *
+ *
* if (!createLocalGrantor(xxx)) { theLockGrantorId = this.lockGrantorId; }
*
* @param elder the elder that told us to be the grantor
@@ -727,15 +747,24 @@ public class DLockService extends DistributedLockService {
* Returns lockGrantorId when lockGrantorFutureResultRef has been set by another thread.
*
* @param lockGrantorFutureResultRef FutureResult to wait for
+ * @param timeToWait how many ms to wait, 0 = forever
+ * @param timeUnit the unit of measure for timeToWait
* @return the LockGrantorId or null if FutureResult was cancelled
*/
- private LockGrantorId waitForLockGrantorFutureResult(FutureResult lockGrantorFutureResultRef) {
+ private LockGrantorId waitForLockGrantorFutureResult(FutureResult lockGrantorFutureResultRef,
+ long timeToWait, final TimeUnit timeUnit) {
LockGrantorId lockGrantorIdRef = null;
while (lockGrantorIdRef == null) {
boolean interrupted = Thread.interrupted();
try {
checkDestroyed();
- lockGrantorIdRef = (LockGrantorId) lockGrantorFutureResultRef.get();
+ if (timeToWait == 0) {
+ lockGrantorIdRef = (LockGrantorId) lockGrantorFutureResultRef.get();
+ } else {
+ lockGrantorIdRef = (LockGrantorId) lockGrantorFutureResultRef.get(timeToWait, timeUnit);
+ }
+ } catch (TimeoutException e) {
+ break;
} catch (InterruptedException e) {
interrupted = true;
this.dm.getCancelCriterion().checkCancelInProgress(e);
@@ -757,7 +786,14 @@ public class DLockService extends DistributedLockService {
return lockGrantorIdRef;
}
- private void notLockGrantorId(LockGrantorId notLockGrantorId, boolean waitForGrantor) {
+ /**
+ * nulls out grantor to force call to elder
+ *
+ * @param timeToWait how long to wait for a new grantor. -1 don't wait, 0 no time limit
+ * @param timeUnit the unit of measure of timeToWait
+ */
+ private void notLockGrantorId(LockGrantorId notLockGrantorId, long timeToWait,
+ final TimeUnit timeUnit) {
if (notLockGrantorId.isLocal(getSerialNumber())) {
if (logger.isTraceEnabled(LogMarker.DLS)) {
logger.trace(LogMarker.DLS,
@@ -793,8 +829,8 @@ public class DLockService extends DistributedLockService {
statStart = getStats().startGrantorWait();
if (!ownLockGrantorFutureResult) {
- if (waitForGrantor) { // fix for bug #43708
- waitForLockGrantorFutureResult(lockGrantorFutureResultRef);
+ if (timeToWait >= 0) {
+ waitForLockGrantorFutureResult(lockGrantorFutureResultRef, timeToWait, timeUnit);
}
return;
}
@@ -947,7 +983,7 @@ public class DLockService extends DistributedLockService {
}
}
if (!ownLockGrantorFutureResult) {
- waitForLockGrantorFutureResult(lockGrantorFutureResultRef);
+ waitForLockGrantorFutureResult(lockGrantorFutureResultRef, 0, TimeUnit.MILLISECONDS);
continue;
}
}
@@ -1329,7 +1365,7 @@ public class DLockService extends DistributedLockService {
* will be ignored if the lock is currently held by another client.
*
* @param interruptible true if this lock request is interruptible
- *
+ *
* @param disableAlerts true to disable logging alerts if the dlock is taking a long time to
* acquired.
*
@@ -1408,7 +1444,6 @@ public class DLockService extends DistributedLockService {
LocalizedStrings.DLockService_DEBUG_LOCKINTERRUPTIBLY_HAS_GONE_HOT_AND_LOOPED_0_TIMES
.toLocalizedString(count);
-
InternalGemFireError e = new InternalGemFireError(s);
logger.error(LogMarker.DLS,
LocalizedMessage.create(
@@ -1516,7 +1551,6 @@ public class DLockService extends DistributedLockService {
}
} // else: non-reentrant or reentrant w/ non-infinite lease
-
if (gotLock) {
// if (processor != null) (cannot be null)
{ // TODO: can be null after restoring above optimization
@@ -1539,9 +1573,7 @@ public class DLockService extends DistributedLockService {
this.lockGrantorId);
}
continue;
- }
-
- else if (isDestroyed()) {
+ } else if (isDestroyed()) {
// race: dls was destroyed
if (isDebugEnabled_DLS) {
logger.trace(LogMarker.DLS,
@@ -1549,9 +1581,7 @@ public class DLockService extends DistributedLockService {
theLockGrantorId);
}
needToReleaseOrphanedGrant = true;
- }
-
- else {
+ } else {
safeExit = true;
synchronized (this.tokens) {
checkDestroyed();
@@ -1603,7 +1633,7 @@ public class DLockService extends DistributedLockService {
// grantor replied NOT_GRANTOR or departed (getLock is false)
else if (processor.repliedNotGrantor() || processor.hadNoResponse()) {
safeExit = true;
- notLockGrantorId(theLockGrantorId, true);
+ notLockGrantorId(theLockGrantorId, 0, TimeUnit.MILLISECONDS);
// keepTrying is still true... loop back around
} // grantor replied NOT_GRANTOR or departed
@@ -1912,7 +1942,7 @@ public class DLockService extends DistributedLockService {
released = true;
} finally {
if (!released) {
- notLockGrantorId(theLockGrantorId, true);
+ notLockGrantorId(theLockGrantorId, 0, TimeUnit.MILLISECONDS);
}
}
} // while !released
@@ -1966,7 +1996,7 @@ public class DLockService extends DistributedLockService {
// loop back around to get next lock grantor
} finally {
if (queryReply != null && queryReply.repliedNotGrantor()) {
- notLockGrantorId(theLockGrantorId, true);
+ notLockGrantorId(theLockGrantorId, 0, TimeUnit.MILLISECONDS);
}
}
} // while querying
@@ -2076,7 +2106,7 @@ public class DLockService extends DistributedLockService {
return this.dlockStats;
}
- public void releaseTryLocks(DLockBatchId batchId, boolean onlyIfSameGrantor) {
+ public void releaseTryLocks(DLockBatchId batchId, Callable<Boolean> untilCondition) {
final boolean isDebugEnabled_DLS = logger.isTraceEnabled(LogMarker.DLS);
if (isDebugEnabled_DLS) {
logger.trace(LogMarker.DLS, "[DLockService.releaseTryLocks] enter: {}", batchId);
@@ -2088,26 +2118,29 @@ public class DLockService extends DistributedLockService {
boolean lockBatch = true;
boolean released = false;
while (!released) {
+ try {
+ boolean quit = untilCondition.call();
+ if (quit) {
+ return;
+ }
+ } catch (Exception e) {
+ throw new InternalGemFireException("unexpected exception", e);
+ }
checkDestroyed();
LockGrantorId theLockGrantorId = null;
- if (onlyIfSameGrantor) { // this was a fix for bug #38763, from r19555
- theLockGrantorId = batchId.getLockGrantorId();
- synchronized (this.lockGrantorIdLock) {
- if (!checkLockGrantorId(theLockGrantorId)) {
- // the grantor is different so break and skip DLockReleaseProcessor
- break;
- }
+ theLockGrantorId = batchId.getLockGrantorId();
+ synchronized (this.lockGrantorIdLock) {
+ if (!checkLockGrantorId(theLockGrantorId)) {
+ // the grantor is different so break and skip DLockReleaseProcessor
+ break;
}
- } else {
- theLockGrantorId = getLockGrantorId();
}
released =
callReleaseProcessor(theLockGrantorId.getLockGrantorMember(), batchId, lockBatch, -1);
if (!released) {
- final boolean waitForGrantor = onlyIfSameGrantor; // fix for bug #43708
- notLockGrantorId(theLockGrantorId, waitForGrantor);
+ notLockGrantorId(theLockGrantorId, 100, TimeUnit.MILLISECONDS);
}
}
} finally {
@@ -2185,7 +2218,7 @@ public class DLockService extends DistributedLockService {
// should have thrown LockServiceDestroyedException
Assert.assertTrue(isDestroyed(), "Grantor reports service " + this + " is destroyed");
} else if (processor.repliedNotGrantor() || processor.hadNoResponse()) {
- notLockGrantorId(theLockGrantorId, true);
+ notLockGrantorId(theLockGrantorId, 0, TimeUnit.MILLISECONDS);
} else {
keyIfFailed[0] = processor.getKeyIfFailed();
if (keyIfFailed[0] == null) {
@@ -2455,7 +2488,8 @@ public class DLockService extends DistributedLockService {
if (theLockGrantorId != null && !theLockGrantorId.isLocal(getSerialNumber())) {
if (!NonGrantorDestroyedProcessor.send(this.serviceName, theLockGrantorId, dm)) {
// grantor responded NOT_GRANTOR
- notLockGrantorId(theLockGrantorId, true); // nulls out grantor to force call to elder
+ notLockGrantorId(theLockGrantorId, 0, TimeUnit.MILLISECONDS); // nulls out grantor to
+ // force call to elder
retry = true;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f02ea36f/geode-core/src/main/java/org/apache/geode/internal/cache/locks/TXLockServiceImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/locks/TXLockServiceImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/locks/TXLockServiceImpl.java
index f4ab02f..717d878 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/locks/TXLockServiceImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/locks/TXLockServiceImpl.java
@@ -15,13 +15,6 @@
package org.apache.geode.internal.cache.locks;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.logging.log4j.Logger;
-
import org.apache.geode.cache.CommitConflictException;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.ReplyException;
@@ -32,6 +25,12 @@ import org.apache.geode.distributed.internal.membership.InternalDistributedMembe
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.util.concurrent.StoppableReentrantReadWriteLock;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
/** Provides clean separation of implementation from public facade */
public class TXLockServiceImpl extends TXLockService {
@@ -50,13 +49,14 @@ public class TXLockServiceImpl extends TXLockService {
/** Instance of dlock service to use */
private DLockService dlock;
- /** List of active txLockIds */
+ /**
+ * List of active txLockIds
+ */
protected List txLockIdList = new ArrayList();
/**
* True if grantor recovery is in progress; used to keep <code>release</code> from waiting for
- * grantor. TODO: this boolean can probably be removed... it was insufficient and new fixes for
- * bug 38763 have the side effect of making this boolean obsolete (verify before removal!)
+ * grantor.
*/
private volatile boolean recovering = false;
@@ -225,10 +225,11 @@ public class TXLockServiceImpl extends TXLockService {
LocalizedStrings.TXLockServiceImpl_INVALID_TXLOCKID_NOT_FOUND_0
.toLocalizedString(txLockId));
}
- // only release w/ dlock if not in middle of recovery...
- if (!this.recovering) {
- this.dlock.releaseTryLocks(txLockId, true);
- }
+
+ this.dlock.releaseTryLocks(txLockId, () -> {
+ return this.recovering;
+ });
+
this.txLockIdList.remove(txLockId);
releaseRecoveryReadLock();
}
@@ -243,10 +244,14 @@ public class TXLockServiceImpl extends TXLockService {
// Internal implementation methods
// -------------------------------------------------------------------------
+ boolean isRecovering() {
+ return this.recovering;
+ }
+
/** Delays grantor recovery replies until finished with locks */
void acquireRecoveryWriteLock() throws InterruptedException {
- this.recoveryLock.writeLock().lockInterruptibly();
this.recovering = true;
+ this.recoveryLock.writeLock().lockInterruptibly();
}
void releaseRecoveryWriteLock() {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f02ea36f/geode-core/src/main/java/org/apache/geode/internal/cache/locks/TXRecoverGrantorMessageProcessor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/locks/TXRecoverGrantorMessageProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/locks/TXRecoverGrantorMessageProcessor.java
old mode 100755
new mode 100644
index 77dec94..7ae2d2b
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/locks/TXRecoverGrantorMessageProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/locks/TXRecoverGrantorMessageProcessor.java
@@ -15,10 +15,6 @@
package org.apache.geode.internal.cache.locks;
-import java.util.concurrent.RejectedExecutionException;
-
-import org.apache.logging.log4j.Logger;
-
import org.apache.geode.distributed.internal.DM;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.ReplyException;
@@ -30,11 +26,13 @@ import org.apache.geode.internal.cache.TXCommitMessage;
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.log4j.LocalizedMessage;
+import org.apache.logging.log4j.Logger;
+
+import java.util.concurrent.RejectedExecutionException;
/**
* Provides processing of DLockRecoverGrantorProcessor. Reply will not be sent until all locks are
* released.
- *
*/
public class TXRecoverGrantorMessageProcessor
implements DLockRecoverGrantorProcessor.MessageProcessor {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f02ea36f/geode-core/src/test/java/org/apache/geode/internal/cache/locks/TXLockServiceDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/locks/TXLockServiceDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/locks/TXLockServiceDUnitTest.java
old mode 100755
new mode 100644
index fb16ea9..6a5eae8
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/locks/TXLockServiceDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/locks/TXLockServiceDUnitTest.java
@@ -14,29 +14,25 @@
*/
package org.apache.geode.internal.cache.locks;
-import static org.apache.geode.distributed.ConfigurationProperties.*;
-import static org.junit.Assert.*;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
+import static com.jayway.awaitility.Awaitility.await;
+import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
import org.apache.geode.cache.CommitConflictException;
import org.apache.geode.distributed.DistributedLockService;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.internal.DM;
+import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.DistributionMessage;
+import org.apache.geode.distributed.internal.DistributionMessageObserver;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.ReplyProcessor21;
import org.apache.geode.distributed.internal.locks.DLockRecoverGrantorProcessor;
+import org.apache.geode.distributed.internal.locks.DLockRecoverGrantorProcessor.DLockRecoverGrantorMessage;
import org.apache.geode.distributed.internal.locks.DLockService;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.TXRegionLockRequestImpl;
@@ -44,9 +40,19 @@ import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.Invoke;
import org.apache.geode.test.dunit.LogWriterUtils;
import org.apache.geode.test.dunit.SerializableRunnable;
-import org.apache.geode.test.dunit.ThreadUtils;
import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
import org.apache.geode.test.junit.categories.DistributedTest;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
/**
* This class tests distributed ownership via the DistributedLockService api.
@@ -75,22 +81,10 @@ public class TXLockServiceDUnitTest extends JUnit4DistributedTestCase {
*/
@Override
public final void postSetUp() throws Exception {
- // Create a DistributedSystem in every VM
+ Invoke.invokeInEveryVM("connectDistributedSystem", () -> connectDistributedSystem());
connectDistributedSystem();
-
- for (int h = 0; h < Host.getHostCount(); h++) {
- Host host = Host.getHost(h);
-
- for (int v = 0; v < host.getVMCount(); v++) {
- // host.getVM(v).invoke(() -> TXLockServiceDUnitTest.dumpStack());
- host.getVM(v).invoke(TXLockServiceDUnitTest.class, "connectDistributedSystem", null);
- }
- }
}
- public static void dumpStack() {
- org.apache.geode.internal.OSProcess.printStacks(0);
- }
@Override
public final void preTearDown() throws Exception {
@@ -124,16 +118,13 @@ public class TXLockServiceDUnitTest extends JUnit4DistributedTestCase {
*/
}
- @Ignore("TODO: test is disabled")
@Test
public void testGetAndDestroyAgain() {
testGetAndDestroy();
}
- @Ignore("TODO: test is disabled")
@Test
public void testTXRecoverGrantorMessageProcessor() throws Exception {
- LogWriterUtils.getLogWriter().info("[testTXOriginatorRecoveryProcessor]");
TXLockService.createDTLS();
checkDLockRecoverGrantorMessageProcessor();
@@ -162,29 +153,158 @@ public class TXLockServiceDUnitTest extends JUnit4DistributedTestCase {
msg.setProcessorId(testProc.getProcessorId());
msg.setSender(dlock.getDistributionManager().getId());
- Thread thread = new Thread(new Runnable() {
- public void run() {
- TXRecoverGrantorMessageProcessor proc =
- (TXRecoverGrantorMessageProcessor) dlock.getDLockRecoverGrantorMessageProcessor();
- proc.processDLockRecoverGrantorMessage(dlock.getDistributionManager(), msg);
- }
+ Thread thread = new Thread(() -> {
+ TXRecoverGrantorMessageProcessor proc =
+ (TXRecoverGrantorMessageProcessor) dlock.getDLockRecoverGrantorMessageProcessor();
+ proc.processDLockRecoverGrantorMessage(dlock.getDistributionManager(), msg);
});
+ thread.setName("TXLockServiceDUnitTest thread");
+ thread.setDaemon(true);
thread.start();
- // pause to allow thread to be blocked before we release the lock
- sleep(999);
+ await("waiting for recovery message to block").atMost(999, TimeUnit.MILLISECONDS).until(() -> {
+ return ((TXLockServiceImpl) dtls).isRecovering();
+ });
- // release txLock
dtls.release(txLockId);
- // check results to verify no locks were provided in reply
- ThreadUtils.join(thread, 30 * 1000);
+ // check results to verify no locks were provided in the reply
+ await("waiting for thread to exit").atMost(30, TimeUnit.SECONDS).until(() -> {
+ return !thread.isAlive();
+ });
+
+ assertFalse(((TXLockServiceImpl) dtls).isRecovering());
+
assertEquals("testTXRecoverGrantor_replyCode_PASS is false", true,
testTXRecoverGrantor_replyCode_PASS);
assertEquals("testTXRecoverGrantor_heldLocks_PASS is false", true,
testTXRecoverGrantor_heldLocks_PASS);
}
+
+ @Test
+ public void testTXGrantorMigration() throws Exception {
+ // first make sure some other VM is the grantor
+ Host.getHost(0).getVM(0).invoke("become lock grantor", () -> {
+ TXLockService.createDTLS();
+ TXLockService vm0dtls = TXLockService.getDTLS();
+ DLockService vm0dlock = ((TXLockServiceImpl) vm0dtls).getInternalDistributedLockService();
+ vm0dlock.becomeLockGrantor();
+ });
+
+ TXLockService.createDTLS();
+ checkDLockRecoverGrantorMessageProcessor();
+
+ /*
+ * call TXRecoverGrantorMessageProcessor.process directly to make sure that correct behavior
+ * occurs
+ */
+
+ // get txLock and hold it
+ final List regionLockReqs = new ArrayList();
+ regionLockReqs.add(new TXRegionLockRequestImpl("/testTXRecoverGrantorMessageProcessor2",
+ new HashSet(Arrays.asList(new String[] {"KEY-1", "KEY-2", "KEY-3", "KEY-4"}))));
+ TXLockService dtls = TXLockService.getDTLS();
+ TXLockId txLockId = dtls.txLock(regionLockReqs, Collections.EMPTY_SET);
+
+ final DLockService dlock = ((TXLockServiceImpl) dtls).getInternalDistributedLockService();
+
+ // GEODE-2024: now cause grantor migration while holding the recoveryReadLock.
+ // It will lock up in TXRecoverGrantorMessageProcessor until the recoveryReadLock
+ // is released. Demonstrate that dtls.release() does not block forever and releases the
+ // recoveryReadLock
+ // allowing grantor migration to finish
+
+ // create an observer that will block recovery messages from being processed
+ MessageObserver observer = new MessageObserver();
+ DistributionMessageObserver.setInstance(observer);
+
+ try {
+ System.out.println("starting thread to take over being lock grantor from vm0");
+
+ // become the grantor - this will block waiting for a reply to the message blocked by the
+ // observer
+ Thread thread = new Thread(() -> {
+ dlock.becomeLockGrantor();
+ });
+ thread.setName("TXLockServiceDUnitTest thread2");
+ thread.setDaemon(true);
+ thread.start();
+
+ await("waiting for recovery to begin").atMost(10, TimeUnit.SECONDS).until(() -> {
+ return observer.isPreventingProcessing();
+ });
+
+
+ // spawn a thread that will unblock message processing
+ // so that TXLockServiceImpl's "recovering" variable will be set
+ System.out.println("starting a thread to unblock recovery in 5 seconds");
+ Thread unblockThread = new Thread(() -> {
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException("sleep interrupted");
+ }
+ System.out.println("releasing block of recovery message processing");
+ observer.releasePreventionOfProcessing();
+ });
+ unblockThread.setName("TXLockServiceDUnitTest unblockThread");
+ unblockThread.setDaemon(true);
+ unblockThread.start();
+
+ // release txLock - this will block until unblockThread tells the observer
+ // that it can process its message. Then it should release the recovery read-lock
+ // allowing the grantor to finish recovery
+ System.out.println("releasing transaction locks, which should block for a bit");
+ dtls.release(txLockId);
+
+ await("waiting for recovery to finish").atMost(10, TimeUnit.SECONDS).until(() -> {
+ return !((TXLockServiceImpl) dtls).isRecovering();
+ });
+ } finally {
+ observer.releasePreventionOfProcessing();
+ DistributionMessageObserver.setInstance(null);
+ }
+ }
+
+ static class MessageObserver extends DistributionMessageObserver {
+ final boolean[] preventingMessageProcessing = new boolean[] {false};
+ final boolean[] preventMessageProcessing = new boolean[] {true};
+
+
+ public boolean isPreventingProcessing() {
+ synchronized (preventingMessageProcessing) {
+ return preventingMessageProcessing[0];
+ }
+ }
+
+ public void releasePreventionOfProcessing() {
+ synchronized (preventMessageProcessing) {
+ preventMessageProcessing[0] = false;
+ }
+ }
+
+ @Override
+ public void beforeProcessMessage(DistributionManager dm, DistributionMessage message) {
+ if (message instanceof DLockRecoverGrantorMessage) {
+ synchronized (preventingMessageProcessing) {
+ preventingMessageProcessing[0] = true;
+ }
+ synchronized (preventMessageProcessing) {
+ while (preventMessageProcessing[0]) {
+ try {
+ preventMessageProcessing.wait(50);
+ } catch (InterruptedException e) {
+ throw new RuntimeException("sleep interrupted");
+ }
+ }
+ }
+ }
+ }
+
+ }
+
+
protected static volatile TXLockId testTXLock_TXLockId;
@Test
@@ -384,7 +504,6 @@ public class TXLockServiceDUnitTest extends JUnit4DistributedTestCase {
});
Host.getHost(0).getVM(originatorVM).invoke(() -> disconnectFromDS());
-
// grantor sends TXOriginatorRecoveryMessage...
// TODO: verify processing of message? and have test sleep until finished
sleep(200);
@@ -456,7 +575,7 @@ public class TXLockServiceDUnitTest extends JUnit4DistributedTestCase {
/**
* Creates a new DistributedLockService in a remote VM.
- *
+ *
* @param name The name of the newly-created DistributedLockService. It is recommended that the
* name of the Region be the {@link #getUniqueName()} of the test, or at least derive from
* it.
@@ -594,9 +713,6 @@ public class TXLockServiceDUnitTest extends JUnit4DistributedTestCase {
/**
* Accessed via reflection. DO NOT REMOVE
- *
- * @param key
- * @return
*/
protected static Boolean unlock_DTLS(Object key) {
TXLockService dtls = TXLockService.getDTLS();