You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/09/20 07:49:24 UTC

[pulsar] 02/02: [fix][metadata] Cleanup state when lock revalidation gets `LockBusyException` (#17700)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 65c059a511097f1b60f48ed8f67afd6c363f017d
Author: Qiang Zhao <ma...@apache.org>
AuthorDate: Tue Sep 20 15:41:15 2022 +0800

    [fix][metadata] Cleanup state when lock revalidation gets `LockBusyException` (#17700)
    
    ### Motivation
    
    In the production environment,  we found two brokers holding the same valid locks. and one has an exceptional revalidate future with `lockBusyException`. after reading the code, there may forget the reset the cache and complete expire exception when getting lockBusyException.
    
    (cherry picked from commit 955ae340293185ba17b9d5baf1a0f5bd7e547186)
---
 .../coordination/impl/LockManagerImpl.java         |  2 +-
 .../coordination/impl/ResourceLockImpl.java        | 48 ++++++++++--------
 .../apache/pulsar/metadata/LockManagerTest.java    | 59 ++++++++++++++++++++++
 3 files changed, 86 insertions(+), 23 deletions(-)

diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
index b1a77aa1304..e677e99faf6 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
@@ -125,7 +125,7 @@ class LockManagerImpl<T> implements LockManager<T> {
             if (se == SessionEvent.SessionReestablished) {
                 log.info("Metadata store session has been re-established. Revalidating all the existing locks.");
                 for (ResourceLockImpl<T> lock : locks.values()) {
-                    futures.add(lock.revalidate(lock.getValue()));
+                    futures.add(lock.revalidate(lock.getValue(), true));
                 }
 
             } else if (se == SessionEvent.Reconnected) {
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
index 22eaccc278b..134ee8030d9 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
@@ -23,6 +23,7 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.MetadataSerde;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
@@ -127,7 +128,7 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
                 .thenRun(() -> result.complete(null))
                 .exceptionally(ex -> {
                     if (ex.getCause() instanceof LockBusyException) {
-                        revalidate(newValue)
+                        revalidate(newValue, false)
                                 .thenAccept(__ -> result.complete(null))
                                 .exceptionally(ex1 -> {
                                    result.completeExceptionally(ex1);
@@ -184,38 +185,21 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
         }
 
         log.info("Lock on resource {} was invalidated", path);
-        revalidate(value)
-                .thenRun(() -> log.info("Successfully revalidated the lock on {}", path))
-                .exceptionally(ex -> {
-                    synchronized (ResourceLockImpl.this) {
-                        if (ex.getCause() instanceof BadVersionException) {
-                            log.warn("Failed to revalidate the lock at {}. Marked as expired", path);
-                            state = State.Released;
-                            expiredFuture.complete(null);
-                        } else {
-                            // We failed to revalidate the lock due to connectivity issue
-                            // Continue assuming we hold the lock, until we can revalidate it, either
-                            // on Reconnected or SessionReestablished events.
-                            revalidateAfterReconnection = true;
-                            log.warn("Failed to revalidate the lock at {}. Retrying later on reconnection {}", path,
-                                    ex.getCause().getMessage());
-                        }
-                    }
-                    return null;
-                });
+        revalidate(value, true)
+                .thenRun(() -> log.info("Successfully revalidated the lock on {}", path));
     }
 
     synchronized CompletableFuture<Void> revalidateIfNeededAfterReconnection() {
         if (revalidateAfterReconnection) {
             revalidateAfterReconnection = false;
             log.warn("Revalidate lock at {} after reconnection", path);
-            return revalidate(value);
+            return revalidate(value, true);
         } else {
             return CompletableFuture.completedFuture(null);
         }
     }
 
-    synchronized CompletableFuture<Void> revalidate(T newValue) {
+    synchronized CompletableFuture<Void> revalidate(T newValue, boolean revalidateAfterReconnection) {
         if (revalidateFuture == null || revalidateFuture.isDone()) {
             revalidateFuture = doRevalidate(newValue);
         } else {
@@ -233,6 +217,26 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
             });
             revalidateFuture = newFuture;
         }
+        revalidateFuture.exceptionally(ex -> {
+            synchronized (ResourceLockImpl.this) {
+                Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+                if (!revalidateAfterReconnection || realCause instanceof BadVersionException
+                        || realCause instanceof LockBusyException) {
+                    log.warn("Failed to revalidate the lock at {}. Marked as expired. {}",
+                            path, realCause.getMessage());
+                    state = State.Released;
+                    expiredFuture.complete(null);
+                } else {
+                    // We failed to revalidate the lock due to connectivity issue
+                    // Continue assuming we hold the lock, until we can revalidate it, either
+                    // on Reconnected or SessionReestablished events.
+                    ResourceLockImpl.this.revalidateAfterReconnection = true;
+                    log.warn("Failed to revalidate the lock at {}. Retrying later on reconnection {}", path,
+                            realCause.getMessage());
+                }
+            }
+            return null;
+        });
         return revalidateFuture;
     }
 
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java
index 29e9af5d70e..1a4e4e7e9df 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.metadata;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -29,6 +31,7 @@ import java.util.Optional;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
 import lombok.Cleanup;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -293,4 +296,60 @@ public class LockManagerTest extends BaseMetadataStoreTest {
             assertEquals(new String(store1.get(path2).join().get().getValue()), "\"value-1\"");
         });
     }
+
+    @Test(dataProvider = "impl")
+    public void testCleanUpStateWhenRevalidationGotLockBusy(String provider, Supplier<String> urlSupplier)
+            throws Exception {
+
+        if (provider.equals("Memory") || provider.equals("RocksDB")) {
+            // Local memory provider doesn't really have the concept of multiple sessions
+            return;
+        }
+
+        @Cleanup
+        MetadataStoreExtended store1 = MetadataStoreExtended.create(urlSupplier.get(),
+                MetadataStoreConfig.builder().build());
+        @Cleanup
+        MetadataStoreExtended store2 = MetadataStoreExtended.create(urlSupplier.get(),
+                MetadataStoreConfig.builder().build());
+
+        @Cleanup
+        CoordinationService cs1 = new CoordinationServiceImpl(store1);
+        @Cleanup
+        LockManager<String> lm1 = cs1.getLockManager(String.class);
+
+        @Cleanup
+        CoordinationService cs2 = new CoordinationServiceImpl(store2);
+        @Cleanup
+        LockManager<String> lm2 = cs2.getLockManager(String.class);
+
+        String path1 = newKey();
+
+        ResourceLock<String> lock1 = lm1.acquireLock(path1, "value-1").join();
+        AtomicReference<ResourceLock<String>> lock2 = new AtomicReference<>();
+        // lock 2 will steal the distributed lock first.
+        Awaitility.await().until(()-> {
+            // Ensure steal the lock success.
+            try {
+                lock2.set(lm2.acquireLock(path1, "value-1").join());
+                return true;
+            } catch (Exception ex) {
+                return false;
+            }
+        });
+
+        // Since we can steal the lock repeatedly, we don't know which one will get it.
+        // But we can verify the final state.
+        Awaitility.await().untilAsserted(() -> {
+            if (lock1.getLockExpiredFuture().isDone()) {
+                assertTrue(lm1.listLocks(path1).join().isEmpty());
+                assertFalse(lock2.get().getLockExpiredFuture().isDone());
+            } else if (lock2.get().getLockExpiredFuture().isDone()) {
+                assertTrue(lm2.listLocks(path1).join().isEmpty());
+                assertFalse(lock1.getLockExpiredFuture().isDone());
+            } else {
+                fail("unexpected behaviour");
+            }
+        });
+    }
 }