You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2021/09/01 21:48:24 UTC
[pulsar] branch master updated: Fixed
ZKSessionTest.testReacquireLocksAfterSessionLost (#11886)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new bc49191 Fixed ZKSessionTest.testReacquireLocksAfterSessionLost (#11886)
bc49191 is described below
commit bc49191d1816bedf8cb6a8ba156b8593b921b27a
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed Sep 1 14:47:47 2021 -0700
Fixed ZKSessionTest.testReacquireLocksAfterSessionLost (#11886)
---
.../coordination/impl/LockManagerImpl.java | 3 +++
.../coordination/impl/ResourceLockImpl.java | 23 +++++++++++++++++++---
.../org/apache/pulsar/metadata/ZKSessionTest.java | 8 +++-----
3 files changed, 26 insertions(+), 8 deletions(-)
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
index aadaf51..0e3754ac 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
@@ -107,6 +107,9 @@ class LockManagerImpl<T> implements LockManager<T> {
if (se == SessionEvent.SessionReestablished) {
log.info("Metadata store session has been re-established. Revalidating all the existing locks.");
locks.values().forEach(ResourceLockImpl::revalidate);
+ } else if (se == SessionEvent.Reconnected) {
+ log.info("Metadata store connection has been re-established. Revalidating locks that were pending.");
+ locks.values().forEach(ResourceLockImpl::revalidateIfNeededAfterReconnection);
}
}
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
index 2ca7022..55b420e 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
@@ -43,6 +43,7 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
private volatile T value;
private long version;
private final CompletableFuture<Void> expiredFuture;
+ private boolean revalidateAfterReconnection = false;
private enum State {
Init,
@@ -197,14 +198,30 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
.thenRun(() -> log.info("Successfully revalidated the lock on {}", path))
.exceptionally(ex -> {
synchronized (ResourceLockImpl.this) {
- log.warn("Failed to revalidate the lock at {}. Marked as expired", path);
- state = State.Released;
- expiredFuture.complete(null);
+ if (ex.getCause() instanceof BadVersionException) {
+ log.warn("Failed to revalidate the lock at {}. Marked as expired", path);
+ state = State.Released;
+ expiredFuture.complete(null);
+ } else {
+ // We failed to revalidate the lock due to connectivity issue
+ // Continue assuming we hold the lock, until we can revalidate it, either
+ // on Reconnected or SessionReestablished events.
+ log.warn("Failed to revalidate the lock at {}. Retrying later on reconnection", path,
+ ex.getCause().getMessage());
+ }
}
return null;
});
}
+ synchronized void revalidateIfNeededAfterReconnection() {
+ if (revalidateAfterReconnection) {
+ revalidateAfterReconnection = false;
+ log.warn("Revalidate lock at {} after reconnection", path);
+ revalidate();
+ }
+ }
+
synchronized CompletableFuture<Void> revalidate() {
return store.get(path)
.thenCompose(optGetResult -> {
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java
index bb61edd..76e14bb 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java
@@ -114,6 +114,7 @@ public class ZKSessionTest extends BaseMetadataStoreTest {
ResourceLock<String> lock = lm1.acquireLock(path, "value-1").join();
+
zks.expireSession(((ZKMetadataStore) store).getZkSessionId());
SessionEvent e = sessionEvents.poll(5, TimeUnit.SECONDS);
@@ -127,11 +128,8 @@ public class ZKSessionTest extends BaseMetadataStoreTest {
e = sessionEvents.poll(10, TimeUnit.SECONDS);
assertEquals(e, SessionEvent.SessionReestablished);
- Awaitility.waitAtMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
- assertFalse(lock.getLockExpiredFuture().isDone());
- });
-
- assertTrue(store.get(path).join().isPresent());
+ Awaitility.await().untilAsserted(() -> assertTrue(store.get(path).join().isPresent()));
+ assertFalse(lock.getLockExpiredFuture().isDone());
}
@Test