You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/08/08 09:04:24 UTC
[pulsar] 20/33: Fix rack awareness cache expiration race condition (#16825)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9cb41fd42e5e3f19ba950fde6bcfcbac267cf3c2
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Fri Jul 29 03:24:56 2022 -0500
Fix rack awareness cache expiration race condition (#16825)
(cherry picked from commit e451806a715661fca81876579e0f078aca36c9d9)
---
.../rackawareness/BookieRackAffinityMapping.java | 64 +++++++---------------
.../BookieRackAffinityMappingTest.java | 12 ++--
2 files changed, 26 insertions(+), 50 deletions(-)
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
index ec4b7da250e..c0c29637114 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
@@ -26,9 +26,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy;
import org.apache.bookkeeper.client.RackChangeNotifier;
@@ -66,8 +64,8 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping
private ITopologyAwareEnsemblePlacementPolicy<BookieNode> rackawarePolicy = null;
private List<BookieId> bookieAddressListLastTime = new ArrayList<>();
- private volatile BookiesRackConfiguration racksWithHost = new BookiesRackConfiguration();
- private volatile Map<String, BookieInfo> bookieInfoMap = new HashMap<>();
+ private BookiesRackConfiguration racksWithHost = new BookiesRackConfiguration();
+ private Map<String, BookieInfo> bookieInfoMap = new HashMap<>();
public static MetadataStore createMetadataStore(Configuration conf) throws MetadataException {
MetadataStore store;
@@ -110,15 +108,17 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping
}
@Override
- public void setConf(Configuration conf) {
+ public synchronized void setConf(Configuration conf) {
super.setConf(conf);
MetadataStore store;
try {
store = createMetadataStore(conf);
bookieMappingCache = store.getMetadataCache(BookiesRackConfiguration.class);
- bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).join();
- for (Map<String, BookieInfo> bookieMapping : bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get()
- .map(Map::values).orElse(Collections.emptyList())) {
+ store.registerListener(this::handleUpdates);
+ racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get()
+ .orElseGet(BookiesRackConfiguration::new);
+ updateRacksWithHost(racksWithHost);
+ for (Map<String, BookieInfo> bookieMapping : racksWithHost.values()) {
for (String address : bookieMapping.keySet()) {
bookieAddressListLastTime.add(BookieId.parse(address));
}
@@ -130,13 +130,9 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping
} catch (InterruptedException | ExecutionException | MetadataException e) {
throw new RuntimeException(METADATA_STORE_INSTANCE + " failed to init BookieId list");
}
- store.registerListener(this::handleUpdates);
-
- // A previous version of this code tried to eagerly load the cache. However, this is invalid
- // in later versions of bookkeeper as when setConf is called, the bookieAddressResolver is not yet set
}
- private void updateRacksWithHost(BookiesRackConfiguration racks) {
+ private synchronized void updateRacksWithHost(BookiesRackConfiguration racks) {
// In config z-node, the bookies are added in the `ip:port` notation, while BK will ask
// for just the IP/hostname when trying to get the rack for a bookie.
// To work around this issue, we insert in the map the bookie ip/hostname with same rack-info
@@ -176,7 +172,7 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping
}
@Override
- public List<String> resolve(List<String> bookieAddressList) {
+ public synchronized List<String> resolve(List<String> bookieAddressList) {
List<String> racks = new ArrayList<>(bookieAddressList.size());
for (String bookieAddress : bookieAddressList) {
racks.add(getRack(bookieAddress));
@@ -185,32 +181,9 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping
}
private String getRack(String bookieAddress) {
- try {
- // Trigger load of z-node in case it didn't exist
- CompletableFuture<Optional<BookiesRackConfiguration>> future =
- bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH);
-
- Optional<BookiesRackConfiguration> racks = (future.isDone() && !future.isCompletedExceptionally())
- ? future.join() : Optional.empty();
- updateRacksWithHost(racks.orElseGet(BookiesRackConfiguration::new));
- if (!racks.isPresent()) {
- // since different placement policy will have different default rack,
- // don't be smart here and just return null
- return null;
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
BookieInfo bi = bookieInfoMap.get(bookieAddress);
if (bi == null) {
- Optional<BookieInfo> biOpt = racksWithHost.getBookie(bookieAddress);
- if (biOpt.isPresent()) {
- bi = biOpt.get();
- } else {
- updateRacksWithHost(racksWithHost);
- bi = bookieInfoMap.get(bookieAddress);
- }
+ bi = racksWithHost.getBookie(bookieAddress).orElse(null);
}
if (bi != null
@@ -243,10 +216,11 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping
return;
}
- if (rackawarePolicy != null) {
- bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH)
- .thenAccept(optVal -> {
+ bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH)
+ .thenAccept(optVal -> {
+ synchronized (this) {
LOG.info("Bookie rack info updated to {}. Notifying rackaware policy.", optVal);
+ this.updateRacksWithHost(optVal.orElseGet(BookiesRackConfiguration::new));
List<BookieId> bookieAddressList = new ArrayList<>();
for (Map<String, BookieInfo> bookieMapping : optVal.map(Map::values).orElse(
Collections.emptyList())) {
@@ -261,9 +235,11 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping
Set<BookieId> bookieIdSet = new HashSet<>(bookieAddressList);
bookieIdSet.addAll(bookieAddressListLastTime);
bookieAddressListLastTime = bookieAddressList;
- rackawarePolicy.onBookieRackChange(new ArrayList<>(bookieIdSet));
- });
- }
+ if (rackawarePolicy != null) {
+ rackawarePolicy.onBookieRackChange(new ArrayList<>(bookieIdSet));
+ }
+ }
+ });
}
@Override
diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java
index 485447868b1..4377916ace2 100644
--- a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java
+++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java
@@ -170,12 +170,12 @@ public class BookieRackAffinityMappingTest {
bookieMapping.put("group2", secondaryBookieGroup);
store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping),
Optional.empty()).join();
-
- racks = mapping.resolve(Lists.newArrayList("127.0.0.1", "127.0.0.2", "127.0.0.3"));
- assertEquals(racks.get(0), "/rack0");
- assertEquals(racks.get(1), "/rack1");
- assertEquals(racks.get(2), "/rack0");
-
+ Awaitility.await().untilAsserted(() -> {
+ List<String> r = mapping.resolve(Lists.newArrayList("127.0.0.1", "127.0.0.2", "127.0.0.3"));
+ assertEquals(r.get(0), "/rack0");
+ assertEquals(r.get(1), "/rack1");
+ assertEquals(r.get(2), "/rack0");
+ });
store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, "{}".getBytes(),
Optional.empty()).join();