You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/09/20 07:49:51 UTC

[pulsar] branch branch-2.9 updated (ab78a7e0c91 -> 15da3ed5d46)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from ab78a7e0c91 Update proxy lookup throw exception type (#17600)
     new d1d1273e2c4 [fix][metadata] Set revalidateAfterReconnection true for certain failures (#17664)
     new 15da3ed5d46 [fix][metadata] Cleanup state when lock revalidation gets `LockBusyException` (#17700)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../coordination/impl/LockManagerImpl.java         |  2 +-
 .../coordination/impl/ResourceLockImpl.java        | 47 +++++++++--------
 .../apache/pulsar/metadata/LockManagerTest.java    | 59 ++++++++++++++++++++++
 3 files changed, 86 insertions(+), 22 deletions(-)


[pulsar] 01/02: [fix][metadata] Set revalidateAfterReconnection true for certain failures (#17664)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d1d1273e2c4a1cdb5484345f778f97bb6e6312e9
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Sun Sep 18 06:27:59 2022 -0700

    [fix][metadata] Set revalidateAfterReconnection true for certain failures (#17664)
    
    (cherry picked from commit c40c7ee9fb35eafea9c9923dfcf62706ea5d36bf)
---
 .../org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java   | 1 +
 1 file changed, 1 insertion(+)

diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
index ec9d3c42904..afc7feeaa25 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
@@ -197,6 +197,7 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
                             // We failed to revalidate the lock due to connectivity issue
                             // Continue assuming we hold the lock, until we can revalidate it, either
                             // on Reconnected or SessionReestablished events.
+                            revalidateAfterReconnection = true;
                             log.warn("Failed to revalidate the lock at {}. Retrying later on reconnection {}", path,
                                     ex.getCause().getMessage());
                         }


[pulsar] 02/02: [fix][metadata] Cleanup state when lock revalidation gets `LockBusyException` (#17700)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 15da3ed5d4622aeb5317698f6843f48c3eb309b1
Author: Qiang Zhao <ma...@apache.org>
AuthorDate: Tue Sep 20 15:41:15 2022 +0800

    [fix][metadata] Cleanup state when lock revalidation gets `LockBusyException` (#17700)
    
    ### Motivation
    
    In the production environment,  we found two brokers holding the same valid locks. and one has an exceptional revalidate future with `lockBusyException`. after reading the code, there may forget the reset the cache and complete expire exception when getting lockBusyException.
    
    (cherry picked from commit 955ae340293185ba17b9d5baf1a0f5bd7e547186)
---
 .../coordination/impl/LockManagerImpl.java         |  2 +-
 .../coordination/impl/ResourceLockImpl.java        | 48 ++++++++++--------
 .../apache/pulsar/metadata/LockManagerTest.java    | 59 ++++++++++++++++++++++
 3 files changed, 86 insertions(+), 23 deletions(-)

diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
index f45c0c7dd7d..046b174d352 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
@@ -125,7 +125,7 @@ class LockManagerImpl<T> implements LockManager<T> {
             if (se == SessionEvent.SessionReestablished) {
                 log.info("Metadata store session has been re-established. Revalidating all the existing locks.");
                 for (ResourceLockImpl<T> lock : locks.values()) {
-                    futures.add(lock.revalidate(lock.getValue()));
+                    futures.add(lock.revalidate(lock.getValue(), true));
                 }
 
             } else if (se == SessionEvent.Reconnected) {
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
index afc7feeaa25..05758ef1146 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
@@ -24,6 +24,7 @@ import java.util.concurrent.CompletableFuture;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
@@ -128,7 +129,7 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
                 .thenRun(() -> result.complete(null))
                 .exceptionally(ex -> {
                     if (ex.getCause() instanceof LockBusyException) {
-                        revalidate(newValue)
+                        revalidate(newValue, false)
                                 .thenAccept(__ -> result.complete(null))
                                 .exceptionally(ex1 -> {
                                    result.completeExceptionally(ex1);
@@ -185,38 +186,21 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
         }
 
         log.info("Lock on resource {} was invalidated", path);
-        revalidate(value)
-                .thenRun(() -> log.info("Successfully revalidated the lock on {}", path))
-                .exceptionally(ex -> {
-                    synchronized (ResourceLockImpl.this) {
-                        if (ex.getCause() instanceof BadVersionException) {
-                            log.warn("Failed to revalidate the lock at {}. Marked as expired", path);
-                            state = State.Released;
-                            expiredFuture.complete(null);
-                        } else {
-                            // We failed to revalidate the lock due to connectivity issue
-                            // Continue assuming we hold the lock, until we can revalidate it, either
-                            // on Reconnected or SessionReestablished events.
-                            revalidateAfterReconnection = true;
-                            log.warn("Failed to revalidate the lock at {}. Retrying later on reconnection {}", path,
-                                    ex.getCause().getMessage());
-                        }
-                    }
-                    return null;
-                });
+        revalidate(value, true)
+                .thenRun(() -> log.info("Successfully revalidated the lock on {}", path));
     }
 
     synchronized CompletableFuture<Void> revalidateIfNeededAfterReconnection() {
         if (revalidateAfterReconnection) {
             revalidateAfterReconnection = false;
             log.warn("Revalidate lock at {} after reconnection", path);
-            return revalidate(value);
+            return revalidate(value, true);
         } else {
             return CompletableFuture.completedFuture(null);
         }
     }
 
-    synchronized CompletableFuture<Void> revalidate(T newValue) {
+    synchronized CompletableFuture<Void> revalidate(T newValue, boolean revalidateAfterReconnection) {
         if (revalidateFuture == null || revalidateFuture.isDone()) {
             revalidateFuture = doRevalidate(newValue);
         } else {
@@ -234,6 +218,26 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
             });
             revalidateFuture = newFuture;
         }
+        revalidateFuture.exceptionally(ex -> {
+            synchronized (ResourceLockImpl.this) {
+                Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+                if (!revalidateAfterReconnection || realCause instanceof BadVersionException
+                        || realCause instanceof LockBusyException) {
+                    log.warn("Failed to revalidate the lock at {}. Marked as expired. {}",
+                            path, realCause.getMessage());
+                    state = State.Released;
+                    expiredFuture.complete(null);
+                } else {
+                    // We failed to revalidate the lock due to connectivity issue
+                    // Continue assuming we hold the lock, until we can revalidate it, either
+                    // on Reconnected or SessionReestablished events.
+                    ResourceLockImpl.this.revalidateAfterReconnection = true;
+                    log.warn("Failed to revalidate the lock at {}. Retrying later on reconnection {}", path,
+                            realCause.getMessage());
+                }
+            }
+            return null;
+        });
         return revalidateFuture;
     }
 
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java
index da080baacab..ef94f2f3fe2 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.metadata;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -29,6 +31,7 @@ import java.util.Optional;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
 import lombok.Cleanup;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -286,4 +289,60 @@ public class LockManagerTest extends BaseMetadataStoreTest {
             assertEquals(new String(store1.get(path2).join().get().getValue()), "\"value-1\"");
         });
     }
+
+    @Test(dataProvider = "impl")
+    public void testCleanUpStateWhenRevalidationGotLockBusy(String provider, Supplier<String> urlSupplier)
+            throws Exception {
+
+        if (provider.equals("Memory") || provider.equals("RocksDB")) {
+            // Local memory provider doesn't really have the concept of multiple sessions
+            return;
+        }
+
+        @Cleanup
+        MetadataStoreExtended store1 = MetadataStoreExtended.create(urlSupplier.get(),
+                MetadataStoreConfig.builder().build());
+        @Cleanup
+        MetadataStoreExtended store2 = MetadataStoreExtended.create(urlSupplier.get(),
+                MetadataStoreConfig.builder().build());
+
+        @Cleanup
+        CoordinationService cs1 = new CoordinationServiceImpl(store1);
+        @Cleanup
+        LockManager<String> lm1 = cs1.getLockManager(String.class);
+
+        @Cleanup
+        CoordinationService cs2 = new CoordinationServiceImpl(store2);
+        @Cleanup
+        LockManager<String> lm2 = cs2.getLockManager(String.class);
+
+        String path1 = newKey();
+
+        ResourceLock<String> lock1 = lm1.acquireLock(path1, "value-1").join();
+        AtomicReference<ResourceLock<String>> lock2 = new AtomicReference<>();
+        // lock 2 will steal the distributed lock first.
+        Awaitility.await().until(()-> {
+            // Ensure steal the lock success.
+            try {
+                lock2.set(lm2.acquireLock(path1, "value-1").join());
+                return true;
+            } catch (Exception ex) {
+                return false;
+            }
+        });
+
+        // Since we can steal the lock repeatedly, we don't know which one will get it.
+        // But we can verify the final state.
+        Awaitility.await().untilAsserted(() -> {
+            if (lock1.getLockExpiredFuture().isDone()) {
+                assertTrue(lm1.listLocks(path1).join().isEmpty());
+                assertFalse(lock2.get().getLockExpiredFuture().isDone());
+            } else if (lock2.get().getLockExpiredFuture().isDone()) {
+                assertTrue(lm2.listLocks(path1).join().isEmpty());
+                assertFalse(lock1.getLockExpiredFuture().isDone());
+            } else {
+                fail("unexpected behaviour");
+            }
+        });
+    }
 }