You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/06/20 01:08:37 UTC

[GitHub] sijie closed pull request #1496: ISSUE #1495: Option to enforce minNumRacksPerWriteQuorum

sijie closed pull request #1496: ISSUE #1495: Option to enforce minNumRacksPerWriteQuorum
URL: https://github.com/apache/bookkeeper/pull/1496
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ITopologyAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ITopologyAwareEnsemblePlacementPolicy.java
index 4a05e18ab..87e6f184d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ITopologyAwareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ITopologyAwareEnsemblePlacementPolicy.java
@@ -20,6 +20,10 @@
 import java.util.ArrayList;
 import java.util.Set;
 
+import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
+import org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy.Ensemble;
+import org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy.Predicate;
+import org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy.BookieNode;
 import org.apache.bookkeeper.common.annotation.InterfaceAudience;
 import org.apache.bookkeeper.common.annotation.InterfaceStability;
 import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -109,15 +113,63 @@
      *          predicate to apply
      * @param ensemble
      *          ensemble
+     * @param fallbackToRandom
+     *          fallbackToRandom
      * @return the selected bookie.
      * @throws BKException.BKNotEnoughBookiesException
      */
     T selectFromNetworkLocation(String networkLoc,
                                 Set<Node> excludeBookies,
                                 Predicate<T> predicate,
-                                Ensemble<T> ensemble)
+                                Ensemble<T> ensemble,
+                                boolean fallbackToRandom)
             throws BKException.BKNotEnoughBookiesException;
 
