You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/09/20 07:50:16 UTC
[pulsar] branch branch-2.11 updated: [fix][metadata] Cleanup state when lock revalidation gets `LockBusyException` (#17700)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new ec2aef7f52d [fix][metadata] Cleanup state when lock revalidation gets `LockBusyException` (#17700)
ec2aef7f52d is described below
commit ec2aef7f52df0ef9be15446a4d3dc63bb1157d16
Author: Qiang Zhao <ma...@apache.org>
AuthorDate: Tue Sep 20 15:41:15 2022 +0800
[fix][metadata] Cleanup state when lock revalidation gets `LockBusyException` (#17700)
### Motivation
In the production environment, we found two brokers holding the same valid locks. and one has an exceptional revalidate future with `lockBusyException`. after reading the code, there may forget the reset the cache and complete expire exception when getting lockBusyException.
(cherry picked from commit 955ae340293185ba17b9d5baf1a0f5bd7e547186)
---
.../coordination/impl/LockManagerImpl.java | 2 +-
.../coordination/impl/ResourceLockImpl.java | 48 ++++++++++--------
.../apache/pulsar/metadata/LockManagerTest.java | 59 ++++++++++++++++++++++
3 files changed, 86 insertions(+), 23 deletions(-)
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
index b1a77aa1304..e677e99faf6 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
@@ -125,7 +125,7 @@ class LockManagerImpl<T> implements LockManager<T> {
if (se == SessionEvent.SessionReestablished) {
log.info("Metadata store session has been re-established. Revalidating all the existing locks.");
for (ResourceLockImpl<T> lock : locks.values()) {
- futures.add(lock.revalidate(lock.getValue()));
+ futures.add(lock.revalidate(lock.getValue(), true));
}
} else if (se == SessionEvent.Reconnected) {
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
index 22eaccc278b..134ee8030d9 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
@@ -23,6 +23,7 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataSerde;
import org.apache.pulsar.metadata.api.MetadataStoreException;
@@ -127,7 +128,7 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
.thenRun(() -> result.complete(null))
.exceptionally(ex -> {
if (ex.getCause() instanceof LockBusyException) {
- revalidate(newValue)
+ revalidate(newValue, false)
.thenAccept(__ -> result.complete(null))
.exceptionally(ex1 -> {
result.completeExceptionally(ex1);
@@ -184,38 +185,21 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
}
log.info("Lock on resource {} was invalidated", path);
- revalidate(value)
- .thenRun(() -> log.info("Successfully revalidated the lock on {}", path))
- .exceptionally(ex -> {
- synchronized (ResourceLockImpl.this) {
- if (ex.getCause() instanceof BadVersionException) {
- log.warn("Failed to revalidate the lock at {}. Marked as expired", path);
- state = State.Released;
- expiredFuture.complete(null);
- } else {
- // We failed to revalidate the lock due to connectivity issue
- // Continue assuming we hold the lock, until we can revalidate it, either
- // on Reconnected or SessionReestablished events.
- revalidateAfterReconnection = true;
- log.warn("Failed to revalidate the lock at {}. Retrying later on reconnection {}", path,
- ex.getCause().getMessage());
- }
- }
- return null;
- });
+ revalidate(value, true)
+ .thenRun(() -> log.info("Successfully revalidated the lock on {}", path));
}
synchronized CompletableFuture<Void> revalidateIfNeededAfterReconnection() {
if (revalidateAfterReconnection) {
revalidateAfterReconnection = false;
log.warn("Revalidate lock at {} after reconnection", path);
- return revalidate(value);
+ return revalidate(value, true);
} else {
return CompletableFuture.completedFuture(null);
}
}
- synchronized CompletableFuture<Void> revalidate(T newValue) {
+ synchronized CompletableFuture<Void> revalidate(T newValue, boolean revalidateAfterReconnection) {
if (revalidateFuture == null || revalidateFuture.isDone()) {
revalidateFuture = doRevalidate(newValue);
} else {
@@ -233,6 +217,26 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
});
revalidateFuture = newFuture;
}
+ revalidateFuture.exceptionally(ex -> {
+ synchronized (ResourceLockImpl.this) {
+ Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+ if (!revalidateAfterReconnection || realCause instanceof BadVersionException
+ || realCause instanceof LockBusyException) {
+ log.warn("Failed to revalidate the lock at {}. Marked as expired. {}",
+ path, realCause.getMessage());
+ state = State.Released;
+ expiredFuture.complete(null);
+ } else {
+ // We failed to revalidate the lock due to connectivity issue
+ // Continue assuming we hold the lock, until we can revalidate it, either
+ // on Reconnected or SessionReestablished events.
+ ResourceLockImpl.this.revalidateAfterReconnection = true;
+ log.warn("Failed to revalidate the lock at {}. Retrying later on reconnection {}", path,
+ realCause.getMessage());
+ }
+ }
+ return null;
+ });
return revalidateFuture;
}
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java
index 29e9af5d70e..1a4e4e7e9df 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.metadata;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
@@ -29,6 +31,7 @@ import java.util.Optional;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import lombok.Cleanup;
import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -293,4 +296,60 @@ public class LockManagerTest extends BaseMetadataStoreTest {
assertEquals(new String(store1.get(path2).join().get().getValue()), "\"value-1\"");
});
}
+
+ @Test(dataProvider = "impl")
+ public void testCleanUpStateWhenRevalidationGotLockBusy(String provider, Supplier<String> urlSupplier)
+ throws Exception {
+
+ if (provider.equals("Memory") || provider.equals("RocksDB")) {
+ // Local memory provider doesn't really have the concept of multiple sessions
+ return;
+ }
+
+ @Cleanup
+ MetadataStoreExtended store1 = MetadataStoreExtended.create(urlSupplier.get(),
+ MetadataStoreConfig.builder().build());
+ @Cleanup
+ MetadataStoreExtended store2 = MetadataStoreExtended.create(urlSupplier.get(),
+ MetadataStoreConfig.builder().build());
+
+ @Cleanup
+ CoordinationService cs1 = new CoordinationServiceImpl(store1);
+ @Cleanup
+ LockManager<String> lm1 = cs1.getLockManager(String.class);
+
+ @Cleanup
+ CoordinationService cs2 = new CoordinationServiceImpl(store2);
+ @Cleanup
+ LockManager<String> lm2 = cs2.getLockManager(String.class);
+
+ String path1 = newKey();
+
+ ResourceLock<String> lock1 = lm1.acquireLock(path1, "value-1").join();
+ AtomicReference<ResourceLock<String>> lock2 = new AtomicReference<>();
+ // lock 2 will steal the distributed lock first.
+ Awaitility.await().until(()-> {
+ // Ensure steal the lock success.
+ try {
+ lock2.set(lm2.acquireLock(path1, "value-1").join());
+ return true;
+ } catch (Exception ex) {
+ return false;
+ }
+ });
+
+ // Since we can steal the lock repeatedly, we don't know which one will get it.
+ // But we can verify the final state.
+ Awaitility.await().untilAsserted(() -> {
+ if (lock1.getLockExpiredFuture().isDone()) {
+ assertTrue(lm1.listLocks(path1).join().isEmpty());
+ assertFalse(lock2.get().getLockExpiredFuture().isDone());
+ } else if (lock2.get().getLockExpiredFuture().isDone()) {
+ assertTrue(lm2.listLocks(path1).join().isEmpty());
+ assertFalse(lock1.getLockExpiredFuture().isDone());
+ } else {
+ fail("unexpected behaviour");
+ }
+ });
+ }
}