You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/12/26 05:35:37 UTC
[pulsar] branch master updated: Fix NPE: namespaceService need
leaderElection service (#3238)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ea6f366 Fix NPE: namespaceService need leaderElection service (#3238)
ea6f366 is described below
commit ea6f366997c7a693f926df4d340ff87e9177db1d
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Tue Dec 25 21:35:33 2018 -0800
Fix NPE: namespaceService need leaderElection service (#3238)
### Motivation
namespace-service uses leaderElectionService so, leaderElectionService should start before namespace-service.
```
Caused by: java.lang.NullPointerException
at org.apache.pulsar.broker.namespace.NamespaceService.searchForCandidateBroker(NamespaceService.java:376) ~[pulsar-broker-2.2.jar]
... 9 more
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) ~[?:1.8.0_131]
at org.apache.pulsar.broker.namespace.NamespaceService.searchForCandidateBroker(NamespaceService.java:395) ~[pulsar-broker-2.2.jar]
at org.apache.pulsar.broker.namespace.NamespaceService.lambda$22(NamespaceService.java:335) ~[pulsar-broker-2.2.jar]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_131]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_131]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_131]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_131]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_131]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_131]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [pulsar-functions-metrics-2.2.2-yahoo.jar:?]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
```
---
.../org/apache/pulsar/broker/PulsarService.java | 70 ++++++++++++----------
1 file changed, 37 insertions(+), 33 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 74e91be..f576f95 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -355,6 +355,9 @@ public class PulsarService implements AutoCloseable {
// Start load management service (even if load balancing is disabled)
this.loadManager.set(LoadManager.create(this));
+ // Start the leader election service
+ startLeaderElectionService();
+
// needs load management service
this.startNamespaceService();
@@ -418,39 +421,6 @@ public class PulsarService implements AutoCloseable {
// Register heartbeat and bootstrap namespaces.
this.nsservice.registerBootstrapNamespaces();
- // Start the leader election service
- this.leaderElectionService = new LeaderElectionService(this, new LeaderListener() {
- @Override
- public synchronized void brokerIsTheLeaderNow() {
- if (getConfiguration().isLoadBalancerEnabled()) {
- long loadSheddingInterval = TimeUnit.MINUTES
- .toMillis(getConfiguration().getLoadBalancerSheddingIntervalMinutes());
- long resourceQuotaUpdateInterval = TimeUnit.MINUTES
- .toMillis(getConfiguration().getLoadBalancerResourceQuotaUpdateIntervalMinutes());
-
- loadSheddingTask = loadManagerExecutor.scheduleAtFixedRate(new LoadSheddingTask(loadManager),
- loadSheddingInterval, loadSheddingInterval, TimeUnit.MILLISECONDS);
- loadResourceQuotaTask = loadManagerExecutor.scheduleAtFixedRate(
- new LoadResourceQuotaUpdaterTask(loadManager), resourceQuotaUpdateInterval,
- resourceQuotaUpdateInterval, TimeUnit.MILLISECONDS);
- }
- }
-
- @Override
- public synchronized void brokerIsAFollowerNow() {
- if (loadSheddingTask != null) {
- loadSheddingTask.cancel(false);
- loadSheddingTask = null;
- }
- if (loadResourceQuotaTask != null) {
- loadResourceQuotaTask.cancel(false);
- loadResourceQuotaTask = null;
- }
- }
- });
-
- leaderElectionService.start();
-
schemaRegistryService = SchemaRegistryService.create(this);
webService.start();
@@ -480,6 +450,40 @@ public class PulsarService implements AutoCloseable {
}
}
+ private void startLeaderElectionService() {
+ this.leaderElectionService = new LeaderElectionService(this, new LeaderListener() {
+ @Override
+ public synchronized void brokerIsTheLeaderNow() {
+ if (getConfiguration().isLoadBalancerEnabled()) {
+ long loadSheddingInterval = TimeUnit.MINUTES
+ .toMillis(getConfiguration().getLoadBalancerSheddingIntervalMinutes());
+ long resourceQuotaUpdateInterval = TimeUnit.MINUTES
+ .toMillis(getConfiguration().getLoadBalancerResourceQuotaUpdateIntervalMinutes());
+
+ loadSheddingTask = loadManagerExecutor.scheduleAtFixedRate(new LoadSheddingTask(loadManager),
+ loadSheddingInterval, loadSheddingInterval, TimeUnit.MILLISECONDS);
+ loadResourceQuotaTask = loadManagerExecutor.scheduleAtFixedRate(
+ new LoadResourceQuotaUpdaterTask(loadManager), resourceQuotaUpdateInterval,
+ resourceQuotaUpdateInterval, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ @Override
+ public synchronized void brokerIsAFollowerNow() {
+ if (loadSheddingTask != null) {
+ loadSheddingTask.cancel(false);
+ loadSheddingTask = null;
+ }
+ if (loadResourceQuotaTask != null) {
+ loadResourceQuotaTask.cancel(false);
+ loadResourceQuotaTask = null;
+ }
+ }
+ });
+
+ leaderElectionService.start();
+ }
+
private void acquireSLANamespace() {
try {
// Namespace not created hence no need to unload it