You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/03/01 05:53:10 UTC

[pulsar] branch master updated: When the loadmanager leader is not available, fall through regular least loaded selection (#3688)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ccfb949  When the loadmanager leader is not available, fall through regular least loaded selection (#3688)
ccfb949 is described below

commit ccfb94970bd26d7e9b4f3c819bba258f148773e4
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Feb 28 21:53:04 2019 -0800

    When the loadmanager leader is not available, fall through regular least loaded selection (#3688)
    
    * When the loadmanager leader is not available, fall through regular least loaded selection
    
    * Handle exceptions coming from mock zk in tests
---
 .../broker/loadbalance/LeaderElectionService.java       | 17 +++++++++++++++++
 .../pulsar/broker/namespace/NamespaceService.java       | 11 ++++++++---
 2 files changed, 25 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java
index 799c7af..712b234 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.zookeeper.KeeperException.NodeExistsException;
 import org.apache.zookeeper.WatchedEvent;
@@ -179,6 +180,22 @@ public class LeaderElectionService {
         if (stopped) {
             return;
         }
+
+        if (isLeader()) {
+            // Make sure to remove the leader election z-node in case the session doesn't
+            // get closed properly. This is to avoid having to wait the session timeout
+            // to elect a new one.
+            // This delete operation is safe to do here (with version=-1) because either:
+            //  1. The ZK session is still valid, in which case this broker is still
+            //     the "leader" and we have to remove the z-node
+            //  2. The session has already expired, in which case this delete operation
+            //     will not go through
+            try {
+                pulsar.getLocalZkCache().getZooKeeper().delete(ELECTION_ROOT, -1);
+            } catch (Throwable t) {
+                log.warn("Failed to cleanup election root znode: {}", t);
+            }
+        }
         stopped = true;
         log.info("LeaderElectionService stopped");
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 9b4fcae..17fa025 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -373,7 +373,12 @@ public class NamespaceService {
             }
 
             if (candidateBroker == null) {
-                if (!this.loadManager.get().isCentralized() || pulsar.getLeaderElectionService().isLeader()) {
+                if (!this.loadManager.get().isCentralized()
+                        || pulsar.getLeaderElectionService().isLeader()
+
+                        // If leader is not active, fallback to pick the least loaded from current broker loadmanager
+                        || !isBrokerActive(pulsar.getLeaderElectionService().getCurrentLeader().getServiceUrl())
+                    ) {
                     Optional<String> availableBroker = getLeastLoadedFromLoadManager(bundle);
                     if (!availableBroker.isPresent()) {
                         lookupFuture.complete(Optional.empty());
@@ -977,7 +982,7 @@ public class NamespaceService {
             port = config.getWebServicePort().get();
         } else if (config.getWebServicePortTls().isPresent()) {
             port = config.getWebServicePortTls().get();
-        } 
+        }
         return String.format(HEARTBEAT_NAMESPACE_FMT, config.getClusterName(), host, port);
     }
      public static String getSLAMonitorNamespace(String host, ServiceConfiguration config) {
@@ -986,7 +991,7 @@ public class NamespaceService {
             port = config.getWebServicePort().get();
         } else if (config.getWebServicePortTls().isPresent()) {
             port = config.getWebServicePortTls().get();
-        } 
+        }
         return String.format(SLA_NAMESPACE_FMT, config.getClusterName(), host, port);
     }