You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/05/10 21:09:49 UTC
[02/16] incubator-geode git commit: Moving a distributed lock service
unit test to open-source.
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2c148caa/geode-core/src/test/java/com/gemstone/gemfire/distributed/DistributedLockServiceDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/distributed/DistributedLockServiceDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/distributed/DistributedLockServiceDUnitTest.java
new file mode 100755
index 0000000..42c3f01
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/distributed/DistributedLockServiceDUnitTest.java
@@ -0,0 +1,3246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.distributed;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import junit.framework.Assert;
+import junit.framework.AssertionFailedError;
+
+import com.gemstone.gemfire.SystemFailure;
+import com.gemstone.gemfire.distributed.internal.DistributionManager;
+import com.gemstone.gemfire.distributed.internal.DistributionMessage;
+import com.gemstone.gemfire.distributed.internal.DistributionMessageObserver;
+import com.gemstone.gemfire.distributed.internal.locks.DLockGrantor;
+import com.gemstone.gemfire.distributed.internal.locks.DLockRemoteToken;
+import com.gemstone.gemfire.distributed.internal.locks.DLockRequestProcessor;
+import com.gemstone.gemfire.distributed.internal.locks.DLockRequestProcessor.DLockRequestMessage;
+import com.gemstone.gemfire.distributed.internal.locks.DLockRequestProcessor.DLockResponseMessage;
+import com.gemstone.gemfire.distributed.internal.locks.DLockService;
+import com.gemstone.gemfire.distributed.internal.locks.DLockToken;
+import com.gemstone.gemfire.distributed.internal.locks.RemoteThread;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.util.StopWatch;
+import com.gemstone.gemfire.test.dunit.AsyncInvocation;
+import com.gemstone.gemfire.test.dunit.DistributedTestCase;
+import com.gemstone.gemfire.test.dunit.Host;
+import com.gemstone.gemfire.test.dunit.Invoke;
+import com.gemstone.gemfire.test.dunit.LogWriterUtils;
+import com.gemstone.gemfire.test.dunit.RMIException;
+import com.gemstone.gemfire.test.dunit.SerializableCallable;
+import com.gemstone.gemfire.test.dunit.SerializableRunnable;
+import com.gemstone.gemfire.test.dunit.ThreadUtils;
+import com.gemstone.gemfire.test.dunit.VM;
+import com.gemstone.gemfire.test.dunit.Wait;
+import com.gemstone.gemfire.test.dunit.WaitCriterion;
+
+/**
+ * This class tests distributed ownership via the DistributedLockService api.
+ */
+public class DistributedLockServiceDUnitTest extends DistributedTestCase {
+
+ protected static DistributedSystem dlstSystem;
+ private static DistributedLockBlackboard blackboard;
+ protected static Object monitor = new Object();
+
+ private int hits = 0;
+ private int completes = 0;
+ private boolean done;
+ private boolean got;
+
+
+ public DistributedLockServiceDUnitTest(String name) {
+ super(name);
+ if (blackboard == null) {
+ try {
+ blackboard = DistributedLockBlackboardImpl.getInstance();
+ } catch (Exception e) {
+ throw new RuntimeException("initialization error", e);
+ }
+ }
+ }
+
+ /////////// Test lifecycle //////////
+
+ public static void caseSetUp() throws Exception {
+ disconnectAllFromDS();
+ }
+
+ public static void caseTearDown() throws Exception {
+ disconnectAllFromDS();
+ }
+
+ /**
+ * Returns a previously created (or new, if this is the first
+ * time this method is called in this VM) distributed system
+ * which is somewhat configurable via hydra test parameters.
+ */
+ @Override
+ public final void postSetUp() throws Exception {
+ // Create a DistributedSystem in every VM
+ connectDistributedSystem();
+
+ for (int h = 0; h < Host.getHostCount(); h++) {
+ Host host = Host.getHost(h);
+
+ for (int v = 0; v < host.getVMCount(); v++) {
+ host.getVM(v).invoke(
+ DistributedLockServiceDUnitTest.class, "connectDistributedSystem", null);
+ }
+ }
+ }
+
+ @Override
+ public final void preTearDown() throws Exception {
+ Invoke.invokeInEveryVM(DistributedLockServiceDUnitTest.class,
+ "destroyAllDLockServices");
+// invokeInEveryVM(DistributedLockServiceDUnitTest.class,
+// "remoteDumpAllDLockServices");
+
+ //InternalDistributedLockService.destroyAll();
+
+// // Disconnects the DistributedSystem in every VM - since
+// // each test randomly chooses whether shared memory is used
+// disconnectAllFromDS();
+
+ this.lockGrantor = null;
+ }
+
+ public static void destroyAllDLockServices() {
+ DLockService.destroyAll();
+ dlstSystem = null;
+ }
+
+ public static void remoteDumpAllDLockServices() {
+ DLockService.dumpAllServices();
+ }
+
+ ///////// Remote setup/teardown support
+
+ /**
+ * Connects a DistributedSystem, saves it in static variable "system"
+ */
+ protected static void connectDistributedSystem() {
+ dlstSystem = (new DistributedLockServiceDUnitTest("dummy")).getSystem();
+ }
+
+ ///////// Public test methods
+
+ public void testBasic() {
+ String serviceName = getUniqueName();
+ String objectName = "object";
+
+ // Create service
+ DistributedLockService service = DistributedLockService.create(serviceName, dlstSystem);
+
+ // Not locked initially
+ assertFalse(service.isHeldByCurrentThread(objectName));
+
+ // Get lock
+ assertTrue(service.lock(objectName, 3000, -1));
+ assertTrue(service.isHeldByCurrentThread(objectName));
+ assertTrue(service.lock(objectName, 3000, -1));
+ assertTrue(service.isHeldByCurrentThread(objectName));
+
+ // Release lock
+ service.unlock(objectName);
+ assertTrue(service.isHeldByCurrentThread(objectName));
+ service.unlock(objectName);
+ assertFalse(service.isHeldByCurrentThread(objectName));
+
+ // Destroy service
+ DistributedLockService.destroy(serviceName);
+ }
+
+ public void testCreateDestroy() throws Exception {
+ final String serviceName = getUniqueName();
+ final String abc = "abc";
+
+ // create and destroy dls
+ assertNull(DistributedLockService.getServiceNamed(serviceName));
+ DistributedLockService service = DistributedLockService.create(serviceName, getSystem());
+ assertSame(service, DistributedLockService.getServiceNamed(serviceName));
+ DistributedLockService.destroy(serviceName);
+
+ // assert attempt to use dls throws LockServiceDestroyedException
+ try {
+ service.lock(abc, -1, -1);
+ fail("didn't get LockServiceDestroyedException");
+ } catch (LockServiceDestroyedException ex) {
+ }
+
+ // assert that destroyed dls is no longer available
+ service = DistributedLockService.getServiceNamed(serviceName);
+ assertNull("" + service, service);
+
+ // recreate the dls
+ service = DistributedLockService.create(serviceName, getSystem());
+ assertTrue(!((DLockService)service).isDestroyed());
+ ((DLockService)service).checkDestroyed();
+
+ // get the same dls from another thread and hold a lock
+ Thread thread = new Thread(new Runnable() {
+ public void run() {
+ DistributedLockService dls =
+ DistributedLockService.getServiceNamed(serviceName);
+ assertTrue(!((DLockService)dls).isDestroyed());
+ ((DLockService)dls).checkDestroyed();
+ dls.lock(abc, -1, -1); // get lock on abc and hold it
+ }
+ });
+ thread.start();
+ ThreadUtils.join(thread, 30 * 1000);
+
+ // start a new thread to wait for lock on abc
+ AsyncInvocation remoteWaitingThread =
+ Host.getHost(0).getVM(0).invokeAsync(new SerializableRunnable() {
+ public void run() {
+ DistributedLockService dls =
+ DistributedLockService.create(serviceName, getSystem());
+ try {
+ dls.lock(abc, -1, -1); // waiting to get lock abc
+ fail("remoteWaitingThread got lock after dls destroyed");
+ }
+ catch (LockServiceDestroyedException expected) {
+ return;
+ }
+ fail("remoteWaitingThread lock failed to throw LockServiceDestroyedException");
+ }
+ });
+
+ // loop will handle race condition with 1 sec sleep and retry
+ int retry = 10;
+ for (int i = 0; i < retry; i++) {
+ try {
+ // destroy DLS and free up remoteWaitingThread
+ Host.getHost(0).getVM(0).invoke(new SerializableRunnable() {
+ public void run() {
+ DistributedLockService.destroy(serviceName);
+ }
+ });
+ }
+ catch (RMIException e) {
+ // race condition: remoteWaitingThread probably hasn't created DLS yet
+ if (i < retry && e.getCause() instanceof IllegalArgumentException) {
+ sleep(1000);
+ continue;
+ }
+ else {
+ throw e;
+ }
+ }
+ break; // completed so break out of loop
+ }
+
+ DistributedLockService.destroy(serviceName);
+
+ // make sure remoteWaitingThread stopped waiting and threw LockServiceDestroyedException
+ ThreadUtils.join(remoteWaitingThread, 10 * 1000);
+ if (remoteWaitingThread.exceptionOccurred()) {
+ Throwable e = remoteWaitingThread.getException();
+ com.gemstone.gemfire.test.dunit.Assert.fail(e.getMessage(), e);
+ }
+
+ // make sure LockServiceDestroyedException is thrown
+ try {
+ service.lock(abc, -1, -1);
+ fail("didn't get LockServiceDestroyedException");
+ } catch (LockServiceDestroyedException ex) {
+ }
+
+ // make sure getServiceNamed returns null
+ service = DistributedLockService.getServiceNamed(serviceName);
+ assertNull("" + service, service);
+ }
+
+ protected static DistributedLockService dls_testFairness;
+ protected static int count_testFairness[] = new int[16];
+ protected static volatile boolean stop_testFairness;
+ protected static volatile boolean[] done_testFairness = new boolean[16];
+ static { Arrays.fill(done_testFairness, true); }
+ public void testFairness() throws Exception {
+ final String serviceName = "testFairness_" + getUniqueName();
+ final Object lock = "lock";
+
+ // get the lock and hold it until all threads are ready to go
+ DistributedLockService service =
+ DistributedLockService.create(serviceName, dlstSystem);
+ assertTrue(service.lock(lock, -1, -1));
+
+ final int[] vmThreads = new int[] { 1, 4, 8, 16 };
+ forNumVMsInvoke(vmThreads.length,
+ "remoteCreateService",
+ new Object[] { serviceName });
+ sleep(100);
+
+ // line up threads for the fairness race...
+ for (int i = 0; i < vmThreads.length; i++) {
+ final int vm = i;
+ LogWriterUtils.getLogWriter().info("[testFairness] lining up " + vmThreads[vm] +
+ " threads in vm " + vm);
+
+ for (int j = 0; j < vmThreads[vm]; j++) {
+ final int thread = j;
+ /*getLogWriter().info("[testFairness] setting up thread " + thread +
+ " in vm " + vm);*/
+
+ Host.getHost(0).getVM(vm).invokeAsync(new SerializableRunnable() {
+ public void run() {
+ // lock, inc count, and unlock until stop_testFairness is set true
+ try {
+ done_testFairness[thread] = false;
+ dls_testFairness = DistributedLockService.getServiceNamed(serviceName);
+ while (!stop_testFairness) {
+ assertTrue(dls_testFairness.lock(lock, -1, -1));
+ count_testFairness[thread]++;
+ dls_testFairness.unlock(lock);
+ }
+ done_testFairness[thread] = true;
+ }
+ catch (VirtualMachineError e) {
+ SystemFailure.initiateFailure(e);
+ throw e;
+ }
+ catch (Throwable t) {
+ LogWriterUtils.getLogWriter().warning(t);
+ fail(t.getMessage());
+ }
+ }
+ });
+ }
+ }
+ sleep(500); // 500 ms
+
+ // start the race!
+ service.unlock(lock);
+ sleep(1000 * 5); // 5 seconds
+ assertTrue(service.lock(lock, -1, -1));
+
+ // stop the race...
+ for (int i = 0; i < vmThreads.length; i++) {
+ final int vm = i;
+ Host.getHost(0).getVM(vm).invoke(new SerializableRunnable() {
+ public void run() {
+ stop_testFairness = true;
+ }
+ });
+ }
+ service.unlock(lock);
+ for (int i = 0; i < vmThreads.length; i++) {
+ final int vm = i;
+ Host.getHost(0).getVM(vm).invoke(new SerializableRunnable() {
+ public void run() {
+ try {
+ boolean testIsDone = false;
+ while (!stop_testFairness || !testIsDone) {
+ testIsDone = true;
+ for (int i2 = 0; i2 < done_testFairness.length; i2++) {
+ if (!done_testFairness[i2]) testIsDone = false;
+ }
+ }
+ DistributedLockService.destroy(serviceName);
+ }
+ catch (VirtualMachineError e) {
+ SystemFailure.initiateFailure(e);
+ throw e;
+ }
+ catch (Throwable t) {
+ fail(t.getMessage());
+ }
+ }
+ });
+ }
+
+ // calc total locks granted...
+ int totalLocks = 0;
+ int minLocks = Integer.MAX_VALUE;
+ int maxLocks = 0;
+
+ // add up total locks across all vms and threads...
+ int numThreads = 0;
+ for (int i = 0; i < vmThreads.length; i++) {
+ final int vm = i;
+ for (int j = 0; j < vmThreads[vm]; j++) {
+ final int thread = j;
+ Integer count = (Integer)Host.getHost(0).getVM(vm).invoke(() -> DistributedLockServiceDUnitTest.get_count_testFairness( new Integer(thread) ));
+ int numLocks = count.intValue();
+ if (numLocks < minLocks) minLocks = numLocks;
+ if (numLocks > maxLocks) maxLocks = numLocks;
+ totalLocks = totalLocks + numLocks;
+ numThreads++;
+ }
+ }
+
+ LogWriterUtils.getLogWriter().info("[testFairness] totalLocks=" + totalLocks +
+ " minLocks=" + minLocks +
+ " maxLocks=" + maxLocks);
+
+ int expectedLocks = (totalLocks / numThreads) + 1;
+
+ int deviation = (int)(expectedLocks * 0.3);
+ int lowThreshold = expectedLocks - deviation;
+ int highThreshold = expectedLocks + deviation;
+
+ LogWriterUtils.getLogWriter().info("[testFairness] deviation=" + deviation +
+ " expectedLocks=" + expectedLocks +
+ " lowThreshold=" + lowThreshold +
+ " highThreshold=" + highThreshold);
+
+ assertTrue("minLocks is less than lowThreshold",
+ minLocks >= lowThreshold);
+ assertTrue("maxLocks is greater than highThreshold",
+ maxLocks <= highThreshold);
+ }
+
+ /**
+ * Accessed by reflection. DO NOT REMOVE
+ * @param i
+ * @return
+ */
+ public static Integer get_count_testFairness(Integer i) {
+ return new Integer(count_testFairness[i.intValue()]);
+ }
+
+ public void testOneGetsAndOthersTimeOut() throws Exception {
+ doOneGetsAndOthersTimeOut(1, 1);
+// doOneGetsAndOthersTimeOut(2, 2);
+// doOneGetsAndOthersTimeOut(3, 2);
+ doOneGetsAndOthersTimeOut(4, 3);
+ }
+
+ private InternalDistributedMember lockGrantor;
+ private synchronized void assertGrantorIsConsistent(InternalDistributedMember id) {
+ if (this.lockGrantor == null) {
+ this.lockGrantor = id;
+ } else {
+ assertEquals("assertGrantorIsConsistent failed", lockGrantor, id);
+ }
+ }
+
+ /**
+ * Accessed via reflection. DO NOT REMOVE
+ * @param serviceName
+ * @return
+ */
+ public static InternalDistributedMember identifyLockGrantor(String serviceName) {
+ DLockService service = (DLockService)
+ DistributedLockService.getServiceNamed(serviceName);
+ assertNotNull(service);
+ InternalDistributedMember grantor = service.getLockGrantorId().getLockGrantorMember();
+ assertNotNull(grantor);
+ logInfo("In identifyLockGrantor - grantor is " + grantor);
+ return grantor;
+ }
+
+ /**
+ * Accessed via reflection. DO NOT REMOVE.
+ * @param serviceName
+ * @return
+ */
+ public static Boolean isLockGrantor(String serviceName) {
+ DLockService service = (DLockService)
+ DistributedLockService.getServiceNamed(serviceName);
+ assertNotNull(service);
+ Boolean result = Boolean.valueOf(service.isLockGrantor());
+ logInfo("In isLockGrantor: " + result);
+ return result;
+ }
+
+ /**
+ * Accessed via reflection. DO NOT REMOVE.
+ * @param serviceName
+ */
+ protected static void becomeLockGrantor(String serviceName) {
+ DLockService service = (DLockService)
+ DistributedLockService.getServiceNamed(serviceName);
+ assertNotNull(service);
+ logInfo("About to call becomeLockGrantor...");
+ service.becomeLockGrantor();
+ }
+
+ public void testGrantorSelection() {
+ // TODO change distributedCreateService usage to be concurrent threads
+
+ // bring up 4 members and make sure all identify one as grantor
+ int numVMs = 4;
+ final String serviceName = "testGrantorSelection_" + getUniqueName();
+ distributedCreateService(numVMs, serviceName);
+ try {
+ Thread.sleep(100);
+ }
+ catch (InterruptedException ignore) {fail("interrupted");}
+
+ final Object[] args = new Object[] {serviceName};
+ final Host host = Host.getHost(0);
+ for (int vm=0; vm<numVMs; vm++) {
+ final int finalvm = vm;
+ logInfo("VM " + finalvm + " in " + serviceName + " about to invoke");
+ InternalDistributedMember id = (InternalDistributedMember)host.getVM(finalvm).invoke(
+ DistributedLockServiceDUnitTest.class,
+ "identifyLockGrantor",
+ args);
+ logInfo("VM " + finalvm + " in " + serviceName + " got " + id);
+ assertGrantorIsConsistent(id);
+ }
+ }
+
+ public void testBasicGrantorRecovery() {
+ //DLockGrantor.setUncleanDestroyEnabled(true);
+// try {
+ // 1) start up 4 VM members...
+ int numVMs = 4;
+ final String serviceName = "testBasicGrantorRecovery_" + getUniqueName();
+ distributedCreateService(numVMs, serviceName);
+ try {
+ Thread.sleep(100);
+ }
+ catch (InterruptedException ignore) {fail("interrupted");}
+
+ final Object[] args = new Object[] {serviceName};
+ final Host host = Host.getHost(0);
+
+ int originalGrantor = 3;
+ host.getVM(originalGrantor).invoke(
+ DistributedLockServiceDUnitTest.class, "identifyLockGrantor", args);
+
+ // 2) find the grantor and disconnect him...
+ int originalVM = -1;
+ InternalDistributedMember oldGrantor = null;
+ for (int vm = 0; vm < numVMs; vm++) {
+ final int finalvm = vm;
+ Boolean isGrantor = (Boolean)host.getVM(finalvm).invoke(
+ DistributedLockServiceDUnitTest.class, "isLockGrantor", args);
+ if (isGrantor.booleanValue()) {
+ originalVM = vm;
+ oldGrantor = (InternalDistributedMember)host.getVM(finalvm).invoke(
+ DistributedLockServiceDUnitTest.class, "identifyLockGrantor", args);
+ break;
+ }
+ }
+
+ assertTrue(originalVM == originalGrantor);
+
+ host.getVM(originalVM).invoke(new SerializableRunnable() {
+ public void run() {
+ disconnectFromDS();
+ }
+ });
+
+ try {
+ Thread.sleep(100);
+ }
+ catch (InterruptedException ignore) {fail("interrupted");}
+
+ // 3) verify that another member recovers for grantor
+ int attempts = 3;
+ for (int attempt = 0; attempt < attempts; attempt++) {
+ try {
+ for (int vm = 0; vm < numVMs; vm++) {
+ if (vm == originalVM) continue; // skip because he's disconnected
+ final int finalvm = vm;
+ logInfo("[testBasicGrantorRecovery] VM " + finalvm +
+ " in " + serviceName + " about to invoke");
+ InternalDistributedMember id = (InternalDistributedMember)host.getVM(finalvm).invoke(
+ DistributedLockServiceDUnitTest.class,
+ "identifyLockGrantor",
+ args);
+ logInfo("[testBasicGrantorRecovery] VM " + finalvm +
+ " in " + serviceName + " got " + id);
+ assertGrantorIsConsistent(id);
+ logInfo("[testBasicGrantorRecovery] new grantor " + id +
+ " is not old grantor " + oldGrantor);
+ assertEquals("New grantor must not equal the old grantor", true,
+ !id.equals(oldGrantor)); // new grantor != old grantor
+ } // loop thru vms
+ logInfo("[testBasicGrantorRecovery] succeeded attempt " + attempt);
+ break; // success
+ }
+ catch (AssertionFailedError e) {
+ logInfo("[testBasicGrantorRecovery] failed attempt " + attempt);
+ if (attempt == attempts-1) throw e;
+ }
+ } // loop thru attempts
+// }
+// finally {
+ //DLockGrantor.setUncleanDestroyEnabled(false);
+// }
+ }
+
+ public void testLockFailover() {
+ final int originalGrantorVM = 0;
+ final int oneVM = 1;
+ final int twoVM = 2;
+ final String serviceName = "testLockFailover-" + getUniqueName();
+
+ // create lock services...
+ LogWriterUtils.getLogWriter().fine("[testLockFailover] create services");
+
+ Host.getHost(0).getVM(originalGrantorVM).invoke(() -> DistributedLockServiceDUnitTest.remoteCreateService( serviceName ));
+
+ Host.getHost(0).getVM(oneVM).invoke(() -> DistributedLockServiceDUnitTest.remoteCreateService( serviceName ));
+
+ Host.getHost(0).getVM(twoVM).invoke(() -> DistributedLockServiceDUnitTest.remoteCreateService( serviceName ));
+
+ Host.getHost(0).getVM(originalGrantorVM).invoke(() -> DistributedLockServiceDUnitTest.identifyLockGrantor( serviceName ));
+
+ Boolean isGrantor = (Boolean) Host.getHost(0).getVM(originalGrantorVM).invoke(() -> DistributedLockServiceDUnitTest.isLockGrantor( serviceName ));
+ assertEquals("First member calling getLockGrantor failed to become grantor",
+ Boolean.TRUE, isGrantor);
+
+ // get locks...
+ LogWriterUtils.getLogWriter().fine("[testLockFailover] get lock");
+
+ Boolean locked = (Boolean) Host.getHost(0).getVM(originalGrantorVM).invoke(() -> DistributedLockServiceDUnitTest.lock( serviceName, "KEY-"+originalGrantorVM ));
+ assertEquals("Failed to get lock in testLockFailover",
+ Boolean.TRUE, locked);
+
+ locked = (Boolean) Host.getHost(0).getVM(twoVM).invoke(() -> DistributedLockServiceDUnitTest.lock( serviceName, "KEY-"+twoVM ));
+ assertEquals("Failed to get lock in testLockFailover",
+ Boolean.TRUE, locked);
+
+ locked = (Boolean) Host.getHost(0).getVM(oneVM).invoke(() -> DistributedLockServiceDUnitTest.lock( serviceName, "KEY-"+oneVM ));
+ assertEquals("Failed to get lock in testLockFailover",
+ Boolean.TRUE, locked);
+
+ // disconnect originalGrantorVM...
+ LogWriterUtils.getLogWriter().fine("[testLockFailover] disconnect originalGrantorVM");
+
+ Host.getHost(0).getVM(originalGrantorVM).invoke(new SerializableRunnable() {
+ public void run() {
+ disconnectFromDS();
+ }
+ });
+
+ try {
+ Thread.sleep(100);
+ }
+ catch (InterruptedException ignore) {fail("interrupted");}
+
+ // verify locks by unlocking...
+ LogWriterUtils.getLogWriter().fine("[testLockFailover] release locks");
+
+ Boolean unlocked = (Boolean) Host.getHost(0).getVM(twoVM).invoke(() -> DistributedLockServiceDUnitTest.unlock( serviceName, "KEY-"+twoVM ));
+ assertEquals("Failed to release lock in testLockFailover",
+ Boolean.TRUE, unlocked);
+
+ unlocked = (Boolean) Host.getHost(0).getVM(oneVM).invoke(() -> DistributedLockServiceDUnitTest.unlock( serviceName, "KEY-"+oneVM ));
+ assertEquals("Failed to release lock in testLockFailover",
+ Boolean.TRUE, unlocked);
+
+ // switch locks...
+ locked = (Boolean) Host.getHost(0).getVM(oneVM).invoke(() -> DistributedLockServiceDUnitTest.lock( serviceName, "KEY-"+twoVM ));
+ assertEquals("Failed to get lock in testLockFailover",
+ Boolean.TRUE, locked);
+
+ locked = (Boolean) Host.getHost(0).getVM(twoVM).invoke(() -> DistributedLockServiceDUnitTest.lock( serviceName, "KEY-"+oneVM ));
+ assertEquals("Failed to get lock in testLockFailover",
+ Boolean.TRUE, locked);
+
+ unlocked = (Boolean) Host.getHost(0).getVM(oneVM).invoke(() -> DistributedLockServiceDUnitTest.unlock( serviceName, "KEY-"+twoVM ));
+ assertEquals("Failed to release lock in testLockFailover",
+ Boolean.TRUE, unlocked);
+
+ unlocked = (Boolean) Host.getHost(0).getVM(twoVM).invoke(() -> DistributedLockServiceDUnitTest.unlock( serviceName, "KEY-"+oneVM ));
+ assertEquals("Failed to release lock in testLockFailover",
+ Boolean.TRUE, unlocked);
+
+ // verify grantor is unique...
+ LogWriterUtils.getLogWriter().fine("[testLockFailover] verify grantor identity");
+
+ InternalDistributedMember oneID = (InternalDistributedMember)Host.getHost(0).getVM(oneVM).invoke(() -> DistributedLockServiceDUnitTest.identifyLockGrantor(serviceName));
+ InternalDistributedMember twoID = (InternalDistributedMember)Host.getHost(0).getVM(twoVM).invoke(() -> DistributedLockServiceDUnitTest.identifyLockGrantor(serviceName));
+ assertTrue("Failed to identifyLockGrantor in testLockFailover",
+ oneID != null && twoID != null);
+ assertEquals("Failed grantor uniqueness in testLockFailover",
+ oneID, twoID);
+ }
+
+ public void testLockThenBecomeLockGrantor() {
+ final int originalGrantorVM = 0;
+ final int becomeGrantorVM = 1;
+ final int thirdPartyVM = 2;
+ final String serviceName = "testLockThenBecomeLockGrantor-" + getUniqueName();
+
+ // create lock services...
+ LogWriterUtils.getLogWriter().fine("[testLockThenBecomeLockGrantor] create services");
+
+ Host.getHost(0).getVM(originalGrantorVM).invoke(() -> DistributedLockServiceDUnitTest.remoteCreateService( serviceName ));
+
+ try {
+ Thread.sleep(20);
+ } catch (InterruptedException ignore) {fail("interrupted");}
+
+ Host.getHost(0).getVM(becomeGrantorVM).invoke(() -> DistributedLockServiceDUnitTest.remoteCreateService( serviceName ));
+
+ Host.getHost(0).getVM(thirdPartyVM).invoke(() -> DistributedLockServiceDUnitTest.remoteCreateService( serviceName ));
+
+ Host.getHost(0).getVM(originalGrantorVM).invoke(() -> DistributedLockServiceDUnitTest.identifyLockGrantor( serviceName ));
+
+ Boolean isGrantor = (Boolean) Host.getHost(0).getVM(originalGrantorVM).invoke(() -> DistributedLockServiceDUnitTest.isLockGrantor( serviceName ));
+ assertEquals("First member calling getLockGrantor failed to become grantor",
+ Boolean.TRUE, isGrantor);
+
+ // control...
+ LogWriterUtils.getLogWriter().fine("[testLockThenBecomeLockGrantor] check control");
+ Boolean check = (Boolean) Host.getHost(0).getVM(becomeGrantorVM).invoke(() -> DistributedLockServiceDUnitTest.unlock( serviceName, "KEY-"+becomeGrantorVM ));
+ assertEquals("Check of control failed... unlock succeeded but nothing locked",
+ Boolean.FALSE, check);
+
+ // get locks...
+ LogWriterUtils.getLogWriter().fine("[testLockThenBecomeLockGrantor] get lock");
+
+ Boolean locked = (Boolean) Host.getHost(0).getVM(originalGrantorVM).invoke(() -> DistributedLockServiceDUnitTest.lock( serviceName, "KEY-"+originalGrantorVM ));
+ assertEquals("Failed to get lock in testLockThenBecomeLockGrantor",
+ Boolean.TRUE, locked);
+
+ locked = (Boolean) Host.getHost(0).getVM(thirdPartyVM).invoke(() -> DistributedLockServiceDUnitTest.lock( serviceName, "KEY-"+thirdPartyVM ));
+ assertEquals("Failed to get lock in testLockThenBecomeLockGrantor",
+ Boolean.TRUE, locked);
+
+ locked = (Boolean) Host.getHost(0).getVM(becomeGrantorVM).invoke(() -> DistributedLockServiceDUnitTest.lock( serviceName, "KEY-"+becomeGrantorVM ));
+ assertEquals("Failed to get lock in testLockThenBecomeLockGrantor",
+ Boolean.TRUE, locked);
+
+ // become lock grantor...
+ LogWriterUtils.getLogWriter().fine("[testLockThenBecomeLockGrantor] become lock grantor");
+
+ Host.getHost(0).getVM(becomeGrantorVM).invoke(() -> DistributedLockServiceDUnitTest.becomeLockGrantor( serviceName ));
+
+ try {
+ Thread.sleep(20);
+ } catch (InterruptedException ignore) {fail("interrupted");}
+
+ isGrantor = (Boolean) Host.getHost(0).getVM(becomeGrantorVM).invoke(() -> DistributedLockServiceDUnitTest.isLockGrantor( serviceName ));
+ assertEquals("Failed to become lock grantor",
+ Boolean.TRUE, isGrantor);
+
+ // verify locks by unlocking...
+ LogWriterUtils.getLogWriter().fine("[testLockThenBecomeLockGrantor] release locks");
+
+ Boolean unlocked = (Boolean) Host.getHost(0).getVM(originalGrantorVM).invoke(() -> DistributedLockServiceDUnitTest.unlock( serviceName, "KEY-"+originalGrantorVM ));
+ assertEquals("Failed to release lock in testLockThenBecomeLockGrantor",
+ Boolean.TRUE, unlocked);
+
+ unlocked = (Boolean) Host.getHost(0).getVM(thirdPartyVM).invoke(() -> DistributedLockServiceDUnitTest.unlock( serviceName, "KEY-"+thirdPartyVM ));
+ assertEquals("Failed to release lock in testLockThenBecomeLockGrantor",
+ Boolean.TRUE, unlocked);
+
+ unlocked = (Boolean) Host.getHost(0).getVM(becomeGrantorVM).invoke(() -> DistributedLockServiceDUnitTest.unlock( serviceName, "KEY-"+becomeGrantorVM ));
+ assertEquals("Failed to release lock in testLockThenBecomeLockGrantor",
+ Boolean.TRUE, unlocked);
+
+ // test for bug in which transferred token gets re-entered causing lock recursion
+ unlocked = (Boolean) Host.getHost(0).getVM(becomeGrantorVM).invoke(() -> DistributedLockServiceDUnitTest.unlock( serviceName, "KEY-"+becomeGrantorVM ));
+ assertEquals("Transfer of tokens caused lock recursion in held lock",
+ Boolean.FALSE, unlocked);
+ }
+
+ public void testBecomeLockGrantor() {
+ // create lock services...
+ int numVMs = 4;
+ final String serviceName = "testBecomeLockGrantor-" + getUniqueName();
+ distributedCreateService(numVMs, serviceName);
+
+ // each one gets a lock...
+ for (int vm = 0; vm < numVMs; vm++) {
+ final int finalvm = vm;
+ Boolean locked = Host.getHost(0).getVM(finalvm).invoke(() -> DistributedLockServiceDUnitTest.lock( serviceName, "obj-"+finalvm ));
+ assertEquals("Failed to get lock in testBecomeLockGrantor",
+ Boolean.TRUE, locked);
+ }
+
+ // find the grantor...
+ final Object[] args = new Object[] {serviceName};
+ int originalVM = -1;
+ InternalDistributedMember oldGrantor = null;
+ for (int vm = 0; vm < numVMs; vm++) {
+ final int finalvm = vm;
+ Boolean isGrantor = (Boolean)Host.getHost(0).getVM(finalvm).invoke(
+ DistributedLockServiceDUnitTest.class, "isLockGrantor", args);
+ if (isGrantor.booleanValue()) {
+ originalVM = vm;
+ oldGrantor = (InternalDistributedMember)Host.getHost(0).getVM(finalvm).invoke(
+ DistributedLockServiceDUnitTest.class, "identifyLockGrantor", args);
+ break;
+ }
+ }
+
+ LogWriterUtils.getLogWriter().fine("[testBecomeLockGrantor] original grantor is " + oldGrantor);
+
+ // have one call becomeLockGrantor
+ for (int vm = 0; vm < numVMs; vm++) {
+ if (vm != originalVM) {
+ final int finalvm = vm;
+ Host.getHost(0).getVM(finalvm).invoke(
+ DistributedLockServiceDUnitTest.class, "becomeLockGrantor", args);
+ Boolean isGrantor = (Boolean)Host.getHost(0).getVM(finalvm).invoke(
+ DistributedLockServiceDUnitTest.class, "isLockGrantor", args);
+ assertEquals("isLockGrantor is false after calling becomeLockGrantor",
+ Boolean.TRUE, isGrantor);
+ break;
+ }
+ }
+
+ LogWriterUtils.getLogWriter().fine("[testBecomeLockGrantor] one vm has called becomeLockGrantor...");
+
+ InternalDistributedMember newGrantor = null;
+ for (int vm = 0; vm < numVMs; vm++) {
+ final int finalvm = vm;
+ Boolean isGrantor = (Boolean)Host.getHost(0).getVM(finalvm).invoke(
+ DistributedLockServiceDUnitTest.class, "isLockGrantor", args);
+ if (isGrantor.booleanValue()) {
+ newGrantor = (InternalDistributedMember)Host.getHost(0).getVM(finalvm).invoke(
+ DistributedLockServiceDUnitTest.class, "identifyLockGrantor", args);
+ break;
+ }
+ }
+ LogWriterUtils.getLogWriter().fine("[testBecomeLockGrantor] new Grantor is " + newGrantor);
+ assertEquals(false, newGrantor.equals(oldGrantor));
+
+ // verify locks still held by unlocking
+ // each one unlocks...
+ for (int vm = 0; vm < numVMs; vm++) {
+ final int finalvm = vm;
+ Boolean unlocked = (Boolean) Host.getHost(0).getVM(finalvm).invoke(() -> DistributedLockServiceDUnitTest.unlock( serviceName, "obj-"+finalvm ));
+ assertEquals("Failed to unlock in testBecomeLockGrantor",
+ Boolean.TRUE, unlocked);
+ }
+
+ LogWriterUtils.getLogWriter().fine("[testBecomeLockGrantor] finished");
+
+ // verify that pending requests are granted by unlocking them also
+ }
+
+ public void testTryLock() {
+ final Long waitMillis = new Long(100);
+
+ // create lock services...
+ LogWriterUtils.getLogWriter().fine("[testTryLock] create lock services");
+ final String serviceName = "testTryLock-" + getUniqueName();
+ distributedCreateService(4, serviceName);
+
+ // all 4 vms scramble to get tryLock but only one should succeed...
+ LogWriterUtils.getLogWriter().fine("[testTryLock] attempt to get tryLock");
+ int lockCount = 0;
+ for (int vm = 0; vm < 4; vm++) {
+ final int finalvm = vm;
+ Boolean locked = (Boolean) Host.getHost(0).getVM(finalvm).invoke(() -> DistributedLockServiceDUnitTest.tryLock( serviceName, "KEY", waitMillis ));
+ if (locked.booleanValue()) lockCount++;
+ }
+
+ assertEquals("More than one vm acquired the tryLock",
+ 1, lockCount);
+
+ LogWriterUtils.getLogWriter().fine("[testTryLock] unlock tryLock");
+ int unlockCount = 0;
+ for (int vm = 0; vm < 4; vm++) {
+ final int finalvm = vm;
+ Boolean unlocked = (Boolean) Host.getHost(0).getVM(finalvm).invoke(() -> DistributedLockServiceDUnitTest.unlock( serviceName, "KEY" ));
+ if (unlocked.booleanValue()) unlockCount++;
+ }
+
+ assertEquals("More than one vm unlocked the tryLock",
+ 1, unlockCount);
+ }
+
+ public void testOneGetsThenOtherGets() throws Exception { // (numVMs, numThreadsPerVM)
+ doOneGetsThenOtherGets(1, 1);
+// doOneGetsThenOtherGets(2, 2);
+// doOneGetsThenOtherGets(3, 3);
+ doOneGetsThenOtherGets(4, 3);
+ }
+
+ public void testLockDifferentNames() throws Exception {
+ String serviceName = getUniqueName();
+
+ // Same VM
+ remoteCreateService(serviceName);
+ DistributedLockService service = DistributedLockService.getServiceNamed(serviceName);
+ assertTrue(service.lock("obj1", -1, -1));
+ assertTrue(service.lock("obj2", -1, -1));
+ service.unlock("obj1");
+ service.unlock("obj2");
+
+ // Different VMs
+ VM vm = Host.getHost(0).getVM(0);
+ vm.invoke(() -> this.remoteCreateService(serviceName));
+ assertTrue(service.lock("masterVMobj", -1, -1));
+
+ assertEquals(Boolean.TRUE, vm.invoke(() -> this.getLockAndIncrement(
+ serviceName, "otherVMobj", new Integer(-1), new Integer(0)
+ )));
+
+ service.unlock("masterVMobj");
+ }
+
+ public void testLocalGetLockAndIncrement() throws Exception {
+ String serviceName = getUniqueName();
+ remoteCreateService(serviceName);
+ DistributedLockService.getServiceNamed(serviceName);
+ assertEquals(Boolean.TRUE,
+ getLockAndIncrement(serviceName, "localVMobj", -1, 0));
+ }
+ public void testRemoteGetLockAndIncrement() {
+ String serviceName = getUniqueName();
+ VM vm = Host.getHost(0).getVM(0);
+ vm.invoke(() -> this.remoteCreateService(serviceName));
+ assertEquals(Boolean.TRUE, vm.invoke(() -> this.getLockAndIncrement(
+ serviceName, "remoteVMobj", new Integer(-1), new Integer(0)
+ )));
+ }
+
+ public void testLockSameNameDifferentService()
+ {
+ String serviceName1 = getUniqueName() + "_1";
+ String serviceName2 = getUniqueName() + "_2";
+ String objName = "obj";
+
+ // Same VM
+ remoteCreateService(serviceName1);
+ remoteCreateService(serviceName2);
+ DistributedLockService service1 = DistributedLockService.getServiceNamed(serviceName1);
+ DistributedLockService service2 = DistributedLockService.getServiceNamed(serviceName2);
+ assertTrue(service1.lock(objName, -1, -1));
+ assertTrue(service2.lock(objName, -1, -1));
+ service1.unlock(objName);
+ service2.unlock(objName);
+
+ // Different VMs
+ VM vm = Host.getHost(0).getVM(0);
+ vm.invoke(() -> this.remoteCreateService(serviceName1));
+ vm.invoke(() -> this.remoteCreateService(serviceName2));
+ assertTrue(service1.lock(objName, -1, -1));
+ assertEquals(Boolean.TRUE, vm.invoke(() -> this.getLockAndIncrement(
+ serviceName2, objName, new Integer(-1), new Integer(0)
+ )));
+ service1.unlock(objName);
+ }
+
+ public void testLeaseDoesntExpire()
+ throws InterruptedException
+ {
+ String serviceName = getUniqueName();
+ final Object objName = new Integer(3);
+
+ // Same VM
+ remoteCreateService(serviceName);
+ final DistributedLockService service = DistributedLockService.getServiceNamed(serviceName);
+ // lock objName with a sufficiently long lease
+ assertTrue(service.lock(objName, -1, 60000));
+ // try to lock in another thread, with a timeout shorter than above lease
+ final boolean[] resultHolder = new boolean[] { false };
+ Thread thread = new Thread(new Runnable() {
+ public void run() {
+ resultHolder[0] = !service.lock(objName, 1000, -1);
+ }
+ });
+ thread.start();
+ ThreadUtils.join(thread, 30 * 1000);
+ assertTrue(resultHolder[0]);
+ // the unlock should succeed without throwing LeaseExpiredException
+ service.unlock(objName);
+
+ // Different VM
+ VM vm = Host.getHost(0).getVM(0);
+ vm.invoke(() -> this.remoteCreateService(serviceName));
+ // lock objName in this VM with a sufficiently long lease
+ assertTrue(service.lock(objName, -1, 60000));
+ // try to lock in another VM, with a timeout shorter than above lease
+ assertEquals(Boolean.FALSE, vm.invoke(() -> this.getLockAndIncrement(
+ serviceName, objName, new Long(1000), new Long(0)
+ )));
+ // the unlock should succeed without throwing LeaseExpiredException
+ service.unlock(objName);
+ }
+
+ public void testLockUnlock() {
+ String serviceName = getUniqueName();
+ Object objName = new Integer(42);
+
+ remoteCreateService(serviceName);
+ DistributedLockService service =
+ DistributedLockService.getServiceNamed(serviceName);
+
+ Assert.assertTrue(!service.isHeldByCurrentThread(objName));
+
+ service.lock(objName, -1, -1);
+ Assert.assertTrue(service.isHeldByCurrentThread(objName));
+
+ service.unlock(objName);
+ Assert.assertTrue(!service.isHeldByCurrentThread(objName));
+ }
+
+ public void testLockExpireUnlock() {
+ long leaseMs = 200;
+ long waitBeforeLockingMs = 210;
+
+ String serviceName = getUniqueName();
+ Object objName = new Integer(42);
+
+ remoteCreateService(serviceName);
+ DistributedLockService service =
+ DistributedLockService.getServiceNamed(serviceName);
+
+ assertTrue(!service.isHeldByCurrentThread(objName));
+
+ assertTrue(service.lock(objName, -1, leaseMs));
+ assertTrue(service.isHeldByCurrentThread(objName));
+
+ sleep(waitBeforeLockingMs); // should expire...
+ assertTrue(!service.isHeldByCurrentThread(objName));
+
+ try {
+ service.unlock(objName);
+ fail("unlock should have thrown LeaseExpiredException");
+ } catch (LeaseExpiredException ex) {
+ }
+ }
+
+ public void testLockRecursion() {
+ String serviceName = getUniqueName();
+ Object objName = new Integer(42);
+
+ remoteCreateService(serviceName);
+ DistributedLockService service =
+ DistributedLockService.getServiceNamed(serviceName);
+
+ Assert.assertTrue(!service.isHeldByCurrentThread(objName));
+
+ // initial lock...
+ Assert.assertTrue(service.lock(objName, -1, -1));
+ Assert.assertTrue(service.isHeldByCurrentThread(objName));
+
+ // recursion +1...
+ Assert.assertTrue(service.lock(objName, -1, -1));
+
+ // recursion -1...
+ service.unlock(objName);
+ Assert.assertTrue(service.isHeldByCurrentThread(objName));
+
+ // and unlock...
+ service.unlock(objName);
+ Assert.assertTrue(!service.isHeldByCurrentThread(objName));
+ }
+
+ public void testLockRecursionWithExpiration() {
+ long leaseMs = 500;
+ long waitBeforeLockingMs = 750;
+
+ String serviceName = getUniqueName();
+ Object objName = new Integer(42);
+
+ remoteCreateService(serviceName);
+ DistributedLockService service =
+ DistributedLockService.getServiceNamed(serviceName);
+
+ Assert.assertTrue(!service.isHeldByCurrentThread(objName));
+
+ // initial lock...
+ Assert.assertTrue(service.lock(objName, -1, leaseMs));
+ Assert.assertTrue(service.isHeldByCurrentThread(objName));
+
+ // recursion +1...
+ Assert.assertTrue(service.lock(objName, -1, leaseMs));
+ Assert.assertTrue(service.isHeldByCurrentThread(objName));
+
+ // expire...
+ sleep(waitBeforeLockingMs);
+ Assert.assertTrue(!service.isHeldByCurrentThread(objName));
+
+ // should fail...
+ try {
+ service.unlock(objName);
+ fail("unlock should have thrown LeaseExpiredException");
+ } catch (LeaseExpiredException ex) {
+ }
+
+ // relock it...
+ Assert.assertTrue(service.lock(objName, -1, leaseMs));
+ Assert.assertTrue(service.isHeldByCurrentThread(objName));
+
+ // and unlock to verify no recursion...
+ service.unlock(objName);
+ Assert.assertTrue(!service.isHeldByCurrentThread(objName)); // throws failure!!
+
+ // go thru again in different order...
+ Assert.assertTrue(!service.isHeldByCurrentThread(objName));
+
+ // initial lock...
+ Assert.assertTrue(service.lock(objName, -1, leaseMs));
+ Assert.assertTrue(service.isHeldByCurrentThread(objName));
+
+ // expire...
+ sleep(waitBeforeLockingMs);
+ Assert.assertTrue(!service.isHeldByCurrentThread(objName));
+
+ // relock it...
+ Assert.assertTrue(service.lock(objName, -1, leaseMs));
+ Assert.assertTrue(service.isHeldByCurrentThread(objName));
+
+ // and unlock to verify no recursion...
+ service.unlock(objName);
+ Assert.assertTrue(!service.isHeldByCurrentThread(objName));
+ }
+
+ public void testLeaseExpiresBeforeOtherLocks()
+ throws InterruptedException
+ {
+ leaseExpiresTest(false);
+ }
+
+ public void testLeaseExpiresWhileOtherLocks()
+ throws InterruptedException
+ {
+ leaseExpiresTest(true);
+ }
+
+ private void leaseExpiresTest(boolean tryToLockBeforeExpiration)
+ throws InterruptedException
+ {
+ LogWriterUtils.getLogWriter().fine("[testLeaseExpires] prepping");
+ long leaseMs = 100;
+ long waitBeforeLockingMs = tryToLockBeforeExpiration ? 50 : 110;
+
+ final String serviceName = getUniqueName();
+ final Object objName = new Integer(3);
+
+ // Same VM
+ remoteCreateService(serviceName);
+ final DistributedLockService service = DistributedLockService.getServiceNamed(serviceName);
+
+ LogWriterUtils.getLogWriter().fine("[testLeaseExpires] acquire first lock");
+ // lock objName with a short lease
+ assertTrue(service.lock(objName, -1, leaseMs));
+ sleep(waitBeforeLockingMs);
+
+ if (waitBeforeLockingMs > leaseMs) {
+ Assert.assertTrue(!service.isHeldByCurrentThread(objName));
+ }
+
+ LogWriterUtils.getLogWriter().fine("[testLeaseExpires] acquire lock that expired");
+ // try to lock in another thread - lease should have expired
+ final boolean[] resultHolder = new boolean[] { false };
+ Thread thread = new Thread(new Runnable() {
+ public void run() {
+ resultHolder[0] = service.lock(objName, -1, -1);
+ service.unlock(objName);
+ Assert.assertTrue(!service.isHeldByCurrentThread(objName));
+ }
+ });
+ thread.start();
+ ThreadUtils.join(thread, 30 * 1000);
+ assertTrue(resultHolder[0]);
+
+ LogWriterUtils.getLogWriter().fine("[testLeaseExpires] unlock should throw LeaseExpiredException");
+ // this thread's unlock should throw LeaseExpiredException
+ try {
+ service.unlock(objName);
+ fail("unlock should have thrown LeaseExpiredException");
+ } catch (LeaseExpiredException ex) {
+ }
+
+ LogWriterUtils.getLogWriter().fine("[testLeaseExpires] create service in other vm");
+ // Different VM
+ VM vm = Host.getHost(0).getVM(0);
+ vm.invoke(() -> this.remoteCreateService(serviceName));
+
+ LogWriterUtils.getLogWriter().fine("[testLeaseExpires] acquire lock again and expire");
+ // lock objName in this VM with a short lease
+ assertTrue(service.lock(objName, -1, leaseMs));
+ sleep(waitBeforeLockingMs);
+
+ LogWriterUtils.getLogWriter().fine("[testLeaseExpires] succeed lock in other vm");
+ // try to lock in another VM - should succeed
+ assertEquals(Boolean.TRUE, vm.invoke(() -> this.getLockAndIncrement(
+ serviceName, objName, new Long(-1), new Long(0)
+ )));
+
+ LogWriterUtils.getLogWriter().fine("[testLeaseExpires] unlock should throw LeaseExpiredException again");
+ // this VMs unlock should throw LeaseExpiredException
+ try {
+ service.unlock(objName);
+ fail("unlock should have thrown LeaseExpiredException");
+ } catch (LeaseExpiredException ex) {
+ }
+ }
+
+ public void testSuspendLockingAfterExpiration() throws Exception {
+ LogWriterUtils.getLogWriter().fine("[leaseExpiresThenSuspendTest]");
+
+ final long leaseMillis = 100;
+ final long suspendWaitMillis = 10000;
+
+ final String serviceName = getUniqueName();
+ final Object key = new Integer(3);
+
+ // controller locks key and then expires - controller is grantor
+
+ DistributedLockService dls
+ = DistributedLockService.create(serviceName, getSystem());
+
+ assertTrue(dls.lock(key, -1, leaseMillis));
+
+ // wait for expiration
+ sleep(leaseMillis*2);
+
+ LogWriterUtils.getLogWriter().fine("[leaseExpiresThenSuspendTest] unlock should throw LeaseExpiredException");
+ // this thread's unlock should throw LeaseExpiredException
+ try {
+ dls.unlock(key);
+ fail("unlock should have thrown LeaseExpiredException");
+ } catch (LeaseExpiredException ex) {
+ }
+
+ // other vm calls suspend
+
+ LogWriterUtils.getLogWriter().fine("[leaseExpiresThenSuspendTest] call to suspend locking");
+ Host.getHost(0).getVM(0).invoke(new SerializableRunnable() {
+ public void run() {
+ final DistributedLockService dlock
+ = DistributedLockService.create(serviceName, getSystem());
+ dlock.suspendLocking(suspendWaitMillis);
+ dlock.resumeLocking();
+ assertTrue(dlock.lock(key, -1, leaseMillis));
+ dlock.unlock(key);
+ }
+ });
+ }
+
+ volatile boolean started = false;
+ volatile boolean gotLock = false;
+ volatile Throwable exception = null;
+ volatile Throwable throwable = null;
+
+ public void testLockInterruptiblyIsInterruptible() {
+ started = false;
+ gotLock = false;
+ exception = null;
+ throwable = null;
+
+ // Lock entire service in first thread
+ LogWriterUtils.getLogWriter().info("[testLockInterruptiblyIsInterruptible] get and hold the lock");
+ final String serviceName = getUniqueName();
+ final DistributedLockService service =
+ DistributedLockService.create(serviceName, dlstSystem);
+ service.becomeLockGrantor();
+ assertTrue(service.lock("obj", 1000, -1));
+
+ // Start second thread that tries to lock in second thread
+ LogWriterUtils.getLogWriter().info("[testLockInterruptiblyIsInterruptible] call lockInterruptibly");
+ Thread thread2 = new Thread(new Runnable() {
+ public void run() {
+ try {
+ started = true;
+ gotLock = service.lockInterruptibly("obj", -1, -1);
+ } catch (InterruptedException ex) {
+ exception = ex;
+ }
+ catch (VirtualMachineError e) {
+ SystemFailure.initiateFailure(e);
+ throw e;
+ }
+ catch (Throwable t) {
+ throwable = t;
+ }
+ }
+ });
+ thread2.start();
+
+ // Interrupt second thread
+ LogWriterUtils.getLogWriter().info("[testLockInterruptiblyIsInterruptible] interrupt calling thread");
+ while (!started) Thread.yield();
+ thread2.interrupt();
+ ThreadUtils.join(thread2, 20 * 1000);
+
+ // Expect it got InterruptedException and didn't lock the service
+ LogWriterUtils.getLogWriter().info("[testLockInterruptiblyIsInterruptible] verify failed to get lock");
+ assertFalse(gotLock);
+ if (throwable != null) {
+ LogWriterUtils.getLogWriter().warning(
+ "testLockInterruptiblyIsInterruptible threw unexpected Throwable",
+ throwable);
+ }
+ assertNotNull(exception);
+
+ // Unlock "obj" in first thread
+ LogWriterUtils.getLogWriter().info("[testLockInterruptiblyIsInterruptible] unlock the lock");
+ service.unlock("obj");
+
+ // Make sure it didn't get locked by second thread
+ LogWriterUtils.getLogWriter().info("[testLockInterruptiblyIsInterruptible] try to get lock with timeout should not fail");
+ assertTrue(service.lock("obj", 5000, -1));
+ DistributedLockService.destroy(serviceName);
+ }
+
+ volatile boolean wasFlagSet = false;
+
+ public void testLockIsNotInterruptible() {
+ // Lock entire service in first thread
+ LogWriterUtils.getLogWriter().fine("[testLockIsNotInterruptible] lock in first thread");
+ started = false;
+ gotLock = false;
+ exception = null;
+ wasFlagSet = false;
+
+ final String serviceName = getUniqueName();
+ final DistributedLockService service =
+ DistributedLockService.create(serviceName, dlstSystem);
+ assertTrue(service.lock("obj", 1000, -1));
+
+ // Start second thread that tries to lock in second thread
+ LogWriterUtils.getLogWriter().fine("[testLockIsNotInterruptible] attempt lock in second thread");
+ Thread thread2 = new Thread(new Runnable() {
+ public void run() {
+ try {
+ started = true;
+ gotLock = service.lock("obj", -1, -1);
+ LogWriterUtils.getLogWriter().fine("[testLockIsNotInterruptible] thread2 finished lock() - got " + gotLock);
+ }
+ catch (VirtualMachineError e) {
+ SystemFailure.initiateFailure(e);
+ throw e;
+ }
+ catch (Throwable ex) {
+ LogWriterUtils.getLogWriter().warning("[testLockIsNotInterruptible] Caught...", ex);
+ exception = ex;
+ }
+ wasFlagSet = Thread.currentThread().isInterrupted();
+ }
+ });
+ thread2.start();
+
+ // Interrupt second thread
+ LogWriterUtils.getLogWriter().fine("[testLockIsNotInterruptible] interrupt second thread");
+ while (!started) Thread.yield();
+ sleep(500);
+ thread2.interrupt();
+ // Expect it didn't get an exception and didn't lock the service
+ sleep(500);
+ assertFalse(gotLock);
+ assertNull(exception);
+
+ // Unlock entire service in first thread
+ LogWriterUtils.getLogWriter().fine("[testLockIsNotInterruptible] unlock in first thread");
+ service.unlock("obj");
+ sleep(500);
+
+ // Expect that thread2 should now complete execution.
+ ThreadUtils.join(thread2, 20 * 1000);
+
+ // Now thread2 should have gotten the lock, not the exception, but the
+ // thread's flag should be set
+ LogWriterUtils.getLogWriter().fine("[testLockIsNotInterruptible] verify second thread got lock");
+ assertNull(exception);
+ assertTrue(gotLock);
+ assertTrue(wasFlagSet);
+ }
+
+ /**
+ * Test DistributedLockService.acquireExclusiveLocking(), releaseExclusiveLocking()
+ */
+ public void testSuspendLockingBasic()
+ throws InterruptedException {
+ final DistributedLockService service =
+ DistributedLockService.create(getUniqueName(), dlstSystem);
+
+ try {
+ service.resumeLocking();
+ fail("Didn't throw LockNotHeldException");
+ } catch (LockNotHeldException ex) {
+ // expected
+ }
+
+ assertTrue(service.suspendLocking(-1));
+ service.resumeLocking();
+
+ // It's not reentrant
+ assertTrue(service.suspendLocking(1000));
+ try {
+ service.suspendLocking(1);
+ fail("didn't get IllegalStateException");
+ } catch (IllegalStateException ex) {
+ // expected
+ }
+ service.resumeLocking();
+
+ // Get "false" if another thread is holding it
+ Thread thread = new Thread(new Runnable() {
+ public void run() {
+ logInfo("new thread about to suspendLocking()");
+ assertTrue(service.suspendLocking(1000));
+ }
+ });
+ thread.start();
+ ThreadUtils.join(thread, 30 * 1000);
+ logInfo("main thread about to suspendLocking");
+ assertTrue(!service.suspendLocking(1000));
+ }
+
+ /**
+ * Test that exlusive locking prohibits locking activity
+ */
+ public void testSuspendLockingProhibitsLocking()
+ {
+ final String name = getUniqueName();
+ distributedCreateService(2, name);
+ DistributedLockService service =
+ DistributedLockService.getServiceNamed(name);
+
+ // Should be able to lock from other VM
+ VM vm1 = Host.getHost(0).getVM(1);
+ assertTrue(vm1.invoke(() -> DistributedLockServiceDUnitTest.tryToLock( name )));
+
+ assertTrue(service.suspendLocking(1000));
+
+ // vm1 is the grantor... use debugHandleSuspendTimeouts
+ vm1.invoke(new SerializableRunnable("setDebugHandleSuspendTimeouts") {
+ public void run() {
+ DLockService dls =
+ (DLockService) DistributedLockService.getServiceNamed(name);
+ assertTrue(dls.isLockGrantor());
+ DLockGrantor grantor = dls.getGrantorWithNoSync();
+ grantor.setDebugHandleSuspendTimeouts(5000);
+ }
+ });
+
+ // Shouldn't be able to lock a name from another VM
+ assertTrue(!vm1.invoke(() -> DistributedLockServiceDUnitTest.tryToLock( name )));
+
+ service.resumeLocking();
+
+ vm1.invoke(new SerializableRunnable("unsetDebugHandleSuspendTimeouts") {
+ public void run() {
+ DLockService dls =
+ (DLockService) DistributedLockService.getServiceNamed(name);
+ assertTrue(dls.isLockGrantor());
+ DLockGrantor grantor = dls.getGrantorWithNoSync();
+ grantor.setDebugHandleSuspendTimeouts(0);
+ }
+ });
+
+ // Should be able to lock again
+ assertTrue(vm1.invoke(() -> DistributedLockServiceDUnitTest.tryToLock( name )));
+
+ }
+
+ /**
+ * Test that suspend locking behaves under various usage patterns. This
+ * ensures that suspend and regular locks behave as ReadWriteLocks and
+ * processing occurs in order.
+ */
+ public void notestSuspendLockingBehaves() throws Exception {
+ try {
+ doTestSuspendLockingBehaves();
+ }
+ finally {
+ Invoke.invokeInEveryVM(new SerializableRunnable() {
+ public void run() {
+ try {
+ if (suspendClientSuspendLockingBehaves != null) {
+ suspendClientSuspendLockingBehaves.stop();
+ suspendClientSuspendLockingBehaves = null;
+ }
+ }
+ catch (VirtualMachineError e) {
+ SystemFailure.initiateFailure(e);
+ throw e;
+ }
+ catch (Throwable t) {
+ LogWriterUtils.getLogWriter().error("Error in testSuspendLockingBehaves finally", t);
+ }
+ try {
+ if (lockClientSuspendLockingBehaves != null) {
+ lockClientSuspendLockingBehaves.stop();
+ lockClientSuspendLockingBehaves = null;
+ }
+ }
+ catch (VirtualMachineError e) {
+ SystemFailure.initiateFailure(e);
+ throw e;
+ }
+ catch (Throwable t) {
+ LogWriterUtils.getLogWriter().error("Error in testSuspendLockingBehaves finally", t);
+ }
+ }
+ });
+ }
+ }
+ private void doTestSuspendLockingBehaves() throws Exception {
+ final String dlsName = getUniqueName();
+ final VM vmGrantor = Host.getHost(0).getVM(0);
+ final VM vmOne = Host.getHost(0).getVM(1);
+ final VM vmTwo = Host.getHost(0).getVM(2);
+ final VM vmThree = Host.getHost(0).getVM(3);
+ final String key1 = "key1";
+
+ // TODO: make sure suspend thread can get other locks
+
+ // TODO: test local (in grantor) locks and suspends also
+
+ // define some SerializableRunnables
+ final SerializableRunnable createDLS =
+ new SerializableRunnable("Create "+dlsName) {
+ public void run() {
+ DistributedLockService.create(dlsName, getSystem());
+ lockClientSuspendLockingBehaves = new BasicLockClient(dlsName, key1);
+ suspendClientSuspendLockingBehaves = new BasicLockClient(dlsName, key1);
+ assertFalse(isLockGrantor(dlsName).booleanValue());
+ }
+ };
+ final SerializableRunnable suspendLocking =
+ new SerializableRunnable("Suspend locking "+dlsName) {
+ public void run() {
+ suspendClientSuspendLockingBehaves.suspend();
+ }
+ };
+ final SerializableRunnable resumeLocking =
+ new SerializableRunnable("Resume locking "+dlsName) {
+ public void run() {
+ suspendClientSuspendLockingBehaves.resume();
+ }
+ };
+ final SerializableRunnable lockKey =
+ new SerializableRunnable("Get lock "+dlsName) {
+ public void run() {
+ lockClientSuspendLockingBehaves.lock();
+ }
+ };
+ final SerializableRunnable unlockKey =
+ new SerializableRunnable("Unlock "+dlsName) {
+ public void run() {
+ lockClientSuspendLockingBehaves.unlock();
+ }
+ };
+
+ // create grantor
+ LogWriterUtils.getLogWriter().info("[testSuspendLockingBehaves] Create grantor "+dlsName);
+ vmGrantor.invoke(new SerializableRunnable("Create grantor "+dlsName) {
+ public void run() {
+ DistributedLockService.create(dlsName, getSystem());
+ DistributedLockService.getServiceNamed(dlsName).lock(key1, -1, -1);
+ DistributedLockService.getServiceNamed(dlsName).unlock(key1);
+ assertTrue(isLockGrantor(dlsName).booleanValue());
+ }
+ });
+
+ // create dls in other vms
+// getLogWriter().info("[testSuspendLockingBehaves] Create DLS in vmOne");
+ vmOne.invoke(createDLS);
+// getLogWriter().info("[testSuspendLockingBehaves] Create DLS in vmTwo");
+ vmTwo.invoke(createDLS);
+// getLogWriter().info("[testSuspendLockingBehaves] Create DLS in vmThree");
+ vmThree.invoke(createDLS);
+
+ // get a lock
+ LogWriterUtils.getLogWriter().info("[testSuspendLockingBehaves] line up vms for lock");
+// getLogWriter().info("[testSuspendLockingBehaves] vmOne lock");
+ vmOne.invoke(lockKey);
+// getLogWriter().info("[testSuspendLockingBehaves] start vmTwoLocking");
+ AsyncInvocation vmTwoLocking = vmTwo.invokeAsync(lockKey);
+ Wait.pause(2000); // make sure vmTwo is first in line
+// getLogWriter().info("[testSuspendLockingBehaves] start vmThreeLocking");
+ AsyncInvocation vmThreeLocking = vmThree.invokeAsync(lockKey);
+ Wait.pause(2000);
+
+ // make sure vmTwo and vmThree are still waiting for lock on key1
+// getLogWriter().info("[testSuspendLockingBehaves] assert vmTwoLocking still alive");
+ Wait.pause(100);
+ assertTrue(vmTwoLocking.isAlive());
+// getLogWriter().info("[testSuspendLockingBehaves] assert vmThreeLocking still alive");
+ Wait.pause(100);
+ assertTrue(vmThreeLocking.isAlive());
+
+ // let vmTwo get key
+ LogWriterUtils.getLogWriter().info("[testSuspendLockingBehaves] unlock so vmTwo can get key");
+// getLogWriter().info("[testSuspendLockingBehaves] vmOne unlock");
+ vmOne.invoke(unlockKey);
+ ThreadUtils.join(vmTwoLocking, 10 * 1000);
+
+ // start suspending in vmOne and vmTwo
+ LogWriterUtils.getLogWriter().info("[testSuspendLockingBehaves] start suspending requests");
+// getLogWriter().info("[testSuspendLockingBehaves] start vmOneSuspending");
+ AsyncInvocation vmOneSuspending = vmOne.invokeAsync(suspendLocking);
+ Wait.pause(2000); // make sure vmOne is first in line
+// getLogWriter().info("[testSuspendLockingBehaves] start vmTwoSuspending");
+ AsyncInvocation vmTwoSuspending = vmTwo.invokeAsync(suspendLocking);
+ Wait.pause(2000);
+
+ // let vmThree finish locking key
+ LogWriterUtils.getLogWriter().info("[testSuspendLockingBehaves] unlock so vmThree can get key");
+// getLogWriter().info("[testSuspendLockingBehaves] vmTwo unlock");
+ vmTwo.invoke(unlockKey);
+ ThreadUtils.join(vmThreeLocking, 10 * 1000);
+
+ // have vmOne get back in line for locking key
+ LogWriterUtils.getLogWriter().info("[testSuspendLockingBehaves] start another lock request");
+// getLogWriter().info("[testSuspendLockingBehaves] start vmOneLockingAgain");
+ AsyncInvocation vmOneLockingAgain = vmOne.invokeAsync(lockKey);
+ Wait.pause(2000);
+
+ // let vmOne suspend locking
+ LogWriterUtils.getLogWriter().info("[testSuspendLockingBehaves] let vmOne suspend locking");
+// getLogWriter().info("[testSuspendLockingBehaves] assert vmOneSuspending still alive");
+ Wait.pause(100);
+ assertTrue(vmOneSuspending.isAlive());
+// getLogWriter().info("[testSuspendLockingBehaves] vmThree unlock");
+ vmThree.invoke(unlockKey);
+ ThreadUtils.join(vmOneSuspending, 10 * 1000);
+
+ // start suspending in vmThree
+ LogWriterUtils.getLogWriter().info("[testSuspendLockingBehaves] line up vmThree for suspending");
+// getLogWriter().info("[testSuspendLockingBehaves] start vmThreeSuspending");
+ AsyncInvocation vmThreeSuspending = vmThree.invokeAsync(suspendLocking);
+ Wait.pause(2000);
+
+ // let vmTwo suspend locking
+ LogWriterUtils.getLogWriter().info("[testSuspendLockingBehaves] let vmTwo suspend locking");
+// getLogWriter().info("[testSuspendLockingBehaves] assert vmTwoSuspending still alive");
+ Wait.pause(100);
+ assertTrue(vmTwoSuspending.isAlive());
+// getLogWriter().info("[testSuspendLockingBehaves] vmOne resumes locking");
+ vmOne.invoke(resumeLocking);
+ ThreadUtils.join(vmTwoSuspending, 10 * 1000);
+
+ // let vmOne get that lock
+ LogWriterUtils.getLogWriter().info("[testSuspendLockingBehaves] let vmOne get that lock");
+// getLogWriter().info("[testSuspendLockingBehaves] assert vmOneLockingAgain still alive");
+ Wait.pause(100);
+ assertTrue(vmOneLockingAgain.isAlive());
+// getLogWriter().info("[testSuspendLockingBehaves] vmTwo resumes locking");
+ vmTwo.invoke(resumeLocking);
+ ThreadUtils.join(vmOneLockingAgain, 10 * 1000);
+
+ // let vmThree suspend locking
+ LogWriterUtils.getLogWriter().info("[testSuspendLockingBehaves] let vmThree suspend locking");
+// getLogWriter().info("[testSuspendLockingBehaves] assert vmThreeSuspending still alive");
+ Wait.pause(100);
+ assertTrue(vmThreeSuspending.isAlive());
+// getLogWriter().info("[testSuspendLockingBehaves] vmOne unlocks again");
+ vmOne.invoke(unlockKey);
+ ThreadUtils.join(vmThreeSuspending, 10 * 1000);
+
+ // done
+// getLogWriter().info("[testSuspendLockingBehaves] vmThree resumes locking");
+ vmThree.invoke(resumeLocking);
+ }
+ protected static BasicLockClient suspendClientSuspendLockingBehaves;
+ protected static BasicLockClient lockClientSuspendLockingBehaves;
+
+ /**
+ * Test that exlusive locking prohibits locking activity
+ */
+ public void testSuspendLockingBlocksUntilNoLocks()
+ throws InterruptedException
+ {
+
+ final String name = getUniqueName();
+ distributedCreateService(2, name);
+ final DistributedLockService service =
+ DistributedLockService.getServiceNamed(name);
+
+ // Get lock from other VM. Since same thread needs to lock and unlock,
+ // invoke asynchronously, get lock, wait to be notified, then unlock.
+ VM vm1 = Host.getHost(0).getVM(1);
+ vm1.invokeAsync(new SerializableRunnable("Lock & unlock in vm1") {
+ public void run() {
+ DistributedLockService service2 =
+ DistributedLockService.getServiceNamed(name);
+ assertTrue(service2.lock("lock", -1, -1));
+ synchronized (monitor) {
+ try {
+ monitor.wait();
+ } catch (InterruptedException ex) {
+ System.out.println("Unexpected InterruptedException");
+ fail("interrupted");
+ }
+ }
+ service2.unlock("lock");
+ }
+ });
+ // Let vm1's thread get the lock and go into wait()
+ Thread.sleep(100);
+
+ Thread thread = new Thread(new Runnable() {
+ public void run() {
+ setGot(service.suspendLocking(-1));
+ setDone(true);
+ service.resumeLocking();
+ }
+ });
+ setGot(false);
+ setDone(false);
+ thread.start();
+
+ // Let thread start, make sure it's blocked in suspendLocking
+ Thread.sleep(100);
+ assertFalse("Before release, got: " + getGot() + ", done: " + getDone(), getGot() || getDone());
+
+ vm1.invoke(new SerializableRunnable("notify vm1 to unlock") {
+ public void run() {
+ synchronized (monitor) {
+ monitor.notify();
+ }
+ }
+ });
+
+ // Let thread finish, make sure it successfully suspended and is done
+ WaitCriterion ev = new WaitCriterion() {
+ public boolean done() {
+ return getDone();
+ }
+ public String description() {
+ return null;
+ }
+ };
+ Wait.waitForCriterion(ev, 30 * 1000, 200, true);
+ if (!getGot() || !getDone()) {
+ ThreadUtils.dumpAllStacks();
+ }
+ assertTrue("After release, got: " + getGot() + ", done: " + getDone(), getGot() && getDone());
+
+ }
+
+ public void testSuspendLockingInterruptiblyIsInterruptible() {
+
+ started = false;
+ gotLock = false;
+ exception = null;
+
+ // Lock entire service in first thread
+ final String name = getUniqueName();
+ final DistributedLockService service =
+ DistributedLockService.create(name, dlstSystem);
+ assertTrue(service.suspendLocking(1000));
+
+ // Start second thread that tries to lock in second thread
+ Thread thread2 = new Thread(new Runnable() {
+ public void run() {
+ try {
+ started = true;
+ gotLock = service.suspendLockingInterruptibly(-1);
+ } catch (InterruptedException ex) {
+ exception = ex;
+ }
+ }
+ });
+ thread2.start();
+
+ // Interrupt second thread
+ while (!started) Thread.yield();
+ thread2.interrupt();
+ ThreadUtils.join(thread2, 20 * 1000);
+
+ // Expect it got InterruptedException and didn't lock the service
+ sleep(500);
+ assertFalse(gotLock);
+ assertNotNull(exception);
+
+ // Unlock entire service in first thread
+ service.resumeLocking();
+ sleep(500);
+
+ // Make sure it didn't get locked by second thread
+ assertTrue(service.suspendLocking(1000));
+ DistributedLockService.destroy(name);
+ }
+
+ public void testSuspendLockingIsNotInterruptible() {
+
+ started = false;
+ gotLock = false;
+ exception = null;
+ wasFlagSet = false;
+
+ // Lock entire service in first thread
+ final String name = getUniqueName();
+ final DistributedLockService service =
+ DistributedLockService.create(name, dlstSystem);
+ assertTrue(service.suspendLocking(1000));
+
+ // Start second thread that tries to lock in second thread
+ Thread thread2 = new Thread(new Runnable() {
+ public void run() {
+ try {
+ started = true;
+ gotLock = service.suspendLocking(-1);
+ }
+ catch (VirtualMachineError e) {
+ SystemFailure.initiateFailure(e);
+ throw e;
+ }
+ catch (Throwable ex) {
+ exception = ex;
+ }
+ wasFlagSet = Thread.currentThread().isInterrupted();
+ }
+ });
+ thread2.start();
+
+ // Interrupt second thread
+ while (!started) Thread.yield();
+ thread2.interrupt();
+ // Expect it didn't get an exception and didn't lock the service
+ sleep(500);
+ assertFalse(gotLock);
+ assertNull(exception);
+
+ // Unlock entire service in first thread
+ service.resumeLocking();
+ ThreadUtils.join(thread2, 20 * 1000);
+
+ // Now thread2 should have gotten the lock, not the exception, but the
+ // thread's flag should be set
+ LogWriterUtils.getLogWriter().info("[testSuspendLockingIsNotInterruptible]" +
+ " gotLock=" + gotLock +
+ " wasFlagSet=" + wasFlagSet +
+ " exception=" + exception, exception);
+ assertTrue(gotLock);
+ assertNull(exception);
+ assertTrue(wasFlagSet);
+ }
+
+ /**
+ * Tests what happens when you attempt to lock a name on a lock
+ * service that has been destroyed.
+ *
+ * @author David Whitlock
+ */
+ public void testLockDestroyedService() {
+ String serviceName = this.getUniqueName();
+ DistributedLockService service =
+ DistributedLockService.create(serviceName, dlstSystem);
+ DistributedLockService.destroy(serviceName);
+ try {
+ boolean locked = service.lock("TEST", -1, -1);
+ fail("Lock of destroyed service returned: " + locked);
+
+ } catch (LockServiceDestroyedException ex) {
+ // pass...
+ }
+ }
+
+ public void testDepartedLastOwnerWithLease() {
+ final String serviceName = this.getUniqueName();
+
+ // Create service in this VM
+ DistributedLockService service =
+ DistributedLockService.create(serviceName, dlstSystem);
+ assertTrue(service.lock("key", -1, -1));
+ service.unlock("key");
+
+ // Create service in other VM
+ VM otherVm = Host.getHost(0).getVM(0);
+ otherVm.invoke(new SerializableRunnable() {
+ public void run() {
+ DistributedLockService service2 =
+ DistributedLockService.create(serviceName, dlstSystem);
+ service2.lock("key", -1, 360000);
+ service2.unlock("key");
+ // Wait for asynchronous messaging to complete
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException ex) {
+ fail("interrupted");
+ }
+ disconnectFromDS();
+ }
+ });
+
+ // Now lock back in this VM
+ assertTrue(service.lock("key", -1, -1));
+
+ }
+
+ public void testDepartedLastOwnerNoLease() {
+ final String serviceName = this.getUniqueName();
+
+ // Create service in this VM
+ DistributedLockService service =
+ DistributedLockService.create(serviceName, dlstSystem);
+ assertTrue(service.lock("key", -1, -1));
+ service.unlock("key");
+
+ // Create service in other VM
+ VM otherVm = Host.getHost(0).getVM(0);
+ otherVm.invoke(new SerializableRunnable() {
+ public void run() {
+ DistributedLockService service2 =
+ DistributedLockService.create(serviceName, dlstSystem);
+ service2.lock("key", -1, -1);
+ service2.unlock("key");
+ // Wait for asynchronous messaging to complete
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException ex) {
+ fail("interrupted");
+ }
+ disconnectFromDS();
+ }
+ });
+
+ // Now lock back in this VM
+ assertTrue(service.lock("key", -1, -1));
+
+ }
+
+ /**
+ * Tests for 32461 R3 StuckLocks can occur on locks with an expiration lease
+ * <p>
+ * VM-A locks/unlocks "lock", VM-B leases "lock" and disconnects, VM-C
+ * attempts to lock "lock" and old dlock throws StuckLockException. VM-C
+ * should now succeed in acquiring the lock.
+ */
+ public void testBug32461() throws Exception {
+ LogWriterUtils.getLogWriter().fine("[testBug32461] prepping");
+
+ final String serviceName = getUniqueName();
+ final Object objName = "32461";
+ final int VM_A = 0;
+ final int VM_B = 1;
+ final int VM_C = 2;
+
+ // VM-A locks/unlocks "lock"...
+ LogWriterUtils.getLogWriter().fine("[testBug32461] VM-A locks/unlocks '32461'");
+
+ Host.getHost(0).getVM(VM_A).invoke(new SerializableRunnable() {
+ public void run() {
+ remoteCreateService(serviceName);
+ final DistributedLockService service =
+ DistributedLockService.getServiceNamed(serviceName);
+ assertTrue(service.lock(objName, -1, Long.MAX_VALUE));
+ service.unlock(objName);
+ }
+ });
+
+ // VM-B leases "lock" and disconnects,
+ LogWriterUtils.getLogWriter().fine("[testBug32461] VM_B leases '32461' and disconnects");
+
+ Host.getHost(0).getVM(VM_B).invoke(new SerializableRunnable() {
+ public void run() {
+ remoteCreateService(serviceName);
+ final DistributedLockService service =
+ DistributedLockService.getServiceNamed(serviceName);
+ assertTrue(service.lock(objName, -1, Long.MAX_VALUE));
+ DistributedLockService.destroy(serviceName);
+ disconnectFromDS();
+ }
+ });
+
+ LogWriterUtils.getLogWriter().fine("[testBug32461] VM_C attempts to lock '32461'");
+
+ Host.getHost(0).getVM(VM_C).invoke(new SerializableRunnable() {
+ public void run() {
+ remoteCreateService(serviceName);
+ final DistributedLockService service =
+ DistributedLockService.getServiceNamed(serviceName);
+ assertTrue(service.lock(objName, -1, -1));
+ service.unlock(objName);
+ }
+ });
+ }
+
+ public void testNoStuckLock() {
+ final String serviceName = this.getUniqueName();
+ final Object keyWithLease = "key-with-lease";
+ final Object keyNoLease = "key-no-lease";
+
+ // Create service in this VM
+ DistributedLockService service =
+ DistributedLockService.create(serviceName, dlstSystem);
+
+ assertTrue(service.lock(keyWithLease, -1, -1));
+ service.unlock(keyWithLease);
+
+ assertTrue(service.lock(keyNoLease, -1, -1));
+ service.unlock(keyNoLease);
+
+ // Create service in other VM
+ VM otherVm = Host.getHost(0).getVM(0);
+ otherVm.invoke(new SerializableRunnable() {
+ public void run() {
+ DistributedLockService service2 =
+ DistributedLockService.create(serviceName, dlstSystem);
+ service2.lock(keyWithLease, -1, 360000);
+ service2.lock(keyNoLease, -1, -1);
+ disconnectFromDS();
+ }
+ });
+
+ // Now lock back in this VM... no stuck locks anymore
+ assertTrue(service.lock(keyWithLease, -1, -1));
+ service.unlock(keyWithLease);
+ assertTrue(service.lock(keyNoLease, -1, -1));
+ service.unlock(keyNoLease);
+ }
+
+ volatile boolean startedThread1_testReleaseOrphanedGrant;
+ volatile boolean releaseThread1_testReleaseOrphanedGrant;
+ volatile boolean startedThread2_testReleaseOrphanedGrant;
+ volatile boolean gotLockThread2_testReleaseOrphanedGrant;
+ /**
+ * Client requests lock and then interrupts lock request before processing
+ * the grant reply. This causes the Client to send a release msg to the
+ * grantor.
+ */
+ public void testReleaseOrphanedGrant_Local() {
+ DLockRequestProcessor.setDebugReleaseOrphanedGrant(true);
+ DLockRequestProcessor.setWaitToProcessDLockResponse(false);
+ try {
+ startedThread2_testReleaseOrphanedGrant = false;
+ gotLockThread2_testReleaseOrphanedGrant = false;
+ releaseThread1_testReleaseOrphanedGrant = false;
+
+ LogWriterUtils.getLogWriter().info("[testReleaseOrphanedGrant_Local] create lock service");
+ final String serviceName = getUniqueName();
+ final DistributedLockService service =
+ DistributedLockService.create(serviceName, dlstSystem);
+
+ // thread to get lock and wait and then unlock
+ final Thread thread1 = new Thread(new Runnable() {
+ public void run() {
+ LogWriterUtils.getLogWriter().info("[testReleaseOrphanedGrant_Local] get the lock");
+ assertTrue(service.lock("obj", -1, -1));
+ DLockRequestProcessor.setWaitToProcessDLockResponse(true);
+ startedThread1_testReleaseOrphanedGrant = true;
+ synchronized(Thread.currentThread()) {
+ while (!releaseThread1_testReleaseOrphanedGrant) {
+ try {
+ Thread.currentThread().wait();
+ } catch (InterruptedException ignore) {fail("interrupted");}
+ }
+ }
+ LogWriterUtils.getLogWriter().info("[testReleaseOrphanedGrant_Local] unlock the lock");
+ service.unlock("obj");
+ }
+ });
+ thread1.start();
+ while (!startedThread1_testReleaseOrphanedGrant) {
+ Thread.yield();
+ }
+
+ // thread to interrupt lockInterruptibly call to cause zombie grant
+ final Thread thread2 = new Thread(new Runnable() {
+ public void run() {
+ try {
+ LogWriterUtils.getLogWriter().info("[testReleaseOrphanedGrant_Local] call lockInterruptibly");
+ startedThread2_testReleaseOrphanedGrant = true;
+ assertFalse(service.lockInterruptibly("obj", -1, -1));
+ } catch (InterruptedException expected) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ });
+ thread2.start();
+ while (!startedThread2_testReleaseOrphanedGrant) {
+ Thread.yield();
+ }
+
+ // release first thread to unlock
+ LogWriterUtils.getLogWriter().info("[testReleaseOrphanedGrant_Local] release 1st thread");
+ sleep(500);
+ synchronized(thread1) {
+ releaseThread1_testReleaseOrphanedGrant = true;
+ thread1.notifyAll();
+ }
+ sleep(500);
+
+ // while first thread is stuck on waitToProcessDLockResponse,
+ // interrupt 2nd thread
+ LogWriterUtils.getLogWriter().info("[testReleaseOrphanedGrant_Local] interrupt 2nd thread");
+ thread2.interrupt();
+ ThreadUtils.join(thread2, 20 * 1000);
+
+ // release waitToProcessDLockResponse
+ LogWriterUtils.getLogWriter().info("[testReleaseOrphanedGrant_Local] process lock response");
+ sleep(500);
+ DLockRequestProcessor.setWaitToProcessDLockResponse(false);
+
+ // relock obj to make sure zombie release worked
+ LogWriterUtils.getLogWriter().info("[testReleaseOrphanedGrant_Local] verify lock not held");
+ assertTrue(service.lock("obj", 1000, -1));
+ }
+ finally {
+ DLockRequestProcessor.setDebugReleaseOrphanedGrant(false);
+ DLockRequestProcessor.setWaitToProcessDLockResponse(false);
+ }
+ }
+
+ static volatile Thread threadVM1_testReleaseOrphanedGrant_Remote;
+ static volatile Thread threadVM2_testReleaseOrphanedGrant_Remote;
+ static volatile boolean startedThreadVM1_testReleaseOrphanedGrant_Remote;
+ static volatile boolean releaseThreadVM1_testReleaseOrphanedGrant_Remote;
+ static volatile boolean unlockedThreadVM1_testReleaseOrphanedGrant_Remote;
+ static volatile boolean startedThreadVM2_testReleaseOrphanedGrant_Remote;
+ static volatile boolean gotLockThreadVM2_testReleaseOrphanedGrant_Remote;
+ public void testReleaseOrphanedGrant_Remote() {
+ doTestReleaseOrphanedGrant_Remote(false);
+ }
+ public void testReleaseOrphanedGrant_RemoteWithDestroy() {
+ doTestReleaseOrphanedGrant_Remote(true);
+ }
+ public void doTestReleaseOrphanedGrant_Remote(final boolean destroyLockService) {
+ final VM vm1 = Host.getHost(0).getVM(0);
+ final VM vm2 = Host.getHost(0).getVM(1);
+
+ try {
+ LogWriterUtils.getLogWriter().info("[testReleaseOrphanedGrant_Remote] create lock service");
+ final String serviceName = getUniqueName();
+ final DistributedLockService service =
+ DistributedLockService.create(serviceName, dlstSystem);
+
+ // lock and unlock to make sure this vm is grantor
+ assertTrue(service.lock("obj", -1, -1));
+ service.unlock("obj");
+
+ // thread to get lock and wait and then unlock
+ vm1.invokeAsync(new SerializableRunnable() {
+ public void run() {
+ LogWriterUtils.getLogWriter().info("[testReleaseOrphanedGrant_Remote] get the lock");
+ threadVM1_testReleaseOrphanedGrant_Remote = Thread.currentThread();
+ connectDistributedSystem();
+ DistributedLockService service_vm1 =
+ DistributedLockService.create(serviceName, getSystem());
+ assertTrue(service_vm1.lock("obj", -1, -1));
+ synchronized(threadVM1_testReleaseOrphanedGrant_Remote) {
+ while (!releaseThreadVM1_testReleaseOrphanedGrant_Remote) {
+ try {
+ startedThreadVM1_testReleaseOrphanedGrant_Remote = true;
+ Thread.currentThread().wait();
+ } catch (InterruptedException ignore) {fail("interrupted");}
+ }
+ }
+ LogWriterUtils.getLogWriter().info("[testReleaseOrphanedGrant_Remote] unlock the lock");
+ service_vm1.unlock("obj");
+ unlockedThreadVM1_testReleaseOrphanedGrant_Remote = true;
+ }
+ });
+ vm1.invoke(new SerializableRunnable() {
+ public void run() {
+ while (!startedThreadVM1_testReleaseOrphanedGrant_Remote) {
+ Thread.yield();
+ }
+ }
+ });
+ sleep(500);
+
+ // thread to interrupt lockInterruptibly call to cause zombie grant
+ vm2.invokeAsync(new SerializableRunnable() {
+ public void run() {
+ LogWriterUtils.getLogWriter().info("[testReleaseOrphanedGrant_Remote] call lockInterruptibly");
+ threadVM2_testReleaseOrphanedGrant_Remote = Thread.currentThread();
+ DistributedLockService service_vm2 =
+ DistributedLockService.create(serviceName, getSystem());
+ startedThreadVM2_testReleaseOrphanedGrant_Remote = true;
+ try {
+ DLockRequestProcessor.setDebugReleaseOrphanedGrant(true);
+ DLockRequestProcessor.setWaitToProcessDLockResponse(true);
+ assertFalse(service_vm2.lockInterruptibly("obj", -1, -1));
+ } catch (InterruptedException expected) {Thread.currentThread().interrupt();}
+ }
+ });
+ vm2.invoke(new SerializableRunnable() {
+ public void run() {
+ while (!startedThreadVM2_testReleaseOrphanedGrant_Remote) {
+ Thread.yield();
+ }
+ }
+ });
+ sleep(500);
+
+ // release first thread to unlock
+ vm1.invoke(new SerializableRunnable() {
+ public void run() {
+ LogWriterUtils.getLogWriter().info("[testReleaseOrphanedGrant_Remote] release 1st thread");
+ synchronized(threadVM1_testReleaseOrphanedGrant_Remote) {
+ releaseThreadVM1_testReleaseOrphanedGrant_Remote = true;
+ threadVM1_testReleaseOrphanedGrant_Remote.notifyAll();
+ }
+ }
+ });
+ sleep(500); // lock is being released, grantor will grant lock to vm2
+
+ // while first thread is stuck on waitToProcessDLockResponse,
+ // interrupt 2nd thread
+ vm2.invoke(new SerializableRunnable() {
+ public void run() {
+ LogWriterUtils.getLogWriter().info("[testReleaseOrphanedGrant_Remote] interrupt 2nd thread");
+ threadVM2_testReleaseOrphanedGrant_Remote.interrupt();
+ ThreadUtils.join(threadVM2_testReleaseOrphanedGrant_Remote,
+ 5 * 60 * 1000);
+ if (destroyLockService) {
+ LogWriterUtils.getLogWriter().info("[testReleaseOrphanedGrant_Remote] destroy lock service");
+ DistributedLockService.destroy(serviceName);
+ assertNull(DistributedLockService.getServiceNamed(serviceName));
+ }
+ }
+ });
+ sleep(500); // grant is blocked while reply processor is being destroyed
+
+ // release waitToProcessDLockResponse
+ vm2.invoke(new SerializableRunnable() {
+ public void run() {
+ LogWriterUtils.getLogWriter().info("[testReleaseOrphanedGrant_Remote] process lock response");
+ DLockRequestProcessor.setWaitToProcessDLockResponse(false);
+ }
+ });
+ sleep(500); // process grant and send zombie release to grantor
+
+ // relock obj to make sure zombie release worked
+ LogWriterUtils.getLogWriter().info("[testReleaseOrphanedGrant_Remote] verify lock not held");
+ assertTrue(service.lock("obj", 1000, -1));
+ }
+ finally {
+ vm2.invoke(new SerializableRunnable() {
+ public void run() {
+ LogWriterUtils.getLogWriter().info("[testReleaseOrphanedGrant_Remote] clean up DebugReleaseOrphanedGrant");
+ DLockRequestProcessor.setDebugReleaseOrphanedGrant(false);
+ DLockRequestProcessor.setWaitToProcessDLockResponse(false);
+ }
+ });
+ }
+ }
+
+ public void testDestroyLockServiceAfterGrantResponse() throws Throwable {
+ Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+
+ final String serviceName = getUniqueName();
+
+ vm0.invoke(new SerializableRunnable("Create the grantor") {
+
+ public void run() {
+ connectDistributedSystem();
+ final DistributedLockService service =
+ DistributedLockService.create(serviceName, dlstSystem);
+
+ // lock and unlock to make sure this vm is grantor
+ assertTrue(service.lock("obj", -1, -1));
+ service.unlock("obj");
+ }
+ });
+
+ DistributionMessageObserver.setInstance(new DistributionMessageObserver() {
+
+ @Override
+ public void beforeProcessMessage(DistributionManager dm,
+ DistributionMessage message) {
+ if(message instanceof DLockResponseMessage) {
+ DistributedLockService.destroy(serviceName);
+ }
+ }
+ });
+
+ connectDistributedSystem();
+ final DistributedLockService service =
+ DistributedLockService.create(serviceName, dlstSystem);
+ try {
+ service.lock("obj", -1, -1);
+ fail("The lock service should have been destroyed");
+ } catch(LockServiceDestroyedException expected) {
+ //Do nothing
+ }
+
+ vm0.invoke(new SerializableRunnable("check to make sure the lock is not orphaned") {
+
+ public void run() {
+ final DistributedLockService service =
+ DistributedLockService.getServiceNamed(serviceName);
+
+ // lock and unlock to make sure this vm is grantor
+ assertTrue(service.lock("obj", -1, -1));
+ service.unlock("obj");
<TRUNCATED>