You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/08/08 09:04:24 UTC

[pulsar] 20/33: Fix rack awareness cache expiration race condition (#16825)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9cb41fd42e5e3f19ba950fde6bcfcbac267cf3c2
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Fri Jul 29 03:24:56 2022 -0500

    Fix rack awareness cache expiration race condition (#16825)
    
    (cherry picked from commit e451806a715661fca81876579e0f078aca36c9d9)
---
 .../rackawareness/BookieRackAffinityMapping.java   | 64 +++++++---------------
 .../BookieRackAffinityMappingTest.java             | 12 ++--
 2 files changed, 26 insertions(+), 50 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
index ec4b7da250e..c0c29637114 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
@@ -26,9 +26,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy;
 import org.apache.bookkeeper.client.RackChangeNotifier;
@@ -66,8 +64,8 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping
     private ITopologyAwareEnsemblePlacementPolicy<BookieNode> rackawarePolicy = null;
     private List<BookieId> bookieAddressListLastTime = new ArrayList<>();
 
-    private volatile BookiesRackConfiguration racksWithHost = new BookiesRackConfiguration();
-    private volatile Map<String, BookieInfo> bookieInfoMap = new HashMap<>();
+    private BookiesRackConfiguration racksWithHost = new BookiesRackConfiguration();
+    private Map<String, BookieInfo> bookieInfoMap = new HashMap<>();
 
     public static MetadataStore createMetadataStore(Configuration conf) throws MetadataException {
         MetadataStore store;
@@ -110,15 +108,17 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping
     }
 
     @Override
-    public void setConf(Configuration conf) {
+    public synchronized void setConf(Configuration conf) {
         super.setConf(conf);
         MetadataStore store;
         try {
             store = createMetadataStore(conf);
             bookieMappingCache = store.getMetadataCache(BookiesRackConfiguration.class);
-            bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).join();
-            for (Map<String, BookieInfo> bookieMapping : bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get()
-                    .map(Map::values).orElse(Collections.emptyList())) {
+            store.registerListener(this::handleUpdates);
+            racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get()
+                    .orElseGet(BookiesRackConfiguration::new);
+            updateRacksWithHost(racksWithHost);
+            for (Map<String, BookieInfo> bookieMapping : racksWithHost.values()) {
                 for (String address : bookieMapping.keySet()) {
                     bookieAddressListLastTime.add(BookieId.parse(address));
                 }
@@ -130,13 +130,9 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping
         } catch (InterruptedException | ExecutionException | MetadataException e) {
             throw new RuntimeException(METADATA_STORE_INSTANCE + " failed to init BookieId list");
         }
-        store.registerListener(this::handleUpdates);
-
-        // A previous version of this code tried to eagerly load the cache. However, this is invalid
-        // in later versions of bookkeeper as when setConf is called, the bookieAddressResolver is not yet set
     }
 
-    private void updateRacksWithHost(BookiesRackConfiguration racks) {
+    private synchronized void updateRacksWithHost(BookiesRackConfiguration racks) {
         // In config z-node, the bookies are added in the `ip:port` notation, while BK will ask
         // for just the IP/hostname when trying to get the rack for a bookie.
         // To work around this issue, we insert in the map the bookie ip/hostname with same rack-info
@@ -176,7 +172,7 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping
     }
 
     @Override
-    public List<String> resolve(List<String> bookieAddressList) {
+    public synchronized List<String> resolve(List<String> bookieAddressList) {
         List<String> racks = new ArrayList<>(bookieAddressList.size());
         for (String bookieAddress : bookieAddressList) {
             racks.add(getRack(bookieAddress));
@@ -185,32 +181,9 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping
     }
 
     private String getRack(String bookieAddress) {
-        try {
-            // Trigger load of z-node in case it didn't exist
-            CompletableFuture<Optional<BookiesRackConfiguration>> future =
-                    bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH);
-
-            Optional<BookiesRackConfiguration> racks = (future.isDone() && !future.isCompletedExceptionally())
-                    ? future.join() : Optional.empty();
-            updateRacksWithHost(racks.orElseGet(BookiesRackConfiguration::new));
-            if (!racks.isPresent()) {
-                // since different placement policy will have different default rack,
-                // don't be smart here and just return null
-                return null;
-            }
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-
         BookieInfo bi = bookieInfoMap.get(bookieAddress);
         if (bi == null) {
-            Optional<BookieInfo> biOpt = racksWithHost.getBookie(bookieAddress);
-            if (biOpt.isPresent()) {
-                bi = biOpt.get();
-            } else {
-                updateRacksWithHost(racksWithHost);
-                bi = bookieInfoMap.get(bookieAddress);
-            }
+            bi = racksWithHost.getBookie(bookieAddress).orElse(null);
         }
 
         if (bi != null
@@ -243,10 +216,11 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping
             return;
         }
 
-        if (rackawarePolicy != null) {
-            bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH)
-                    .thenAccept(optVal -> {
+        bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH)
+                .thenAccept(optVal -> {
+                    synchronized (this) {
                         LOG.info("Bookie rack info updated to {}. Notifying rackaware policy.", optVal);
+                        this.updateRacksWithHost(optVal.orElseGet(BookiesRackConfiguration::new));
                         List<BookieId> bookieAddressList = new ArrayList<>();
                         for (Map<String, BookieInfo> bookieMapping : optVal.map(Map::values).orElse(
                                 Collections.emptyList())) {
@@ -261,9 +235,11 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping
                         Set<BookieId> bookieIdSet = new HashSet<>(bookieAddressList);
                         bookieIdSet.addAll(bookieAddressListLastTime);
                         bookieAddressListLastTime = bookieAddressList;
-                        rackawarePolicy.onBookieRackChange(new ArrayList<>(bookieIdSet));
-                    });
-        }
+                        if (rackawarePolicy != null) {
+                            rackawarePolicy.onBookieRackChange(new ArrayList<>(bookieIdSet));
+                        }
+                    }
+                });
     }
 
     @Override
diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java
index 485447868b1..4377916ace2 100644
--- a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java
+++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java
@@ -170,12 +170,12 @@ public class BookieRackAffinityMappingTest {
         bookieMapping.put("group2", secondaryBookieGroup);
         store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping),
                 Optional.empty()).join();
-
-        racks = mapping.resolve(Lists.newArrayList("127.0.0.1", "127.0.0.2", "127.0.0.3"));
-        assertEquals(racks.get(0), "/rack0");
-        assertEquals(racks.get(1), "/rack1");
-        assertEquals(racks.get(2), "/rack0");
-
+        Awaitility.await().untilAsserted(() -> {
+            List<String> r = mapping.resolve(Lists.newArrayList("127.0.0.1", "127.0.0.2", "127.0.0.3"));
+            assertEquals(r.get(0), "/rack0");
+            assertEquals(r.get(1), "/rack1");
+            assertEquals(r.get(2), "/rack0");
+        });
         store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, "{}".getBytes(),
                 Optional.empty()).join();