+    /**
+     * Select a node from cluster excluding excludeBookies and bookie nodes of
+     * excludeRacks. If there isn't a BookieNode excluding those racks and
+     * nodes, then if fallbackToRandom is set to true then pick a random node
+     * from cluster just excluding excludeBookies.
+     *
+     * @param excludeRacks
+     * @param excludeBookies
+     * @param predicate
+     * @param ensemble
+     * @param fallbackToRandom
+     * @return
+     * @throws BKException.BKNotEnoughBookiesException
+     */
+    T selectFromNetworkLocation(Set<String> excludeRacks,
+                                Set<Node> excludeBookies,
+                                Predicate<BookieNode> predicate,
+                                Ensemble<BookieNode> ensemble,
+                                boolean fallbackToRandom)
+            throws BKException.BKNotEnoughBookiesException;
+
+    /**
+     * Select a node from networkLoc rack excluding excludeBookies. If there
+     * isn't any node in 'networkLoc', then it will try to get a node from
+     * cluster excluding excludeRacks and excludeBookies. If fallbackToRandom is
+     * set to true then it will get a random bookie from cluster excluding
+     * excludeBookies if it couldn't find a bookie
+     *
+     * @param networkLoc
+     * @param excludeRacks
+     * @param excludeBookies
+     * @param predicate
+     * @param ensemble
+     * @param fallbackToRandom
+     * @return
+     * @throws BKNotEnoughBookiesException
+     */
+    T selectFromNetworkLocation(String networkLoc,
+                                Set<String> excludeRacks,
+                                Set<Node> excludeBookies,
+                                Predicate<BookieNode> predicate,
+                                Ensemble<BookieNode> ensemble,
+                                boolean fallbackToRandom)
+            throws BKNotEnoughBookiesException;
+
     /**
      * Handle bookies that left.
      *
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
index e3ce8fff1..d41ca9fba 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
@@ -23,6 +23,10 @@
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
+import org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy.Ensemble;
+import org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy.Predicate;
+import org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy.BookieNode;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.net.DNSToSwitchMapping;
 import org.apache.bookkeeper.net.Node;
@@ -54,18 +58,19 @@ protected RackawareEnsemblePlacementPolicy initialize(DNSToSwitchMapping dnsReso
                                                           boolean isWeighted,
                                                           int maxWeightMultiple,
                                                           int minNumRacksPerWriteQuorum,
-                                                          StatsLogger statsLogger) {
+                                                          boolean enforceMinNumRacksPerWriteQuorum,
+            StatsLogger statsLogger) {
         if (stabilizePeriodSeconds > 0) {
             super.initialize(dnsResolver, timer, reorderReadsRandom, 0, reorderThresholdPendingRequests, isWeighted,
-                    maxWeightMultiple, minNumRacksPerWriteQuorum, statsLogger);
+                    maxWeightMultiple, minNumRacksPerWriteQuorum, enforceMinNumRacksPerWriteQuorum, statsLogger);
             slave = new RackawareEnsemblePlacementPolicyImpl(enforceDurability);
             slave.initialize(dnsResolver, timer, reorderReadsRandom, stabilizePeriodSeconds,
-                    reorderThresholdPendingRequests, isWeighted, maxWeightMultiple,
-                    minNumRacksPerWriteQuorum, statsLogger);
+                    reorderThresholdPendingRequests, isWeighted, maxWeightMultiple, minNumRacksPerWriteQuorum,
+                    enforceMinNumRacksPerWriteQuorum, statsLogger);
         } else {
             super.initialize(dnsResolver, timer, reorderReadsRandom, stabilizePeriodSeconds,
-                    reorderThresholdPendingRequests, isWeighted, maxWeightMultiple,
-                    minNumRacksPerWriteQuorum, statsLogger);
+                    reorderThresholdPendingRequests, isWeighted, maxWeightMultiple, minNumRacksPerWriteQuorum,
+                    enforceMinNumRacksPerWriteQuorum, statsLogger);
             slave = null;
         }
         return this;
@@ -171,15 +176,60 @@ public BookieNode selectFromNetworkLocation(
             String networkLoc,
             Set<Node> excludeBookies,
             Predicate<BookieNode> predicate,
-            Ensemble<BookieNode> ensemble)
+            Ensemble<BookieNode> ensemble,
+            boolean fallbackToRandom)
             throws BKException.BKNotEnoughBookiesException {
         try {
-            return super.selectFromNetworkLocation(networkLoc, excludeBookies, predicate, ensemble);
+            return super.selectFromNetworkLocation(networkLoc, excludeBookies, predicate, ensemble,
+                    fallbackToRandom);
         } catch (BKException.BKNotEnoughBookiesException bnebe) {
             if (slave == null) {
                 throw bnebe;
             } else {
-                return slave.selectFromNetworkLocation(networkLoc, excludeBookies, predicate, ensemble);
+                return slave.selectFromNetworkLocation(networkLoc, excludeBookies, predicate, ensemble,
+                        fallbackToRandom);
+            }
+        }
+    }
+
+    @Override
+    public BookieNode selectFromNetworkLocation(
+            Set<String> excludeRacks,
+            Set<Node> excludeBookies,
+            Predicate<BookieNode> predicate,
+            Ensemble<BookieNode> ensemble,
+            boolean fallbackToRandom)
+                    throws BKException.BKNotEnoughBookiesException {
+        try {
+            return super.selectFromNetworkLocation(excludeRacks, excludeBookies, predicate, ensemble, fallbackToRandom);
+        } catch (BKException.BKNotEnoughBookiesException bnebe) {
+            if (slave == null) {
+                throw bnebe;
+            } else {
+                return slave.selectFromNetworkLocation(excludeRacks, excludeBookies, predicate, ensemble,
+                        fallbackToRandom);
+            }
+        }
+    }
+
+    @Override
+    public BookieNode selectFromNetworkLocation(
+            String networkLoc,
+            Set<String> excludeRacks,
+            Set<Node> excludeBookies,
+            Predicate<BookieNode> predicate,
+            Ensemble<BookieNode> ensemble,
+            boolean fallbackToRandom)
+            throws BKNotEnoughBookiesException {
+        try {
+            return super.selectFromNetworkLocation(networkLoc, excludeRacks, excludeBookies, predicate, ensemble,
+                    fallbackToRandom);
+        } catch (BKException.BKNotEnoughBookiesException bnebe) {
+            if (slave == null) {
+                throw bnebe;
+            } else {
+                return slave.selectFromNetworkLocation(networkLoc, excludeRacks, excludeBookies, predicate, ensemble,
+                        fallbackToRandom);
             }
         }
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
index 23afaa9ae..ae640bbac 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
@@ -79,7 +79,9 @@
     int maxWeightMultiple;
     private Map<BookieNode, WeightedObject> bookieInfoMap = new HashMap<BookieNode, WeightedObject>();
     private WeightedRandomSelection<BookieNode> weightedSelection;
+
     protected int minNumRacksPerWriteQuorum;
+    protected boolean enforceMinNumRacksPerWriteQuorum;
 
     public static final String REPP_DNS_RESOLVER_CLASS = "reppDnsResolverClass";
     public static final String REPP_RANDOM_READ_REORDERING = "ensembleRandomReadReordering";
@@ -238,6 +240,7 @@ protected RackawareEnsemblePlacementPolicyImpl initialize(DNSToSwitchMapping dns
                                                               boolean isWeighted,
                                                               int maxWeightMultiple,
                                                               int minNumRacksPerWriteQuorum,
+                                                              boolean enforceMinNumRacksPerWriteQuorum,
                                                               StatsLogger statsLogger) {
         checkNotNull(statsLogger, "statsLogger should not be null, use NullStatsLogger instead.");
         this.statsLogger = statsLogger;
@@ -250,6 +253,7 @@ protected RackawareEnsemblePlacementPolicyImpl initialize(DNSToSwitchMapping dns
         this.dnsResolver = new DNSResolverDecorator(dnsResolver, () -> this.getDefaultRack());
         this.timer = timer;
         this.minNumRacksPerWriteQuorum = minNumRacksPerWriteQuorum;
+        this.enforceMinNumRacksPerWriteQuorum = enforceMinNumRacksPerWriteQuorum;
 
         // create the network topology
         if (stabilizePeriodSeconds > 0) {
@@ -339,6 +343,7 @@ public Long load(BookieSocketAddress key) throws Exception {
                 conf.getDiskWeightBasedPlacementEnabled(),
                 conf.getBookieMaxWeightMultipleForWeightBasedPlacement(),
                 conf.getMinNumRacksPerWriteQuorum(),
+                conf.getEnforceMinNumRacksPerWriteQuorum(),
                 statsLogger);
     }
 
@@ -536,6 +541,10 @@ public void handleBookiesThatJoined(Set<BookieSocketAddress> joinedBookies) {
             int numRacks = topology.getNumOfRacks();
             // only one rack, use the random algorithm.
             if (numRacks < 2) {
+                if (enforceMinNumRacksPerWriteQuorum && (minNumRacksPerWriteQuorumForThisEnsemble > 1)) {
+                    LOG.error("Only one rack available and minNumRacksPerWriteQuorum is enforced, so giving up");
+                    throw new BKNotEnoughBookiesException();
+                }
                 List<BookieNode> bns = selectRandom(ensembleSize, excludeNodes, TruePredicate.INSTANCE,
                         ensemble);
                 ArrayList<BookieSocketAddress> addrs = new ArrayList<BookieSocketAddress>(ensembleSize);
@@ -610,7 +619,9 @@ public void handleBookiesThatJoined(Set<BookieSocketAddress> joinedBookies) {
                     }
                     curRack = sb.toString();
                 }
-                prevNode = selectFromNetworkLocation(curRack, excludeNodes, ensemble, ensemble);
+                boolean firstBookieInTheEnsemble = (null == prevNode);
+                prevNode = selectFromNetworkLocation(curRack, excludeNodes, ensemble, ensemble,
+                        !enforceMinNumRacksPerWriteQuorum || firstBookieInTheEnsemble);
                 racks[i] = prevNode.getNetworkLocation();
             }
             ArrayList<BookieSocketAddress> bookieList = ensemble.toList();
@@ -657,7 +668,8 @@ public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize,
                     networkLocationsToBeExcluded,
                     excludeNodes,
                     TruePredicate.INSTANCE,
-                    EnsembleForReplacementWithNoConstraints.INSTANCE);
+                    EnsembleForReplacementWithNoConstraints.INSTANCE,
+                    !enforceMinNumRacksPerWriteQuorum);
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Bookie {} is chosen to replace bookie {}.", candidate, bn);
             }
@@ -698,12 +710,20 @@ public BookieNode selectFromNetworkLocation(
             String networkLoc,
             Set<Node> excludeBookies,
             Predicate<BookieNode> predicate,
-            Ensemble<BookieNode> ensemble)
+            Ensemble<BookieNode> ensemble,
+            boolean fallbackToRandom)
             throws BKNotEnoughBookiesException {
         // select one from local rack
         try {
             return selectRandomFromRack(networkLoc, excludeBookies, predicate, ensemble);
         } catch (BKNotEnoughBookiesException e) {
+            if (!fallbackToRandom) {
+                LOG.error(
+                        "Failed to choose a bookie from {} : "
+                                + "excluded {}, enforceMinNumRacksPerWriteQuorum is enabled so giving up.",
+                        networkLoc, excludeBookies);
+                throw e;
+            }
             LOG.warn("Failed to choose a bookie from {} : "
                      + "excluded {}, fallback to choose bookie randomly from the cluster.",
                      networkLoc, excludeBookies);
@@ -712,28 +732,25 @@ public BookieNode selectFromNetworkLocation(
         }
     }
 
-    protected BookieNode selectFromNetworkLocation(String networkLoc,
+    @Override
+    public BookieNode selectFromNetworkLocation(String networkLoc,
                                                    Set<String> excludeRacks,
                                                    Set<Node> excludeBookies,
                                                    Predicate<BookieNode> predicate,
-                                                   Ensemble<BookieNode> ensemble)
+                                                   Ensemble<BookieNode> ensemble,
+                                                   boolean fallbackToRandom)
             throws BKNotEnoughBookiesException {
         // first attempt to select one from local rack
         try {
             return selectRandomFromRack(networkLoc, excludeBookies, predicate, ensemble);
         } catch (BKNotEnoughBookiesException e) {
-            if (isWeighted) {
-                // if weight based selection is enabled, randomly select one from the whole cluster
-                // based on weights and ignore the provided <tt>excludeRacks</tt>.
-                // randomly choose one from whole cluster, ignore the provided predicate.
-                return selectRandom(1, excludeBookies, predicate, ensemble).get(0);
-            } else {
-                // if weight based selection is disabled, and there is no enough bookie from local rack,
-                // select bookies from the whole cluster and exclude the racks specified at <tt>excludeRacks</tt>.
-                return selectFromNetworkLocation(excludeRacks, excludeBookies, predicate, ensemble);
-            }
+            /*
+             * there is no enough bookie from local rack, select bookies from
+             * the whole cluster and exclude the racks specified at
+             * <tt>excludeRacks</tt>.
+             */
+            return selectFromNetworkLocation(excludeRacks, excludeBookies, predicate, ensemble, fallbackToRandom);
         }
