You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2021/09/01 21:48:24 UTC

[pulsar] branch master updated: Fixed ZKSessionTest.testReacquireLocksAfterSessionLost (#11886)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new bc49191  Fixed ZKSessionTest.testReacquireLocksAfterSessionLost (#11886)
bc49191 is described below

commit bc49191d1816bedf8cb6a8ba156b8593b921b27a
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed Sep 1 14:47:47 2021 -0700

    Fixed ZKSessionTest.testReacquireLocksAfterSessionLost (#11886)
---
 .../coordination/impl/LockManagerImpl.java         |  3 +++
 .../coordination/impl/ResourceLockImpl.java        | 23 +++++++++++++++++++---
 .../org/apache/pulsar/metadata/ZKSessionTest.java  |  8 +++-----
 3 files changed, 26 insertions(+), 8 deletions(-)

diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
index aadaf51..0e3754ac 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
@@ -107,6 +107,9 @@ class LockManagerImpl<T> implements LockManager<T> {
         if (se == SessionEvent.SessionReestablished) {
             log.info("Metadata store session has been re-established. Revalidating all the existing locks.");
             locks.values().forEach(ResourceLockImpl::revalidate);
+        } else if (se == SessionEvent.Reconnected) {
+            log.info("Metadata store connection has been re-established. Revalidating locks that were pending.");
+            locks.values().forEach(ResourceLockImpl::revalidateIfNeededAfterReconnection);
         }
     }
 
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
index 2ca7022..55b420e 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
@@ -43,6 +43,7 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
     private volatile T value;
     private long version;
     private final CompletableFuture<Void> expiredFuture;
+    private boolean revalidateAfterReconnection = false;
 
     private enum State {
         Init,
@@ -197,14 +198,30 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
                 .thenRun(() -> log.info("Successfully revalidated the lock on {}", path))
                 .exceptionally(ex -> {
                     synchronized (ResourceLockImpl.this) {
-                        log.warn("Failed to revalidate the lock at {}. Marked as expired", path);
-                        state = State.Released;
-                        expiredFuture.complete(null);
+                        if (ex.getCause() instanceof BadVersionException) {
+                            log.warn("Failed to revalidate the lock at {}. Marked as expired", path);
+                            state = State.Released;
+                            expiredFuture.complete(null);
+                        } else {
+                            // We failed to revalidate the lock due to connectivity issue
+                            // Continue assuming we hold the lock, until we can revalidate it, either
+                            // on Reconnected or SessionReestablished events.
+                            log.warn("Failed to revalidate the lock at {}. Retrying later on reconnection", path,
+                                    ex.getCause().getMessage());
+                        }
                     }
                     return null;
                 });
     }
 
+    synchronized void revalidateIfNeededAfterReconnection() {
+        if (revalidateAfterReconnection) {
+            revalidateAfterReconnection = false;
+            log.warn("Revalidate lock at {} after reconnection", path);
+            revalidate();
+        }
+    }
+
     synchronized CompletableFuture<Void> revalidate() {
         return store.get(path)
                 .thenCompose(optGetResult -> {
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java
index bb61edd..76e14bb 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java
@@ -114,6 +114,7 @@ public class ZKSessionTest extends BaseMetadataStoreTest {
 
         ResourceLock<String> lock = lm1.acquireLock(path, "value-1").join();
 
+
         zks.expireSession(((ZKMetadataStore) store).getZkSessionId());
 
         SessionEvent e = sessionEvents.poll(5, TimeUnit.SECONDS);
@@ -127,11 +128,8 @@ public class ZKSessionTest extends BaseMetadataStoreTest {
         e = sessionEvents.poll(10, TimeUnit.SECONDS);
         assertEquals(e, SessionEvent.SessionReestablished);
 
-        Awaitility.waitAtMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
-            assertFalse(lock.getLockExpiredFuture().isDone());
-        });
-
-        assertTrue(store.get(path).join().isPresent());
+        Awaitility.await().untilAsserted(() -> assertTrue(store.get(path).join().isPresent()));
+        assertFalse(lock.getLockExpiredFuture().isDone());
     }
 
     @Test