You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ti...@apache.org on 2023/04/01 14:55:02 UTC

[curator] branch master updated: CURATOR-621: Fix write acquire after downgrade InterProcessReadWriteLock (#445)

This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/curator.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c283bd9 CURATOR-621: Fix write acquire after downgrade InterProcessReadWriteLock (#445)
2c283bd9 is described below

commit 2c283bd97fbc50ce592231dcf846ece4190b7948
Author: Kezhu Wang <ke...@gmail.com>
AuthorDate: Sat Apr 1 22:54:54 2023 +0800

    CURATOR-621: Fix write acquire after downgrade InterProcessReadWriteLock (#445)
    
    Currently, in downgrading the write lock of InterProcessReadWriteLock, the read lock could have a larger sorting sequence than the contending write-acquire; this cause the contending write-acquire to succeed after downgrading.
    
    This commit solves this by using the write lock's sorting sequence for read lock in downgrading.
---
 .../recipes/locks/InterProcessReadWriteLock.java   | 21 +++++------
 .../recipes/locks/StandardLockInternalsDriver.java | 15 +++++++-
 .../locks/TestInterProcessReadWriteLock.java       | 43 ++++++++++++++++++++++
 3 files changed, 66 insertions(+), 13 deletions(-)

diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessReadWriteLock.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessReadWriteLock.java
index 53718eb5..b0a8e178 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessReadWriteLock.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessReadWriteLock.java
@@ -114,17 +114,7 @@ public class InterProcessReadWriteLock
     {
         public WriteLock(CuratorFramework client, String basePath, byte[] lockData)
         {
-            super(client, basePath, WRITE_LOCK_NAME, lockData, 1, new SortingLockInternalsDriver() {
-                @Override
-                public PredicateResults getsTheLock(
-                    CuratorFramework client,
-                    List<String> children,
-                    String sequenceNodeName,
-                    int maxLeases
-                ) throws Exception {
-                    return super.getsTheLock(client, children, sequenceNodeName, maxLeases);
-                }
-            });
+            super(client, basePath, WRITE_LOCK_NAME, lockData, 1, new SortingLockInternalsDriver());
         }
 
         @Override
@@ -138,6 +128,15 @@ public class InterProcessReadWriteLock
         public ReadLock(CuratorFramework client, String basePath, byte[] lockData, WriteLock writeLock)
         {
             super(client, basePath, READ_LOCK_NAME, lockData, Integer.MAX_VALUE, new SortingLockInternalsDriver() {
+                @Override
+                protected String getSortingSequence() {
+                    String writePath = writeLock.getLockPath();
+                    if (writePath != null) {
+                        return fixForSorting(writePath, WRITE_LOCK_NAME);
+                    }
+                    return null;
+                }
+
                 @Override
                 public PredicateResults getsTheLock(
                     CuratorFramework client,
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/StandardLockInternalsDriver.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/StandardLockInternalsDriver.java
index 2d785b92..b9e5d52a 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/StandardLockInternalsDriver.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/StandardLockInternalsDriver.java
@@ -42,17 +42,28 @@ public class StandardLockInternalsDriver implements LockInternalsDriver
         return new PredicateResults(pathToWatch, getsTheLock);
     }
 
+    protected String getSortingSequence() {
+        return null;
+    }
+
     @Override
     public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
     {
+
+        CreateMode createMode = CreateMode.EPHEMERAL_SEQUENTIAL;
+        String sequence = getSortingSequence();
+        if (sequence != null) {
+            path += sequence;
+            createMode = CreateMode.EPHEMERAL;
+        }
         String ourPath;
         if ( lockNodeBytes != null )
         {
-            ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
+            ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(createMode).forPath(path, lockNodeBytes);
         }
         else
         {
-            ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
+            ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(createMode).forPath(path);
         }
         return ourPath;
     }
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java
index a54bd401..f1dbe456 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java
@@ -23,6 +23,7 @@ import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 import com.google.common.collect.Lists;
@@ -45,6 +46,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class TestInterProcessReadWriteLock extends BaseClassForTests
@@ -239,6 +241,47 @@ public class TestInterProcessReadWriteLock extends BaseClassForTests
         }
     }
 
+    @Test
+    public void testContendingDowngrading() throws Exception
+    {
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        ExecutorService executor = Executors.newCachedThreadPool();
+        try
+        {
+            client.start();
+
+            InterProcessReadWriteLock lock1 = new InterProcessReadWriteLock(client, "/lock");
+            lock1.writeLock().acquire();
+
+            CountDownLatch ready = new CountDownLatch(1);
+            Future<?> writeAcquire = executor.submit(() -> {
+                ready.countDown();
+                InterProcessReadWriteLock lock2 = new InterProcessReadWriteLock(client, "/lock");
+                lock2.writeLock().acquire();
+                fail("expect no acquire");
+                return null;
+            });
+
+            ready.await();
+            // Let lock2 have chance to do write-acquire before downgrading.
+            Thread.sleep(20);
+
+            assertTrue(lock1.readLock().acquire(5, TimeUnit.SECONDS));
+
+            lock1.writeLock().release();
+            // We still hold read lock, other write-acquire should block.
+            assertThrows(TimeoutException.class, () -> {
+                // Let lock2 have chance to respond to write-release
+                writeAcquire.get(20, TimeUnit.MILLISECONDS);
+            });
+        }
+        finally
+        {
+            TestCleanState.closeAndTestClean(client);
+            executor.shutdown();
+        }
+    }
+
     @Test
     public void testBasic() throws Exception
     {