-
     }
 
 
@@ -742,27 +759,39 @@ protected BookieNode selectFromNetworkLocation(String networkLoc,
      * <i>excludeBookies</i> set. If it fails to find one, it selects a random {@link BookieNode} from the whole
      * cluster.
      */
-    protected BookieNode selectFromNetworkLocation(Set<String> excludeRacks,
+    @Override
+    public BookieNode selectFromNetworkLocation(Set<String> excludeRacks,
                                                    Set<Node> excludeBookies,
                                                    Predicate<BookieNode> predicate,
-                                                   Ensemble<BookieNode> ensemble)
+                                                   Ensemble<BookieNode> ensemble,
+                                                   boolean fallbackToRandom)
             throws BKNotEnoughBookiesException {
-        List<BookieNode> knownNodes = new ArrayList<>(knownBookies.values());
-        Collections.shuffle(knownNodes);
 
+        List<BookieNode> knownNodes = new ArrayList<>(knownBookies.values());
+        Set<Node> fullExclusionBookiesList = new HashSet<Node>(excludeBookies);
         for (BookieNode knownNode : knownNodes) {
-            if (excludeBookies.contains(knownNode)) {
-                continue;
-            }
             if (excludeRacks.contains(knownNode.getNetworkLocation())) {
-                continue;
+                fullExclusionBookiesList.add(knownNode);
+            }
+        }
+
+        try {
+            return selectRandomInternal(knownNodes, 1, fullExclusionBookiesList, TruePredicate.INSTANCE,
+                    EnsembleForReplacementWithNoConstraints.INSTANCE).get(0);
+        } catch (BKNotEnoughBookiesException e) {
+            if (!fallbackToRandom) {
+                LOG.error(
+                        "Failed to choose a bookie excluding Racks: {} "
+                                + "Nodes: {}, enforceMinNumRacksPerWriteQuorum is enabled so giving up.",
+                        excludeRacks, excludeBookies);
+                throw e;
             }
-            return knownNode;
+
+            LOG.warn("Failed to choose a bookie: excluded {}, fallback to choose bookie randomly from the cluster.",
+                    excludeBookies);
+            // randomly choose one from whole cluster
+            return selectRandom(1, excludeBookies, predicate, ensemble).get(0);
         }
-        LOG.warn("Failed to choose a bookie: excluded {}, fallback to choose bookie randomly from the cluster.",
-                excludeBookies);
-        // randomly choose one from whole cluster
-        return selectRandom(1, excludeBookies, predicate, ensemble).get(0);
     }
 
     private WeightedRandomSelection<BookieNode> prepareForWeightedSelection(List<Node> leaves) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
index 218781b15..dcd4b17f0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
@@ -131,7 +131,7 @@ public void handleBookiesThatJoined(Set<BookieSocketAddress> joinedBookies) {
                 perRegionPlacement.put(region, new RackawareEnsemblePlacementPolicy()
                         .initialize(dnsResolver, timer, this.reorderReadsRandom, this.stabilizePeriodSeconds,
                                 this.reorderThresholdPendingRequests, this.isWeighted, this.maxWeightMultiple,
-                                this.minNumRacksPerWriteQuorum, statsLogger)
+                                this.minNumRacksPerWriteQuorum, this.enforceMinNumRacksPerWriteQuorum, statsLogger)
                         .withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK));
             }
 
