You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/05/22 16:31:55 UTC

[pulsar] branch master updated: In LockManager use concurrent hash map to handle locks notifiications (#10680)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e1b2a9a  In LockManager use concurrent hash map to handle locks notifiications (#10680)
e1b2a9a is described below

commit e1b2a9acd2360ccc0e8a646eb9de0f0229d566be
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sat May 22 09:31:02 2021 -0700

    In LockManager use concurrent hash map to handle locks notifiications (#10680)
    
    * In LockManager use sync block to handle locks notifiications
    
    * Use ConcurrentHashMap instead of sync block
---
 .../coordination/impl/LockManagerImpl.java         | 36 +++++++++++-----------
 1 file changed, 18 insertions(+), 18 deletions(-)

diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
index 6fd8543..aadaf51 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
@@ -19,19 +19,18 @@
 package org.apache.pulsar.metadata.coordination.impl;
 
 import com.fasterxml.jackson.databind.type.TypeFactory;
-
-import java.util.HashSet;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
-
 import lombok.extern.slf4j.Slf4j;
-
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.pulsar.metadata.api.MetadataCache;
+import org.apache.pulsar.metadata.api.MetadataSerde;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
 import org.apache.pulsar.metadata.api.MetadataStoreException.LockBusyException;
@@ -42,17 +41,16 @@ import org.apache.pulsar.metadata.api.coordination.ResourceLock;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.metadata.api.extended.SessionEvent;
 import org.apache.pulsar.metadata.cache.impl.JSONMetadataSerdeSimpleType;
-import org.apache.pulsar.metadata.api.MetadataSerde;
 
 @Slf4j
 class LockManagerImpl<T> implements LockManager<T> {
 
-    private final Set<ResourceLockImpl<T>> locks = new HashSet<>();
+    private final Map<String, ResourceLockImpl<T>> locks = new ConcurrentHashMap<>();
     private final MetadataStoreExtended store;
     private final MetadataCache<T> cache;
     private final MetadataSerde<T> serde;
 
-    private static enum State {
+    private enum State {
         Ready, Closed
     }
 
@@ -79,11 +77,11 @@ class LockManagerImpl<T> implements LockManager<T> {
         lock.acquire().thenRun(() -> {
             synchronized (LockManagerImpl.this) {
                 if (state == State.Ready) {
-                    locks.add(lock);
+                    locks.put(path, lock);
                     lock.getLockExpiredFuture().thenRun(() -> {
                         log.info("Released resource lock on {}", path);
                         synchronized (LockManagerImpl.this) {
-                            locks.remove(lock);
+                            locks.remove(path, lock);
                         }
                     });
                 } else {
@@ -91,7 +89,7 @@ class LockManagerImpl<T> implements LockManager<T> {
                     lock.release();
                 }
             }
-      result.complete(lock);
+            result.complete(lock);
         }).exceptionally(ex -> {
             if (ex.getCause() instanceof BadVersionException) {
                 result.completeExceptionally(
@@ -108,15 +106,16 @@ class LockManagerImpl<T> implements LockManager<T> {
     private void handleSessionEvent(SessionEvent se) {
         if (se == SessionEvent.SessionReestablished) {
             log.info("Metadata store session has been re-established. Revalidating all the existing locks.");
-            locks.forEach(ResourceLockImpl::revalidate);
+            locks.values().forEach(ResourceLockImpl::revalidate);
         }
     }
 
     private void handleDataNotification(Notification n) {
         if (n.getType() == NotificationType.Deleted) {
-            locks.stream()
-                    .filter(l -> l.getPath().equals(n.getPath()))
-                    .forEach(l -> l.lockWasInvalidated());
+            ResourceLockImpl<T> lock = locks.get(n.getPath());
+            if (lock != null) {
+                lock.lockWasInvalidated();
+            }
         }
     }
 
@@ -136,18 +135,19 @@ class LockManagerImpl<T> implements LockManager<T> {
 
     @Override
     public CompletableFuture<Void> asyncClose() {
-        Set<ResourceLock<T>> locks;
+        Map<String, ResourceLock<T>> locks;
         synchronized (this) {
             if (state != State.Ready) {
                 return CompletableFuture.completedFuture(null);
             }
 
-            locks = new HashSet<>(this.locks);
+            locks = new HashMap<>(this.locks);
             this.state = State.Closed;
         }
 
         return FutureUtils.collect(
-                locks.stream().map(ResourceLock::release)
+                locks.values().stream()
+                        .map(ResourceLock::release)
                         .collect(Collectors.toList()))
                 .thenApply(x -> null);
     }