You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/09/30 06:50:54 UTC
[pulsar] branch master updated: PIP-45: On
LockResource.updateValue() revalidate the lock if needed (#12190)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 693a066 PIP-45: On LockResource.updateValue() revalidate the lock if needed (#12190)
693a066 is described below
commit 693a066d73ea4012fb2bb750d7450474f210cccd
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Sep 30 00:50:09 2021 -0600
PIP-45: On LockResource.updateValue() revalidate the lock if needed (#12190)
---
.../coordination/impl/LockManagerImpl.java | 6 +--
.../coordination/impl/ResourceLockImpl.java | 61 ++++++++++++----------
.../apache/pulsar/metadata/LockManagerTest.java | 52 ++++++++++++++++++
3 files changed, 87 insertions(+), 32 deletions(-)
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
index 94f1b44..f45c0c7 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
@@ -84,10 +84,10 @@ class LockManagerImpl<T> implements LockManager<T> {
@Override
public CompletableFuture<ResourceLock<T>> acquireLock(String path, T value) {
- ResourceLockImpl<T> lock = new ResourceLockImpl<>(store, serde, path, value);
+ ResourceLockImpl<T> lock = new ResourceLockImpl<>(store, serde, path);
CompletableFuture<ResourceLock<T>> result = new CompletableFuture<>();
- lock.acquire().thenRun(() -> {
+ lock.acquire(value).thenRun(() -> {
synchronized (LockManagerImpl.this) {
if (state == State.Ready) {
locks.put(path, lock);
@@ -125,7 +125,7 @@ class LockManagerImpl<T> implements LockManager<T> {
if (se == SessionEvent.SessionReestablished) {
log.info("Metadata store session has been re-established. Revalidating all the existing locks.");
for (ResourceLockImpl<T> lock : locks.values()) {
- futures.add(lock.revalidate());
+ futures.add(lock.revalidate(lock.getValue()));
}
} else if (se == SessionEvent.Reconnected) {
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
index 282f288..677ace7 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
@@ -54,11 +54,10 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
private State state;
- public ResourceLockImpl(MetadataStoreExtended store, MetadataSerde<T> serde, String path, T value) {
+ public ResourceLockImpl(MetadataStoreExtended store, MetadataSerde<T> serde, String path) {
this.store = store;
this.serde = serde;
this.path = path;
- this.value = value;
this.version = -1;
this.expiredFuture = new CompletableFuture<>();
this.state = State.Init;
@@ -71,20 +70,7 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
@Override
public synchronized CompletableFuture<Void> updateValue(T newValue) {
- byte[] payload;
- try {
- payload = serde.serialize(path, newValue);
- } catch (Throwable t) {
- return FutureUtils.exception(t);
- }
-
- return store.put(path, payload, Optional.of(version))
- .thenAccept(stat -> {
- synchronized (ResourceLockImpl.this) {
- value = newValue;
- version = stat.getVersion();
- }
- });
+ return acquire(newValue);
}
@Override
@@ -135,13 +121,13 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
return path.hashCode();
}
- synchronized CompletableFuture<Void> acquire() {
+ synchronized CompletableFuture<Void> acquire(T newValue) {
CompletableFuture<Void> result = new CompletableFuture<>();
- acquireWithNoRevalidation()
+ acquireWithNoRevalidation(newValue)
.thenRun(() -> result.complete(null))
.exceptionally(ex -> {
if (ex.getCause() instanceof LockBusyException) {
- revalidate()
+ revalidate(newValue)
.thenAccept(__ -> result.complete(null))
.exceptionally(ex1 -> {
result.completeExceptionally(ex1);
@@ -157,20 +143,21 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
}
// Simple operation of acquiring the lock with no retries, or checking for the lock content
- private CompletableFuture<Void> acquireWithNoRevalidation() {
+ private CompletableFuture<Void> acquireWithNoRevalidation(T newValue) {
byte[] payload;
try {
- payload = serde.serialize(path, value);
+ payload = serde.serialize(path, newValue);
} catch (Throwable t) {
return FutureUtils.exception(t);
}
CompletableFuture<Void> result = new CompletableFuture<>();
- store.put(path, payload, Optional.of(-1L), EnumSet.of(CreateOption.Ephemeral))
+ store.put(path, payload, Optional.of(version), EnumSet.of(CreateOption.Ephemeral))
.thenAccept(stat -> {
synchronized (ResourceLockImpl.this) {
state = State.Valid;
version = stat.getVersion();
+ value = newValue;
}
log.info("Acquired resource lock on {}", path);
result.complete(null);
@@ -194,7 +181,7 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
}
log.info("Lock on resource {} was invalidated", path);
- revalidate()
+ revalidate(value)
.thenRun(() -> log.info("Successfully revalidated the lock on {}", path))
.exceptionally(ex -> {
synchronized (ResourceLockImpl.this) {
@@ -218,18 +205,20 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
if (revalidateAfterReconnection) {
revalidateAfterReconnection = false;
log.warn("Revalidate lock at {} after reconnection", path);
- return revalidate();
+ return revalidate(value);
} else {
return CompletableFuture.completedFuture(null);
}
}
- synchronized CompletableFuture<Void> revalidate() {
+ synchronized CompletableFuture<Void> revalidate(T newValue) {
return store.get(path)
.thenCompose(optGetResult -> {
if (!optGetResult.isPresent()) {
// The lock just disappeared, try to acquire it again
- return acquireWithNoRevalidation()
+ // Reset the expectation on the version
+ setVersion(-1L);
+ return acquireWithNoRevalidation(newValue)
.thenRun(() -> log.info("Successfully re-acquired missing lock at {}", path));
}
@@ -248,7 +237,7 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
}
synchronized (ResourceLockImpl.this) {
- if (value.equals(existingValue)) {
+ if (newValue.equals(existingValue)) {
// The lock value is still the same, that means that we're the
// logical "owners" of the lock.
@@ -256,12 +245,18 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
// If the new lock belongs to the same session, there's no
// need to recreate it.
version = res.getStat().getVersion();
+ value = newValue;
+ return CompletableFuture.completedFuture(null);
} else {
// The lock needs to get recreated since it belong to an earlier
// session which maybe expiring soon
log.info("Deleting stale lock at {}", path);
return store.delete(path, Optional.of(res.getStat().getVersion()))
- .thenCompose(__ -> acquireWithNoRevalidation())
+ .thenRun(() ->
+ // Reset the expectation that the key is not there anymore
+ setVersion(-1L)
+ )
+ .thenCompose(__ -> acquireWithNoRevalidation(newValue))
.thenRun(() -> log.info("Successfully re-acquired stale lock at {}", path));
}
}
@@ -276,9 +271,17 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
}
return store.delete(path, Optional.of(res.getStat().getVersion()))
- .thenCompose(__ -> acquireWithNoRevalidation())
+ .thenRun(() ->
+ // Reset the expectation that the key is not there anymore
+ setVersion(-1L)
+ )
+ .thenCompose(__ -> acquireWithNoRevalidation(newValue))
.thenRun(() -> log.info("Successfully re-acquired lock at {}", path));
}
});
}
+
+ private synchronized void setVersion(long version) {
+ this.version = version;
+ }
}
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java
index f215750..da080ba 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java
@@ -149,6 +149,58 @@ public class LockManagerTest extends BaseMetadataStoreTest {
}
@Test(dataProvider = "impl")
+ public void updateValueWhenVersionIsOutOfSync(String provider, Supplier<String> urlSupplier) throws Exception {
+ @Cleanup
+ MetadataStoreExtended store = MetadataStoreExtended.create(urlSupplier.get(),
+ MetadataStoreConfig.builder().build());
+
+ MetadataCache<String> cache = store.getMetadataCache(String.class);
+
+ @Cleanup
+ CoordinationService coordinationService = new CoordinationServiceImpl(store);
+
+ @Cleanup
+ LockManager<String> lockManager = coordinationService.getLockManager(String.class);
+
+ ResourceLock<String> lock = lockManager.acquireLock("/my/path/1", "lock-1").join();
+ assertEquals(lock.getValue(), "lock-1");
+ assertEquals(cache.get("/my/path/1").join().get(), "lock-1");
+
+ store.put("/my/path/1",
+ ObjectMapperFactory.getThreadLocal().writeValueAsBytes("value-2"),
+ Optional.empty(), EnumSet.of(CreateOption.Ephemeral)).join();
+
+ lock.updateValue("value-2").join();
+ assertEquals(lock.getValue(), "value-2");
+ assertEquals(cache.get("/my/path/1").join().get(), "value-2");
+ }
+
+ @Test(dataProvider = "impl")
+ public void updateValueWhenKeyDisappears(String provider, Supplier<String> urlSupplier) throws Exception {
+ @Cleanup
+ MetadataStoreExtended store = MetadataStoreExtended.create(urlSupplier.get(),
+ MetadataStoreConfig.builder().build());
+
+ MetadataCache<String> cache = store.getMetadataCache(String.class);
+
+ @Cleanup
+ CoordinationService coordinationService = new CoordinationServiceImpl(store);
+
+ @Cleanup
+ LockManager<String> lockManager = coordinationService.getLockManager(String.class);
+
+ ResourceLock<String> lock = lockManager.acquireLock("/my/path/1", "lock-1").join();
+ assertEquals(lock.getValue(), "lock-1");
+ assertEquals(cache.get("/my/path/1").join().get(), "lock-1");
+
+ store.delete("/my/path/1", Optional.empty()).join();
+
+ lock.updateValue("value-2").join();
+ assertEquals(lock.getValue(), "value-2");
+ assertEquals(cache.get("/my/path/1").join().get(), "value-2");
+ }
+
+ @Test(dataProvider = "impl")
public void revalidateLockWithinSameSession(String provider, Supplier<String> urlSupplier) throws Exception {
@Cleanup
MetadataStoreExtended store = MetadataStoreExtended.create(urlSupplier.get(),