@@ -176,11 +176,11 @@ public RegionAwareEnsemblePlacementPolicy initialize(ClientConfiguration conf,
             // Regions are specified as
             // R1;R2;...
             String[] regions = regionsString.split(";");
-            for (String region: regions) {
+            for (String region : regions) {
                 perRegionPlacement.put(region, new RackawareEnsemblePlacementPolicy(true)
                         .initialize(dnsResolver, timer, this.reorderReadsRandom, this.stabilizePeriodSeconds,
                                 this.reorderThresholdPendingRequests, this.isWeighted, this.maxWeightMultiple,
-                                this.minNumRacksPerWriteQuorum, statsLogger)
+                                this.minNumRacksPerWriteQuorum, this.enforceMinNumRacksPerWriteQuorum, statsLogger)
                         .withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK));
             }
             minRegionsForDurability = conf.getInt(REPP_MINIMUM_REGIONS_FOR_DURABILITY,
@@ -502,7 +502,8 @@ protected BookieNode replaceFromRack(BookieNode bookieNodeToReplace,
                         bookieNodeToReplace.getNetworkLocation(),
                         excludeBookies,
                         TruePredicate.INSTANCE,
-                        EnsembleForReplacementWithNoConstraints.INSTANCE);
+                        EnsembleForReplacementWithNoConstraints.INSTANCE,
+                        true);
                 } catch (BKException.BKNotEnoughBookiesException e) {
                     LOG.warn("Failed to choose a bookie from {} : "
                             + "excluded {}, fallback to choose bookie randomly from the cluster.",
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
index c813d5fdd..2c80f43e1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
@@ -144,6 +144,9 @@
     // minimum number of racks per write quorum
     public static final String MIN_NUM_RACKS_PER_WRITE_QUORUM = "minNumRacksPerWriteQuorum";
 
+    // enforce minimum number of racks per write quorum
+    public static final String ENFORCE_MIN_NUM_RACKS_PER_WRITE_QUORUM = "enforceMinNumRacksPerWriteQuorum";
+
     protected AbstractConfiguration() {
         super();
         if (READ_SYSTEM_PROPERTIES) {
@@ -795,6 +798,20 @@ public int getMinNumRacksPerWriteQuorum() {
         return getInteger(MIN_NUM_RACKS_PER_WRITE_QUORUM, 2);
     }
 
+    /**
+     * Set the flag to enforce minimum number of racks per write quorum.
+     */
+    public void setEnforceMinNumRacksPerWriteQuorum(boolean enforceMinNumRacksPerWriteQuorum) {
+        setProperty(ENFORCE_MIN_NUM_RACKS_PER_WRITE_QUORUM, enforceMinNumRacksPerWriteQuorum);
+    }
+
+    /**
+     * Get the flag which enforces the minimum number of racks per write quorum.
+     */
+    public boolean getEnforceMinNumRacksPerWriteQuorum() {
+        return getBoolean(ENFORCE_MIN_NUM_RACKS_PER_WRITE_QUORUM, false);
+    }
+
     /**
      * Trickery to allow inheritance with fluent style.
      */
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
index cdff67902..e08500671 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
@@ -17,8 +17,8 @@
  */
 package org.apache.bookkeeper.client;
 
-import static org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy.REPP_DNS_RESOLVER_CLASS;
-import static org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy.shuffleWithMask;
+import static org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl.REPP_DNS_RESOLVER_CLASS;
+import static org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl.shuffleWithMask;
 import static org.apache.bookkeeper.client.RoundRobinDistributionSchedule.writeSetFromValues;
 import static org.apache.bookkeeper.feature.SettableFeatureProvider.DISABLE_ALL;
 
@@ -28,8 +28,10 @@
 
 import java.net.InetAddress;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -39,10 +41,14 @@
 
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
 import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
+import org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy.BookieNode;
+import org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy.EnsembleForReplacementWithNoConstraints;
+import org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy.TruePredicate;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.net.DNSToSwitchMapping;
 import org.apache.bookkeeper.net.NetworkTopology;
+import org.apache.bookkeeper.net.Node;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.util.StaticDNSResolver;
 import org.junit.Test;
@@ -672,6 +678,504 @@ public void testNewEnsembleWithSingleRack() throws Exception {
         }
     }
 
+    @Test
+    public void testSingleRackWithEnforceMinNumRacks() throws Exception {
+        repp.uninitalize();
+        updateMyRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        StaticDNSResolver.addNodeToRack(addr1.getHostName(), NetworkTopology.DEFAULT_REGION_AND_RACK);
+        StaticDNSResolver.addNodeToRack(addr2.getHostName(), NetworkTopology.DEFAULT_REGION_AND_RACK);
+        StaticDNSResolver.addNodeToRack(addr3.getHostName(), NetworkTopology.DEFAULT_REGION_AND_RACK);
+        StaticDNSResolver.addNodeToRack(addr4.getHostName(), NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        ClientConfiguration clientConf = new ClientConfiguration(conf);
+        clientConf.setMinNumRacksPerWriteQuorum(2);
+        clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
+        repp = new RackawareEnsemblePlacementPolicy();
+        repp.initialize(clientConf, Optional.<DNSToSwitchMapping> empty(), timer, DISABLE_ALL,
+                NullStatsLogger.INSTANCE);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+        addrs.add(addr1);
+        addrs.add(addr2);
+        addrs.add(addr3);
+        addrs.add(addr4);
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+
+        ArrayList<BookieSocketAddress> ensemble;
+        try {
+            ensemble = repp.newEnsemble(3, 2, 2, null, new HashSet<>());
+            fail("Should get not enough bookies exception since there is only one rack.");
+        } catch (BKNotEnoughBookiesException bnebe) {
+        }
+
+        try {
+            ensemble = repp.newEnsemble(3, 2, 2, new HashSet<>(), EnsembleForReplacementWithNoConstraints.INSTANCE,
+                    TruePredicate.INSTANCE);
+            fail("Should get not enough bookies exception since there is only one rack.");
+        } catch (BKNotEnoughBookiesException bnebe) {
+        }
+    }
+
+    @Test
+    public void testNewEnsembleWithEnforceMinNumRacks() throws Exception {
+        repp.uninitalize();
+
+        int minNumRacksPerWriteQuorum = 4;
+        ClientConfiguration clientConf = new ClientConfiguration(conf);
+        clientConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorum);
+        // set enforceMinNumRacksPerWriteQuorum
+        clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
+        repp = new RackawareEnsemblePlacementPolicy();
+        repp.initialize(clientConf, Optional.<DNSToSwitchMapping> empty(), timer, DISABLE_ALL,
+                NullStatsLogger.INSTANCE);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        int numOfRacks = 3;
+        int numOfBookiesPerRack = 5;
+        BookieSocketAddress[] bookieSocketAddresses = new BookieSocketAddress[numOfRacks * numOfBookiesPerRack];
+
+        for (int i = 0; i < numOfRacks; i++) {
+            for (int j = 0; j < numOfBookiesPerRack; j++) {
+                int index = i * numOfBookiesPerRack + j;
+                bookieSocketAddresses[index] = new BookieSocketAddress("128.0.0." + index, 3181);
+                StaticDNSResolver.addNodeToRack(bookieSocketAddresses[index].getHostName(), "/default-region/r" + i);
+            }
+        }
+
+        repp.onClusterChanged(new HashSet<BookieSocketAddress>(Arrays.asList(bookieSocketAddresses)),
+                new HashSet<BookieSocketAddress>());
+
+        try {
+            repp.newEnsemble(8, 4, 4, null, new HashSet<>());
+            fail("Should get not enough bookies exception since there are only 3 racks");
+        } catch (BKNotEnoughBookiesException bnebe) {
+        }
+
+        try {
+            repp.newEnsemble(8, 4, 4, new HashSet<>(),
+                    EnsembleForReplacementWithNoConstraints.INSTANCE, TruePredicate.INSTANCE);
+            fail("Should get not enough bookies exception since there are only 3 racks");
+        } catch (BKNotEnoughBookiesException bnebe) {
+        }
+
+        /*
+         * Though minNumRacksPerWriteQuorum is set to 4, since writeQuorum is 3
+         * and there are enough bookies in 3 racks, this newEnsemble calls should
+         * succeed.
+         */
+        ArrayList<BookieSocketAddress> ensemble;
+        int ensembleSize = numOfRacks * numOfBookiesPerRack;
+        int writeQuorumSize = numOfRacks;
+        int ackQuorumSize = numOfRacks;
+
+        ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, null, new HashSet<>());
+        assertEquals("Number of writeQuorum sets covered", ensembleSize,
+                getNumCoveredWriteQuorums(ensemble, writeQuorumSize, clientConf.getMinNumRacksPerWriteQuorum()));
+
+        ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, new HashSet<>(),
+                EnsembleForReplacementWithNoConstraints.INSTANCE, TruePredicate.INSTANCE);
+        assertEquals("Number of writeQuorum sets covered", ensembleSize,
+                getNumCoveredWriteQuorums(ensemble, writeQuorumSize, clientConf.getMinNumRacksPerWriteQuorum()));
+    }
+
+    @Test
+    public void testNewEnsembleWithSufficientRacksAndEnforceMinNumRacks() throws Exception {
+        repp.uninitalize();
+
+        int minNumRacksPerWriteQuorum = 4;
+        ClientConfiguration clientConf = new ClientConfiguration(conf);
+        clientConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorum);
+        // set enforceMinNumRacksPerWriteQuorum
+        clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
+        repp = new RackawareEnsemblePlacementPolicy();
+        repp.initialize(clientConf, Optional.<DNSToSwitchMapping> empty(), timer, DISABLE_ALL,
+                NullStatsLogger.INSTANCE);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        int writeQuorumSize = 3;
+        int ackQuorumSize = 3;
+        int effectiveMinNumRacksPerWriteQuorum = Math.min(minNumRacksPerWriteQuorum, writeQuorumSize);
+
+        int numOfRacks = 2 * effectiveMinNumRacksPerWriteQuorum - 1;
+        int numOfBookiesPerRack = 20;
+        BookieSocketAddress[] bookieSocketAddresses = new BookieSocketAddress[numOfRacks * numOfBookiesPerRack];
+
+        for (int i = 0; i < numOfRacks; i++) {
+            for (int j = 0; j < numOfBookiesPerRack; j++) {
+                int index = i * numOfBookiesPerRack + j;
+                bookieSocketAddresses[index] = new BookieSocketAddress("128.0.0." + index, 3181);
+                StaticDNSResolver.addNodeToRack(bookieSocketAddresses[index].getHostName(), "/default-region/r" + i);
+            }
+        }
+
+        Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+        repp.onClusterChanged(new HashSet<BookieSocketAddress>(Arrays.asList(bookieSocketAddresses)),
+                new HashSet<BookieSocketAddress>());
+
+        /*
+         * in this scenario we have enough number of racks (2 *
+         * effectiveMinNumRacksPerWriteQuorum - 1) and more number of bookies in
+         * each rack. So we should be able to create ensemble for all
+         * ensembleSizes (as long as there are enough number of bookies in each
+         * rack).
+         */
+        ArrayList<BookieSocketAddress> ensemble;
+        for (int ensembleSize = effectiveMinNumRacksPerWriteQuorum; ensembleSize < 40; ensembleSize++) {
+            ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, null, new HashSet<>());
+            assertEquals("Number of writeQuorum sets covered", ensembleSize,
+                    getNumCoveredWriteQuorums(ensemble, writeQuorumSize, clientConf.getMinNumRacksPerWriteQuorum()));
+
+            ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, new HashSet<>(),
+                    EnsembleForReplacementWithNoConstraints.INSTANCE, TruePredicate.INSTANCE);
+            assertEquals("Number of writeQuorum sets covered", ensembleSize,
+                    getNumCoveredWriteQuorums(ensemble, writeQuorumSize, clientConf.getMinNumRacksPerWriteQuorum()));
+        }
+    }
+
+    @Test
+    public void testReplaceBookieWithEnforceMinNumRacks() throws Exception {
+        repp.uninitalize();
+
+        int minNumRacksPerWriteQuorum = 4;
+        ClientConfiguration clientConf = new ClientConfiguration(conf);
+        clientConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorum);
+        // set enforceMinNumRacksPerWriteQuorum
+        clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
+        repp = new RackawareEnsemblePlacementPolicy();
+        repp.initialize(clientConf, Optional.<DNSToSwitchMapping> empty(), timer, DISABLE_ALL,
+                NullStatsLogger.INSTANCE);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        int numOfRacks = 3;
+        int numOfBookiesPerRack = 5;
+        Set<BookieSocketAddress> bookieSocketAddresses = new HashSet<BookieSocketAddress>();
+        Map<BookieSocketAddress, String> bookieRackMap = new HashMap<BookieSocketAddress, String>();
+        BookieSocketAddress bookieAddress;
+        String rack;
+        for (int i = 0; i < numOfRacks; i++) {
+            for (int j = 0; j < numOfBookiesPerRack; j++) {
+                int index = i * numOfBookiesPerRack + j;
+                bookieAddress = new BookieSocketAddress("128.0.0." + index, 3181);
+                rack = "/default-region/r" + i;
+                StaticDNSResolver.addNodeToRack(bookieAddress.getHostName(), rack);
+                bookieSocketAddresses.add(bookieAddress);
+                bookieRackMap.put(bookieAddress, rack);
+            }
+        }
+
+        repp.onClusterChanged(bookieSocketAddresses, new HashSet<BookieSocketAddress>());
+
+        /*
+         * Though minNumRacksPerWriteQuorum is set to 4, since writeQuorum is 3
+         * and there are enough bookies in 3 racks, this newEnsemble call should
+         * succeed.
+         */
+        ArrayList<BookieSocketAddress> ensemble;
+        int ensembleSize = numOfRacks * numOfBookiesPerRack;
+        int writeQuorumSize = numOfRacks;
+        int ackQuorumSize = numOfRacks;
+
+        ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, null, new HashSet<>());
+
+        BookieSocketAddress bookieInEnsembleToBeReplaced = ensemble.get(7);
+        // get rack of some other bookie
+        String rackOfOtherBookieInEnsemble = bookieRackMap.get(ensemble.get(8));
+        BookieSocketAddress newBookieAddress1 = new BookieSocketAddress("128.0.0.100", 3181);
+        /*
+         * add the newBookie to the rack of some other bookie in the current
+         * ensemble
+         */
+        StaticDNSResolver.addNodeToRack(newBookieAddress1.getHostName(), rackOfOtherBookieInEnsemble);
+        bookieSocketAddresses.add(newBookieAddress1);
+        bookieRackMap.put(newBookieAddress1, rackOfOtherBookieInEnsemble);
+
+        repp.onClusterChanged(bookieSocketAddresses, new HashSet<BookieSocketAddress>());
+        try {
+            repp.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, null,
+                    new HashSet<BookieSocketAddress>(ensemble), bookieInEnsembleToBeReplaced, new HashSet<>());
+            fail("Should get not enough bookies exception since there are no more bookies in rack"
+                    + "of 'bookieInEnsembleToReplace'"
+                    + "and new bookie added belongs to the rack of some other bookie in the ensemble");
+        } catch (BKNotEnoughBookiesException bnebe) {
+            // this is expected
+        }
+
+        String newRack = "/default-region/r100";
+        BookieSocketAddress newBookieAddress2 = new BookieSocketAddress("128.0.0.101", 3181);
+        /*
+         * add the newBookie to a new rack.
+         */
+        StaticDNSResolver.addNodeToRack(newBookieAddress2.getHostName(), newRack);
+        bookieSocketAddresses.add(newBookieAddress2);
+        bookieRackMap.put(newBookieAddress2, newRack);
+
+        repp.onClusterChanged(bookieSocketAddresses, new HashSet<BookieSocketAddress>());
+        /*
+         * this replaceBookie should succeed, because a new bookie is added to a
+         * new rack.
+         */
+        BookieSocketAddress replacedBookieAddress = repp.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize,
+                null, new HashSet<BookieSocketAddress>(ensemble), bookieInEnsembleToBeReplaced, new HashSet<>());
+        assertEquals("It should be newBookieAddress2", newBookieAddress2, replacedBookieAddress);
+
+        Set<BookieSocketAddress> bookiesToExclude = new HashSet<>();
+        bookiesToExclude.add(newBookieAddress2);
+        repp.onClusterChanged(bookieSocketAddresses, new HashSet<BookieSocketAddress>());
+        try {
+            repp.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, null,
+                    new HashSet<BookieSocketAddress>(ensemble), bookieInEnsembleToBeReplaced, bookiesToExclude);
+            fail("Should get not enough bookies exception since the only available bookie to replace"
+                    + "is added to excludedBookies list");
+        } catch (BKNotEnoughBookiesException bnebe) {
+            // this is expected
+        }
+
+        // get rack of the bookie to be replaced
+        String rackOfBookieToBeReplaced = bookieRackMap.get(bookieInEnsembleToBeReplaced);
+        BookieSocketAddress newBookieAddress3 = new BookieSocketAddress("128.0.0.102", 3181);
+        /*
+         * add the newBookie to rack of the bookie to be replaced.
+         */
+        StaticDNSResolver.addNodeToRack(newBookieAddress3.getHostName(), rackOfBookieToBeReplaced);
+        bookieSocketAddresses.add(newBookieAddress3);
+        bookieRackMap.put(newBookieAddress3, rackOfBookieToBeReplaced);
+
+        repp.onClusterChanged(bookieSocketAddresses, new HashSet<BookieSocketAddress>());
+        /*
+         * here we have added new bookie to the rack of the bookie to be
+         * replaced, so we should be able to replacebookie though
+         * newBookieAddress2 is added to excluded bookies list.
+         */
+        replacedBookieAddress = repp.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, null,
+                new HashSet<BookieSocketAddress>(ensemble), bookieInEnsembleToBeReplaced, bookiesToExclude);
+        assertEquals("It should be newBookieAddress3", newBookieAddress3, replacedBookieAddress);
+    }
+
+    @Test
+    public void testSelectBookieFromNetworkLoc() throws Exception {
+        repp.uninitalize();
+
+        int minNumRacksPerWriteQuorum = 4;
+        ClientConfiguration clientConf = new ClientConfiguration(conf);
+        clientConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorum);
+        // set enforceMinNumRacksPerWriteQuorum
+        clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
+        repp = new RackawareEnsemblePlacementPolicy();
+        repp.initialize(clientConf, Optional.<DNSToSwitchMapping> empty(), timer, DISABLE_ALL,
+                NullStatsLogger.INSTANCE);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        int numOfRacks = 3;
+        int numOfBookiesPerRack = 5;
+        String[] rackLocationNames = new String[numOfRacks];
+        List<BookieSocketAddress> bookieSocketAddresses = new ArrayList<BookieSocketAddress>();
+        Map<BookieSocketAddress, String> bookieRackMap = new HashMap<BookieSocketAddress, String>();
+        BookieSocketAddress bookieAddress;
+
+        for (int i = 0; i < numOfRacks; i++) {
+            rackLocationNames[i] = "/default-region/r" + i;
+            for (int j = 0; j < numOfBookiesPerRack; j++) {
+                int index = i * numOfBookiesPerRack + j;
+                bookieAddress = new BookieSocketAddress("128.0.0." + index, 3181);
+                StaticDNSResolver.addNodeToRack(bookieAddress.getHostName(), rackLocationNames[i]);
+                bookieSocketAddresses.add(bookieAddress);
+                bookieRackMap.put(bookieAddress, rackLocationNames[i]);
+            }
+        }
+        String nonExistingRackLocation = "/default-region/r25";
+
+        repp.onClusterChanged(new HashSet<BookieSocketAddress>(bookieSocketAddresses),
+                new HashSet<BookieSocketAddress>());
+
+        String rack = bookieRackMap.get(bookieSocketAddresses.get(0));
+        BookieNode bookieNode = repp.selectFromNetworkLocation(rack, new HashSet<Node>(), TruePredicate.INSTANCE,
+                EnsembleForReplacementWithNoConstraints.INSTANCE, false);
+        String recRack = bookieNode.getNetworkLocation();
+        assertEquals("Rack of node", rack, recRack);
+
+        try {
+            repp.selectFromNetworkLocation(nonExistingRackLocation, new HashSet<Node>(), TruePredicate.INSTANCE,
+                    EnsembleForReplacementWithNoConstraints.INSTANCE, false);
+            fail("Should get not enough bookies exception since there are no bookies in this rack");
+        } catch (BKNotEnoughBookiesException bnebe) {
+            // this is expected
+        }
+
+        // it should not fail, since fallback is set to true and it should pick
+        // some random one
+        repp.selectFromNetworkLocation(nonExistingRackLocation, new HashSet<Node>(), TruePredicate.INSTANCE,
+                EnsembleForReplacementWithNoConstraints.INSTANCE, true);
+
+        Set<BookieSocketAddress> excludeBookiesOfRackR0 = new HashSet<BookieSocketAddress>();
+        for (int i = 0; i < numOfBookiesPerRack; i++) {
+            excludeBookiesOfRackR0.add(bookieSocketAddresses.get(i));
+        }
+
+        Set<Node> excludeBookieNodesOfRackR0 = repp.convertBookiesToNodes(excludeBookiesOfRackR0);
+        try {
+            repp.selectFromNetworkLocation(bookieRackMap.get(bookieSocketAddresses.get(0)), excludeBookieNodesOfRackR0,
+                    TruePredicate.INSTANCE, EnsembleForReplacementWithNoConstraints.INSTANCE, false);
+            fail("Should get not enough bookies exception since all the bookies in r0 are added to the exclusion list");
+        } catch (BKNotEnoughBookiesException bnebe) {
+            // this is expected
+        }
+
+        // not expected to get exception since fallback is set to true
+        bookieNode = repp.selectFromNetworkLocation(bookieRackMap.get(bookieSocketAddresses.get(0)),
+                excludeBookieNodesOfRackR0, TruePredicate.INSTANCE, EnsembleForReplacementWithNoConstraints.INSTANCE,
+                true);
+        assertTrue("BookieNode should not be from Rack /r0" + bookieNode.getNetworkLocation(),
+                rackLocationNames[1].equals(bookieNode.getNetworkLocation())
+                        || rackLocationNames[2].equals(bookieNode.getNetworkLocation()));
+    }
+
+    @Test
+    public void testSelectBookieFromExcludingRacks() throws Exception {
+        repp.uninitalize();
+
+        int minNumRacksPerWriteQuorum = 4;
+        ClientConfiguration clientConf = new ClientConfiguration(conf);
+        clientConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorum);
+        // set enforceMinNumRacksPerWriteQuorum
+        clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
+        repp = new RackawareEnsemblePlacementPolicy();
+        repp.initialize(clientConf, Optional.<DNSToSwitchMapping> empty(), timer, DISABLE_ALL,
+                NullStatsLogger.INSTANCE);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        int numOfRacks = 3;
+        int numOfBookiesPerRack = 5;
+        String[] rackLocationNames = new String[numOfRacks];
+        List<BookieSocketAddress> bookieSocketAddresses = new ArrayList<BookieSocketAddress>();
+        Map<BookieSocketAddress, String> bookieRackMap = new HashMap<BookieSocketAddress, String>();
+        BookieSocketAddress bookieAddress;
+
+        for (int i = 0; i < numOfRacks; i++) {
+            rackLocationNames[i] = "/default-region/r" + i;
+            for (int j = 0; j < numOfBookiesPerRack; j++) {
+                int index = i * numOfBookiesPerRack + j;
+                bookieAddress = new BookieSocketAddress("128.0.0." + index, 3181);
+                StaticDNSResolver.addNodeToRack(bookieAddress.getHostName(), rackLocationNames[i]);
+                bookieSocketAddresses.add(bookieAddress);
+                bookieRackMap.put(bookieAddress, rackLocationNames[i]);
+            }
+        }
+
+        repp.onClusterChanged(new HashSet<BookieSocketAddress>(bookieSocketAddresses),
+                new HashSet<BookieSocketAddress>());
+
+        Set<BookieSocketAddress> excludeBookiesOfRackR0 = new HashSet<BookieSocketAddress>();
+        for (int i = 0; i < numOfBookiesPerRack; i++) {
+            excludeBookiesOfRackR0.add(bookieSocketAddresses.get(i));
+        }
+
+        Set<Node> excludeBookieNodesOfRackR0 = repp.convertBookiesToNodes(excludeBookiesOfRackR0);
+
+        Set<String> excludeRacksRackR1AndR2 = new HashSet<String>();
+        excludeRacksRackR1AndR2.add(rackLocationNames[1]);
+        excludeRacksRackR1AndR2.add(rackLocationNames[2]);
+
+        try {
+            repp.selectFromNetworkLocation(excludeRacksRackR1AndR2, excludeBookieNodesOfRackR0, TruePredicate.INSTANCE,
+                    EnsembleForReplacementWithNoConstraints.INSTANCE, false);
+            fail("Should get not enough bookies exception racks R1 and R2 are"
+                    + "excluded and all the bookies in r0 are added to the exclusion list");
+        } catch (BKNotEnoughBookiesException bnebe) {
+            // this is expected
+        }
+
+        BookieNode bookieNode = repp.selectFromNetworkLocation(excludeRacksRackR1AndR2, new HashSet<Node>(),
+                TruePredicate.INSTANCE, EnsembleForReplacementWithNoConstraints.INSTANCE, false);
+        assertTrue("BookieNode should be from Rack /r0" + bookieNode.getNetworkLocation(),
+                rackLocationNames[0].equals(bookieNode.getNetworkLocation()));
+
+        // not expected to get exception since fallback is set to true
+        bookieNode = repp.selectFromNetworkLocation(excludeRacksRackR1AndR2, excludeBookieNodesOfRackR0,
+                TruePredicate.INSTANCE, EnsembleForReplacementWithNoConstraints.INSTANCE, true);
+        assertTrue("BookieNode should not be from Rack /r0" + bookieNode.getNetworkLocation(),
+                rackLocationNames[1].equals(bookieNode.getNetworkLocation())
+                        || rackLocationNames[2].equals(bookieNode.getNetworkLocation()));
+    }
+
+    @Test
+    public void testSelectBookieFromNetworkLocAndExcludingRacks() throws Exception {
+        repp.uninitalize();
+
+        int minNumRacksPerWriteQuorum = 4;
+        ClientConfiguration clientConf = new ClientConfiguration(conf);
+        clientConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorum);
+        // set enforceMinNumRacksPerWriteQuorum
+        clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
+        repp = new RackawareEnsemblePlacementPolicy();
+        repp.initialize(clientConf, Optional.<DNSToSwitchMapping> empty(), timer, DISABLE_ALL,
+                NullStatsLogger.INSTANCE);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        int numOfRacks = 3;
+        int numOfBookiesPerRack = 5;
+        String[] rackLocationNames = new String[numOfRacks];
+        List<BookieSocketAddress> bookieSocketAddresses = new ArrayList<BookieSocketAddress>();
+        Map<BookieSocketAddress, String> bookieRackMap = new HashMap<BookieSocketAddress, String>();
+        BookieSocketAddress bookieAddress;
+
+        for (int i = 0; i < numOfRacks; i++) {
+            rackLocationNames[i] = "/default-region/r" + i;
+            for (int j = 0; j < numOfBookiesPerRack; j++) {
+                int index = i * numOfBookiesPerRack + j;
+                bookieAddress = new BookieSocketAddress("128.0.0." + index, 3181);
+                StaticDNSResolver.addNodeToRack(bookieAddress.getHostName(), rackLocationNames[i]);
+                bookieSocketAddresses.add(bookieAddress);
+                bookieRackMap.put(bookieAddress, rackLocationNames[i]);
+            }
+        }
+        String nonExistingRackLocation = "/default-region/r25";
+
+        repp.onClusterChanged(new HashSet<BookieSocketAddress>(bookieSocketAddresses),
+                new HashSet<BookieSocketAddress>());
+
+        Set<BookieSocketAddress> excludeBookiesOfRackR0 = new HashSet<BookieSocketAddress>();
+        for (int i = 0; i < numOfBookiesPerRack; i++) {
+            excludeBookiesOfRackR0.add(bookieSocketAddresses.get(i));
+        }
+
+        Set<Node> excludeBookieNodesOfRackR0 = repp.convertBookiesToNodes(excludeBookiesOfRackR0);
+
+        Set<String> excludeRacksRackR1AndR2 = new HashSet<String>();
+        excludeRacksRackR1AndR2.add(rackLocationNames[1]);
+        excludeRacksRackR1AndR2.add(rackLocationNames[2]);
+
+        try {
+            repp.selectFromNetworkLocation(nonExistingRackLocation, excludeRacksRackR1AndR2,
+                    excludeBookieNodesOfRackR0,
+                    TruePredicate.INSTANCE, EnsembleForReplacementWithNoConstraints.INSTANCE, false);
+            fail("Should get not enough bookies exception racks R1 and R2 are excluded and all the bookies in"
+                    + "r0 are added to the exclusion list");
+        } catch (BKNotEnoughBookiesException bnebe) {
+            // this is expected
+        }
+
+        BookieNode bookieNode = repp.selectFromNetworkLocation(rackLocationNames[0], excludeRacksRackR1AndR2,
+                new HashSet<Node>(), TruePredicate.INSTANCE, EnsembleForReplacementWithNoConstraints.INSTANCE, false);
+        assertTrue("BookieNode should be from Rack /r0" + bookieNode.getNetworkLocation(),
+                rackLocationNames[0].equals(bookieNode.getNetworkLocation()));
+
+        bookieNode = repp.selectFromNetworkLocation(rackLocationNames[0], new HashSet<String>(),
+                excludeBookieNodesOfRackR0, TruePredicate.INSTANCE,
+                EnsembleForReplacementWithNoConstraints.INSTANCE, false);
+        assertTrue("BookieNode should not be from Rack /r0" + bookieNode.getNetworkLocation(),
+                rackLocationNames[1].equals(bookieNode.getNetworkLocation())
+                        || rackLocationNames[2].equals(bookieNode.getNetworkLocation()));
+
+        bookieNode = repp.selectFromNetworkLocation(nonExistingRackLocation, excludeRacksRackR1AndR2,
+                excludeBookieNodesOfRackR0, TruePredicate.INSTANCE, EnsembleForReplacementWithNoConstraints.INSTANCE,
+                true);
+        assertTrue("BookieNode should not be from Rack /r0" + bookieNode.getNetworkLocation(),
+                rackLocationNames[1].equals(bookieNode.getNetworkLocation())
+                        || rackLocationNames[2].equals(bookieNode.getNetworkLocation()));
+    }
+
     @Test
     public void testNewEnsembleWithMultipleRacks() throws Exception {
         BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);
