You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/05/22 16:31:55 UTC
[pulsar] branch master updated: In LockManager use concurrent hash
map to handle locks notifiications (#10680)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e1b2a9a In LockManager use concurrent hash map to handle locks notifiications (#10680)
e1b2a9a is described below
commit e1b2a9acd2360ccc0e8a646eb9de0f0229d566be
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sat May 22 09:31:02 2021 -0700
In LockManager use concurrent hash map to handle locks notifiications (#10680)
* In LockManager use sync block to handle locks notifiications
* Use ConcurrentHashMap instead of sync block
---
.../coordination/impl/LockManagerImpl.java | 36 +++++++++++-----------
1 file changed, 18 insertions(+), 18 deletions(-)
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
index 6fd8543..aadaf51 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
@@ -19,19 +19,18 @@
package org.apache.pulsar.metadata.coordination.impl;
import com.fasterxml.jackson.databind.type.TypeFactory;
-
-import java.util.HashSet;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
-import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
-
import lombok.extern.slf4j.Slf4j;
-
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.metadata.api.MetadataCache;
+import org.apache.pulsar.metadata.api.MetadataSerde;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
import org.apache.pulsar.metadata.api.MetadataStoreException.LockBusyException;
@@ -42,17 +41,16 @@ import org.apache.pulsar.metadata.api.coordination.ResourceLock;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.metadata.cache.impl.JSONMetadataSerdeSimpleType;
-import org.apache.pulsar.metadata.api.MetadataSerde;
@Slf4j
class LockManagerImpl<T> implements LockManager<T> {
- private final Set<ResourceLockImpl<T>> locks = new HashSet<>();
+ private final Map<String, ResourceLockImpl<T>> locks = new ConcurrentHashMap<>();
private final MetadataStoreExtended store;
private final MetadataCache<T> cache;
private final MetadataSerde<T> serde;
- private static enum State {
+ private enum State {
Ready, Closed
}
@@ -79,11 +77,11 @@ class LockManagerImpl<T> implements LockManager<T> {
lock.acquire().thenRun(() -> {
synchronized (LockManagerImpl.this) {
if (state == State.Ready) {
- locks.add(lock);
+ locks.put(path, lock);
lock.getLockExpiredFuture().thenRun(() -> {
log.info("Released resource lock on {}", path);
synchronized (LockManagerImpl.this) {
- locks.remove(lock);
+ locks.remove(path, lock);
}
});
} else {
@@ -91,7 +89,7 @@ class LockManagerImpl<T> implements LockManager<T> {
lock.release();
}
}
- result.complete(lock);
+ result.complete(lock);
}).exceptionally(ex -> {
if (ex.getCause() instanceof BadVersionException) {
result.completeExceptionally(
@@ -108,15 +106,16 @@ class LockManagerImpl<T> implements LockManager<T> {
private void handleSessionEvent(SessionEvent se) {
if (se == SessionEvent.SessionReestablished) {
log.info("Metadata store session has been re-established. Revalidating all the existing locks.");
- locks.forEach(ResourceLockImpl::revalidate);
+ locks.values().forEach(ResourceLockImpl::revalidate);
}
}
private void handleDataNotification(Notification n) {
if (n.getType() == NotificationType.Deleted) {
- locks.stream()
- .filter(l -> l.getPath().equals(n.getPath()))
- .forEach(l -> l.lockWasInvalidated());
+ ResourceLockImpl<T> lock = locks.get(n.getPath());
+ if (lock != null) {
+ lock.lockWasInvalidated();
+ }
}
}
@@ -136,18 +135,19 @@ class LockManagerImpl<T> implements LockManager<T> {
@Override
public CompletableFuture<Void> asyncClose() {
- Set<ResourceLock<T>> locks;
+ Map<String, ResourceLock<T>> locks;
synchronized (this) {
if (state != State.Ready) {
return CompletableFuture.completedFuture(null);
}
- locks = new HashSet<>(this.locks);
+ locks = new HashMap<>(this.locks);
this.state = State.Closed;
}
return FutureUtils.collect(
- locks.stream().map(ResourceLock::release)
+ locks.values().stream()
+ .map(ResourceLock::release)
.collect(Collectors.toList()))
.thenApply(x -> null);
}