You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/09/20 07:49:22 UTC

[pulsar] branch branch-2.10 updated (d2793e4618d -> 65c059a5110)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from d2793e4618d [cleanup][owasp] Supress false positive netty-tcnative (#17282)
     new 0529c572c94 [fix][metadata] Set revalidateAfterReconnection true for certain failures (#17664)
     new 65c059a5110 [fix][metadata] Cleanup state when lock revalidation gets `LockBusyException` (#17700)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../coordination/impl/LockManagerImpl.java         |  2 +-
 .../coordination/impl/ResourceLockImpl.java        | 47 +++++++++--------
 .../apache/pulsar/metadata/LockManagerTest.java    | 59 ++++++++++++++++++++++
 3 files changed, 86 insertions(+), 22 deletions(-)


[pulsar] 01/02: [fix][metadata] Set revalidateAfterReconnection true for certain failures (#17664)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0529c572c94d55aff107eef831895e80dc242770
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Sun Sep 18 06:27:59 2022 -0700

    [fix][metadata] Set revalidateAfterReconnection true for certain failures (#17664)
    
    (cherry picked from commit c40c7ee9fb35eafea9c9923dfcf62706ea5d36bf)
---
 .../org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java   | 1 +
 1 file changed, 1 insertion(+)

diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
index bc3f47d41dc..22eaccc278b 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
@@ -196,6 +196,7 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
                             // We failed to revalidate the lock due to connectivity issue
                             // Continue assuming we hold the lock, until we can revalidate it, either
                             // on Reconnected or SessionReestablished events.
+                            revalidateAfterReconnection = true;
                             log.warn("Failed to revalidate the lock at {}. Retrying later on reconnection {}", path,
                                     ex.getCause().getMessage());
                         }


[pulsar] 02/02: [fix][metadata] Cleanup state when lock revalidation gets `LockBusyException` (#17700)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 65c059a511097f1b60f48ed8f67afd6c363f017d
Author: Qiang Zhao <ma...@apache.org>
AuthorDate: Tue Sep 20 15:41:15 2022 +0800

    [fix][metadata] Cleanup state when lock revalidation gets `LockBusyException` (#17700)
    
    ### Motivation
    
    In the production environment,  we found two brokers holding the same valid locks. and one has an exceptional revalidate future with `lockBusyException`. after reading the code, there may forget the reset the cache and complete expire exception when getting lockBusyException.
    
    (cherry picked from commit 955ae340293185ba17b9d5baf1a0f5bd7e547186)
---
 .../coordination/impl/LockManagerImpl.java         |  2 +-
 .../coordination/impl/ResourceLockImpl.java        | 48 ++++++++++--------
 .../apache/pulsar/metadata/LockManagerTest.java    | 59 ++++++++++++++++++++++
 3 files changed, 86 insertions(+), 23 deletions(-)

diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
index b1a77aa1304..e677e99faf6 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
@@ -125,7 +125,7 @@ class LockManagerImpl<T> implements LockManager<T> {
             if (se == SessionEvent.SessionReestablished) {
                 log.info("Metadata store session has been re-established. Revalidating all the existing locks.");
                 for (ResourceLockImpl<T> lock : locks.values()) {
-                    futures.add(lock.revalidate(lock.getValue()));
+                    futures.add(lock.revalidate(lock.getValue(), true));
                 }
 
             } else if (se == SessionEvent.Reconnected) {
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
index 22eaccc278b..134ee8030d9 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
@@ -23,6 +23,7 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.MetadataSerde;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
@@ -127,7 +128,7 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
                 .thenRun(() -> result.complete(null))
                 .exceptionally(ex -> {
                     if (ex.getCause() instanceof LockBusyException) {
-                        revalidate(newValue)
+                        revalidate(newValue, false)
                                 .thenAccept(__ -> result.complete(null))
                                 .exceptionally(ex1 -> {
                                    result.completeExceptionally(ex1);
@@ -184,38 +185,21 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
         }
 
         log.info("Lock on resource {} was invalidated", path);
-        revalidate(value)
-                .thenRun(() -> log.info("Successfully revalidated the lock on {}", path))
-                .exceptionally(ex -> {
-                    synchronized (ResourceLockImpl.this) {
-                        if (ex.getCause() instanceof BadVersionException) {
-                            log.warn("Failed to revalidate the lock at {}. Marked as expired", path);
-                            state = State.Released;
-                            expiredFuture.complete(null);
-                        } else {
-                            // We failed to revalidate the lock due to connectivity issue
-                            // Continue assuming we hold the lock, until we can revalidate it, either
-                            // on Reconnected or SessionReestablished events.
-                            revalidateAfterReconnection = true;
-                            log.warn("Failed to revalidate the lock at {}. Retrying later on reconnection {}", path,
-                                    ex.getCause().getMessage());
-                        }
-                    }
-                    return null;
-                });
+        revalidate(value, true)
+                .thenRun(() -> log.info("Successfully revalidated the lock on {}", path));
     }
 
     synchronized CompletableFuture<Void> revalidateIfNeededAfterReconnection() {
         if (revalidateAfterReconnection) {
             revalidateAfterReconnection = false;
             log.warn("Revalidate lock at {} after reconnection", path);
-            return revalidate(value);
+            return revalidate(value, true);
         } else {
             return CompletableFuture.completedFuture(null);
         }
     }
 
-    synchronized CompletableFuture<Void> revalidate(T newValue) {
+    synchronized CompletableFuture<Void> revalidate(T newValue, boolean revalidateAfterReconnection) {
         if (revalidateFuture == null || revalidateFuture.isDone()) {
             revalidateFuture = doRevalidate(newValue);
         } else {
@@ -233,6 +217,26 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
             });
             revalidateFuture = newFuture;
         }
+        revalidateFuture.exceptionally(ex -> {
+            synchronized (ResourceLockImpl.this) {
+                Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+                if (!revalidateAfterReconnection || realCause instanceof BadVersionException
+                        || realCause instanceof LockBusyException) {
+                    log.warn("Failed to revalidate the lock at {}. Marked as expired. {}",
+                            path, realCause.getMessage());
+                    state = State.Released;
+                    expiredFuture.complete(null);
+                } else {
+                    // We failed to revalidate the lock due to connectivity issue
+                    // Continue assuming we hold the lock, until we can revalidate it, either
+                    // on Reconnected or SessionReestablished events.
+                    ResourceLockImpl.this.revalidateAfterReconnection = true;
+                    log.warn("Failed to revalidate the lock at {}. Retrying later on reconnection {}", path,
+                            realCause.getMessage());
+                }
+            }
+            return null;
+        });
         return revalidateFuture;
     }
 
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java
index 29e9af5d70e..1a4e4e7e9df 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.metadata;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -29,6 +31,7 @@ import java.util.Optional;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
 import lombok.Cleanup;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -293,4 +296,60 @@ public class LockManagerTest extends BaseMetadataStoreTest {
             assertEquals(new String(store1.get(path2).join().get().getValue()), "\"value-1\"");
         });
     }
+
+    @Test(dataProvider = "impl")
+    public void testCleanUpStateWhenRevalidationGotLockBusy(String provider, Supplier<String> urlSupplier)
+            throws Exception {
+
+        if (provider.equals("Memory") || provider.equals("RocksDB")) {
+            // Local memory provider doesn't really have the concept of multiple sessions
+            return;
+        }
+
+        @Cleanup
+        MetadataStoreExtended store1 = MetadataStoreExtended.create(urlSupplier.get(),
+                MetadataStoreConfig.builder().build());
+        @Cleanup
+        MetadataStoreExtended store2 = MetadataStoreExtended.create(urlSupplier.get(),
+                MetadataStoreConfig.builder().build());
+
+        @Cleanup
+        CoordinationService cs1 = new CoordinationServiceImpl(store1);
+        @Cleanup
+        LockManager<String> lm1 = cs1.getLockManager(String.class);
+
+        @Cleanup
+        CoordinationService cs2 = new CoordinationServiceImpl(store2);
+        @Cleanup
+        LockManager<String> lm2 = cs2.getLockManager(String.class);
+
+        String path1 = newKey();
+
+        ResourceLock<String> lock1 = lm1.acquireLock(path1, "value-1").join();
+        AtomicReference<ResourceLock<String>> lock2 = new AtomicReference<>();
+        // lock 2 will steal the distributed lock first.
+        Awaitility.await().until(()-> {
+            // Ensure steal the lock success.
+            try {
+                lock2.set(lm2.acquireLock(path1, "value-1").join());
+                return true;
+            } catch (Exception ex) {
+                return false;
+            }
+        });
+
+        // Since we can steal the lock repeatedly, we don't know which one will get it.
+        // But we can verify the final state.
+        Awaitility.await().untilAsserted(() -> {
+            if (lock1.getLockExpiredFuture().isDone()) {
+                assertTrue(lm1.listLocks(path1).join().isEmpty());
+                assertFalse(lock2.get().getLockExpiredFuture().isDone());
+            } else if (lock2.get().getLockExpiredFuture().isDone()) {
+                assertTrue(lm2.listLocks(path1).join().isEmpty());
+                assertFalse(lock1.getLockExpiredFuture().isDone());
+            } else {
+                fail("unexpected behaviour");
+            }
+        });
+    }
 }