You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/09/25 16:19:54 UTC
[pulsar] branch master updated: PIP-45: Handle session events and
invalidations from single thread (#12184)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3f512ea PIP-45: Handle session events and invalidations from single thread (#12184)
3f512ea is described below
commit 3f512ea46ee558f043c1aed862c1266abd75b007
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sat Sep 25 09:18:16 2021 -0700
PIP-45: Handle session events and invalidations from single thread (#12184)
* PIP-45: Handle session events and invalidations from single thread
* Use iterator instead of stream
* Removed synchronized
---
.../coordination/impl/CoordinationServiceImpl.java | 13 +++++--
.../coordination/impl/LeaderElectionImpl.java | 30 ++++++++++------
.../coordination/impl/LockManagerImpl.java | 42 +++++++++++++++++-----
.../coordination/impl/ResourceLockImpl.java | 8 +++--
4 files changed, 69 insertions(+), 24 deletions(-)
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/CoordinationServiceImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/CoordinationServiceImpl.java
index b56f4e7..807d058 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/CoordinationServiceImpl.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/CoordinationServiceImpl.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.metadata.coordination.impl;
+import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
@@ -26,8 +27,9 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
-
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.coordination.CoordinationService;
@@ -45,8 +47,12 @@ public class CoordinationServiceImpl implements CoordinationService {
private final Map<Class<?>, LockManager<?>> lockManagers = new ConcurrentHashMap<>();
private final Map<String, LeaderElection<?>> leaderElections = new ConcurrentHashMap<>();
+ private final ScheduledExecutorService executor;
+
public CoordinationServiceImpl(MetadataStoreExtended store) {
this.store = store;
+ this.executor = Executors.newSingleThreadScheduledExecutor(
+ new DefaultThreadFactory("metadata-store-coordination-service"));
}
@Override
@@ -70,7 +76,8 @@ public class CoordinationServiceImpl implements CoordinationService {
@Override
public <T> LockManager<T> getLockManager(Class<T> clazz) {
- return (LockManager<T>) lockManagers.computeIfAbsent(clazz, k -> new LockManagerImpl<T>(store, clazz));
+ return (LockManager<T>) lockManagers.computeIfAbsent(clazz,
+ k -> new LockManagerImpl<T>(store, clazz, executor));
}
@Override
@@ -91,6 +98,6 @@ public class CoordinationServiceImpl implements CoordinationService {
Consumer<LeaderElectionState> stateChangesListener) {
return (LeaderElection<T>) leaderElections.computeIfAbsent(path,
- key -> new LeaderElectionImpl<T>(store, clazz, path, stateChangesListener));
+ key -> new LeaderElectionImpl<T>(store, clazz, path, stateChangesListener, executor));
}
}
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
index 0b12add..f6b5700 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
@@ -22,10 +22,13 @@ import com.fasterxml.jackson.databind.type.TypeFactory;
import io.netty.util.concurrent.DefaultThreadFactory;
+import java.util.ArrayList;
import java.util.EnumSet;
+import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -34,6 +37,8 @@ import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.util.SafeRunnable;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStoreException;
@@ -72,7 +77,8 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
private static final int LEADER_ELECTION_RETRY_DELAY_SECONDS = 5;
LeaderElectionImpl(MetadataStoreExtended store, Class<T> clazz, String path,
- Consumer<LeaderElectionState> stateChangesListener) {
+ Consumer<LeaderElectionState> stateChangesListener,
+ ScheduledExecutorService executor) {
this.path = path;
this.serde = new JSONMetadataSerdeSimpleType<>(TypeFactory.defaultInstance().constructSimpleType(clazz, null));
this.store = store;
@@ -80,7 +86,7 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
this.leaderElectionState = LeaderElectionState.NoLeader;
this.internalState = InternalState.Init;
this.stateChangesListener = stateChangesListener;
- this.executor = Executors.newScheduledThreadPool(0, new DefaultThreadFactory("leader-election-executor"));
+ this.executor = executor;
store.registerListener(this::handlePathNotification);
store.registerSessionListener(this::handleSessionNotification);
@@ -268,16 +274,20 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
return cache.getIfCached(path);
}
- private synchronized void handleSessionNotification(SessionEvent event) {
- if (event == SessionEvent.SessionReestablished) {
- if (leaderElectionState == LeaderElectionState.Leading) {
+ private void handleSessionNotification(SessionEvent event) {
+ // Ensure we're only processing one session event at a time.
+ executor.execute(SafeRunnable.safeRun(() -> {
+ if (event == SessionEvent.SessionReestablished) {
log.info("Revalidating leadership for {}", path);
- }
- elect().thenAccept(les -> {
- log.info("Resynced leadership for {} - State: {}", path, les);
- });
- }
+ try {
+ LeaderElectionState les = elect().get();
+ log.info("Resynced leadership for {} - State: {}", path, les);
+ } catch (ExecutionException | InterruptedException e) {
+ log.warn("Failure when processing session event", e);
+ }
+ }
+ }));
}
private void handlePathNotification(Notification notification) {
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
index 0e3754ac..dc10b29 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
@@ -19,6 +19,8 @@
package org.apache.pulsar.metadata.coordination.impl;
import com.fasterxml.jackson.databind.type.TypeFactory;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -26,9 +28,14 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.util.SafeRunnable;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataSerde;
import org.apache.pulsar.metadata.api.MetadataStoreException;
@@ -49,6 +56,7 @@ class LockManagerImpl<T> implements LockManager<T> {
private final MetadataStoreExtended store;
private final MetadataCache<T> cache;
private final MetadataSerde<T> serde;
+ private final ExecutorService executor;
private enum State {
Ready, Closed
@@ -56,10 +64,11 @@ class LockManagerImpl<T> implements LockManager<T> {
private State state = State.Ready;
- LockManagerImpl(MetadataStoreExtended store, Class<T> clazz) {
+ LockManagerImpl(MetadataStoreExtended store, Class<T> clazz, ExecutorService executor) {
this.store = store;
this.cache = store.getMetadataCache(clazz);
this.serde = new JSONMetadataSerdeSimpleType<>(TypeFactory.defaultInstance().constructSimpleType(clazz, null));
+ this.executor = executor;
store.registerSessionListener(this::handleSessionEvent);
store.registerListener(this::handleDataNotification);
}
@@ -104,13 +113,30 @@ class LockManagerImpl<T> implements LockManager<T> {
}
private void handleSessionEvent(SessionEvent se) {
- if (se == SessionEvent.SessionReestablished) {
- log.info("Metadata store session has been re-established. Revalidating all the existing locks.");
- locks.values().forEach(ResourceLockImpl::revalidate);
- } else if (se == SessionEvent.Reconnected) {
- log.info("Metadata store connection has been re-established. Revalidating locks that were pending.");
- locks.values().forEach(ResourceLockImpl::revalidateIfNeededAfterReconnection);
- }
+ // We want to make sure we're processing one event at a time and that we're done with one event before going
+ // for the next one.
+ executor.execute(SafeRunnable.safeRun(() -> {
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+ if (se == SessionEvent.SessionReestablished) {
+ log.info("Metadata store session has been re-established. Revalidating all the existing locks.");
+ for (ResourceLockImpl<T> lock : locks.values()) {
+ futures.add(lock.revalidate());
+ }
+
+ } else if (se == SessionEvent.Reconnected) {
+ log.info("Metadata store connection has been re-established. Revalidating locks that were pending.");
+ for (ResourceLockImpl<T> lock : locks.values()) {
+ futures.add(lock.revalidateIfNeededAfterReconnection());
+ }
+ }
+
+ try {
+ FutureUtil.waitForAll(futures).get();
+ } catch (ExecutionException|InterruptedException e) {
+ log.warn("Failure when processing session event", e);
+ }
+ }));
}
private void handleDataNotification(Notification n) {
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
index 3a86aa3..8487400 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
@@ -187,7 +187,7 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
return result;
}
- synchronized void lockWasInvalidated() {
+ synchronized void lockWasInvalidated() {
if (state != State.Valid) {
// Ignore notifications while we're releasing the lock ourselves
return;
@@ -214,11 +214,13 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
});
}
- synchronized void revalidateIfNeededAfterReconnection() {
+ synchronized CompletableFuture<Void> revalidateIfNeededAfterReconnection() {
if (revalidateAfterReconnection) {
revalidateAfterReconnection = false;
log.warn("Revalidate lock at {} after reconnection", path);
- revalidate();
+ return revalidate();
+ } else {
+ return CompletableFuture.completedFuture(null);
}
}