You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/02/26 22:11:52 UTC

[GitHub] merlimat closed pull request #1204: Allow rackaware policy to be notified of any rack change

merlimat closed pull request #1204: Allow rackaware policy to be notified of any rack change
URL: https://github.com/apache/bookkeeper/pull/1204
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackChangeNotifier.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackChangeNotifier.java
new file mode 100644
index 000000000..c66bcaf97
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackChangeNotifier.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.client;
+
+/**
+ * Notifier used by the RackawareEnsemblePlacementPolicy to get notified if a rack changes for a bookie.
+ */
+public interface RackChangeNotifier {
+
+    /**
+     * Register a listener for the rack-aware placement policy.
+     *
+     * @param rackawarePolicy
+     */
+    void registerRackChangeListener(RackawareEnsemblePlacementPolicyImpl rackawarePolicy);
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
index 9a587643b..568debf02 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
@@ -72,7 +72,7 @@
  *
  * <p>Make most of the class and methods as protected, so it could be extended to implement other algorithms.
  */
-class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsemblePlacementPolicy {
+public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsemblePlacementPolicy {
 
     static final Logger LOG = LoggerFactory.getLogger(RackawareEnsemblePlacementPolicyImpl.class);
     boolean isWeighted;
@@ -305,6 +305,10 @@ public RackawareEnsemblePlacementPolicyImpl initialize(ClientConfiguration conf,
                 if (dnsResolver instanceof Configurable) {
                     ((Configurable) dnsResolver).setConf(conf);
                 }
+
+                if (dnsResolver instanceof RackChangeNotifier) {
+                    ((RackChangeNotifier) dnsResolver).registerRackChangeListener(this);
+                }
             } catch (RuntimeException re) {
                 LOG.info("Failed to initialize DNS Resolver {}, used default subnet resolver.", dnsResolverName, re);
                 dnsResolver = new DefaultResolver(() -> this.getDefaultRack());
@@ -337,6 +341,22 @@ protected String resolveNetworkLocation(BookieSocketAddress addr) {
         return NetUtils.resolveNetworkLocation(dnsResolver, addr.getSocketAddress());
     }
 
+    public void onBookieRackChange(List<BookieSocketAddress> bookieAddressList) {
+        rwLock.writeLock().lock();
+        try {
+            for (BookieSocketAddress bookieAddress : bookieAddressList) {
+                BookieNode node = knownBookies.get(bookieAddress);
+                if (node != null) {
+                    // refresh the rack info if its a known bookie
+                    topology.remove(node);
+                    topology.add(createBookieNode(bookieAddress));
+                }
+            }
+        } finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
     @Override
     public Set<BookieSocketAddress> onClusterChanged(Set<BookieSocketAddress> writableBookies,
             Set<BookieSocketAddress> readOnlyBookies) {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
index bb7c6928e..1d32c1376 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
@@ -875,7 +875,7 @@ public void testWeightedPlacementAndNewEnsembleWithoutEnoughBookies() throws Exc
         }
     }
 
-    private int getNumCoveredWriteQuorums(ArrayList<BookieSocketAddress> ensemble, int writeQuorumSize)
+    static int getNumCoveredWriteQuorums(ArrayList<BookieSocketAddress> ensemble, int writeQuorumSize)
             throws Exception {
         int ensembleSize = ensemble.size();
         int numCoveredWriteQuorums = 0;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java
new file mode 100644
index 000000000..a6f28bace
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import static org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl.REPP_DNS_RESOLVER_CLASS;
+import static org.apache.bookkeeper.feature.SettableFeatureProvider.DISABLE_ALL;
+
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import io.netty.util.HashedWheelTimer;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.net.DNSToSwitchMapping;
+import org.apache.bookkeeper.net.NetworkTopology;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.util.StaticDNSResolver;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test the rackaware ensemble placement policy.
+ */
+public class TestRackawarePolicyNotificationUpdates extends TestCase {
+
+    static final Logger LOG = LoggerFactory.getLogger(TestRackawarePolicyNotificationUpdates.class);
+
+    RackawareEnsemblePlacementPolicy repp;
+    HashedWheelTimer timer;
+    ClientConfiguration conf = new ClientConfiguration();
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        conf.setProperty(REPP_DNS_RESOLVER_CLASS, StaticDNSResolver.class.getName());
+
+        StaticDNSResolver.reset();
+        StaticDNSResolver.addNodeToRack(InetAddress.getLocalHost().getHostAddress(),
+                NetworkTopology.DEFAULT_REGION_AND_RACK);
+        StaticDNSResolver.addNodeToRack("127.0.0.1", NetworkTopology.DEFAULT_REGION_AND_RACK);
+        StaticDNSResolver.addNodeToRack("localhost", NetworkTopology.DEFAULT_REGION_AND_RACK);
+        LOG.info("Set up static DNS Resolver.");
+
+        timer = new HashedWheelTimer(new ThreadFactoryBuilder().setNameFormat("TestTimer-%d").build(),
+                conf.getTimeoutTimerTickDurationMs(), TimeUnit.MILLISECONDS, conf.getTimeoutTimerNumTicks());
+
+        repp = new RackawareEnsemblePlacementPolicy();
+        repp.initialize(conf, Optional.<DNSToSwitchMapping> empty(), timer, DISABLE_ALL, NullStatsLogger.INSTANCE);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        repp.uninitalize();
+        super.tearDown();
+    }
+
+    @Test
+    public void testNotifyRackChange() throws Exception {
+        BookieSocketAddress addr1 = new BookieSocketAddress("127.0.1.1", 3181);
+        BookieSocketAddress addr2 = new BookieSocketAddress("127.0.1.2", 3181);
+        BookieSocketAddress addr3 = new BookieSocketAddress("127.0.1.3", 3181);
+        BookieSocketAddress addr4 = new BookieSocketAddress("127.0.1.4", 3181);
+
+        // update dns mapping
+        StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/default-region/rack-1");
+        StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/default-region/rack-2");
+        StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/default-region/rack-2");
+        StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/default-region/rack-2");
+
+        // Update cluster
+        Set<BookieSocketAddress> addrs = Sets.newHashSet(addr1, addr2, addr3, addr4);
+        repp.onClusterChanged(addrs, new HashSet<>());
+
+        ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2, Collections.emptyMap(),
+                Collections.emptySet());
+        int numCovered = TestRackawareEnsemblePlacementPolicy.getNumCoveredWriteQuorums(ensemble, 2);
+        assertTrue(numCovered >= 1 && numCovered < 3);
+        assertTrue(ensemble.contains(addr1));
+
+        List<BookieSocketAddress> bookieAddressList = new ArrayList<>();
+        List<String> rackList = new ArrayList<>();
+        bookieAddressList.add(addr2);
+        rackList.add("/default-region/rack-3");
+        StaticDNSResolver.changeRack(bookieAddressList, rackList);
+
+        ensemble = repp.newEnsemble(3, 2, 1, Collections.emptyMap(), Collections.emptySet());
+        assertEquals(3, TestRackawareEnsemblePlacementPolicy.getNumCoveredWriteQuorums(ensemble, 2));
+        assertTrue(ensemble.contains(addr1));
+        assertTrue(ensemble.contains(addr2));
+    }
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/StaticDNSResolver.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/StaticDNSResolver.java
index b08586c69..d5cb06710 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/StaticDNSResolver.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/StaticDNSResolver.java
@@ -22,7 +22,10 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import org.apache.bookkeeper.client.RackChangeNotifier;
+import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl;
 import org.apache.bookkeeper.net.AbstractDNSToSwitchMapping;
+import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.net.DNSToSwitchMapping;
 import org.apache.bookkeeper.net.NetworkTopology;
 import org.apache.bookkeeper.net.NodeBase;
@@ -32,7 +35,7 @@
 /**
  * Implements {@link DNSToSwitchMapping} via static mappings. Used in test cases to simulate racks.
  */
-public class StaticDNSResolver extends AbstractDNSToSwitchMapping {
+public class StaticDNSResolver extends AbstractDNSToSwitchMapping implements RackChangeNotifier {
 
     static final Logger LOG = LoggerFactory.getLogger(StaticDNSResolver.class);
 
@@ -84,4 +87,19 @@ public void reloadCachedMappings() {
         // nop
     }
 
+    private static RackawareEnsemblePlacementPolicyImpl rackawarePolicy = null;
+
+    @Override
+    public void registerRackChangeListener(RackawareEnsemblePlacementPolicyImpl rackawareEnsemblePolicy) {
+        rackawarePolicy = rackawareEnsemblePolicy;
+    }
+
+    public static void changeRack(List<BookieSocketAddress> bookieAddressList, List<String> rack) {
+        for (int i = 0; i < bookieAddressList.size(); i++) {
+            BookieSocketAddress bkAddress = bookieAddressList.get(i);
+            name2Racks.put(bkAddress.getHostName(), rack.get(i));
+        }
+        rackawarePolicy.onBookieRackChange(bookieAddressList);
+    }
+
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services