You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/04/26 20:20:34 UTC

[pulsar] branch master updated: Resource locks should automatically revalidate after a metadata session is re-established (#10351)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 903494b  Resource locks should automatically revalidate after a metadata session is re-established (#10351)
903494b is described below

commit 903494b932b03b91c1f9e98b5b9c1c14c4e9773b
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Apr 26 13:19:47 2021 -0700

    Resource locks should automatically revalidate after a metadata session is re-established (#10351)
---
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java |   2 +-
 .../java/org/apache/pulsar/metadata/api/Stat.java  |  10 ++
 .../coordination/impl/LockManagerImpl.java         |  84 +++++-----
 .../coordination/impl/ResourceLockImpl.java        | 174 ++++++++++++++++++++-
 .../metadata/impl/AbstractMetadataStore.java       |  13 +-
 .../metadata/impl/LocalMemoryMetadataStore.java    |  13 +-
 .../pulsar/metadata/impl/ZKMetadataStore.java      |  39 +++--
 .../pulsar/metadata/impl/ZKSessionWatcher.java     |   3 +-
 .../apache/pulsar/metadata/LockManagerTest.java    |  94 ++++++++++-
 .../org/apache/pulsar/metadata/TestZKServer.java   |  20 +++
 .../org/apache/pulsar/metadata/ZKSessionTest.java  |  47 ++++++
 11 files changed, 423 insertions(+), 76 deletions(-)

diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 25a031b..8018982 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -2773,7 +2773,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
 
         CountDownLatch l2 = new CountDownLatch(1);
         store.asyncUpdateLedgerIds(mLName, builder.build(),
-                new Stat(mLName, 1, 0, 0),
+                new Stat(mLName, 1, 0, 0, false, true),
                 new MetaStoreCallback<Void>() {
             @Override
             public void operationComplete(Void result, Stat version) {
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/Stat.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/Stat.java
index 995bb31..6a2d30e 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/Stat.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/Stat.java
@@ -45,4 +45,14 @@ public class Stat {
      * When the value was last modified.
      */
     final long modificationTimestamp;
+
+    /**
+     * Whether the key-value pair is ephemeral or persistent
+     */
+    final boolean ephemeral;
+
+    /**
+     * Whether the key-value pair had been created within the current "session"
+     */
+    final boolean createdBySelf;
 }
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
index c6a3866..6f8941c 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
@@ -20,13 +20,13 @@ package org.apache.pulsar.metadata.coordination.impl;
 
 import com.fasterxml.jackson.databind.type.TypeFactory;
 
-import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
 import lombok.extern.slf4j.Slf4j;
@@ -36,17 +36,19 @@ import org.apache.pulsar.metadata.api.MetadataCache;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
 import org.apache.pulsar.metadata.api.MetadataStoreException.LockBusyException;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.NotificationType;
 import org.apache.pulsar.metadata.api.coordination.LockManager;
 import org.apache.pulsar.metadata.api.coordination.ResourceLock;
-import org.apache.pulsar.metadata.api.extended.CreateOption;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.api.extended.SessionEvent;
 import org.apache.pulsar.metadata.cache.impl.JSONMetadataSerdeSimpleType;
 import org.apache.pulsar.metadata.cache.impl.MetadataSerde;
 
 @Slf4j
 class LockManagerImpl<T> implements LockManager<T> {
 
-    private final Set<ResourceLock<T>> locks = new HashSet<>();
+    private final Set<ResourceLockImpl<T>> locks = new HashSet<>();
     private final MetadataStoreExtended store;
     private final MetadataCache<T> cache;
     private final MetadataSerde<T> serde;
@@ -61,6 +63,8 @@ class LockManagerImpl<T> implements LockManager<T> {
         this.store = store;
         this.cache = store.getMetadataCache(clazz);
         this.serde = new JSONMetadataSerdeSimpleType<>(TypeFactory.defaultInstance().constructSimpleType(clazz, null));
+        store.registerSessionListener(this::handleSessionEvent);
+        store.registerListener(this::handleDataNotification);
     }
 
     @Override
@@ -70,47 +74,53 @@ class LockManagerImpl<T> implements LockManager<T> {
 
     @Override
     public CompletableFuture<ResourceLock<T>> acquireLock(String path, T value) {
-        byte[] payload;
-        try {
-            payload = serde.serialize(value);
-        } catch (Throwable t) {
-            return FutureUtils.exception(t);
-        }
+        ResourceLockImpl<T> lock = new ResourceLockImpl<>(store, serde, path, value);
 
         CompletableFuture<ResourceLock<T>> result = new CompletableFuture<>();
-        store.put(path, payload, Optional.of(-1L), EnumSet.of(CreateOption.Ephemeral))
-                .thenAccept(stat -> {
-                    ResourceLock<T> lock = new ResourceLockImpl<>(store, serde, path, value,
-                            stat.getVersion());
-                    synchronized (LockManagerImpl.this) {
-                        if (state == State.Ready) {
-                            log.info("Acquired resource lock on {}", path);
-                            locks.add(lock);
-                            lock.getLockExpiredFuture().thenRun(() -> {
-                                log.info("Released resource lock on {}", path);
-                                synchronized (LockManagerImpl.this) {
-                                    locks.remove(lock);
-                                }
-                            });
-                        } else {
-                            // LockManager was closed in between. Release the lock asynchronously
-                            lock.release();
+        lock.acquire().thenRun(() -> {
+            synchronized (LockManagerImpl.this) {
+                if (state == State.Ready) {
+                    locks.add(lock);
+                    lock.getLockExpiredFuture().thenRun(() -> {
+                        log.info("Released resource lock on {}", path);
+                        synchronized (LockManagerImpl.this) {
+                            locks.remove(lock);
                         }
-                    }
-                    result.complete(lock);
-                }).exceptionally(ex -> {
-                    if (ex.getCause() instanceof BadVersionException) {
-                        result.completeExceptionally(
-                                new LockBusyException("Resource at " + path + " is already locked"));
-                    } else {
-                        result.completeExceptionally(ex.getCause());
-                    }
-                    return null;
-                });
+                    });
+                } else {
+                    // LockManager was closed in between. Release the lock asynchronously
+                    lock.release();
+                }
+            }
+      result.complete(lock);
+        }).exceptionally(ex -> {
+            if (ex.getCause() instanceof BadVersionException) {
+                result.completeExceptionally(
+                        new LockBusyException("Resource at " + path + " is already locked"));
+            } else {
+                result.completeExceptionally(ex.getCause());
+            }
+            return null;
+        });
 
         return result;
     }
 
+    private void handleSessionEvent(SessionEvent se) {
+        if (se == SessionEvent.SessionReestablished) {
+            log.info("Metadata store session has been re-established. Revalidating all the existing locks.");
+            locks.forEach(ResourceLockImpl::revalidate);
+        }
+    }
+
+    private void handleDataNotification(Notification n) {
+        if (n.getType() == NotificationType.Deleted) {
+            locks.stream()
+                    .filter(l -> l.getPath().equals(n.getPath()))
+                    .forEach(l -> l.lockWasInvalidated());
+        }
+    }
+
     @Override
     public CompletableFuture<List<String>> listLocks(String path) {
         return cache.getChildren(path);
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
index b65fb8f..fe94a2b 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
@@ -18,17 +18,25 @@
  */
 package org.apache.pulsar.metadata.coordination.impl;
 
+import java.util.EnumSet;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
-import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.GetResult;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
+import org.apache.pulsar.metadata.api.MetadataStoreException.LockBusyException;
 import org.apache.pulsar.metadata.api.coordination.ResourceLock;
+import org.apache.pulsar.metadata.api.extended.CreateOption;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.metadata.cache.impl.MetadataSerde;
 
+@Slf4j
 public class ResourceLockImpl<T> implements ResourceLock<T> {
 
-    private final MetadataStore store;
+    private final MetadataStoreExtended store;
     private final MetadataSerde<T> serde;
     private final String path;
 
@@ -36,20 +44,23 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
     private long version;
     private final CompletableFuture<Void> expiredFuture;
 
-    private static enum State {
-        Valid, Released
+    private enum State {
+        Init,
+        Valid,
+        Releasing,
+        Released,
     }
 
     private State state;
 
-    public ResourceLockImpl(MetadataStore store, MetadataSerde<T> serde, String path, T value, long version) {
+    public ResourceLockImpl(MetadataStoreExtended store, MetadataSerde<T> serde, String path, T value) {
         this.store = store;
         this.serde = serde;
         this.path = path;
         this.value = value;
-        this.version = version;
+        this.version = -1;
         this.expiredFuture = new CompletableFuture<>();
-        this.state = State.Valid;
+        this.state = State.Init;
     }
 
     @Override
@@ -81,13 +92,31 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
             return CompletableFuture.completedFuture(null);
         }
 
-        return store.delete(path, Optional.of(version))
+        state = State.Releasing;
+        CompletableFuture<Void> result = new CompletableFuture<>();
+
+        store.delete(path, Optional.of(version))
                 .thenRun(() -> {
                     synchronized (ResourceLockImpl.this) {
                         state = State.Released;
                     }
                     expiredFuture.complete(null);
+                    result.complete(null);
+                }).exceptionally(ex -> {
+                    if (ex.getCause() instanceof MetadataStoreException.NotFoundException) {
+                        // The lock is not there on release. We can anyway proceed
+                        synchronized (ResourceLockImpl.this) {
+                            state = State.Released;
+                        }
+                        expiredFuture.complete(null);
+                        result.complete(null);
+                    } else {
+                        result.completeExceptionally(ex);
+                    }
+                    return null;
                 });
+
+        return result;
     }
 
     @Override
@@ -104,4 +133,133 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
     public int hashCode() {
         return path.hashCode();
     }
+
+    synchronized CompletableFuture<Void> acquire() {
+        CompletableFuture<Void> result = new CompletableFuture<>();
+        acquireWithNoRevalidation()
+                .thenRun(() -> result.complete(null))
+                .exceptionally(ex -> {
+                    if (ex.getCause() instanceof LockBusyException) {
+                        revalidate()
+                                .thenAccept(__ -> result.complete(null))
+                                .exceptionally(ex1 -> {
+                                   result.completeExceptionally(ex1);
+                                   return null;
+                                });
+                    } else {
+                        result.completeExceptionally(ex.getCause());
+                    }
+                    return null;
+                });
+
+        return result;
+    }
+
+    // Simple operation of acquiring the lock with no retries, or checking for the lock content
+    private CompletableFuture<Void> acquireWithNoRevalidation() {
+        byte[] payload;
+        try {
+            payload = serde.serialize(value);
+        } catch (Throwable t) {
+            return FutureUtils.exception(t);
+        }
+
+        CompletableFuture<Void> result = new CompletableFuture<>();
+        store.put(path, payload, Optional.of(-1L), EnumSet.of(CreateOption.Ephemeral))
+                .thenAccept(stat -> {
+                    synchronized (ResourceLockImpl.this) {
+                        state = State.Valid;
+                        version = stat.getVersion();
+                    }
+                    log.info("Acquired resource lock on {}", path);
+                    result.complete(null);
+                }).exceptionally(ex -> {
+            if (ex.getCause() instanceof BadVersionException) {
+                result.completeExceptionally(
+                        new LockBusyException("Resource at " + path + " is already locked"));
+            } else {
+                result.completeExceptionally(ex.getCause());
+            }
+            return null;
+        });
+
+        return result;
+    }
+
+    synchronized  void lockWasInvalidated() {
+        if (state != State.Valid) {
+            // Ignore notifications while we're releasing the lock ourselves
+            return;
+        }
+
+        log.info("Lock on resource {} was invalidated", path);
+        revalidate()
+                .thenRun(() -> log.info("Successfully revalidated the lock on {}", path))
+                .exceptionally(ex -> {
+                    synchronized (ResourceLockImpl.this) {
+                        log.warn("Failed to revalidate the lock at {}. Marked as expired", path);
+                        state = State.Released;
+                        expiredFuture.complete(null);
+                    }
+                    return null;
+                });
+    }
+
+    synchronized CompletableFuture<Void> revalidate() {
+        return store.get(path)
+                .thenCompose(optGetResult -> {
+                    if (!optGetResult.isPresent()) {
+                        // The lock just disappeared, try to acquire it again
+                        return acquireWithNoRevalidation()
+                                .thenRun(() -> log.info("Successfully re-acquired missing lock at {}", path));
+                    }
+
+                    GetResult res = optGetResult.get();
+                    if (!res.getStat().isEphemeral()) {
+                        return FutureUtils.exception(
+                                new LockBusyException(
+                                        "Path " + path + " is already created as non-ephemeral"));
+                    }
+
+                    T existingValue;
+                    try {
+                        existingValue = serde.deserialize(optGetResult.get().getValue());
+                    } catch (Throwable t) {
+                        return FutureUtils.exception(t);
+                    }
+
+                    synchronized (ResourceLockImpl.this) {
+                        if (value.equals(existingValue)) {
+                            // The lock value is still the same, that means that we're the
+                            // logical "owners" of the lock.
+
+                            if (res.getStat().isCreatedBySelf()) {
+                                // If the new lock belongs to the same session, there's no
+                                // need to recreate it.
+                                version = res.getStat().getVersion();
+                            } else {
+                                // The lock needs to get recreated since it belong to an earlier
+                                // session which maybe expiring soon
+                                log.info("Deleting stale lock at {}", path);
+                                return store.delete(path, Optional.of(res.getStat().getVersion()))
+                                        .thenCompose(__ -> acquireWithNoRevalidation())
+                                        .thenRun(() -> log.info("Successfully re-acquired stale lock at {}", path));
+                            }
+                        }
+
+                        // At this point we have an existing lock with a value different to what we
+                        // expect. If our session is the owner, we can recreate, otherwise the
+                        // lock has been acquired by someone else and we give up.
+
+                        if (!res.getStat().isCreatedBySelf()) {
+                            return FutureUtils.exception(
+                                    new LockBusyException("Resource at " + path + " is already locked"));
+                        }
+
+                        return store.delete(path, Optional.of(res.getStat().getVersion()))
+                                .thenCompose(__ -> acquireWithNoRevalidation())
+                                .thenRun(() -> log.info("Successfully re-acquired lock at {}", path));
+                    }
+                });
+    }
 }
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index f11ec69..b111825 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -56,7 +56,7 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co
 
     private final CopyOnWriteArrayList<Consumer<Notification>> listeners = new CopyOnWriteArrayList<>();
     private final CopyOnWriteArrayList<Consumer<SessionEvent>> sessionListeners = new CopyOnWriteArrayList<>();
-    protected final ExecutorService executor;
+    private final ExecutorService executor;
     private final AsyncLoadingCache<String, List<String>> childrenCache;
     private final AsyncLoadingCache<String, Boolean> existsCache;
     private final CopyOnWriteArrayList<MetadataCacheImpl<?>> metadataCaches = new CopyOnWriteArrayList<>();
@@ -235,6 +235,17 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co
         existsCache.synchronous().invalidateAll();
     }
 
+    /**
+     * Run the task in the executor thread and fail the future if the executor is shutting down
+     */
+    protected void execute(Runnable task, CompletableFuture<?> future) {
+        try {
+            executor.execute(task);
+        } catch (Throwable t) {
+            future.completeExceptionally(t);
+        }
+    }
+
     protected static String parent(String path) {
         int idx = path.lastIndexOf('/');
         if (idx <= 0) {
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
index 848d170..bf4a225 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
@@ -49,6 +49,7 @@ public class LocalMemoryMetadataStore extends AbstractMetadataStore implements M
         final byte[] data;
         final long createdTimestamp;
         final long modifiedTimestamp;
+        final boolean ephemeral;
     }
 
     private final NavigableMap<String, Value> map;
@@ -68,7 +69,8 @@ public class LocalMemoryMetadataStore extends AbstractMetadataStore implements M
         Value v = map.get(path);
         if (v != null) {
             return FutureUtils.value(
-                    Optional.of(new GetResult(v.data, new Stat(path, v.version, v.createdTimestamp, v.modifiedTimestamp))));
+                    Optional.of(new GetResult(v.data, new Stat(path, v.version, v.createdTimestamp, v.modifiedTimestamp,
+                            v.isEphemeral(), true))));
         } else {
             return FutureUtils.value(Optional.empty());
         }
@@ -127,7 +129,7 @@ public class LocalMemoryMetadataStore extends AbstractMetadataStore implements M
         long now = System.currentTimeMillis();
 
         if (hasVersion && expectedVersion == -1) {
-            Value newValue = new Value(0, data, now, now);
+            Value newValue = new Value(0, data, now, now, options.contains(CreateOption.Ephemeral));
             Value existingValue = map.putIfAbsent(path, newValue);
             if (existingValue != null) {
                 return FutureUtils.exception(new BadVersionException(""));
@@ -137,7 +139,7 @@ public class LocalMemoryMetadataStore extends AbstractMetadataStore implements M
                 if (parent != null) {
                     receivedNotification(new Notification(NotificationType.ChildrenChanged, parent));
                 }
-                return FutureUtils.value(new Stat(path, 0, now, now));
+                return FutureUtils.value(new Stat(path, 0, now, now, newValue.isEphemeral(), true));
             }
         } else {
             Value existingValue = map.get(path);
@@ -147,7 +149,8 @@ public class LocalMemoryMetadataStore extends AbstractMetadataStore implements M
             } else {
                 long newVersion = existingValue != null ? existingValue.version + 1 : 0;
                 long createdTimestamp = existingValue != null ? existingValue.createdTimestamp : now;
-                Value newValue = new Value(newVersion, data, createdTimestamp, now);
+                Value newValue = new Value(newVersion, data, createdTimestamp, now,
+                        options.contains(CreateOption.Ephemeral));
                 map.put(path, newValue);
 
                 NotificationType type = existingValue == null ? NotificationType.Created : NotificationType.Modified;
@@ -159,7 +162,7 @@ public class LocalMemoryMetadataStore extends AbstractMetadataStore implements M
                     }
                 }
                 return FutureUtils
-                        .value(new Stat(path, newValue.version, newValue.createdTimestamp, newValue.modifiedTimestamp));
+                        .value(new Stat(path, newValue.version, newValue.createdTimestamp, newValue.modifiedTimestamp, false, true));
             }
         }
     }
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index d91856f..4c85fe5 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -89,7 +89,7 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
 
         try {
             zkc.getData(path, this, (rc, path1, ctx, data, stat) -> {
-                executor.execute(() -> {
+                execute(() -> {
                     Code code = Code.get(rc);
                     if (code == Code.OK) {
                         future.complete(Optional.of(new GetResult(data, getStat(path1, stat))));
@@ -115,7 +115,7 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
                     } else {
                         future.completeExceptionally(getException(code, path));
                     }
-                });
+                }, future);
             }, null);
         } catch (Throwable t) {
             future.completeExceptionally(new MetadataStoreException(t));
@@ -130,7 +130,7 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
 
         try {
             zkc.getChildren(path, this, (rc, path1, ctx, children) -> {
-                executor.execute(() -> {
+                execute(() -> {
                     Code code = Code.get(rc);
                     if (code == Code.OK) {
                         Collections.sort(children);
@@ -158,7 +158,7 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
                     } else {
                         future.completeExceptionally(getException(code, path));
                     }
-                });
+                }, future);
             }, null);
         } catch (Throwable t) {
             future.completeExceptionally(new MetadataStoreException(t));
@@ -173,7 +173,7 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
 
         try {
             zkc.exists(path, this, (rc, path1, ctx, stat) -> {
-                executor.execute(() -> {
+                execute(() -> {
                     Code code = Code.get(rc);
                     if (code == Code.OK) {
                         future.complete(true);
@@ -182,7 +182,7 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
                     } else {
                         future.completeExceptionally(getException(code, path));
                     }
-                });
+                }, future);
             }, future);
         } catch (Throwable t) {
             future.completeExceptionally(new MetadataStoreException(t));
@@ -206,23 +206,24 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
 
         try {
             if (hasVersion && expectedVersion == -1) {
+                CreateMode createMode = getCreateMode(options);
                 ZkUtils.asyncCreateFullPathOptimistic(zkc, path, value, ZooDefs.Ids.OPEN_ACL_UNSAFE,
-                        getCreateMode(options), (rc, path1, ctx, name) -> {
-                            executor.execute(() -> {
+                        createMode, (rc, path1, ctx, name) -> {
+                            execute(() -> {
                                 Code code = Code.get(rc);
                                 if (code == Code.OK) {
-                                    future.complete(new Stat(name, 0, 0, 0));
+                                    future.complete(new Stat(name, 0, 0, 0, createMode.isEphemeral(), true));
                                 } else if (code == Code.NODEEXISTS) {
                                     // We're emulating a request to create node, so the version is invalid
                                     future.completeExceptionally(getException(Code.BADVERSION, path));
                                 } else {
                                     future.completeExceptionally(getException(code, path));
                                 }
-                            });
+                            }, future);
                         }, null);
             } else {
                 zkc.setData(path, value, expectedVersion, (rc, path1, ctx, stat) -> {
-                    executor.execute(() -> {
+                    execute(() -> {
                         Code code = Code.get(rc);
                         if (code == Code.OK) {
                             future.complete(getStat(path1, stat));
@@ -242,7 +243,7 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
                         } else {
                             future.completeExceptionally(getException(code, path));
                         }
-                    });
+                    }, future);
                 }, null);
             }
         } catch (Throwable t) {
@@ -260,14 +261,14 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
 
         try {
             zkc.delete(path, expectedVersion, (rc, path1, ctx) -> {
-                executor.execute(() -> {
+                execute(() -> {
                     Code code = Code.get(rc);
                     if (code == Code.OK) {
                         future.complete(null);
                     } else {
                         future.completeExceptionally(getException(code, path));
                     }
-                });
+                }, future);
             }, null);
         } catch (Throwable t) {
             future.completeExceptionally(new MetadataStoreException(t));
@@ -285,8 +286,10 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
         super.close();
     }
 
-    private static Stat getStat(String path, org.apache.zookeeper.data.Stat zkStat) {
-        return new Stat(path, zkStat.getVersion(), zkStat.getCtime(), zkStat.getMtime());
+    private Stat getStat(String path, org.apache.zookeeper.data.Stat zkStat) {
+        return new Stat(path, zkStat.getVersion(), zkStat.getCtime(), zkStat.getMtime(),
+                zkStat.getEphemeralOwner() != -1,
+                zkStat.getEphemeralOwner() == zkc.getSessionId());
     }
 
     private static MetadataStoreException getException(Code code, String path) {
@@ -353,4 +356,8 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
             return CreateMode.PERSISTENT;
         }
     }
+
+    public long getZkSessionId() {
+        return zkc.getSessionId();
+    }
 }
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java
index d62e785..c396842 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java
@@ -117,6 +117,7 @@ public class ZKSessionWatcher implements AutoCloseable, Watcher {
 
     @Override
     public synchronized void process(WatchedEvent event) {
+        log.info("Got ZK session watch event: {}", event);
         checkState(event.getState());
     }
 
@@ -156,7 +157,7 @@ public class ZKSessionWatcher implements AutoCloseable, Watcher {
         default:
             if (currentStatus != SessionEvent.SessionReestablished) {
                 // since it reconnected to zoo keeper, we reset the disconnected time
-                log.info("ZooKeeper client reconnection with server quorum");
+                log.info("ZooKeeper client reconnection with server quorum. Current status: {}", currentStatus);
                 disconnectedAt = 0;
 
                 sessionListener.accept(SessionEvent.Reconnected);
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java
index 8256f70..e256950 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java
@@ -19,11 +19,14 @@
 package org.apache.pulsar.metadata;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.fail;
 
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.Optional;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.CountDownLatch;
@@ -31,12 +34,15 @@ import java.util.concurrent.TimeUnit;
 
 import lombok.Cleanup;
 
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.metadata.api.MetadataCache;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.MetadataStoreException.LockBusyException;
 import org.apache.pulsar.metadata.api.coordination.CoordinationService;
 import org.apache.pulsar.metadata.api.coordination.LockManager;
 import org.apache.pulsar.metadata.api.coordination.ResourceLock;
+import org.apache.pulsar.metadata.api.extended.CreateOption;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.metadata.coordination.impl.CoordinationServiceImpl;
 import org.testng.annotations.Test;
@@ -69,13 +75,6 @@ public class LockManagerTest extends BaseMetadataStoreTest {
         });
         assertEquals(latchLock1.getCount(), 1);
 
-        try {
-            lockManager.acquireLock("/my/path/1", "lock-2").join();
-            fail("should have failed");
-        } catch (CompletionException e) {
-            assertException(e, LockBusyException.class);
-        }
-
         assertEquals(lockManager.listLocks("/my/path").join(), Collections.singletonList("1"));
         assertEquals(lockManager.readLock("/my/path/1").join(), Optional.of("lock-1"));
 
@@ -148,4 +147,85 @@ public class LockManagerTest extends BaseMetadataStoreTest {
         assertEquals(lock.getValue(), "value-2");
         assertEquals(cache.get("/my/path/1").join().get(), "value-2");
     }
+
+    @Test(dataProvider = "impl")
+    public void revalidateLockWithinSameSession(String provider, String url) throws Exception {
+        @Cleanup
+        MetadataStoreExtended store = MetadataStoreExtended.create(url, MetadataStoreConfig.builder().build());
+
+        @Cleanup
+        CoordinationService cs2 = new CoordinationServiceImpl(store);
+        @Cleanup
+        LockManager<String> lm2 = cs2.getLockManager(String.class);
+
+        String path1 = newKey();
+
+        // Simulate existing lock with same content
+        store.put(path1, "\"value-1\"".getBytes(StandardCharsets.UTF_8), Optional.of(-1L),
+                EnumSet.of(CreateOption.Ephemeral)).join();
+        ResourceLock<String> rl2 = lm2.acquireLock(path1, "value-1").join();
+
+        assertEquals(new String(store.get(path1).join().get().getValue()), "\"value-1\"");
+        assertFalse(rl2.getLockExpiredFuture().isDone());
+
+        String path2 = newKey();
+
+        // Simulate existing lock with different content
+        store.put(path2, "\"value-1\"".getBytes(StandardCharsets.UTF_8), Optional.of(-1L),
+                EnumSet.of(CreateOption.Ephemeral)).join();
+        rl2 = lm2.acquireLock(path2, "value-2").join();
+
+        assertEquals(new String(store.get(path2).join().get().getValue()), "\"value-2\"");
+        assertFalse(rl2.getLockExpiredFuture().isDone());
+    }
+
+    @Test(dataProvider = "impl")
+    public void revalidateLockOnDifferentSession(String provider, String url) throws Exception {
+        if (provider.equals("Memory")) {
+            // Local memory provider doesn't really have the concept of multiple sessions
+            return;
+        }
+
+        @Cleanup
+        MetadataStoreExtended store1 = MetadataStoreExtended.create(url, MetadataStoreConfig.builder().build());
+        @Cleanup
+        MetadataStoreExtended store2 = MetadataStoreExtended.create(url, MetadataStoreConfig.builder().build());
+
+        @Cleanup
+        CoordinationService cs1 = new CoordinationServiceImpl(store1);
+        @Cleanup
+        LockManager<String> lm1 = cs1.getLockManager(String.class);
+
+        @Cleanup
+        CoordinationService cs2 = new CoordinationServiceImpl(store2);
+        @Cleanup
+        LockManager<String> lm2 = cs2.getLockManager(String.class);
+
+        String path1 = newKey();
+
+        // Simulate existing lock with different content
+        ResourceLock<String> rl1 = lm1.acquireLock(path1, "value-1").join();
+
+        try {
+            lm2.acquireLock(path1, "value-2").join();
+        } catch (CompletionException e) {
+            assertEquals(e.getCause().getClass(), LockBusyException.class);
+        }
+
+        // Lock-1 should get notification of expiry
+        assertFalse(rl1.getLockExpiredFuture().isDone());
+
+        assertEquals(new String(store1.get(path1).join().get().getValue()), "\"value-1\"");
+
+        // Simulate existing lock with same content. The 2nd acquirer will steal the lock
+        String path2 = newKey();
+        rl1 = lm1.acquireLock(path2, "value-1").join();
+
+        ResourceLock<String> rl2 = lm2.acquireLock(path2, "value-1").join();
+
+        assertFalse(rl1.getLockExpiredFuture().isDone());
+        assertFalse(rl2.getLockExpiredFuture().isDone());
+
+        assertEquals(new String(store1.get(path2).join().get().getValue()), "\"value-1\"");
+    }
 }
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/TestZKServer.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/TestZKServer.java
index e49d30e..d69a1f7 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/TestZKServer.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/TestZKServer.java
@@ -33,6 +33,8 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.io.FileUtils;
 import org.apache.zookeeper.server.NIOServerCnxnFactory;
 import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.SessionTracker;
+import org.apache.zookeeper.server.SessionTrackerImpl;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.assertj.core.util.Files;
 
@@ -79,6 +81,24 @@ public class TestZKServer implements AutoCloseable {
         log.info("Stopped test ZK server");
     }
 
+    public void expireSession(long sessionId) {
+        zks.expire(new SessionTracker.Session() {
+            @Override
+            public long getSessionId() {
+                return sessionId;
+            }
+
+            @Override
+            public int getTimeout() {
+                return 10_000;
+            }
+
+            @Override
+            public boolean isClosing() {
+                return false;
+            }
+        });
+    }
 
     @Override
     public void close() throws Exception {
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java
index 577b872..89bd13a 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java
@@ -19,7 +19,9 @@
 package org.apache.pulsar.metadata;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
 
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -28,8 +30,13 @@ import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.coordination.CoordinationService;
+import org.apache.pulsar.metadata.api.coordination.LockManager;
+import org.apache.pulsar.metadata.api.coordination.ResourceLock;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.metadata.api.extended.SessionEvent;
+import org.apache.pulsar.metadata.coordination.impl.CoordinationServiceImpl;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
 import org.testng.annotations.Test;
 
 public class ZKSessionTest extends BaseMetadataStoreTest {
@@ -86,4 +93,44 @@ public class ZKSessionTest extends BaseMetadataStoreTest {
         e = sessionEvents.poll(1, TimeUnit.SECONDS);
         assertNull(e);
     }
+
+    @Test
+    public void testReacquireLocksAfterSessionLost() throws Exception {
+        @Cleanup
+        MetadataStoreExtended store = MetadataStoreExtended.create(zks.getConnectionString(),
+                MetadataStoreConfig.builder()
+                        .sessionTimeoutMillis(2_000)
+                        .build());
+
+        BlockingQueue<SessionEvent> sessionEvents = new LinkedBlockingQueue<>();
+        store.registerSessionListener(sessionEvents::add);
+
+        @Cleanup
+        CoordinationService coordinationService = new CoordinationServiceImpl(store);
+        @Cleanup
+        LockManager<String> lm1 = coordinationService.getLockManager(String.class);
+
+        String path = newKey();
+
+        ResourceLock<String> lock = lm1.acquireLock(path, "value-1").join();
+
+        zks.expireSession(((ZKMetadataStore) store).getZkSessionId());
+
+        SessionEvent e = sessionEvents.poll(5, TimeUnit.SECONDS);
+        assertEquals(e, SessionEvent.ConnectionLost);
+
+        e = sessionEvents.poll(10, TimeUnit.SECONDS);
+        assertEquals(e, SessionEvent.SessionLost);
+
+        e = sessionEvents.poll(10, TimeUnit.SECONDS);
+        assertEquals(e, SessionEvent.Reconnected);
+        e = sessionEvents.poll(10, TimeUnit.SECONDS);
+        assertEquals(e, SessionEvent.SessionReestablished);
+
+        Thread.sleep(2_000);
+
+        assertFalse(lock.getLockExpiredFuture().isDone());
+
+        assertTrue(store.get(path).join().isPresent());
+    }
 }