You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/09/25 16:19:54 UTC

[pulsar] branch master updated: PIP-45: Handle session events and invalidations from single thread (#12184)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f512ea  PIP-45: Handle session events and invalidations from single thread (#12184)
3f512ea is described below

commit 3f512ea46ee558f043c1aed862c1266abd75b007
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sat Sep 25 09:18:16 2021 -0700

    PIP-45: Handle session events and invalidations from single thread (#12184)
    
    * PIP-45: Handle session events and invalidations from single thread
    
    * Use iterator instead of stream
    
    * Removed synchronized
---
 .../coordination/impl/CoordinationServiceImpl.java | 13 +++++--
 .../coordination/impl/LeaderElectionImpl.java      | 30 ++++++++++------
 .../coordination/impl/LockManagerImpl.java         | 42 +++++++++++++++++-----
 .../coordination/impl/ResourceLockImpl.java        |  8 +++--
 4 files changed, 69 insertions(+), 24 deletions(-)

diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/CoordinationServiceImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/CoordinationServiceImpl.java
index b56f4e7..807d058 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/CoordinationServiceImpl.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/CoordinationServiceImpl.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.metadata.coordination.impl;
 
+import io.netty.util.concurrent.DefaultThreadFactory;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
@@ -26,8 +27,9 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.function.Consumer;
-
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.coordination.CoordinationService;
@@ -45,8 +47,12 @@ public class CoordinationServiceImpl implements CoordinationService {
     private final Map<Class<?>, LockManager<?>> lockManagers = new ConcurrentHashMap<>();
     private final Map<String, LeaderElection<?>> leaderElections = new ConcurrentHashMap<>();
 
+    private final ScheduledExecutorService executor;
+
     public CoordinationServiceImpl(MetadataStoreExtended store) {
         this.store = store;
+        this.executor = Executors.newSingleThreadScheduledExecutor(
+                new DefaultThreadFactory("metadata-store-coordination-service"));
     }
 
     @Override
@@ -70,7 +76,8 @@ public class CoordinationServiceImpl implements CoordinationService {
 
     @Override
     public <T> LockManager<T> getLockManager(Class<T> clazz) {
-        return (LockManager<T>) lockManagers.computeIfAbsent(clazz, k -> new LockManagerImpl<T>(store, clazz));
+        return (LockManager<T>) lockManagers.computeIfAbsent(clazz,
+                k -> new LockManagerImpl<T>(store, clazz, executor));
     }
 
     @Override
@@ -91,6 +98,6 @@ public class CoordinationServiceImpl implements CoordinationService {
             Consumer<LeaderElectionState> stateChangesListener) {
 
         return (LeaderElection<T>) leaderElections.computeIfAbsent(path,
-                key -> new LeaderElectionImpl<T>(store, clazz, path, stateChangesListener));
+                key -> new LeaderElectionImpl<T>(store, clazz, path, stateChangesListener, executor));
     }
 }
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
index 0b12add..f6b5700 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
@@ -22,10 +22,13 @@ import com.fasterxml.jackson.databind.type.TypeFactory;
 
 import io.netty.util.concurrent.DefaultThreadFactory;
 
+import java.util.ArrayList;
 import java.util.EnumSet;
+import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -34,6 +37,8 @@ import java.util.function.Consumer;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.util.SafeRunnable;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.MetadataCache;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
@@ -72,7 +77,8 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
     private static final int LEADER_ELECTION_RETRY_DELAY_SECONDS = 5;
 
     LeaderElectionImpl(MetadataStoreExtended store, Class<T> clazz, String path,
-            Consumer<LeaderElectionState> stateChangesListener) {
+            Consumer<LeaderElectionState> stateChangesListener,
+                       ScheduledExecutorService executor) {
         this.path = path;
         this.serde = new JSONMetadataSerdeSimpleType<>(TypeFactory.defaultInstance().constructSimpleType(clazz, null));
         this.store = store;
@@ -80,7 +86,7 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
         this.leaderElectionState = LeaderElectionState.NoLeader;
         this.internalState = InternalState.Init;
         this.stateChangesListener = stateChangesListener;
-        this.executor = Executors.newScheduledThreadPool(0, new DefaultThreadFactory("leader-election-executor"));
+        this.executor = executor;
 
         store.registerListener(this::handlePathNotification);
         store.registerSessionListener(this::handleSessionNotification);
@@ -268,16 +274,20 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
         return cache.getIfCached(path);
     }
 
-    private synchronized void handleSessionNotification(SessionEvent event) {
-        if (event == SessionEvent.SessionReestablished) {
-            if (leaderElectionState == LeaderElectionState.Leading) {
+    private void handleSessionNotification(SessionEvent event) {
+        // Ensure we're only processing one session event at a time.
+        executor.execute(SafeRunnable.safeRun(() -> {
+            if (event == SessionEvent.SessionReestablished) {
                 log.info("Revalidating leadership for {}", path);
-            }
 
-            elect().thenAccept(les -> {
-                log.info("Resynced leadership for {} - State: {}", path, les);
-            });
-        }
+                try {
+                    LeaderElectionState les = elect().get();
+                    log.info("Resynced leadership for {} - State: {}", path, les);
+                } catch (ExecutionException | InterruptedException e) {
+                    log.warn("Failure when processing session event", e);
+                }
+            }
+        }));
     }
 
     private void handlePathNotification(Notification notification) {
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
index 0e3754ac..dc10b29 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.metadata.coordination.impl;
 
 import com.fasterxml.jackson.databind.type.TypeFactory;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -26,9 +28,14 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.util.SafeRunnable;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.MetadataCache;
 import org.apache.pulsar.metadata.api.MetadataSerde;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
@@ -49,6 +56,7 @@ class LockManagerImpl<T> implements LockManager<T> {
     private final MetadataStoreExtended store;
     private final MetadataCache<T> cache;
     private final MetadataSerde<T> serde;
+    private final ExecutorService executor;
 
     private enum State {
         Ready, Closed
@@ -56,10 +64,11 @@ class LockManagerImpl<T> implements LockManager<T> {
 
     private State state = State.Ready;
 
-    LockManagerImpl(MetadataStoreExtended store, Class<T> clazz) {
+    LockManagerImpl(MetadataStoreExtended store, Class<T> clazz, ExecutorService executor) {
         this.store = store;
         this.cache = store.getMetadataCache(clazz);
         this.serde = new JSONMetadataSerdeSimpleType<>(TypeFactory.defaultInstance().constructSimpleType(clazz, null));
+        this.executor = executor;
         store.registerSessionListener(this::handleSessionEvent);
         store.registerListener(this::handleDataNotification);
     }
@@ -104,13 +113,30 @@ class LockManagerImpl<T> implements LockManager<T> {
     }
 
     private void handleSessionEvent(SessionEvent se) {
-        if (se == SessionEvent.SessionReestablished) {
-            log.info("Metadata store session has been re-established. Revalidating all the existing locks.");
-            locks.values().forEach(ResourceLockImpl::revalidate);
-        } else if (se == SessionEvent.Reconnected) {
-            log.info("Metadata store connection has been re-established. Revalidating locks that were pending.");
-            locks.values().forEach(ResourceLockImpl::revalidateIfNeededAfterReconnection);
-        }
+        // We want to make sure we're processing one event at a time and that we're done with one event before going
+        // for the next one.
+        executor.execute(SafeRunnable.safeRun(() -> {
+            List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+            if (se == SessionEvent.SessionReestablished) {
+                log.info("Metadata store session has been re-established. Revalidating all the existing locks.");
+                for (ResourceLockImpl<T> lock : locks.values()) {
+                    futures.add(lock.revalidate());
+                }
+
+            } else if (se == SessionEvent.Reconnected) {
+                log.info("Metadata store connection has been re-established. Revalidating locks that were pending.");
+                for (ResourceLockImpl<T> lock : locks.values()) {
+                    futures.add(lock.revalidateIfNeededAfterReconnection());
+                }
+            }
+
+            try {
+                FutureUtil.waitForAll(futures).get();
+            } catch (ExecutionException|InterruptedException e) {
+                log.warn("Failure when processing session event", e);
+            }
+        }));
     }
 
     private void handleDataNotification(Notification n) {
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
index 3a86aa3..8487400 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
@@ -187,7 +187,7 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
         return result;
     }
 
-    synchronized  void lockWasInvalidated() {
+    synchronized void lockWasInvalidated() {
         if (state != State.Valid) {
             // Ignore notifications while we're releasing the lock ourselves
             return;
@@ -214,11 +214,13 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
                 });
     }
 
-    synchronized void revalidateIfNeededAfterReconnection() {
+    synchronized CompletableFuture<Void> revalidateIfNeededAfterReconnection() {
         if (revalidateAfterReconnection) {
             revalidateAfterReconnection = false;
             log.warn("Revalidate lock at {} after reconnection", path);
-            revalidate();
+            return revalidate();
+        } else {
+            return CompletableFuture.completedFuture(null);
         }
     }