You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/12/26 05:35:37 UTC

[pulsar] branch master updated: Fix NPE: namespaceService need leaderElection service (#3238)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ea6f366  Fix NPE: namespaceService need leaderElection service (#3238)
ea6f366 is described below

commit ea6f366997c7a693f926df4d340ff87e9177db1d
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Tue Dec 25 21:35:33 2018 -0800

    Fix NPE: namespaceService need leaderElection service (#3238)
    
    ### Motivation
    
    namespace-service uses leaderElectionService so, leaderElectionService should start before namespace-service.
    
    ```
    Caused by: java.lang.NullPointerException
            at org.apache.pulsar.broker.namespace.NamespaceService.searchForCandidateBroker(NamespaceService.java:376) ~[pulsar-broker-2.2.jar]
            ... 9 more
    
    
                    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) ~[?:1.8.0_131]
            at org.apache.pulsar.broker.namespace.NamespaceService.searchForCandidateBroker(NamespaceService.java:395) ~[pulsar-broker-2.2.jar]
            at org.apache.pulsar.broker.namespace.NamespaceService.lambda$22(NamespaceService.java:335) ~[pulsar-broker-2.2.jar]
            at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_131]
            at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_131]
            at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_131]
            at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_131]
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_131]
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_131]
            at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [pulsar-functions-metrics-2.2.2-yahoo.jar:?]
            at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
    ```
---
 .../org/apache/pulsar/broker/PulsarService.java    | 70 ++++++++++++----------
 1 file changed, 37 insertions(+), 33 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 74e91be..f576f95 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -355,6 +355,9 @@ public class PulsarService implements AutoCloseable {
             // Start load management service (even if load balancing is disabled)
             this.loadManager.set(LoadManager.create(this));
 
+            // Start the leader election service
+            startLeaderElectionService();
+
             // needs load management service
             this.startNamespaceService();
 
@@ -418,39 +421,6 @@ public class PulsarService implements AutoCloseable {
             // Register heartbeat and bootstrap namespaces.
             this.nsservice.registerBootstrapNamespaces();
 
-            // Start the leader election service
-            this.leaderElectionService = new LeaderElectionService(this, new LeaderListener() {
-                @Override
-                public synchronized void brokerIsTheLeaderNow() {
-                    if (getConfiguration().isLoadBalancerEnabled()) {
-                        long loadSheddingInterval = TimeUnit.MINUTES
-                                .toMillis(getConfiguration().getLoadBalancerSheddingIntervalMinutes());
-                        long resourceQuotaUpdateInterval = TimeUnit.MINUTES
-                                .toMillis(getConfiguration().getLoadBalancerResourceQuotaUpdateIntervalMinutes());
-
-                        loadSheddingTask = loadManagerExecutor.scheduleAtFixedRate(new LoadSheddingTask(loadManager),
-                                loadSheddingInterval, loadSheddingInterval, TimeUnit.MILLISECONDS);
-                        loadResourceQuotaTask = loadManagerExecutor.scheduleAtFixedRate(
-                                new LoadResourceQuotaUpdaterTask(loadManager), resourceQuotaUpdateInterval,
-                                resourceQuotaUpdateInterval, TimeUnit.MILLISECONDS);
-                    }
-                }
-
-                @Override
-                public synchronized void brokerIsAFollowerNow() {
-                    if (loadSheddingTask != null) {
-                        loadSheddingTask.cancel(false);
-                        loadSheddingTask = null;
-                    }
-                    if (loadResourceQuotaTask != null) {
-                        loadResourceQuotaTask.cancel(false);
-                        loadResourceQuotaTask = null;
-                    }
-                }
-            });
-
-            leaderElectionService.start();
-
             schemaRegistryService = SchemaRegistryService.create(this);
 
             webService.start();
@@ -480,6 +450,40 @@ public class PulsarService implements AutoCloseable {
         }
     }
 
+    private void startLeaderElectionService() {
+        this.leaderElectionService = new LeaderElectionService(this, new LeaderListener() {
+            @Override
+            public synchronized void brokerIsTheLeaderNow() {
+                if (getConfiguration().isLoadBalancerEnabled()) {
+                    long loadSheddingInterval = TimeUnit.MINUTES
+                            .toMillis(getConfiguration().getLoadBalancerSheddingIntervalMinutes());
+                    long resourceQuotaUpdateInterval = TimeUnit.MINUTES
+                            .toMillis(getConfiguration().getLoadBalancerResourceQuotaUpdateIntervalMinutes());
+
+                    loadSheddingTask = loadManagerExecutor.scheduleAtFixedRate(new LoadSheddingTask(loadManager),
+                            loadSheddingInterval, loadSheddingInterval, TimeUnit.MILLISECONDS);
+                    loadResourceQuotaTask = loadManagerExecutor.scheduleAtFixedRate(
+                            new LoadResourceQuotaUpdaterTask(loadManager), resourceQuotaUpdateInterval,
+                            resourceQuotaUpdateInterval, TimeUnit.MILLISECONDS);
+                }
+            }
+
+            @Override
+            public synchronized void brokerIsAFollowerNow() {
+                if (loadSheddingTask != null) {
+                    loadSheddingTask.cancel(false);
+                    loadSheddingTask = null;
+                }
+                if (loadResourceQuotaTask != null) {
+                    loadResourceQuotaTask.cancel(false);
+                    loadResourceQuotaTask = null;
+                }
+            }
+        });
+
+        leaderElectionService.start();
+    }
+
     private void acquireSLANamespace() {
         try {
             // Namespace not created hence no need to unload it