You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/04/26 20:20:34 UTC
[pulsar] branch master updated: Resource locks should automatically
revalidate after a metadata session is re-established (#10351)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 903494b Resource locks should automatically revalidate after a metadata session is re-established (#10351)
903494b is described below
commit 903494b932b03b91c1f9e98b5b9c1c14c4e9773b
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Apr 26 13:19:47 2021 -0700
Resource locks should automatically revalidate after a metadata session is re-established (#10351)
---
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 2 +-
.../java/org/apache/pulsar/metadata/api/Stat.java | 10 ++
.../coordination/impl/LockManagerImpl.java | 84 +++++-----
.../coordination/impl/ResourceLockImpl.java | 174 ++++++++++++++++++++-
.../metadata/impl/AbstractMetadataStore.java | 13 +-
.../metadata/impl/LocalMemoryMetadataStore.java | 13 +-
.../pulsar/metadata/impl/ZKMetadataStore.java | 39 +++--
.../pulsar/metadata/impl/ZKSessionWatcher.java | 3 +-
.../apache/pulsar/metadata/LockManagerTest.java | 94 ++++++++++-
.../org/apache/pulsar/metadata/TestZKServer.java | 20 +++
.../org/apache/pulsar/metadata/ZKSessionTest.java | 47 ++++++
11 files changed, 423 insertions(+), 76 deletions(-)
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 25a031b..8018982 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -2773,7 +2773,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
CountDownLatch l2 = new CountDownLatch(1);
store.asyncUpdateLedgerIds(mLName, builder.build(),
- new Stat(mLName, 1, 0, 0),
+ new Stat(mLName, 1, 0, 0, false, true),
new MetaStoreCallback<Void>() {
@Override
public void operationComplete(Void result, Stat version) {
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/Stat.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/Stat.java
index 995bb31..6a2d30e 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/Stat.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/Stat.java
@@ -45,4 +45,14 @@ public class Stat {
* When the value was last modified.
*/
final long modificationTimestamp;
+
+ /**
+ * Whether the key-value pair is ephemeral or persistent
+ */
+ final boolean ephemeral;
+
+ /**
+ * Whether the key-value pair had been created within the current "session"
+ */
+ final boolean createdBySelf;
}
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
index c6a3866..6f8941c 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
@@ -20,13 +20,13 @@ package org.apache.pulsar.metadata.coordination.impl;
import com.fasterxml.jackson.databind.type.TypeFactory;
-import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import java.util.function.Consumer;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
@@ -36,17 +36,19 @@ import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
import org.apache.pulsar.metadata.api.MetadataStoreException.LockBusyException;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.coordination.LockManager;
import org.apache.pulsar.metadata.api.coordination.ResourceLock;
-import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.metadata.cache.impl.JSONMetadataSerdeSimpleType;
import org.apache.pulsar.metadata.cache.impl.MetadataSerde;
@Slf4j
class LockManagerImpl<T> implements LockManager<T> {
- private final Set<ResourceLock<T>> locks = new HashSet<>();
+ private final Set<ResourceLockImpl<T>> locks = new HashSet<>();
private final MetadataStoreExtended store;
private final MetadataCache<T> cache;
private final MetadataSerde<T> serde;
@@ -61,6 +63,8 @@ class LockManagerImpl<T> implements LockManager<T> {
this.store = store;
this.cache = store.getMetadataCache(clazz);
this.serde = new JSONMetadataSerdeSimpleType<>(TypeFactory.defaultInstance().constructSimpleType(clazz, null));
+ store.registerSessionListener(this::handleSessionEvent);
+ store.registerListener(this::handleDataNotification);
}
@Override
@@ -70,47 +74,53 @@ class LockManagerImpl<T> implements LockManager<T> {
@Override
public CompletableFuture<ResourceLock<T>> acquireLock(String path, T value) {
- byte[] payload;
- try {
- payload = serde.serialize(value);
- } catch (Throwable t) {
- return FutureUtils.exception(t);
- }
+ ResourceLockImpl<T> lock = new ResourceLockImpl<>(store, serde, path, value);
CompletableFuture<ResourceLock<T>> result = new CompletableFuture<>();
- store.put(path, payload, Optional.of(-1L), EnumSet.of(CreateOption.Ephemeral))
- .thenAccept(stat -> {
- ResourceLock<T> lock = new ResourceLockImpl<>(store, serde, path, value,
- stat.getVersion());
- synchronized (LockManagerImpl.this) {
- if (state == State.Ready) {
- log.info("Acquired resource lock on {}", path);
- locks.add(lock);
- lock.getLockExpiredFuture().thenRun(() -> {
- log.info("Released resource lock on {}", path);
- synchronized (LockManagerImpl.this) {
- locks.remove(lock);
- }
- });
- } else {
- // LockManager was closed in between. Release the lock asynchronously
- lock.release();
+ lock.acquire().thenRun(() -> {
+ synchronized (LockManagerImpl.this) {
+ if (state == State.Ready) {
+ locks.add(lock);
+ lock.getLockExpiredFuture().thenRun(() -> {
+ log.info("Released resource lock on {}", path);
+ synchronized (LockManagerImpl.this) {
+ locks.remove(lock);
}
- }
- result.complete(lock);
- }).exceptionally(ex -> {
- if (ex.getCause() instanceof BadVersionException) {
- result.completeExceptionally(
- new LockBusyException("Resource at " + path + " is already locked"));
- } else {
- result.completeExceptionally(ex.getCause());
- }
- return null;
- });
+ });
+ } else {
+ // LockManager was closed in between. Release the lock asynchronously
+ lock.release();
+ }
+ }
+ result.complete(lock);
+ }).exceptionally(ex -> {
+ if (ex.getCause() instanceof BadVersionException) {
+ result.completeExceptionally(
+ new LockBusyException("Resource at " + path + " is already locked"));
+ } else {
+ result.completeExceptionally(ex.getCause());
+ }
+ return null;
+ });
return result;
}
+ private void handleSessionEvent(SessionEvent se) {
+ if (se == SessionEvent.SessionReestablished) {
+ log.info("Metadata store session has been re-established. Revalidating all the existing locks.");
+ locks.forEach(ResourceLockImpl::revalidate);
+ }
+ }
+
+ private void handleDataNotification(Notification n) {
+ if (n.getType() == NotificationType.Deleted) {
+ locks.stream()
+ .filter(l -> l.getPath().equals(n.getPath()))
+ .forEach(l -> l.lockWasInvalidated());
+ }
+ }
+
@Override
public CompletableFuture<List<String>> listLocks(String path) {
return cache.getChildren(path);
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
index b65fb8f..fe94a2b 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
@@ -18,17 +18,25 @@
*/
package org.apache.pulsar.metadata.coordination.impl;
+import java.util.EnumSet;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
-import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.GetResult;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
+import org.apache.pulsar.metadata.api.MetadataStoreException.LockBusyException;
import org.apache.pulsar.metadata.api.coordination.ResourceLock;
+import org.apache.pulsar.metadata.api.extended.CreateOption;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.cache.impl.MetadataSerde;
+@Slf4j
public class ResourceLockImpl<T> implements ResourceLock<T> {
- private final MetadataStore store;
+ private final MetadataStoreExtended store;
private final MetadataSerde<T> serde;
private final String path;
@@ -36,20 +44,23 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
private long version;
private final CompletableFuture<Void> expiredFuture;
- private static enum State {
- Valid, Released
+ private enum State {
+ Init,
+ Valid,
+ Releasing,
+ Released,
}
private State state;
- public ResourceLockImpl(MetadataStore store, MetadataSerde<T> serde, String path, T value, long version) {
+ public ResourceLockImpl(MetadataStoreExtended store, MetadataSerde<T> serde, String path, T value) {
this.store = store;
this.serde = serde;
this.path = path;
this.value = value;
- this.version = version;
+ this.version = -1;
this.expiredFuture = new CompletableFuture<>();
- this.state = State.Valid;
+ this.state = State.Init;
}
@Override
@@ -81,13 +92,31 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
return CompletableFuture.completedFuture(null);
}
- return store.delete(path, Optional.of(version))
+ state = State.Releasing;
+ CompletableFuture<Void> result = new CompletableFuture<>();
+
+ store.delete(path, Optional.of(version))
.thenRun(() -> {
synchronized (ResourceLockImpl.this) {
state = State.Released;
}
expiredFuture.complete(null);
+ result.complete(null);
+ }).exceptionally(ex -> {
+ if (ex.getCause() instanceof MetadataStoreException.NotFoundException) {
+ // The lock is not there on release. We can anyway proceed
+ synchronized (ResourceLockImpl.this) {
+ state = State.Released;
+ }
+ expiredFuture.complete(null);
+ result.complete(null);
+ } else {
+ result.completeExceptionally(ex);
+ }
+ return null;
});
+
+ return result;
}
@Override
@@ -104,4 +133,133 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
public int hashCode() {
return path.hashCode();
}
+
+ synchronized CompletableFuture<Void> acquire() {
+ CompletableFuture<Void> result = new CompletableFuture<>();
+ acquireWithNoRevalidation()
+ .thenRun(() -> result.complete(null))
+ .exceptionally(ex -> {
+ if (ex.getCause() instanceof LockBusyException) {
+ revalidate()
+ .thenAccept(__ -> result.complete(null))
+ .exceptionally(ex1 -> {
+ result.completeExceptionally(ex1);
+ return null;
+ });
+ } else {
+ result.completeExceptionally(ex.getCause());
+ }
+ return null;
+ });
+
+ return result;
+ }
+
+ // Simple operation of acquiring the lock with no retries, or checking for the lock content
+ private CompletableFuture<Void> acquireWithNoRevalidation() {
+ byte[] payload;
+ try {
+ payload = serde.serialize(value);
+ } catch (Throwable t) {
+ return FutureUtils.exception(t);
+ }
+
+ CompletableFuture<Void> result = new CompletableFuture<>();
+ store.put(path, payload, Optional.of(-1L), EnumSet.of(CreateOption.Ephemeral))
+ .thenAccept(stat -> {
+ synchronized (ResourceLockImpl.this) {
+ state = State.Valid;
+ version = stat.getVersion();
+ }
+ log.info("Acquired resource lock on {}", path);
+ result.complete(null);
+ }).exceptionally(ex -> {
+ if (ex.getCause() instanceof BadVersionException) {
+ result.completeExceptionally(
+ new LockBusyException("Resource at " + path + " is already locked"));
+ } else {
+ result.completeExceptionally(ex.getCause());
+ }
+ return null;
+ });
+
+ return result;
+ }
+
+ synchronized void lockWasInvalidated() {
+ if (state != State.Valid) {
+ // Ignore notifications while we're releasing the lock ourselves
+ return;
+ }
+
+ log.info("Lock on resource {} was invalidated", path);
+ revalidate()
+ .thenRun(() -> log.info("Successfully revalidated the lock on {}", path))
+ .exceptionally(ex -> {
+ synchronized (ResourceLockImpl.this) {
+ log.warn("Failed to revalidate the lock at {}. Marked as expired", path);
+ state = State.Released;
+ expiredFuture.complete(null);
+ }
+ return null;
+ });
+ }
+
+ synchronized CompletableFuture<Void> revalidate() {
+ return store.get(path)
+ .thenCompose(optGetResult -> {
+ if (!optGetResult.isPresent()) {
+ // The lock just disappeared, try to acquire it again
+ return acquireWithNoRevalidation()
+ .thenRun(() -> log.info("Successfully re-acquired missing lock at {}", path));
+ }
+
+ GetResult res = optGetResult.get();
+ if (!res.getStat().isEphemeral()) {
+ return FutureUtils.exception(
+ new LockBusyException(
+ "Path " + path + " is already created as non-ephemeral"));
+ }
+
+ T existingValue;
+ try {
+ existingValue = serde.deserialize(optGetResult.get().getValue());
+ } catch (Throwable t) {
+ return FutureUtils.exception(t);
+ }
+
+ synchronized (ResourceLockImpl.this) {
+ if (value.equals(existingValue)) {
+ // The lock value is still the same, that means that we're the
+ // logical "owners" of the lock.
+
+ if (res.getStat().isCreatedBySelf()) {
+ // If the new lock belongs to the same session, there's no
+ // need to recreate it.
+ version = res.getStat().getVersion();
+ } else {
+ // The lock needs to get recreated since it belong to an earlier
+ // session which maybe expiring soon
+ log.info("Deleting stale lock at {}", path);
+ return store.delete(path, Optional.of(res.getStat().getVersion()))
+ .thenCompose(__ -> acquireWithNoRevalidation())
+ .thenRun(() -> log.info("Successfully re-acquired stale lock at {}", path));
+ }
+ }
+
+ // At this point we have an existing lock with a value different to what we
+ // expect. If our session is the owner, we can recreate, otherwise the
+ // lock has been acquired by someone else and we give up.
+
+ if (!res.getStat().isCreatedBySelf()) {
+ return FutureUtils.exception(
+ new LockBusyException("Resource at " + path + " is already locked"));
+ }
+
+ return store.delete(path, Optional.of(res.getStat().getVersion()))
+ .thenCompose(__ -> acquireWithNoRevalidation())
+ .thenRun(() -> log.info("Successfully re-acquired lock at {}", path));
+ }
+ });
+ }
}
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index f11ec69..b111825 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -56,7 +56,7 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co
private final CopyOnWriteArrayList<Consumer<Notification>> listeners = new CopyOnWriteArrayList<>();
private final CopyOnWriteArrayList<Consumer<SessionEvent>> sessionListeners = new CopyOnWriteArrayList<>();
- protected final ExecutorService executor;
+ private final ExecutorService executor;
private final AsyncLoadingCache<String, List<String>> childrenCache;
private final AsyncLoadingCache<String, Boolean> existsCache;
private final CopyOnWriteArrayList<MetadataCacheImpl<?>> metadataCaches = new CopyOnWriteArrayList<>();
@@ -235,6 +235,17 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co
existsCache.synchronous().invalidateAll();
}
+ /**
+ * Run the task in the executor thread and fail the future if the executor is shutting down
+ */
+ protected void execute(Runnable task, CompletableFuture<?> future) {
+ try {
+ executor.execute(task);
+ } catch (Throwable t) {
+ future.completeExceptionally(t);
+ }
+ }
+
protected static String parent(String path) {
int idx = path.lastIndexOf('/');
if (idx <= 0) {
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
index 848d170..bf4a225 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
@@ -49,6 +49,7 @@ public class LocalMemoryMetadataStore extends AbstractMetadataStore implements M
final byte[] data;
final long createdTimestamp;
final long modifiedTimestamp;
+ final boolean ephemeral;
}
private final NavigableMap<String, Value> map;
@@ -68,7 +69,8 @@ public class LocalMemoryMetadataStore extends AbstractMetadataStore implements M
Value v = map.get(path);
if (v != null) {
return FutureUtils.value(
- Optional.of(new GetResult(v.data, new Stat(path, v.version, v.createdTimestamp, v.modifiedTimestamp))));
+ Optional.of(new GetResult(v.data, new Stat(path, v.version, v.createdTimestamp, v.modifiedTimestamp,
+ v.isEphemeral(), true))));
} else {
return FutureUtils.value(Optional.empty());
}
@@ -127,7 +129,7 @@ public class LocalMemoryMetadataStore extends AbstractMetadataStore implements M
long now = System.currentTimeMillis();
if (hasVersion && expectedVersion == -1) {
- Value newValue = new Value(0, data, now, now);
+ Value newValue = new Value(0, data, now, now, options.contains(CreateOption.Ephemeral));
Value existingValue = map.putIfAbsent(path, newValue);
if (existingValue != null) {
return FutureUtils.exception(new BadVersionException(""));
@@ -137,7 +139,7 @@ public class LocalMemoryMetadataStore extends AbstractMetadataStore implements M
if (parent != null) {
receivedNotification(new Notification(NotificationType.ChildrenChanged, parent));
}
- return FutureUtils.value(new Stat(path, 0, now, now));
+ return FutureUtils.value(new Stat(path, 0, now, now, newValue.isEphemeral(), true));
}
} else {
Value existingValue = map.get(path);
@@ -147,7 +149,8 @@ public class LocalMemoryMetadataStore extends AbstractMetadataStore implements M
} else {
long newVersion = existingValue != null ? existingValue.version + 1 : 0;
long createdTimestamp = existingValue != null ? existingValue.createdTimestamp : now;
- Value newValue = new Value(newVersion, data, createdTimestamp, now);
+ Value newValue = new Value(newVersion, data, createdTimestamp, now,
+ options.contains(CreateOption.Ephemeral));
map.put(path, newValue);
NotificationType type = existingValue == null ? NotificationType.Created : NotificationType.Modified;
@@ -159,7 +162,7 @@ public class LocalMemoryMetadataStore extends AbstractMetadataStore implements M
}
}
return FutureUtils
- .value(new Stat(path, newValue.version, newValue.createdTimestamp, newValue.modifiedTimestamp));
+ .value(new Stat(path, newValue.version, newValue.createdTimestamp, newValue.modifiedTimestamp, false, true));
}
}
}
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index d91856f..4c85fe5 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -89,7 +89,7 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
try {
zkc.getData(path, this, (rc, path1, ctx, data, stat) -> {
- executor.execute(() -> {
+ execute(() -> {
Code code = Code.get(rc);
if (code == Code.OK) {
future.complete(Optional.of(new GetResult(data, getStat(path1, stat))));
@@ -115,7 +115,7 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
} else {
future.completeExceptionally(getException(code, path));
}
- });
+ }, future);
}, null);
} catch (Throwable t) {
future.completeExceptionally(new MetadataStoreException(t));
@@ -130,7 +130,7 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
try {
zkc.getChildren(path, this, (rc, path1, ctx, children) -> {
- executor.execute(() -> {
+ execute(() -> {
Code code = Code.get(rc);
if (code == Code.OK) {
Collections.sort(children);
@@ -158,7 +158,7 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
} else {
future.completeExceptionally(getException(code, path));
}
- });
+ }, future);
}, null);
} catch (Throwable t) {
future.completeExceptionally(new MetadataStoreException(t));
@@ -173,7 +173,7 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
try {
zkc.exists(path, this, (rc, path1, ctx, stat) -> {
- executor.execute(() -> {
+ execute(() -> {
Code code = Code.get(rc);
if (code == Code.OK) {
future.complete(true);
@@ -182,7 +182,7 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
} else {
future.completeExceptionally(getException(code, path));
}
- });
+ }, future);
}, future);
} catch (Throwable t) {
future.completeExceptionally(new MetadataStoreException(t));
@@ -206,23 +206,24 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
try {
if (hasVersion && expectedVersion == -1) {
+ CreateMode createMode = getCreateMode(options);
ZkUtils.asyncCreateFullPathOptimistic(zkc, path, value, ZooDefs.Ids.OPEN_ACL_UNSAFE,
- getCreateMode(options), (rc, path1, ctx, name) -> {
- executor.execute(() -> {
+ createMode, (rc, path1, ctx, name) -> {
+ execute(() -> {
Code code = Code.get(rc);
if (code == Code.OK) {
- future.complete(new Stat(name, 0, 0, 0));
+ future.complete(new Stat(name, 0, 0, 0, createMode.isEphemeral(), true));
} else if (code == Code.NODEEXISTS) {
// We're emulating a request to create node, so the version is invalid
future.completeExceptionally(getException(Code.BADVERSION, path));
} else {
future.completeExceptionally(getException(code, path));
}
- });
+ }, future);
}, null);
} else {
zkc.setData(path, value, expectedVersion, (rc, path1, ctx, stat) -> {
- executor.execute(() -> {
+ execute(() -> {
Code code = Code.get(rc);
if (code == Code.OK) {
future.complete(getStat(path1, stat));
@@ -242,7 +243,7 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
} else {
future.completeExceptionally(getException(code, path));
}
- });
+ }, future);
}, null);
}
} catch (Throwable t) {
@@ -260,14 +261,14 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
try {
zkc.delete(path, expectedVersion, (rc, path1, ctx) -> {
- executor.execute(() -> {
+ execute(() -> {
Code code = Code.get(rc);
if (code == Code.OK) {
future.complete(null);
} else {
future.completeExceptionally(getException(code, path));
}
- });
+ }, future);
}, null);
} catch (Throwable t) {
future.completeExceptionally(new MetadataStoreException(t));
@@ -285,8 +286,10 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
super.close();
}
- private static Stat getStat(String path, org.apache.zookeeper.data.Stat zkStat) {
- return new Stat(path, zkStat.getVersion(), zkStat.getCtime(), zkStat.getMtime());
+ private Stat getStat(String path, org.apache.zookeeper.data.Stat zkStat) {
+ return new Stat(path, zkStat.getVersion(), zkStat.getCtime(), zkStat.getMtime(),
+ zkStat.getEphemeralOwner() != -1,
+ zkStat.getEphemeralOwner() == zkc.getSessionId());
}
private static MetadataStoreException getException(Code code, String path) {
@@ -353,4 +356,8 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
return CreateMode.PERSISTENT;
}
}
+
+ public long getZkSessionId() {
+ return zkc.getSessionId();
+ }
}
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java
index d62e785..c396842 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java
@@ -117,6 +117,7 @@ public class ZKSessionWatcher implements AutoCloseable, Watcher {
@Override
public synchronized void process(WatchedEvent event) {
+ log.info("Got ZK session watch event: {}", event);
checkState(event.getState());
}
@@ -156,7 +157,7 @@ public class ZKSessionWatcher implements AutoCloseable, Watcher {
default:
if (currentStatus != SessionEvent.SessionReestablished) {
// since it reconnected to zoo keeper, we reset the disconnected time
- log.info("ZooKeeper client reconnection with server quorum");
+ log.info("ZooKeeper client reconnection with server quorum. Current status: {}", currentStatus);
disconnectedAt = 0;
sessionListener.accept(SessionEvent.Reconnected);
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java
index 8256f70..e256950 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java
@@ -19,11 +19,14 @@
package org.apache.pulsar.metadata;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.fail;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.Optional;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
@@ -31,12 +34,15 @@ import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreException.LockBusyException;
import org.apache.pulsar.metadata.api.coordination.CoordinationService;
import org.apache.pulsar.metadata.api.coordination.LockManager;
import org.apache.pulsar.metadata.api.coordination.ResourceLock;
+import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.coordination.impl.CoordinationServiceImpl;
import org.testng.annotations.Test;
@@ -69,13 +75,6 @@ public class LockManagerTest extends BaseMetadataStoreTest {
});
assertEquals(latchLock1.getCount(), 1);
- try {
- lockManager.acquireLock("/my/path/1", "lock-2").join();
- fail("should have failed");
- } catch (CompletionException e) {
- assertException(e, LockBusyException.class);
- }
-
assertEquals(lockManager.listLocks("/my/path").join(), Collections.singletonList("1"));
assertEquals(lockManager.readLock("/my/path/1").join(), Optional.of("lock-1"));
@@ -148,4 +147,85 @@ public class LockManagerTest extends BaseMetadataStoreTest {
assertEquals(lock.getValue(), "value-2");
assertEquals(cache.get("/my/path/1").join().get(), "value-2");
}
+
+ @Test(dataProvider = "impl")
+ public void revalidateLockWithinSameSession(String provider, String url) throws Exception {
+ @Cleanup
+ MetadataStoreExtended store = MetadataStoreExtended.create(url, MetadataStoreConfig.builder().build());
+
+ @Cleanup
+ CoordinationService cs2 = new CoordinationServiceImpl(store);
+ @Cleanup
+ LockManager<String> lm2 = cs2.getLockManager(String.class);
+
+ String path1 = newKey();
+
+ // Simulate existing lock with same content
+ store.put(path1, "\"value-1\"".getBytes(StandardCharsets.UTF_8), Optional.of(-1L),
+ EnumSet.of(CreateOption.Ephemeral)).join();
+ ResourceLock<String> rl2 = lm2.acquireLock(path1, "value-1").join();
+
+ assertEquals(new String(store.get(path1).join().get().getValue()), "\"value-1\"");
+ assertFalse(rl2.getLockExpiredFuture().isDone());
+
+ String path2 = newKey();
+
+ // Simulate existing lock with different content
+ store.put(path2, "\"value-1\"".getBytes(StandardCharsets.UTF_8), Optional.of(-1L),
+ EnumSet.of(CreateOption.Ephemeral)).join();
+ rl2 = lm2.acquireLock(path2, "value-2").join();
+
+ assertEquals(new String(store.get(path2).join().get().getValue()), "\"value-2\"");
+ assertFalse(rl2.getLockExpiredFuture().isDone());
+ }
+
+ @Test(dataProvider = "impl")
+ public void revalidateLockOnDifferentSession(String provider, String url) throws Exception {
+ if (provider.equals("Memory")) {
+ // Local memory provider doesn't really have the concept of multiple sessions
+ return;
+ }
+
+ @Cleanup
+ MetadataStoreExtended store1 = MetadataStoreExtended.create(url, MetadataStoreConfig.builder().build());
+ @Cleanup
+ MetadataStoreExtended store2 = MetadataStoreExtended.create(url, MetadataStoreConfig.builder().build());
+
+ @Cleanup
+ CoordinationService cs1 = new CoordinationServiceImpl(store1);
+ @Cleanup
+ LockManager<String> lm1 = cs1.getLockManager(String.class);
+
+ @Cleanup
+ CoordinationService cs2 = new CoordinationServiceImpl(store2);
+ @Cleanup
+ LockManager<String> lm2 = cs2.getLockManager(String.class);
+
+ String path1 = newKey();
+
+ // Simulate existing lock with different content
+ ResourceLock<String> rl1 = lm1.acquireLock(path1, "value-1").join();
+
+ try {
+ lm2.acquireLock(path1, "value-2").join();
+ } catch (CompletionException e) {
+ assertEquals(e.getCause().getClass(), LockBusyException.class);
+ }
+
+ // Lock-1 should get notification of expiry
+ assertFalse(rl1.getLockExpiredFuture().isDone());
+
+ assertEquals(new String(store1.get(path1).join().get().getValue()), "\"value-1\"");
+
+ // Simulate existing lock with same content. The 2nd acquirer will steal the lock
+ String path2 = newKey();
+ rl1 = lm1.acquireLock(path2, "value-1").join();
+
+ ResourceLock<String> rl2 = lm2.acquireLock(path2, "value-1").join();
+
+ assertFalse(rl1.getLockExpiredFuture().isDone());
+ assertFalse(rl2.getLockExpiredFuture().isDone());
+
+ assertEquals(new String(store1.get(path2).join().get().getValue()), "\"value-1\"");
+ }
}
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/TestZKServer.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/TestZKServer.java
index e49d30e..d69a1f7 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/TestZKServer.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/TestZKServer.java
@@ -33,6 +33,8 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.SessionTracker;
+import org.apache.zookeeper.server.SessionTrackerImpl;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.assertj.core.util.Files;
@@ -79,6 +81,24 @@ public class TestZKServer implements AutoCloseable {
log.info("Stopped test ZK server");
}
+ public void expireSession(long sessionId) {
+ zks.expire(new SessionTracker.Session() {
+ @Override
+ public long getSessionId() {
+ return sessionId;
+ }
+
+ @Override
+ public int getTimeout() {
+ return 10_000;
+ }
+
+ @Override
+ public boolean isClosing() {
+ return false;
+ }
+ });
+ }
@Override
public void close() throws Exception {
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java
index 577b872..89bd13a 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java
@@ -19,7 +19,9 @@
package org.apache.pulsar.metadata;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -28,8 +30,13 @@ import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.coordination.CoordinationService;
+import org.apache.pulsar.metadata.api.coordination.LockManager;
+import org.apache.pulsar.metadata.api.coordination.ResourceLock;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
+import org.apache.pulsar.metadata.coordination.impl.CoordinationServiceImpl;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.testng.annotations.Test;
public class ZKSessionTest extends BaseMetadataStoreTest {
@@ -86,4 +93,44 @@ public class ZKSessionTest extends BaseMetadataStoreTest {
e = sessionEvents.poll(1, TimeUnit.SECONDS);
assertNull(e);
}
+
+ @Test
+ public void testReacquireLocksAfterSessionLost() throws Exception {
+ @Cleanup
+ MetadataStoreExtended store = MetadataStoreExtended.create(zks.getConnectionString(),
+ MetadataStoreConfig.builder()
+ .sessionTimeoutMillis(2_000)
+ .build());
+
+ BlockingQueue<SessionEvent> sessionEvents = new LinkedBlockingQueue<>();
+ store.registerSessionListener(sessionEvents::add);
+
+ @Cleanup
+ CoordinationService coordinationService = new CoordinationServiceImpl(store);
+ @Cleanup
+ LockManager<String> lm1 = coordinationService.getLockManager(String.class);
+
+ String path = newKey();
+
+ ResourceLock<String> lock = lm1.acquireLock(path, "value-1").join();
+
+ zks.expireSession(((ZKMetadataStore) store).getZkSessionId());
+
+ SessionEvent e = sessionEvents.poll(5, TimeUnit.SECONDS);
+ assertEquals(e, SessionEvent.ConnectionLost);
+
+ e = sessionEvents.poll(10, TimeUnit.SECONDS);
+ assertEquals(e, SessionEvent.SessionLost);
+
+ e = sessionEvents.poll(10, TimeUnit.SECONDS);
+ assertEquals(e, SessionEvent.Reconnected);
+ e = sessionEvents.poll(10, TimeUnit.SECONDS);
+ assertEquals(e, SessionEvent.SessionReestablished);
+
+ Thread.sleep(2_000);
+
+ assertFalse(lock.getLockExpiredFuture().isDone());
+
+ assertTrue(store.get(path).join().isPresent());
+ }
}