You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by kw...@apache.org on 2023/05/22 00:51:17 UTC

[pulsar] branch master updated: [fix][broker] Fix broker load manager class filter NPE (#20350)

This is an automated email from the ASF dual-hosted git repository.

kwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b7f0004313e [fix][broker] Fix broker load manager class filter NPE (#20350)
b7f0004313e is described below

commit b7f0004313ea4565717cc6d3c0b99aee5c079c6c
Author: Kai Wang <kw...@apache.org>
AuthorDate: Mon May 22 08:51:10 2023 +0800

    [fix][broker] Fix broker load manager class filter NPE (#20350)
    
    PIP: https://github.com/apache/pulsar/issues/16691
    
    ### Motivation
    When upgrading the pulsar version and changing the pulsar load manager to `ExtensibleLoadManagerImpl` it might cause NPE. The root cause is the old version of pulsar does not contain the `loadManagerClassName` field.
    ```
    2023-05-18T05:42:50,557+0000 [pulsar-io-4-1] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.6:51345] connected with role=[pulsarinstance-v3-0-n@test.dev](mailto:pulsarinstance-v3-0-n@test.dev) using authMethod=token, clientVersion=Pulsar Go 0.9.0, clientProtocolVersion=18, proxyVersion=null
    2023-05-18T05:42:50,558+0000 [pulsar-io-4-1] WARN  org.apache.pulsar.broker.lookup.TopicLookupBase - Failed to lookup [pulsarinstance-v3-0-n@test.dev](mailto:pulsarinstance-v3-0-n@test.dev) for topic persistent://xxx with error java.lang.NullPointerException: Cannot invoke “String.equals(Object)” because the return value of “org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData.getLoadManagerClassName()” is null
    java.util.concurrent.CompletionException: java.lang.NullPointerException: Cannot invoke “String.equals(Object)” because the return value of “org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData.getLoadManagerClassName()” is null
            at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315) ~[?:?]
            at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1194) ~[?:?]
            at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2309) ~[?:?]
            at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.selectAsync(ExtensibleLoadManagerImpl.java:385) ~[io.streamnative-pulsar-broker-3.0.0.1.jar:3.0.0.1]
            at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.lambda$assign$6(ExtensibleLoadManagerImpl.java:336) ~[io.streamnative-pulsar-broker-3.0.0.1.jar:3.0.0.1]
            at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1187) ~[?:?]
            at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2309) ~[?:?]
            at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.lambda$assign$10(ExtensibleLoadManagerImpl.java:333) ~[io.streamnative-pulsar-broker-3.0.0.1.jar:3.0.0.1]
            at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:409) ~[io.streamnative-pulsar-common-3.0.0.1.jar:3.0.0.1]
            at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:243) ~[io.streamnative-pulsar-common-3.0.0.1.jar:3.0.0.1]
            at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.assign(ExtensibleLoadManagerImpl.java:327) ~[io.streamnative-pulsar-broker-3.0.0.1.jar:3.0.0.1]
            at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerWrapper.findBrokerServiceUrl(ExtensibleLoadManagerWrapper.java:66) ~[io.streamnative-pulsar-broker-3.0.0.1.jar:3.0.0.1]
            at org.apache.pulsar.broker.namespace.NamespaceService.lambda$getBrokerServiceUrlAsync$0(NamespaceService.java:191) ~[io.streamnative-pulsar-broker-3.0.0.1.jar:3.0.0.1]
    ```
    
    ### Modifications
    
    * Add null check when using`getLoadManagerClassName`.
    * Add test to cover this case.
    * Add `RedirectManager` unit test.
---
 .../filter/BrokerLoadManagerClassFilter.java       |   5 +-
 .../extensions/manager/RedirectManager.java        |  17 +++-
 .../impl/BrokerLoadManagerClassFilter.java         |   5 +-
 .../filter/BrokerLoadManagerClassFilterTest.java   |   3 +-
 .../extensions/manager/RedirectManagerTest.java    | 111 +++++++++++++++++++++
 .../impl/BrokerLoadManagerClassFilterTest.java     |   6 ++
 6 files changed, 139 insertions(+), 8 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLoadManagerClassFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLoadManagerClassFilter.java
index 4ee28a5225a..07109b277ae 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLoadManagerClassFilter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLoadManagerClassFilter.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.loadbalance.extensions.filter;
 
 import java.util.Map;
+import java.util.Objects;
 import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
 import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
 import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
@@ -43,7 +44,9 @@ public class BrokerLoadManagerClassFilter implements BrokerFilter {
         }
         brokers.entrySet().removeIf(entry -> {
             BrokerLookupData v = entry.getValue();
-            return !v.getLoadManagerClassName().equals(context.brokerConfiguration().getLoadManagerClassName());
+            // The load manager class name can be null if the cluster has old version of broker.
+            return !Objects.equals(v.getLoadManagerClassName(),
+                    context.brokerConfiguration().getLoadManagerClassName());
         });
         return brokers;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManager.java
index 4aff77937a5..3455b333b0a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManager.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManager.java
@@ -19,9 +19,11 @@
 package org.apache.pulsar.broker.loadbalance.extensions.manager;
 
 import static org.apache.pulsar.broker.loadbalance.LoadManager.LOADBALANCE_BROKERS_ROOT;
+import com.google.common.annotations.VisibleForTesting;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -48,6 +50,12 @@ public class RedirectManager {
         this.brokerLookupDataLockManager = pulsar.getCoordinationService().getLockManager(BrokerLookupData.class);
     }
 
+    @VisibleForTesting
+    public RedirectManager(PulsarService pulsar, LockManager<BrokerLookupData> brokerLookupDataLockManager) {
+        this.pulsar = pulsar;
+        this.brokerLookupDataLockManager = brokerLookupDataLockManager;
+    }
+
     public CompletableFuture<Map<String, BrokerLookupData>> getAvailableBrokerLookupDataAsync() {
         return brokerLookupDataLockManager.listLocks(LOADBALANCE_BROKERS_ROOT).thenCompose(availableBrokers -> {
             Map<String, BrokerLookupData> map = new ConcurrentHashMap<>();
@@ -69,7 +77,7 @@ public class RedirectManager {
 
     public CompletableFuture<Optional<LookupResult>> findRedirectLookupResultAsync() {
         String currentLMClassName = pulsar.getConfiguration().getLoadManagerClassName();
-        boolean debug = ExtensibleLoadManagerImpl.debug(pulsar.getConfig(), log);
+        boolean debug = ExtensibleLoadManagerImpl.debug(pulsar.getConfiguration(), log);
         return getAvailableBrokerLookupDataAsync().thenApply(lookupDataMap -> {
             if (lookupDataMap.isEmpty()) {
                 String errorMsg = "No available broker found.";
@@ -89,9 +97,10 @@ public class RedirectManager {
                 log.warn(errorMsg);
                 throw new IllegalStateException(errorMsg);
             }
-            if (latestServiceLookupData.get().getLoadManagerClassName().equals(currentLMClassName)) {
+
+            if (Objects.equals(latestServiceLookupData.get().getLoadManagerClassName(), currentLMClassName)) {
                 if (debug) {
-                    log.info("We don't need to redirect, current load manager class name: {}",
+                    log.info("No need to redirect, current load manager class name: {}",
                             currentLMClassName);
                 }
                 return Optional.empty();
@@ -99,7 +108,7 @@ public class RedirectManager {
             var serviceLookupDataObj = latestServiceLookupData.get();
             var candidateBrokers = new ArrayList<ServiceLookupData>();
             lookupDataMap.forEach((key, value) -> {
-                if (value.getLoadManagerClassName().equals(serviceLookupDataObj.getLoadManagerClassName())) {
+                if (Objects.equals(value.getLoadManagerClassName(), serviceLookupDataObj.getLoadManagerClassName())) {
                     candidateBrokers.add(value);
                 }
             });
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BrokerLoadManagerClassFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BrokerLoadManagerClassFilter.java
index 5d6a56ba869..13e3fdc537e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BrokerLoadManagerClassFilter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BrokerLoadManagerClassFilter.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.loadbalance.impl;
 
+import java.util.Objects;
 import java.util.Set;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.loadbalance.BrokerFilter;
@@ -32,8 +33,8 @@ public class BrokerLoadManagerClassFilter implements BrokerFilter {
                        LoadData loadData,
                        ServiceConfiguration conf) throws BrokerFilterException {
         loadData.getBrokerData().forEach((key, value) -> {
-            if (!value.getLocalData().getLoadManagerClassName()
-                    .equals(conf.getLoadManagerClassName())) {
+            // The load manager class name can be null if the cluster has old version of broker.
+            if (!Objects.equals(value.getLocalData().getLoadManagerClassName(), conf.getLoadManagerClassName())) {
                 brokers.remove(key);
             }
         });
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLoadManagerClassFilterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLoadManagerClassFilterTest.java
index 0169b57fe99..4aef87cf63a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLoadManagerClassFilterTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLoadManagerClassFilterTest.java
@@ -44,7 +44,8 @@ public class BrokerLoadManagerClassFilterTest extends BrokerFilterTestBase {
                 "broker1", getLookupData("3.0.0", ExtensibleLoadManagerImpl.class.getName()),
                 "broker2", getLookupData("3.0.0", ExtensibleLoadManagerImpl.class.getName()),
                 "broker3", getLookupData("3.0.0", ModularLoadManagerImpl.class.getName()),
-                "broker4", getLookupData("3.0.0", ModularLoadManagerImpl.class.getName())
+                "broker4", getLookupData("3.0.0", ModularLoadManagerImpl.class.getName()),
+                "broker5", getLookupData("3.0.0", null)
         );
 
         Map<String, BrokerLookupData> result = filter.filter(new HashMap<>(originalBrokers), null, context);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManagerTest.java
new file mode 100644
index 00000000000..cbf77b59d5a
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManagerTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.manager;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
+import org.testng.annotations.Test;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+
+/**
+ * Unit test {@link RedirectManager}.
+ */
+public class RedirectManagerTest {
+
+    @Test
+    public void testFindRedirectLookupResultAsync() throws ExecutionException, InterruptedException {
+        PulsarService pulsar = mock(PulsarService.class);
+        ServiceConfiguration configuration = new ServiceConfiguration();
+        when(pulsar.getConfiguration()).thenReturn(configuration);
+        RedirectManager redirectManager = spy(new RedirectManager(pulsar, null));
+
+        configuration.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
+        configuration.setLoadBalancerDebugModeEnabled(true);
+
+        // Test 1: No load manager class name found.
+        doReturn(CompletableFuture.completedFuture(
+                new HashMap<>(){{
+                    put("broker-1", getLookupData("broker-1", null, 10));
+                    put("broker-2", getLookupData("broker-2", ModularLoadManagerImpl.class.getName(), 1));
+                }}
+        )).when(redirectManager).getAvailableBrokerLookupDataAsync();
+
+        // Should redirect to broker-1, since broker-1 has the latest load manager, even though the class name is null.
+        Optional<LookupResult> lookupResult = redirectManager.findRedirectLookupResultAsync().get();
+        assertTrue(lookupResult.isPresent());
+        assertTrue(lookupResult.get().getLookupData().getBrokerUrl().contains("broker-1"));
+
+        // Test 2: Should redirect to broker-1, since the latest broker are using ExtensibleLoadManagerImpl
+        doReturn(CompletableFuture.completedFuture(
+                new HashMap<>(){{
+                    put("broker-1", getLookupData("broker-1", ExtensibleLoadManagerImpl.class.getName(), 10));
+                    put("broker-2", getLookupData("broker-2", ModularLoadManagerImpl.class.getName(), 1));
+                }}
+        )).when(redirectManager).getAvailableBrokerLookupDataAsync();
+
+        lookupResult = redirectManager.findRedirectLookupResultAsync().get();
+        assertTrue(lookupResult.isPresent());
+        assertTrue(lookupResult.get().getLookupData().getBrokerUrl().contains("broker-1"));
+
+
+        // Test 3: Should not redirect, since current broker are using ModularLoadManagerImpl
+        doReturn(CompletableFuture.completedFuture(
+                new HashMap<>(){{
+                    put("broker-1", getLookupData("broker-1", ExtensibleLoadManagerImpl.class.getName(), 10));
+                    put("broker-2", getLookupData("broker-2", ModularLoadManagerImpl.class.getName(), 100));
+                }}
+        )).when(redirectManager).getAvailableBrokerLookupDataAsync();
+
+        lookupResult = redirectManager.findRedirectLookupResultAsync().get();
+        assertFalse(lookupResult.isPresent());
+    }
+
+
+    public BrokerLookupData getLookupData(String broker, String loadManagerClassName, long startTimeStamp) {
+        String webServiceUrl = "http://" + broker + ":8080";
+        String webServiceUrlTls = "https://" + broker + ":8081";
+        String pulsarServiceUrl = "pulsar://" + broker + ":6650";
+        String pulsarServiceUrlTls = "pulsar+ssl://" + broker + ":6651";
+        Map<String, AdvertisedListener> advertisedListeners = new HashMap<>();
+        Map<String, String> protocols = new HashMap<>(){{
+            put("kafka", "9092");
+        }};
+        return new BrokerLookupData(
+                webServiceUrl, webServiceUrlTls, pulsarServiceUrl,
+                pulsarServiceUrlTls, advertisedListeners, protocols, true, true,
+                loadManagerClassName, startTimeStamp, "3.0.0");
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BrokerLoadManagerClassFilterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BrokerLoadManagerClassFilterTest.java
index 856bbac0292..56332111f93 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BrokerLoadManagerClassFilterTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BrokerLoadManagerClassFilterTest.java
@@ -46,8 +46,12 @@ public class BrokerLoadManagerClassFilterTest {
 
         LocalBrokerData localBrokerData1 = new LocalBrokerData();
         localBrokerData1.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
+
+        LocalBrokerData localBrokerData2 = new LocalBrokerData();
+        localBrokerData2.setLoadManagerClassName(null);
         loadData.getBrokerData().put("broker1", new BrokerData(localBrokerData));
         loadData.getBrokerData().put("broker2", new BrokerData(localBrokerData1));
+        loadData.getBrokerData().put("broker3", new BrokerData(localBrokerData2));
 
         ServiceConfiguration conf = new ServiceConfiguration();
         conf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
@@ -55,6 +59,7 @@ public class BrokerLoadManagerClassFilterTest {
         Set<String> brokers = new HashSet<>(){{
             add("broker1");
             add("broker2");
+            add("broker3");
         }};
         filter.filter(brokers, null, loadData, conf);
 
@@ -64,6 +69,7 @@ public class BrokerLoadManagerClassFilterTest {
         brokers = new HashSet<>(){{
             add("broker1");
             add("broker2");
+            add("broker3");
         }};
         conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
         filter.filter(brokers, null, loadData, conf);