You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/03/01 05:53:10 UTC
[pulsar] branch master updated: When the loadmanager leader is not
available, fall through regular least loaded selection (#3688)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ccfb949 When the loadmanager leader is not available, fall through regular least loaded selection (#3688)
ccfb949 is described below
commit ccfb94970bd26d7e9b4f3c819bba258f148773e4
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Feb 28 21:53:04 2019 -0800
When the loadmanager leader is not available, fall through regular least loaded selection (#3688)
* When the loadmanager leader is not available, fall through regular least loaded selection
* Handle exceptions coming from mock zk in tests
---
.../broker/loadbalance/LeaderElectionService.java | 17 +++++++++++++++++
.../pulsar/broker/namespace/NamespaceService.java | 11 ++++++++---
2 files changed, 25 insertions(+), 3 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java
index 799c7af..712b234 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.WatchedEvent;
@@ -179,6 +180,22 @@ public class LeaderElectionService {
if (stopped) {
return;
}
+
+ if (isLeader()) {
+ // Make sure to remove the leader election z-node in case the session doesn't
+ // get closed properly. This is to avoid having to wait the session timeout
+ // to elect a new one.
+ // This delete operation is safe to do here (with version=-1) because either:
+ // 1. The ZK session is still valid, in which case this broker is still
+ // the "leader" and we have to remove the z-node
+ // 2. The session has already expired, in which case this delete operation
+ // will not go through
+ try {
+ pulsar.getLocalZkCache().getZooKeeper().delete(ELECTION_ROOT, -1);
+ } catch (Throwable t) {
+ log.warn("Failed to cleanup election root znode: {}", t);
+ }
+ }
stopped = true;
log.info("LeaderElectionService stopped");
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 9b4fcae..17fa025 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -373,7 +373,12 @@ public class NamespaceService {
}
if (candidateBroker == null) {
- if (!this.loadManager.get().isCentralized() || pulsar.getLeaderElectionService().isLeader()) {
+ if (!this.loadManager.get().isCentralized()
+ || pulsar.getLeaderElectionService().isLeader()
+
+ // If leader is not active, fallback to pick the least loaded from current broker loadmanager
+ || !isBrokerActive(pulsar.getLeaderElectionService().getCurrentLeader().getServiceUrl())
+ ) {
Optional<String> availableBroker = getLeastLoadedFromLoadManager(bundle);
if (!availableBroker.isPresent()) {
lookupFuture.complete(Optional.empty());
@@ -977,7 +982,7 @@ public class NamespaceService {
port = config.getWebServicePort().get();
} else if (config.getWebServicePortTls().isPresent()) {
port = config.getWebServicePortTls().get();
- }
+ }
return String.format(HEARTBEAT_NAMESPACE_FMT, config.getClusterName(), host, port);
}
public static String getSLAMonitorNamespace(String host, ServiceConfiguration config) {
@@ -986,7 +991,7 @@ public class NamespaceService {
port = config.getWebServicePort().get();
} else if (config.getWebServicePortTls().isPresent()) {
port = config.getWebServicePortTls().get();
- }
+ }
return String.format(SLA_NAMESPACE_FMT, config.getClusterName(), host, port);
}