You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by eo...@apache.org on 2022/12/07 07:50:17 UTC

[bookkeeper] branch master updated: Fix RegionAwareEnsemblePlacementPolicy update rack info problem. (#3666)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 4574ba0233 Fix RegionAwareEnsemblePlacementPolicy update rack info problem. (#3666)
4574ba0233 is described below

commit 4574ba02333308ad648c9b78963a381ad83ea564
Author: Yan Zhao <ho...@apache.org>
AuthorDate: Wed Dec 7 15:50:09 2022 +0800

    Fix RegionAwareEnsemblePlacementPolicy update rack info problem. (#3666)
---
 .../client/RegionAwareEnsemblePlacementPolicy.java |  85 ++++++++++--
 .../TestRegionAwareEnsemblePlacementPolicy.java    | 154 +++++++++++++++++++++
 2 files changed, 225 insertions(+), 14 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
index 3f8a17f59f..43969b8fde 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
@@ -19,6 +19,7 @@ package org.apache.bookkeeper.client;
 
 import io.netty.util.HashedWheelTimer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -34,6 +35,7 @@ import org.apache.bookkeeper.net.BookieId;
 import org.apache.bookkeeper.net.BookieNode;
 import org.apache.bookkeeper.net.DNSToSwitchMapping;
 import org.apache.bookkeeper.net.NetworkTopology;
+import org.apache.bookkeeper.net.NetworkTopologyImpl;
 import org.apache.bookkeeper.net.Node;
 import org.apache.bookkeeper.net.NodeBase;
 import org.apache.bookkeeper.proto.BookieAddressResolver;
@@ -83,30 +85,34 @@ public class RegionAwareEnsemblePlacementPolicy extends RackawareEnsemblePlaceme
         address2Region = new ConcurrentHashMap<BookieId, String>();
     }
 
+    protected String getLocalRegion(BookieNode node) {
+        if (null == node || null == node.getAddr()) {
+            return UNKNOWN_REGION;
+        }
+        return getRegion(node.getAddr());
+    }
+
     protected String getRegion(BookieId addr) {
         String region = address2Region.get(addr);
         if (null == region) {
-            String networkLocation = resolveNetworkLocation(addr);
-            if (NetworkTopology.DEFAULT_REGION_AND_RACK.equals(networkLocation)) {
-                region = UNKNOWN_REGION;
-            } else {
-                String[] parts = networkLocation.split(NodeBase.PATH_SEPARATOR_STR);
-                if (parts.length <= 1) {
-                    region = UNKNOWN_REGION;
-                } else {
-                    region = parts[1];
-                }
-            }
+            region = parseBookieRegion(addr);
             address2Region.putIfAbsent(addr, region);
         }
         return region;
     }
 
-    protected String getLocalRegion(BookieNode node) {
-        if (null == node || null == node.getAddr()) {
+    protected String parseBookieRegion(BookieId addr) {
+        String networkLocation = resolveNetworkLocation(addr);
+        if (NetworkTopology.DEFAULT_REGION_AND_RACK.equals(networkLocation)) {
             return UNKNOWN_REGION;
+        } else {
+            String[] parts = networkLocation.split(NodeBase.PATH_SEPARATOR_STR);
+            if (parts.length <= 1) {
+                return UNKNOWN_REGION;
+            } else {
+                return parts[1];
+            }
         }
-        return getRegion(node.getAddr());
     }
 
     @Override
@@ -161,6 +167,57 @@ public class RegionAwareEnsemblePlacementPolicy extends RackawareEnsemblePlaceme
         }
     }
 
+    @Override
+    public void onBookieRackChange(List<BookieId> bookieAddressList) {
+        rwLock.writeLock().lock();
+        try {
+            bookieAddressList.forEach(bookieAddress -> {
+                try {
+                    BookieNode node = knownBookies.get(bookieAddress);
+                    if (node != null) {
+                        // refresh the rack info if its a known bookie
+                        BookieNode newNode = createBookieNode(bookieAddress);
+                        if (!newNode.getNetworkLocation().equals(node.getNetworkLocation())) {
+                            topology.remove(node);
+                            topology.add(newNode);
+                            knownBookies.put(bookieAddress, newNode);
+                            historyBookies.put(bookieAddress, newNode);
+                        }
+                        //Handle per region placement policy.
+                        String oldRegion = getRegion(bookieAddress);
+                        String newRegion = parseBookieRegion(newNode.getAddr());
+                        if (oldRegion.equals(newRegion)) {
+                            TopologyAwareEnsemblePlacementPolicy regionPlacement = perRegionPlacement.get(oldRegion);
+                            regionPlacement.onBookieRackChange(Collections.singletonList(bookieAddress));
+                        } else {
+                            address2Region.put(bookieAddress, newRegion);
+                            TopologyAwareEnsemblePlacementPolicy oldRegionPlacement = perRegionPlacement.get(oldRegion);
+                            oldRegionPlacement.handleBookiesThatLeft(Collections.singleton(bookieAddress));
+                            TopologyAwareEnsemblePlacementPolicy newRegionPlacement = perRegionPlacement.get(
+                                    newRegion);
+                            if (newRegionPlacement == null) {
+                                newRegionPlacement = new RackawareEnsemblePlacementPolicy()
+                                        .initialize(dnsResolver, timer, this.reorderReadsRandom,
+                                                this.stabilizePeriodSeconds, this.reorderThresholdPendingRequests,
+                                                this.isWeighted, this.maxWeightMultiple,
+                                                this.minNumRacksPerWriteQuorum, this.enforceMinNumRacksPerWriteQuorum,
+                                                this.ignoreLocalNodeInPlacementPolicy, statsLogger,
+                                                bookieAddressResolver)
+                                        .withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+                                perRegionPlacement.put(newRegion, newRegionPlacement);
+                            }
+                            newRegionPlacement.handleBookiesThatJoined(Collections.singleton(bookieAddress));
+                        }
+                    }
+                } catch (IllegalArgumentException | NetworkTopologyImpl.InvalidTopologyException e) {
+                    LOG.error("Failed to update bookie rack info: {} ", bookieAddress, e);
+                }
+            });
+        } finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
     @Override
     public RegionAwareEnsemblePlacementPolicy initialize(ClientConfiguration conf,
                                                          Optional<DNSToSwitchMapping> optionalDnsResolver,
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
index fc4e02cae9..7fe5b83db4 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
@@ -28,6 +28,7 @@ import static org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy.RE
 import static org.apache.bookkeeper.client.RoundRobinDistributionSchedule.writeSetFromValues;
 import static org.apache.bookkeeper.feature.SettableFeatureProvider.DISABLE_ALL;
 
+import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import io.netty.util.HashedWheelTimer;
 import java.net.InetAddress;
@@ -1528,6 +1529,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
         assertEquals(ensemble.get(reoderSet.get(7)), addr4.toBookieId());
     }
 
+    @Test
     public void testNewEnsembleSetWithFiveRegions() throws Exception {
         repp.uninitalize();
         repp = new RegionAwareEnsemblePlacementPolicy();
@@ -1573,6 +1575,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
         }
     }
 
+    @Test
     public void testRegionsWithDiskWeight() throws Exception {
         repp.uninitalize();
         repp = new RegionAwareEnsemblePlacementPolicy();
@@ -1607,4 +1610,155 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
 
         assertEquals(3, ensemble.size());
     }
+
+    @Test
+    public void testNotifyRackChangeWithOldRegion() throws Exception {
+        BookieSocketAddress addr1 = new BookieSocketAddress("127.0.1.1", 3181);
+        BookieSocketAddress addr2 = new BookieSocketAddress("127.0.1.2", 3181);
+        BookieSocketAddress addr3 = new BookieSocketAddress("127.0.1.3", 3181);
+        BookieSocketAddress addr4 = new BookieSocketAddress("127.0.1.4", 3181);
+
+        // update dns mapping
+        StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/region1/rack-1");
+        StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/region1/rack-1");
+        StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/region2/rack-1");
+        StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/region2/rack-1");
+
+        // Update cluster
+        Set<BookieId> addrs = Sets.newHashSet(addr1.toBookieId(),
+                addr2.toBookieId(), addr3.toBookieId(), addr4.toBookieId());
+        repp.onClusterChanged(addrs, new HashSet<>());
+
+        assertEquals(4, repp.knownBookies.size());
+        assertEquals("/region1/rack-1", repp.knownBookies.get(addr1.toBookieId()).getNetworkLocation());
+        assertEquals("/region1/rack-1", repp.knownBookies.get(addr2.toBookieId()).getNetworkLocation());
+        assertEquals("/region2/rack-1", repp.knownBookies.get(addr3.toBookieId()).getNetworkLocation());
+        assertEquals("/region2/rack-1", repp.knownBookies.get(addr4.toBookieId()).getNetworkLocation());
+
+        assertEquals(2, repp.perRegionPlacement.size());
+        TopologyAwareEnsemblePlacementPolicy region1Placement = repp.perRegionPlacement.get("region1");
+        assertEquals(2, region1Placement.knownBookies.keySet().size());
+        assertEquals("/region1/rack-1", region1Placement.knownBookies.get(addr1.toBookieId()).getNetworkLocation());
+        assertEquals("/region1/rack-1", region1Placement.knownBookies.get(addr2.toBookieId()).getNetworkLocation());
+
+        TopologyAwareEnsemblePlacementPolicy region2Placement = repp.perRegionPlacement.get("region2");
+        assertEquals(2, region2Placement.knownBookies.keySet().size());
+        assertEquals("/region2/rack-1", region2Placement.knownBookies.get(addr3.toBookieId()).getNetworkLocation());
+        assertEquals("/region2/rack-1", region2Placement.knownBookies.get(addr4.toBookieId()).getNetworkLocation());
+
+        assertEquals("region1", repp.address2Region.get(addr1.toBookieId()));
+        assertEquals("region1", repp.address2Region.get(addr2.toBookieId()));
+        assertEquals("region2", repp.address2Region.get(addr3.toBookieId()));
+        assertEquals("region2", repp.address2Region.get(addr4.toBookieId()));
+
+        // Update the rack.
+        // change addr2 rack info. /region1/rack-1 -> /region1/rack-2.
+        // change addr4 rack info. /region2/rack-1 -> /region1/rack-2
+        List<BookieSocketAddress> bookieAddressList = new ArrayList<>();
+        List<String> rackList = new ArrayList<>();
+        bookieAddressList.add(addr2);
+        rackList.add("/region1/rack-2");
+        bookieAddressList.add(addr4);
+        rackList.add("/region1/rack-2");
+        StaticDNSResolver.changeRack(bookieAddressList, rackList);
+
+        assertEquals(4, repp.knownBookies.size());
+        assertEquals("/region1/rack-1", repp.knownBookies.get(addr1.toBookieId()).getNetworkLocation());
+        assertEquals("/region1/rack-2", repp.knownBookies.get(addr2.toBookieId()).getNetworkLocation());
+        assertEquals("/region2/rack-1", repp.knownBookies.get(addr3.toBookieId()).getNetworkLocation());
+        assertEquals("/region1/rack-2", repp.knownBookies.get(addr4.toBookieId()).getNetworkLocation());
+
+        assertEquals(2, repp.perRegionPlacement.size());
+        region1Placement = repp.perRegionPlacement.get("region1");
+        assertEquals(3, region1Placement.knownBookies.keySet().size());
+        assertEquals("/region1/rack-1", region1Placement.knownBookies.get(addr1.toBookieId()).getNetworkLocation());
+        assertEquals("/region1/rack-2", region1Placement.knownBookies.get(addr2.toBookieId()).getNetworkLocation());
+        assertEquals("/region1/rack-2", region1Placement.knownBookies.get(addr4.toBookieId()).getNetworkLocation());
+
+        region2Placement = repp.perRegionPlacement.get("region2");
+        assertEquals(1, region2Placement.knownBookies.keySet().size());
+        assertEquals("/region2/rack-1", region2Placement.knownBookies.get(addr3.toBookieId()).getNetworkLocation());
+
+        assertEquals("region1", repp.address2Region.get(addr1.toBookieId()));
+        assertEquals("region1", repp.address2Region.get(addr2.toBookieId()));
+        assertEquals("region2", repp.address2Region.get(addr3.toBookieId()));
+        assertEquals("region1", repp.address2Region.get(addr4.toBookieId()));
+    }
+
+    @Test
+    public void testNotifyRackChangeWithNewRegion() throws Exception {
+        BookieSocketAddress addr1 = new BookieSocketAddress("127.0.1.1", 3181);
+        BookieSocketAddress addr2 = new BookieSocketAddress("127.0.1.2", 3181);
+        BookieSocketAddress addr3 = new BookieSocketAddress("127.0.1.3", 3181);
+        BookieSocketAddress addr4 = new BookieSocketAddress("127.0.1.4", 3181);
+
+        // update dns mapping
+        StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/region1/rack-1");
+        StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/region1/rack-1");
+        StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/region2/rack-1");
+        StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/region2/rack-1");
+
+        // Update cluster
+        Set<BookieId> addrs = Sets.newHashSet(addr1.toBookieId(),
+                addr2.toBookieId(), addr3.toBookieId(), addr4.toBookieId());
+        repp.onClusterChanged(addrs, new HashSet<>());
+
+        assertEquals(4, repp.knownBookies.size());
+        assertEquals("/region1/rack-1", repp.knownBookies.get(addr1.toBookieId()).getNetworkLocation());
+        assertEquals("/region1/rack-1", repp.knownBookies.get(addr2.toBookieId()).getNetworkLocation());
+        assertEquals("/region2/rack-1", repp.knownBookies.get(addr3.toBookieId()).getNetworkLocation());
+        assertEquals("/region2/rack-1", repp.knownBookies.get(addr4.toBookieId()).getNetworkLocation());
+
+        assertEquals(2, repp.perRegionPlacement.size());
+        TopologyAwareEnsemblePlacementPolicy region1Placement = repp.perRegionPlacement.get("region1");
+        assertEquals(2, region1Placement.knownBookies.keySet().size());
+        assertEquals("/region1/rack-1", region1Placement.knownBookies.get(addr1.toBookieId()).getNetworkLocation());
+        assertEquals("/region1/rack-1", region1Placement.knownBookies.get(addr2.toBookieId()).getNetworkLocation());
+
+        TopologyAwareEnsemblePlacementPolicy region2Placement = repp.perRegionPlacement.get("region2");
+        assertEquals(2, region2Placement.knownBookies.keySet().size());
+        assertEquals("/region2/rack-1", region2Placement.knownBookies.get(addr3.toBookieId()).getNetworkLocation());
+        assertEquals("/region2/rack-1", region2Placement.knownBookies.get(addr4.toBookieId()).getNetworkLocation());
+
+        assertEquals("region1", repp.address2Region.get(addr1.toBookieId()));
+        assertEquals("region1", repp.address2Region.get(addr2.toBookieId()));
+        assertEquals("region2", repp.address2Region.get(addr3.toBookieId()));
+        assertEquals("region2", repp.address2Region.get(addr4.toBookieId()));
+
+        // Update the rack.
+        // change addr2 rack info. /region1/rack-1 -> /region3/rack-1.
+        // change addr4 rack info. /region2/rack-1 -> /region3/rack-1
+        List<BookieSocketAddress> bookieAddressList = new ArrayList<>();
+        List<String> rackList = new ArrayList<>();
+        bookieAddressList.add(addr2);
+        rackList.add("/region3/rack-1");
+        bookieAddressList.add(addr4);
+        rackList.add("/region3/rack-1");
+        StaticDNSResolver.changeRack(bookieAddressList, rackList);
+
+        assertEquals(4, repp.knownBookies.size());
+        assertEquals("/region1/rack-1", repp.knownBookies.get(addr1.toBookieId()).getNetworkLocation());
+        assertEquals("/region3/rack-1", repp.knownBookies.get(addr2.toBookieId()).getNetworkLocation());
+        assertEquals("/region2/rack-1", repp.knownBookies.get(addr3.toBookieId()).getNetworkLocation());
+        assertEquals("/region3/rack-1", repp.knownBookies.get(addr4.toBookieId()).getNetworkLocation());
+
+        assertEquals(3, repp.perRegionPlacement.size());
+        region1Placement = repp.perRegionPlacement.get("region1");
+        assertEquals(1, region1Placement.knownBookies.keySet().size());
+        assertEquals("/region1/rack-1", region1Placement.knownBookies.get(addr1.toBookieId()).getNetworkLocation());
+
+        region2Placement = repp.perRegionPlacement.get("region2");
+        assertEquals(1, region2Placement.knownBookies.keySet().size());
+        assertEquals("/region2/rack-1", region2Placement.knownBookies.get(addr3.toBookieId()).getNetworkLocation());
+
+        TopologyAwareEnsemblePlacementPolicy region3Placement = repp.perRegionPlacement.get("region3");
+        assertEquals(2, region3Placement.knownBookies.keySet().size());
+        assertEquals("/region3/rack-1", region3Placement.knownBookies.get(addr2.toBookieId()).getNetworkLocation());
+        assertEquals("/region3/rack-1", region3Placement.knownBookies.get(addr4.toBookieId()).getNetworkLocation());
+
+        assertEquals("region1", repp.address2Region.get(addr1.toBookieId()));
+        assertEquals("region3", repp.address2Region.get(addr2.toBookieId()));
+        assertEquals("region2", repp.address2Region.get(addr3.toBookieId()));
+        assertEquals("region3", repp.address2Region.get(addr4.toBookieId()));
+    }
 }