You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/09/30 06:50:54 UTC

[pulsar] branch master updated: PIP-45: On LockResource.updateValue() revalidate the lock if needed (#12190)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 693a066  PIP-45: On LockResource.updateValue() revalidate the lock if needed (#12190)
693a066 is described below

commit 693a066d73ea4012fb2bb750d7450474f210cccd
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Sep 30 00:50:09 2021 -0600

    PIP-45: On LockResource.updateValue() revalidate the lock if needed (#12190)
---
 .../coordination/impl/LockManagerImpl.java         |  6 +--
 .../coordination/impl/ResourceLockImpl.java        | 61 ++++++++++++----------
 .../apache/pulsar/metadata/LockManagerTest.java    | 52 ++++++++++++++++++
 3 files changed, 87 insertions(+), 32 deletions(-)

diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
index 94f1b44..f45c0c7 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
@@ -84,10 +84,10 @@ class LockManagerImpl<T> implements LockManager<T> {
 
     @Override
     public CompletableFuture<ResourceLock<T>> acquireLock(String path, T value) {
-        ResourceLockImpl<T> lock = new ResourceLockImpl<>(store, serde, path, value);
+        ResourceLockImpl<T> lock = new ResourceLockImpl<>(store, serde, path);
 
         CompletableFuture<ResourceLock<T>> result = new CompletableFuture<>();
-        lock.acquire().thenRun(() -> {
+        lock.acquire(value).thenRun(() -> {
             synchronized (LockManagerImpl.this) {
                 if (state == State.Ready) {
                     locks.put(path, lock);
@@ -125,7 +125,7 @@ class LockManagerImpl<T> implements LockManager<T> {
             if (se == SessionEvent.SessionReestablished) {
                 log.info("Metadata store session has been re-established. Revalidating all the existing locks.");
                 for (ResourceLockImpl<T> lock : locks.values()) {
-                    futures.add(lock.revalidate());
+                    futures.add(lock.revalidate(lock.getValue()));
                 }
 
             } else if (se == SessionEvent.Reconnected) {
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
index 282f288..677ace7 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
@@ -54,11 +54,10 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
 
     private State state;
 
-    public ResourceLockImpl(MetadataStoreExtended store, MetadataSerde<T> serde, String path, T value) {
+    public ResourceLockImpl(MetadataStoreExtended store, MetadataSerde<T> serde, String path) {
         this.store = store;
         this.serde = serde;
         this.path = path;
-        this.value = value;
         this.version = -1;
         this.expiredFuture = new CompletableFuture<>();
         this.state = State.Init;
@@ -71,20 +70,7 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
 
     @Override
     public synchronized CompletableFuture<Void> updateValue(T newValue) {
-        byte[] payload;
-        try {
-            payload = serde.serialize(path, newValue);
-        } catch (Throwable t) {
-            return FutureUtils.exception(t);
-        }
-
-        return store.put(path, payload, Optional.of(version))
-                .thenAccept(stat -> {
-                    synchronized (ResourceLockImpl.this) {
-                        value = newValue;
-                        version = stat.getVersion();
-                    }
-                });
+       return acquire(newValue);
     }
 
     @Override
@@ -135,13 +121,13 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
         return path.hashCode();
     }
 
-    synchronized CompletableFuture<Void> acquire() {
+    synchronized CompletableFuture<Void> acquire(T newValue) {
         CompletableFuture<Void> result = new CompletableFuture<>();
-        acquireWithNoRevalidation()
+        acquireWithNoRevalidation(newValue)
                 .thenRun(() -> result.complete(null))
                 .exceptionally(ex -> {
                     if (ex.getCause() instanceof LockBusyException) {
-                        revalidate()
+                        revalidate(newValue)
                                 .thenAccept(__ -> result.complete(null))
                                 .exceptionally(ex1 -> {
                                    result.completeExceptionally(ex1);
@@ -157,20 +143,21 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
     }
 
     // Simple operation of acquiring the lock with no retries, or checking for the lock content
-    private CompletableFuture<Void> acquireWithNoRevalidation() {
+    private CompletableFuture<Void> acquireWithNoRevalidation(T newValue) {
         byte[] payload;
         try {
-            payload = serde.serialize(path, value);
+            payload = serde.serialize(path, newValue);
         } catch (Throwable t) {
             return FutureUtils.exception(t);
         }
 
         CompletableFuture<Void> result = new CompletableFuture<>();
-        store.put(path, payload, Optional.of(-1L), EnumSet.of(CreateOption.Ephemeral))
+        store.put(path, payload, Optional.of(version), EnumSet.of(CreateOption.Ephemeral))
                 .thenAccept(stat -> {
                     synchronized (ResourceLockImpl.this) {
                         state = State.Valid;
                         version = stat.getVersion();
+                        value = newValue;
                     }
                     log.info("Acquired resource lock on {}", path);
                     result.complete(null);
@@ -194,7 +181,7 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
         }
 
         log.info("Lock on resource {} was invalidated", path);
-        revalidate()
+        revalidate(value)
                 .thenRun(() -> log.info("Successfully revalidated the lock on {}", path))
                 .exceptionally(ex -> {
                     synchronized (ResourceLockImpl.this) {
@@ -218,18 +205,20 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
         if (revalidateAfterReconnection) {
             revalidateAfterReconnection = false;
             log.warn("Revalidate lock at {} after reconnection", path);
-            return revalidate();
+            return revalidate(value);
         } else {
             return CompletableFuture.completedFuture(null);
         }
     }
 
-    synchronized CompletableFuture<Void> revalidate() {
+    synchronized CompletableFuture<Void> revalidate(T newValue) {
         return store.get(path)
                 .thenCompose(optGetResult -> {
                     if (!optGetResult.isPresent()) {
                         // The lock just disappeared, try to acquire it again
-                        return acquireWithNoRevalidation()
+                        // Reset the expectation on the version
+                        setVersion(-1L);
+                        return acquireWithNoRevalidation(newValue)
                                 .thenRun(() -> log.info("Successfully re-acquired missing lock at {}", path));
                     }
 
@@ -248,7 +237,7 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
                     }
 
                     synchronized (ResourceLockImpl.this) {
-                        if (value.equals(existingValue)) {
+                        if (newValue.equals(existingValue)) {
                             // The lock value is still the same, that means that we're the
                             // logical "owners" of the lock.
 
@@ -256,12 +245,18 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
                                 // If the new lock belongs to the same session, there's no
                                 // need to recreate it.
                                 version = res.getStat().getVersion();
+                                value = newValue;
+                                return CompletableFuture.completedFuture(null);
                             } else {
                                 // The lock needs to get recreated since it belong to an earlier
                                 // session which maybe expiring soon
                                 log.info("Deleting stale lock at {}", path);
                                 return store.delete(path, Optional.of(res.getStat().getVersion()))
-                                        .thenCompose(__ -> acquireWithNoRevalidation())
+                                        .thenRun(() ->
+                                            // Reset the expectation that the key is not there anymore
+                                            setVersion(-1L)
+                                        )
+                                        .thenCompose(__ -> acquireWithNoRevalidation(newValue))
                                         .thenRun(() -> log.info("Successfully re-acquired stale lock at {}", path));
                             }
                         }
@@ -276,9 +271,17 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
                         }
 
                         return store.delete(path, Optional.of(res.getStat().getVersion()))
-                                .thenCompose(__ -> acquireWithNoRevalidation())
+                                .thenRun(() ->
+                                    // Reset the expectation that the key is not there anymore
+                                    setVersion(-1L)
+                                )
+                                .thenCompose(__ -> acquireWithNoRevalidation(newValue))
                                 .thenRun(() -> log.info("Successfully re-acquired lock at {}", path));
                     }
                 });
     }
+
+    private synchronized void setVersion(long version) {
+        this.version = version;
+    }
 }
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java
index f215750..da080ba 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java
@@ -149,6 +149,58 @@ public class LockManagerTest extends BaseMetadataStoreTest {
     }
 
     @Test(dataProvider = "impl")
+    public void updateValueWhenVersionIsOutOfSync(String provider, Supplier<String> urlSupplier) throws Exception {
+        @Cleanup
+        MetadataStoreExtended store = MetadataStoreExtended.create(urlSupplier.get(),
+                MetadataStoreConfig.builder().build());
+
+        MetadataCache<String> cache = store.getMetadataCache(String.class);
+
+        @Cleanup
+        CoordinationService coordinationService = new CoordinationServiceImpl(store);
+
+        @Cleanup
+        LockManager<String> lockManager = coordinationService.getLockManager(String.class);
+
+        ResourceLock<String> lock = lockManager.acquireLock("/my/path/1", "lock-1").join();
+        assertEquals(lock.getValue(), "lock-1");
+        assertEquals(cache.get("/my/path/1").join().get(), "lock-1");
+
+        store.put("/my/path/1",
+                ObjectMapperFactory.getThreadLocal().writeValueAsBytes("value-2"),
+                Optional.empty(), EnumSet.of(CreateOption.Ephemeral)).join();
+
+        lock.updateValue("value-2").join();
+        assertEquals(lock.getValue(), "value-2");
+        assertEquals(cache.get("/my/path/1").join().get(), "value-2");
+    }
+
+    @Test(dataProvider = "impl")
+    public void updateValueWhenKeyDisappears(String provider, Supplier<String> urlSupplier) throws Exception {
+        @Cleanup
+        MetadataStoreExtended store = MetadataStoreExtended.create(urlSupplier.get(),
+                MetadataStoreConfig.builder().build());
+
+        MetadataCache<String> cache = store.getMetadataCache(String.class);
+
+        @Cleanup
+        CoordinationService coordinationService = new CoordinationServiceImpl(store);
+
+        @Cleanup
+        LockManager<String> lockManager = coordinationService.getLockManager(String.class);
+
+        ResourceLock<String> lock = lockManager.acquireLock("/my/path/1", "lock-1").join();
+        assertEquals(lock.getValue(), "lock-1");
+        assertEquals(cache.get("/my/path/1").join().get(), "lock-1");
+
+        store.delete("/my/path/1", Optional.empty()).join();
+
+        lock.updateValue("value-2").join();
+        assertEquals(lock.getValue(), "value-2");
+        assertEquals(cache.get("/my/path/1").join().get(), "value-2");
+    }
+
+    @Test(dataProvider = "impl")
     public void revalidateLockWithinSameSession(String provider, Supplier<String> urlSupplier) throws Exception {
         @Cleanup
         MetadataStoreExtended store = MetadataStoreExtended.create(urlSupplier.get(),