@@ -904,12 +1408,15 @@ public void testWeightedPlacementAndReplaceBookieWithEnoughBookiesInSameRack() t
 
     @Test
     public void testWeightedPlacementAndReplaceBookieWithoutEnoughBookiesInSameRack() throws Exception {
+        BookieSocketAddress addr0 = new BookieSocketAddress("126.0.0.1", 3181);
         BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);
         BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181);
         BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181);
         BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181);
         // update dns mapping
         StaticDNSResolver.reset();
+        StaticDNSResolver.addNodeToRack(addr0.getSocketAddress().getAddress().getHostAddress(),
+                NetworkTopology.DEFAULT_REGION + "/r0");
         StaticDNSResolver.addNodeToRack(addr1.getSocketAddress().getAddress().getHostAddress(),
                 NetworkTopology.DEFAULT_REGION_AND_RACK);
         StaticDNSResolver.addNodeToRack(addr2.getSocketAddress().getAddress().getHostAddress(),
@@ -920,6 +1427,7 @@ public void testWeightedPlacementAndReplaceBookieWithoutEnoughBookiesInSameRack(
                 NetworkTopology.DEFAULT_REGION + "/r4");
         // Update cluster
         Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+        addrs.add(addr0);
         addrs.add(addr1);
         addrs.add(addr2);
         addrs.add(addr3);
@@ -933,13 +1441,15 @@ public void testWeightedPlacementAndReplaceBookieWithoutEnoughBookiesInSameRack(
 
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         Map<BookieSocketAddress, BookieInfo> bookieInfoMap = new HashMap<BookieSocketAddress, BookieInfo>();
+        bookieInfoMap.put(addr0, new BookieInfo(50L, 50L));
         bookieInfoMap.put(addr1, new BookieInfo(100L, 100L));
         bookieInfoMap.put(addr2, new BookieInfo(100L, 100L));
         bookieInfoMap.put(addr3, new BookieInfo(200L, 200L));
-        bookieInfoMap.put(addr4, new BookieInfo(multiple * 100L, multiple * 100L));
+        bookieInfoMap.put(addr4, new BookieInfo(multiple * 50L, multiple * 50L));
         repp.updateBookieInfo(bookieInfoMap);
 
         Map<BookieSocketAddress, Long> selectionCounts = new HashMap<BookieSocketAddress, Long>();
+        selectionCounts.put(addr0, 0L);
         selectionCounts.put(addr1, 0L);
         selectionCounts.put(addr2, 0L);
         selectionCounts.put(addr3, 0L);
@@ -951,10 +1461,14 @@ public void testWeightedPlacementAndReplaceBookieWithoutEnoughBookiesInSameRack(
             // will come from other racks. However, the weight should be honored in such
             // selections as well
             replacedBookie = repp.replaceBookie(1, 1, 1, null, new HashSet<>(), addr2, new HashSet<>());
-            assertTrue(addr1.equals(replacedBookie) || addr3.equals(replacedBookie) || addr4.equals(replacedBookie));
+            assertTrue(addr0.equals(replacedBookie) || addr1.equals(replacedBookie) || addr3.equals(replacedBookie)
+                    || addr4.equals(replacedBookie));
             selectionCounts.put(replacedBookie, selectionCounts.get(replacedBookie) + 1);
         }
-
+        /*
+         * since addr2 has to be replaced, the remaining bookies weight are - 50, 100, 200, 500 (10*50)
+         * So the median calculated by WeightedRandomSelection is (100 + 200) / 2 = 150
+         */
         double medianWeight = 150;
         double medianSelectionCounts = (double) (medianWeight / bookieInfoMap.get(addr1).getWeight())
             * selectionCounts.get(addr1);
diff --git a/conf/bk_server.conf b/conf/bk_server.conf
index 1450def3a..7a0f361ea 100755
--- a/conf/bk_server.conf
+++ b/conf/bk_server.conf
@@ -860,6 +860,15 @@ zkEnableSecurity=false
 # The max number of args used in the script provided at `networkTopologyScriptFileName`
 # networkTopologyScriptNumberArgs=100
 
+# minimum number of racks per write quorum. RackawareEnsemblePlacementPolicy will try to
+# get bookies from atleast 'minNumRacksPerWriteQuorum' racks for a writeQuorum.
+# minNumRacksPerWriteQuorum=2
+
+# 'enforceMinNumRacksPerWriteQuorum' enforces RackawareEnsemblePlacementPolicy to pick
+# bookies from 'minNumRacksPerWriteQuorum' racks for a writeQuorum. If it cann't find
+# bookie then it would throw BKNotEnoughBookiesException instead of picking random one.
+# enforceMinNumRacksPerWriteQuorum=false
+
 #############################################################################
 ## Auditor settings
 #############################################################################
diff --git a/site/_data/config/bk_server.yaml b/site/_data/config/bk_server.yaml
index 39f2719d8..ac24fce9b 100644
--- a/site/_data/config/bk_server.yaml
+++ b/site/_data/config/bk_server.yaml
@@ -610,6 +610,12 @@ groups:
   - param: networkTopologyScriptNumberArgs
     description: |
       The max number of args used in the script provided at `networkTopologyScriptFileName`.
+  - param: minNumRacksPerWriteQuorum
+    description: |
+      minimum number of racks per write quorum. RackawareEnsemblePlacementPolicy will try to get bookies from atleast 'minNumRacksPerWriteQuorum' racks for a writeQuorum.
+  - param: enforceMinNumRacksPerWriteQuorum
+    description: |
+      'enforceMinNumRacksPerWriteQuorum' enforces RackawareEnsemblePlacementPolicy to pick bookies from 'minNumRacksPerWriteQuorum' racks for a writeQuorum. If it cann't find bookie then it would throw BKNotEnoughBookiesException instead of picking random one.
 
 - name: AutoRecovery auditor settings
   params:


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services