You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/09/21 15:09:21 UTC

[pulsar] 02/02: Bugfix: Fix rackaware placement policy init error (#12097)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5d52994c6e6c2d8afe339597c70c0d1197b38eb2
Author: Addison Higham <ad...@gmail.com>
AuthorDate: Mon Sep 20 09:22:21 2021 -0600

    Bugfix: Fix rackaware placement policy init error (#12097)
    
    Since the release of Pulsar 2.8 and upgrade to BK 4.12, the default
    rackAwarePlacementPolicy has been failing with the following exception:
    
    ```
    org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl - Failed to initialize DNS Resolver org.apache.pulsar.zookeeper.ZkBookieRackAffinityMapping, used default subnet resolver : java.lang.RuntimeException: java.lang.NullPointerException java.lang.NullPointerException
    ```
    
    This regression occured in commit https://github.com/apache/pulsar/commit/4c6026213b743a7f23ae2a5a6d37ee7404b066db
    
    The core of the issue is that `setConf` is called before
    `setBookieAddressResolver` has been set (see
    https://github.com/apache/bookkeeper/blob/034ef8566ad037937a4d58a28f70631175744f53/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java#L264-L286)
    
    This results in the NPE.
    
    We are safe to simply not eagerly init the cache in setConf as the
    getRack call will re-check the cache.
    
    We also protect against this possible NPE for safety
---
 .../zookeeper/ZkBookieRackAffinityMapping.java     | 39 +++++++++++-----------
 1 file changed, 20 insertions(+), 19 deletions(-)

diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java
index 19dcb99..0bdffca 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java
@@ -76,12 +76,8 @@ public class ZkBookieRackAffinityMapping extends AbstractDNSToSwitchMapping
             conf.setProperty(ZK_DATA_CACHE_BK_RACK_CONF_INSTANCE, bookieMappingCache);
         }
 
-        try {
-            BookiesRackConfiguration racks = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).orElseGet(BookiesRackConfiguration::new);
-            updateRacksWithHost(racks);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
+        // A previous version of this code tried to eagerly load the cache. However, this is invalid
+        // in later versions of bookkeeper as when setConf is called, the bookieAddressResolver is not yet set
     }
 
     private void updateRacksWithHost(BookiesRackConfiguration racks) {
@@ -94,20 +90,25 @@ public class ZkBookieRackAffinityMapping extends AbstractDNSToSwitchMapping
                 bookies.forEach((addr, bi) -> {
                     try {
                         BookieId bookieId = BookieId.parse(addr);
-                        BookieSocketAddress bsa = getBookieAddressResolver().resolve(bookieId);
-                        newRacksWithHost.updateBookie(group, bsa.toString(), bi);
-
-                        String hostname = bsa.getSocketAddress().getHostName();
-                        newBookieInfoMap.put(hostname, bi);
-
-                        InetAddress address = bsa.getSocketAddress().getAddress();
-                        if (null != address) {
-                            String hostIp = address.getHostAddress();
-                            if (null != hostIp) {
-                                newBookieInfoMap.put(hostIp, bi);
-                            }
+                        BookieAddressResolver addressResolver = getBookieAddressResolver();
+                        if (addressResolver == null) {
+                            LOG.warn("Bookie address resolver not yet initialized, skipping resolution");
                         } else {
-                            LOG.info("Network address for {} is unresolvable yet.", addr);
+                            BookieSocketAddress bsa = addressResolver.resolve(bookieId);
+                            newRacksWithHost.updateBookie(group, bsa.toString(), bi);
+
+                            String hostname = bsa.getSocketAddress().getHostName();
+                            newBookieInfoMap.put(hostname, bi);
+
+                            InetAddress address = bsa.getSocketAddress().getAddress();
+                            if (null != address) {
+                                String hostIp = address.getHostAddress();
+                                if (null != hostIp) {
+                                    newBookieInfoMap.put(hostIp, bi);
+                                }
+                            } else {
+                                LOG.info("Network address for {} is unresolvable yet.", addr);
+                            }
                         }
                     } catch (BookieAddressResolver.BookieIdNotResolvedException e) {
                         LOG.info("Network address for {} is unresolvable yet. error is {}", addr, e);