You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ti...@apache.org on 2023/04/01 14:55:02 UTC
[curator] branch master updated: CURATOR-621: Fix write acquire after downgrade InterProcessReadWriteLock (#445)
This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/curator.git
The following commit(s) were added to refs/heads/master by this push:
new 2c283bd9 CURATOR-621: Fix write acquire after downgrade InterProcessReadWriteLock (#445)
2c283bd9 is described below
commit 2c283bd97fbc50ce592231dcf846ece4190b7948
Author: Kezhu Wang <ke...@gmail.com>
AuthorDate: Sat Apr 1 22:54:54 2023 +0800
CURATOR-621: Fix write acquire after downgrade InterProcessReadWriteLock (#445)
Currently, in downgrading the write lock of InterProcessReadWriteLock, the read lock could have a larger sorting sequence than the contending write-acquire; this cause the contending write-acquire to succeed after downgrading.
This commit solves this by using the write lock's sorting sequence for read lock in downgrading.
---
.../recipes/locks/InterProcessReadWriteLock.java | 21 +++++------
.../recipes/locks/StandardLockInternalsDriver.java | 15 +++++++-
.../locks/TestInterProcessReadWriteLock.java | 43 ++++++++++++++++++++++
3 files changed, 66 insertions(+), 13 deletions(-)
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessReadWriteLock.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessReadWriteLock.java
index 53718eb5..b0a8e178 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessReadWriteLock.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessReadWriteLock.java
@@ -114,17 +114,7 @@ public class InterProcessReadWriteLock
{
public WriteLock(CuratorFramework client, String basePath, byte[] lockData)
{
- super(client, basePath, WRITE_LOCK_NAME, lockData, 1, new SortingLockInternalsDriver() {
- @Override
- public PredicateResults getsTheLock(
- CuratorFramework client,
- List<String> children,
- String sequenceNodeName,
- int maxLeases
- ) throws Exception {
- return super.getsTheLock(client, children, sequenceNodeName, maxLeases);
- }
- });
+ super(client, basePath, WRITE_LOCK_NAME, lockData, 1, new SortingLockInternalsDriver());
}
@Override
@@ -138,6 +128,15 @@ public class InterProcessReadWriteLock
public ReadLock(CuratorFramework client, String basePath, byte[] lockData, WriteLock writeLock)
{
super(client, basePath, READ_LOCK_NAME, lockData, Integer.MAX_VALUE, new SortingLockInternalsDriver() {
+ @Override
+ protected String getSortingSequence() {
+ String writePath = writeLock.getLockPath();
+ if (writePath != null) {
+ return fixForSorting(writePath, WRITE_LOCK_NAME);
+ }
+ return null;
+ }
+
@Override
public PredicateResults getsTheLock(
CuratorFramework client,
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/StandardLockInternalsDriver.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/StandardLockInternalsDriver.java
index 2d785b92..b9e5d52a 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/StandardLockInternalsDriver.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/StandardLockInternalsDriver.java
@@ -42,17 +42,28 @@ public class StandardLockInternalsDriver implements LockInternalsDriver
return new PredicateResults(pathToWatch, getsTheLock);
}
+ protected String getSortingSequence() {
+ return null;
+ }
+
@Override
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
{
+
+ CreateMode createMode = CreateMode.EPHEMERAL_SEQUENTIAL;
+ String sequence = getSortingSequence();
+ if (sequence != null) {
+ path += sequence;
+ createMode = CreateMode.EPHEMERAL;
+ }
String ourPath;
if ( lockNodeBytes != null )
{
- ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
+ ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(createMode).forPath(path, lockNodeBytes);
}
else
{
- ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
+ ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(createMode).forPath(path);
}
return ourPath;
}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java
index a54bd401..f1dbe456 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java
@@ -23,6 +23,7 @@ import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import com.google.common.collect.Lists;
@@ -45,6 +46,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
public class TestInterProcessReadWriteLock extends BaseClassForTests
@@ -239,6 +241,47 @@ public class TestInterProcessReadWriteLock extends BaseClassForTests
}
}
+ @Test
+ public void testContendingDowngrading() throws Exception
+ {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ ExecutorService executor = Executors.newCachedThreadPool();
+ try
+ {
+ client.start();
+
+ InterProcessReadWriteLock lock1 = new InterProcessReadWriteLock(client, "/lock");
+ lock1.writeLock().acquire();
+
+ CountDownLatch ready = new CountDownLatch(1);
+ Future<?> writeAcquire = executor.submit(() -> {
+ ready.countDown();
+ InterProcessReadWriteLock lock2 = new InterProcessReadWriteLock(client, "/lock");
+ lock2.writeLock().acquire();
+ fail("expect no acquire");
+ return null;
+ });
+
+ ready.await();
+ // Let lock2 have chance to do write-acquire before downgrading.
+ Thread.sleep(20);
+
+ assertTrue(lock1.readLock().acquire(5, TimeUnit.SECONDS));
+
+ lock1.writeLock().release();
+ // We still hold read lock, other write-acquire should block.
+ assertThrows(TimeoutException.class, () -> {
+ // Let lock2 have chance to respond to write-release
+ writeAcquire.get(20, TimeUnit.MILLISECONDS);
+ });
+ }
+ finally
+ {
+ TestCleanState.closeAndTestClean(client);
+ executor.shutdown();
+ }
+ }
+
@Test
public void testBasic() throws Exception
{