You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by ch...@apache.org on 2022/12/08 14:38:25 UTC
[bookkeeper] 08/08: Fix RegionAwareEnsemblePlacementPolicy update rack info problem. (#3666)
This is an automated email from the ASF dual-hosted git repository.
chenhang pushed a commit to branch branch-4.14
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit b7f9b83f9e8f1d954044c271f516a939cbee3a22
Author: Yan Zhao <ho...@apache.org>
AuthorDate: Wed Dec 7 15:50:09 2022 +0800
Fix RegionAwareEnsemblePlacementPolicy update rack info problem. (#3666)
(cherry picked from commit 4574ba02333308ad648c9b78963a381ad83ea564)
---
.../client/RegionAwareEnsemblePlacementPolicy.java | 85 ++++++++++--
.../TestRegionAwareEnsemblePlacementPolicy.java | 154 +++++++++++++++++++++
2 files changed, 225 insertions(+), 14 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
index 383260dc80..b7a7f0e48e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
@@ -20,6 +20,7 @@ package org.apache.bookkeeper.client;
import io.netty.util.HashedWheelTimer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -36,6 +37,7 @@ import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieNode;
import org.apache.bookkeeper.net.DNSToSwitchMapping;
import org.apache.bookkeeper.net.NetworkTopology;
+import org.apache.bookkeeper.net.NetworkTopologyImpl;
import org.apache.bookkeeper.net.Node;
import org.apache.bookkeeper.net.NodeBase;
import org.apache.bookkeeper.proto.BookieAddressResolver;
@@ -85,30 +87,34 @@ public class RegionAwareEnsemblePlacementPolicy extends RackawareEnsemblePlaceme
address2Region = new ConcurrentHashMap<BookieId, String>();
}
+ protected String getLocalRegion(BookieNode node) {
+ if (null == node || null == node.getAddr()) {
+ return UNKNOWN_REGION;
+ }
+ return getRegion(node.getAddr());
+ }
+
protected String getRegion(BookieId addr) {
String region = address2Region.get(addr);
if (null == region) {
- String networkLocation = resolveNetworkLocation(addr);
- if (NetworkTopology.DEFAULT_REGION_AND_RACK.equals(networkLocation)) {
- region = UNKNOWN_REGION;
- } else {
- String[] parts = networkLocation.split(NodeBase.PATH_SEPARATOR_STR);
- if (parts.length <= 1) {
- region = UNKNOWN_REGION;
- } else {
- region = parts[1];
- }
- }
+ region = parseBookieRegion(addr);
address2Region.putIfAbsent(addr, region);
}
return region;
}
- protected String getLocalRegion(BookieNode node) {
- if (null == node || null == node.getAddr()) {
+ protected String parseBookieRegion(BookieId addr) {
+ String networkLocation = resolveNetworkLocation(addr);
+ if (NetworkTopology.DEFAULT_REGION_AND_RACK.equals(networkLocation)) {
return UNKNOWN_REGION;
+ } else {
+ String[] parts = networkLocation.split(NodeBase.PATH_SEPARATOR_STR);
+ if (parts.length <= 1) {
+ return UNKNOWN_REGION;
+ } else {
+ return parts[1];
+ }
}
- return getRegion(node.getAddr());
}
@Override
@@ -163,6 +169,57 @@ public class RegionAwareEnsemblePlacementPolicy extends RackawareEnsemblePlaceme
}
}
+ @Override
+ public void onBookieRackChange(List<BookieId> bookieAddressList) {
+ rwLock.writeLock().lock();
+ try {
+ bookieAddressList.forEach(bookieAddress -> {
+ try {
+ BookieNode node = knownBookies.get(bookieAddress);
+ if (node != null) {
+ // refresh the rack info if its a known bookie
+ BookieNode newNode = createBookieNode(bookieAddress);
+ if (!newNode.getNetworkLocation().equals(node.getNetworkLocation())) {
+ topology.remove(node);
+ topology.add(newNode);
+ knownBookies.put(bookieAddress, newNode);
+ historyBookies.put(bookieAddress, newNode);
+ }
+ //Handle per region placement policy.
+ String oldRegion = getRegion(bookieAddress);
+ String newRegion = parseBookieRegion(newNode.getAddr());
+ if (oldRegion.equals(newRegion)) {
+ TopologyAwareEnsemblePlacementPolicy regionPlacement = perRegionPlacement.get(oldRegion);
+ regionPlacement.onBookieRackChange(Collections.singletonList(bookieAddress));
+ } else {
+ address2Region.put(bookieAddress, newRegion);
+ TopologyAwareEnsemblePlacementPolicy oldRegionPlacement = perRegionPlacement.get(oldRegion);
+ oldRegionPlacement.handleBookiesThatLeft(Collections.singleton(bookieAddress));
+ TopologyAwareEnsemblePlacementPolicy newRegionPlacement = perRegionPlacement.get(
+ newRegion);
+ if (newRegionPlacement == null) {
+ newRegionPlacement = new RackawareEnsemblePlacementPolicy()
+ .initialize(dnsResolver, timer, this.reorderReadsRandom,
+ this.stabilizePeriodSeconds, this.reorderThresholdPendingRequests,
+ this.isWeighted, this.maxWeightMultiple,
+ this.minNumRacksPerWriteQuorum, this.enforceMinNumRacksPerWriteQuorum,
+ this.ignoreLocalNodeInPlacementPolicy, statsLogger,
+ bookieAddressResolver)
+ .withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+ perRegionPlacement.put(newRegion, newRegionPlacement);
+ }
+ newRegionPlacement.handleBookiesThatJoined(Collections.singleton(bookieAddress));
+ }
+ }
+ } catch (IllegalArgumentException | NetworkTopologyImpl.InvalidTopologyException e) {
+ LOG.error("Failed to update bookie rack info: {} ", bookieAddress, e);
+ }
+ });
+ } finally {
+ rwLock.writeLock().unlock();
+ }
+ }
+
@Override
public RegionAwareEnsemblePlacementPolicy initialize(ClientConfiguration conf,
Optional<DNSToSwitchMapping> optionalDnsResolver,
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
index fc4e02cae9..7fe5b83db4 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
@@ -28,6 +28,7 @@ import static org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy.RE
import static org.apache.bookkeeper.client.RoundRobinDistributionSchedule.writeSetFromValues;
import static org.apache.bookkeeper.feature.SettableFeatureProvider.DISABLE_ALL;
+import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.util.HashedWheelTimer;
import java.net.InetAddress;
@@ -1528,6 +1529,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
assertEquals(ensemble.get(reoderSet.get(7)), addr4.toBookieId());
}
+ @Test
public void testNewEnsembleSetWithFiveRegions() throws Exception {
repp.uninitalize();
repp = new RegionAwareEnsemblePlacementPolicy();
@@ -1573,6 +1575,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
}
}
+ @Test
public void testRegionsWithDiskWeight() throws Exception {
repp.uninitalize();
repp = new RegionAwareEnsemblePlacementPolicy();
@@ -1607,4 +1610,155 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
assertEquals(3, ensemble.size());
}
+
+ @Test
+ public void testNotifyRackChangeWithOldRegion() throws Exception {
+ BookieSocketAddress addr1 = new BookieSocketAddress("127.0.1.1", 3181);
+ BookieSocketAddress addr2 = new BookieSocketAddress("127.0.1.2", 3181);
+ BookieSocketAddress addr3 = new BookieSocketAddress("127.0.1.3", 3181);
+ BookieSocketAddress addr4 = new BookieSocketAddress("127.0.1.4", 3181);
+
+ // update dns mapping
+ StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/region1/rack-1");
+ StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/region1/rack-1");
+ StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/region2/rack-1");
+ StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/region2/rack-1");
+
+ // Update cluster
+ Set<BookieId> addrs = Sets.newHashSet(addr1.toBookieId(),
+ addr2.toBookieId(), addr3.toBookieId(), addr4.toBookieId());
+ repp.onClusterChanged(addrs, new HashSet<>());
+
+ assertEquals(4, repp.knownBookies.size());
+ assertEquals("/region1/rack-1", repp.knownBookies.get(addr1.toBookieId()).getNetworkLocation());
+ assertEquals("/region1/rack-1", repp.knownBookies.get(addr2.toBookieId()).getNetworkLocation());
+ assertEquals("/region2/rack-1", repp.knownBookies.get(addr3.toBookieId()).getNetworkLocation());
+ assertEquals("/region2/rack-1", repp.knownBookies.get(addr4.toBookieId()).getNetworkLocation());
+
+ assertEquals(2, repp.perRegionPlacement.size());
+ TopologyAwareEnsemblePlacementPolicy region1Placement = repp.perRegionPlacement.get("region1");
+ assertEquals(2, region1Placement.knownBookies.keySet().size());
+ assertEquals("/region1/rack-1", region1Placement.knownBookies.get(addr1.toBookieId()).getNetworkLocation());
+ assertEquals("/region1/rack-1", region1Placement.knownBookies.get(addr2.toBookieId()).getNetworkLocation());
+
+ TopologyAwareEnsemblePlacementPolicy region2Placement = repp.perRegionPlacement.get("region2");
+ assertEquals(2, region2Placement.knownBookies.keySet().size());
+ assertEquals("/region2/rack-1", region2Placement.knownBookies.get(addr3.toBookieId()).getNetworkLocation());
+ assertEquals("/region2/rack-1", region2Placement.knownBookies.get(addr4.toBookieId()).getNetworkLocation());
+
+ assertEquals("region1", repp.address2Region.get(addr1.toBookieId()));
+ assertEquals("region1", repp.address2Region.get(addr2.toBookieId()));
+ assertEquals("region2", repp.address2Region.get(addr3.toBookieId()));
+ assertEquals("region2", repp.address2Region.get(addr4.toBookieId()));
+
+ // Update the rack.
+ // change addr2 rack info. /region1/rack-1 -> /region1/rack-2.
+ // change addr4 rack info. /region2/rack-1 -> /region1/rack-2
+ List<BookieSocketAddress> bookieAddressList = new ArrayList<>();
+ List<String> rackList = new ArrayList<>();
+ bookieAddressList.add(addr2);
+ rackList.add("/region1/rack-2");
+ bookieAddressList.add(addr4);
+ rackList.add("/region1/rack-2");
+ StaticDNSResolver.changeRack(bookieAddressList, rackList);
+
+ assertEquals(4, repp.knownBookies.size());
+ assertEquals("/region1/rack-1", repp.knownBookies.get(addr1.toBookieId()).getNetworkLocation());
+ assertEquals("/region1/rack-2", repp.knownBookies.get(addr2.toBookieId()).getNetworkLocation());
+ assertEquals("/region2/rack-1", repp.knownBookies.get(addr3.toBookieId()).getNetworkLocation());
+ assertEquals("/region1/rack-2", repp.knownBookies.get(addr4.toBookieId()).getNetworkLocation());
+
+ assertEquals(2, repp.perRegionPlacement.size());
+ region1Placement = repp.perRegionPlacement.get("region1");
+ assertEquals(3, region1Placement.knownBookies.keySet().size());
+ assertEquals("/region1/rack-1", region1Placement.knownBookies.get(addr1.toBookieId()).getNetworkLocation());
+ assertEquals("/region1/rack-2", region1Placement.knownBookies.get(addr2.toBookieId()).getNetworkLocation());
+ assertEquals("/region1/rack-2", region1Placement.knownBookies.get(addr4.toBookieId()).getNetworkLocation());
+
+ region2Placement = repp.perRegionPlacement.get("region2");
+ assertEquals(1, region2Placement.knownBookies.keySet().size());
+ assertEquals("/region2/rack-1", region2Placement.knownBookies.get(addr3.toBookieId()).getNetworkLocation());
+
+ assertEquals("region1", repp.address2Region.get(addr1.toBookieId()));
+ assertEquals("region1", repp.address2Region.get(addr2.toBookieId()));
+ assertEquals("region2", repp.address2Region.get(addr3.toBookieId()));
+ assertEquals("region1", repp.address2Region.get(addr4.toBookieId()));
+ }
+
+ @Test
+ public void testNotifyRackChangeWithNewRegion() throws Exception {
+ BookieSocketAddress addr1 = new BookieSocketAddress("127.0.1.1", 3181);
+ BookieSocketAddress addr2 = new BookieSocketAddress("127.0.1.2", 3181);
+ BookieSocketAddress addr3 = new BookieSocketAddress("127.0.1.3", 3181);
+ BookieSocketAddress addr4 = new BookieSocketAddress("127.0.1.4", 3181);
+
+ // update dns mapping
+ StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/region1/rack-1");
+ StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/region1/rack-1");
+ StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/region2/rack-1");
+ StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/region2/rack-1");
+
+ // Update cluster
+ Set<BookieId> addrs = Sets.newHashSet(addr1.toBookieId(),
+ addr2.toBookieId(), addr3.toBookieId(), addr4.toBookieId());
+ repp.onClusterChanged(addrs, new HashSet<>());
+
+ assertEquals(4, repp.knownBookies.size());
+ assertEquals("/region1/rack-1", repp.knownBookies.get(addr1.toBookieId()).getNetworkLocation());
+ assertEquals("/region1/rack-1", repp.knownBookies.get(addr2.toBookieId()).getNetworkLocation());
+ assertEquals("/region2/rack-1", repp.knownBookies.get(addr3.toBookieId()).getNetworkLocation());
+ assertEquals("/region2/rack-1", repp.knownBookies.get(addr4.toBookieId()).getNetworkLocation());
+
+ assertEquals(2, repp.perRegionPlacement.size());
+ TopologyAwareEnsemblePlacementPolicy region1Placement = repp.perRegionPlacement.get("region1");
+ assertEquals(2, region1Placement.knownBookies.keySet().size());
+ assertEquals("/region1/rack-1", region1Placement.knownBookies.get(addr1.toBookieId()).getNetworkLocation());
+ assertEquals("/region1/rack-1", region1Placement.knownBookies.get(addr2.toBookieId()).getNetworkLocation());
+
+ TopologyAwareEnsemblePlacementPolicy region2Placement = repp.perRegionPlacement.get("region2");
+ assertEquals(2, region2Placement.knownBookies.keySet().size());
+ assertEquals("/region2/rack-1", region2Placement.knownBookies.get(addr3.toBookieId()).getNetworkLocation());
+ assertEquals("/region2/rack-1", region2Placement.knownBookies.get(addr4.toBookieId()).getNetworkLocation());
+
+ assertEquals("region1", repp.address2Region.get(addr1.toBookieId()));
+ assertEquals("region1", repp.address2Region.get(addr2.toBookieId()));
+ assertEquals("region2", repp.address2Region.get(addr3.toBookieId()));
+ assertEquals("region2", repp.address2Region.get(addr4.toBookieId()));
+
+ // Update the rack.
+ // change addr2 rack info. /region1/rack-1 -> /region3/rack-1.
+ // change addr4 rack info. /region2/rack-1 -> /region3/rack-1
+ List<BookieSocketAddress> bookieAddressList = new ArrayList<>();
+ List<String> rackList = new ArrayList<>();
+ bookieAddressList.add(addr2);
+ rackList.add("/region3/rack-1");
+ bookieAddressList.add(addr4);
+ rackList.add("/region3/rack-1");
+ StaticDNSResolver.changeRack(bookieAddressList, rackList);
+
+ assertEquals(4, repp.knownBookies.size());
+ assertEquals("/region1/rack-1", repp.knownBookies.get(addr1.toBookieId()).getNetworkLocation());
+ assertEquals("/region3/rack-1", repp.knownBookies.get(addr2.toBookieId()).getNetworkLocation());
+ assertEquals("/region2/rack-1", repp.knownBookies.get(addr3.toBookieId()).getNetworkLocation());
+ assertEquals("/region3/rack-1", repp.knownBookies.get(addr4.toBookieId()).getNetworkLocation());
+
+ assertEquals(3, repp.perRegionPlacement.size());
+ region1Placement = repp.perRegionPlacement.get("region1");
+ assertEquals(1, region1Placement.knownBookies.keySet().size());
+ assertEquals("/region1/rack-1", region1Placement.knownBookies.get(addr1.toBookieId()).getNetworkLocation());
+
+ region2Placement = repp.perRegionPlacement.get("region2");
+ assertEquals(1, region2Placement.knownBookies.keySet().size());
+ assertEquals("/region2/rack-1", region2Placement.knownBookies.get(addr3.toBookieId()).getNetworkLocation());
+
+ TopologyAwareEnsemblePlacementPolicy region3Placement = repp.perRegionPlacement.get("region3");
+ assertEquals(2, region3Placement.knownBookies.keySet().size());
+ assertEquals("/region3/rack-1", region3Placement.knownBookies.get(addr2.toBookieId()).getNetworkLocation());
+ assertEquals("/region3/rack-1", region3Placement.knownBookies.get(addr4.toBookieId()).getNetworkLocation());
+
+ assertEquals("region1", repp.address2Region.get(addr1.toBookieId()));
+ assertEquals("region3", repp.address2Region.get(addr2.toBookieId()));
+ assertEquals("region2", repp.address2Region.get(addr3.toBookieId()));
+ assertEquals("region3", repp.address2Region.get(addr4.toBookieId()));
+ }
}