You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/06/20 01:08:34 UTC

[bookkeeper] branch master updated: ISSUE #1495: Option to enforce minNumRacksPerWriteQuorum

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 10c5103  ISSUE #1495: Option to enforce minNumRacksPerWriteQuorum
10c5103 is described below

commit 10c510333d904debf54eb832524f6e09dcc0685d
Author: cguttapalem <cg...@salesforce.com>
AuthorDate: Tue Jun 19 18:08:26 2018 -0700

    ISSUE #1495: Option to enforce minNumRacksPerWriteQuorum
    
    Descriptions of the changes in this PR:
    
    Provide an option to enforce the guarantee of minNumRacksPerWriteQuorum
    if it is enabled. If it cann't find a bookie to enforce the guarantee
    then the API in RackawareEnsemblePlacementPolicy should throw
    BKNotEnoughBookiesException.
    
    Master Issue: #1495
    
    Author: cguttapalem <cg...@salesforce.com>
    
    Reviewers: Sijie Guo <si...@apache.org>
    
    This closes #1496 from reddycharan/enforceminracks, closes #1495
---
 .../ITopologyAwareEnsemblePlacementPolicy.java     |  54 ++-
 .../client/RackawareEnsemblePlacementPolicy.java   |  68 ++-
 .../RackawareEnsemblePlacementPolicyImpl.java      |  87 ++--
 .../client/RegionAwareEnsemblePlacementPolicy.java |   9 +-
 .../bookkeeper/conf/AbstractConfiguration.java     |  17 +
 .../TestRackawareEnsemblePlacementPolicy.java      | 524 ++++++++++++++++++++-
 conf/bk_server.conf                                |   9 +
 site/_data/config/bk_server.yaml                   |   6 +
 8 files changed, 726 insertions(+), 48 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ITopologyAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ITopologyAwareEnsemblePlacementPolicy.java
index 4a05e18..87e6f18 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ITopologyAwareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ITopologyAwareEnsemblePlacementPolicy.java
@@ -20,6 +20,10 @@ package org.apache.bookkeeper.client;
 import java.util.ArrayList;
 import java.util.Set;
 
+import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
+import org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy.Ensemble;
+import org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy.Predicate;
+import org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy.BookieNode;
 import org.apache.bookkeeper.common.annotation.InterfaceAudience;
 import org.apache.bookkeeper.common.annotation.InterfaceStability;
 import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -109,16 +113,64 @@ public interface ITopologyAwareEnsemblePlacementPolicy<T extends Node> extends E
      *          predicate to apply
      * @param ensemble
      *          ensemble
+     * @param fallbackToRandom
+     *          fallbackToRandom
      * @return the selected bookie.
      * @throws BKException.BKNotEnoughBookiesException
      */
     T selectFromNetworkLocation(String networkLoc,
                                 Set<Node> excludeBookies,
                                 Predicate<T> predicate,
-                                Ensemble<T> ensemble)
+                                Ensemble<T> ensemble,
+                                boolean fallbackToRandom)
             throws BKException.BKNotEnoughBookiesException;
 
     /**
+     * Select a node from cluster excluding excludeBookies and bookie nodes of
+     * excludeRacks. If there isn't a BookieNode excluding those racks and
+     * nodes, then if fallbackToRandom is set to true then pick a random node
+     * from cluster just excluding excludeBookies.
+     *
+     * @param excludeRacks
+     * @param excludeBookies
+     * @param predicate
+     * @param ensemble
+     * @param fallbackToRandom
+     * @return
+     * @throws BKException.BKNotEnoughBookiesException
+     */
+    T selectFromNetworkLocation(Set<String> excludeRacks,
+                                Set<Node> excludeBookies,
+                                Predicate<BookieNode> predicate,
+                                Ensemble<BookieNode> ensemble,
+                                boolean fallbackToRandom)
+            throws BKException.BKNotEnoughBookiesException;
+
+    /**
+     * Select a node from networkLoc rack excluding excludeBookies. If there
+     * isn't any node in 'networkLoc', then it will try to get a node from
+     * cluster excluding excludeRacks and excludeBookies. If fallbackToRandom is
+     * set to true then it will get a random bookie from cluster excluding
+     * excludeBookies if it couldn't find a bookie
+     *
+     * @param networkLoc
+     * @param excludeRacks
+     * @param excludeBookies
+     * @param predicate
+     * @param ensemble
+     * @param fallbackToRandom
+     * @return
+     * @throws BKNotEnoughBookiesException
+     */
+    T selectFromNetworkLocation(String networkLoc,
+                                Set<String> excludeRacks,
+                                Set<Node> excludeBookies,
+                                Predicate<BookieNode> predicate,
+                                Ensemble<BookieNode> ensemble,
+                                boolean fallbackToRandom)
+            throws BKNotEnoughBookiesException;
+
+    /**
      * Handle bookies that left.
      *
      * @param leftBookies
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
index e3ce8ff..d41ca9f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
@@ -23,6 +23,10 @@ import java.util.ArrayList;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
+import org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy.Ensemble;
+import org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy.Predicate;
+import org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy.BookieNode;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.net.DNSToSwitchMapping;
 import org.apache.bookkeeper.net.Node;
@@ -54,18 +58,19 @@ public class RackawareEnsemblePlacementPolicy extends RackawareEnsemblePlacement
                                                           boolean isWeighted,
                                                           int maxWeightMultiple,
                                                           int minNumRacksPerWriteQuorum,
-                                                          StatsLogger statsLogger) {
+                                                          boolean enforceMinNumRacksPerWriteQuorum,
+            StatsLogger statsLogger) {
         if (stabilizePeriodSeconds > 0) {
             super.initialize(dnsResolver, timer, reorderReadsRandom, 0, reorderThresholdPendingRequests, isWeighted,
-                    maxWeightMultiple, minNumRacksPerWriteQuorum, statsLogger);
+                    maxWeightMultiple, minNumRacksPerWriteQuorum, enforceMinNumRacksPerWriteQuorum, statsLogger);
             slave = new RackawareEnsemblePlacementPolicyImpl(enforceDurability);
             slave.initialize(dnsResolver, timer, reorderReadsRandom, stabilizePeriodSeconds,
-                    reorderThresholdPendingRequests, isWeighted, maxWeightMultiple,
-                    minNumRacksPerWriteQuorum, statsLogger);
+                    reorderThresholdPendingRequests, isWeighted, maxWeightMultiple, minNumRacksPerWriteQuorum,
+                    enforceMinNumRacksPerWriteQuorum, statsLogger);
         } else {
             super.initialize(dnsResolver, timer, reorderReadsRandom, stabilizePeriodSeconds,
-                    reorderThresholdPendingRequests, isWeighted, maxWeightMultiple,
-                    minNumRacksPerWriteQuorum, statsLogger);
+                    reorderThresholdPendingRequests, isWeighted, maxWeightMultiple, minNumRacksPerWriteQuorum,
+                    enforceMinNumRacksPerWriteQuorum, statsLogger);
             slave = null;
         }
         return this;
@@ -171,15 +176,60 @@ public class RackawareEnsemblePlacementPolicy extends RackawareEnsemblePlacement
             String networkLoc,
             Set<Node> excludeBookies,
             Predicate<BookieNode> predicate,
-            Ensemble<BookieNode> ensemble)
+            Ensemble<BookieNode> ensemble,
+            boolean fallbackToRandom)
             throws BKException.BKNotEnoughBookiesException {
         try {
-            return super.selectFromNetworkLocation(networkLoc, excludeBookies, predicate, ensemble);
+            return super.selectFromNetworkLocation(networkLoc, excludeBookies, predicate, ensemble,
+                    fallbackToRandom);
         } catch (BKException.BKNotEnoughBookiesException bnebe) {
             if (slave == null) {
                 throw bnebe;
             } else {
-                return slave.selectFromNetworkLocation(networkLoc, excludeBookies, predicate, ensemble);
+                return slave.selectFromNetworkLocation(networkLoc, excludeBookies, predicate, ensemble,
+                        fallbackToRandom);
+            }
+        }
+    }
+
+    @Override
+    public BookieNode selectFromNetworkLocation(
+            Set<String> excludeRacks,
+            Set<Node> excludeBookies,
+            Predicate<BookieNode> predicate,
+            Ensemble<BookieNode> ensemble,
+            boolean fallbackToRandom)
+                    throws BKException.BKNotEnoughBookiesException {
+        try {
+            return super.selectFromNetworkLocation(excludeRacks, excludeBookies, predicate, ensemble, fallbackToRandom);
+        } catch (BKException.BKNotEnoughBookiesException bnebe) {
+            if (slave == null) {
+                throw bnebe;
+            } else {
+                return slave.selectFromNetworkLocation(excludeRacks, excludeBookies, predicate, ensemble,
+                        fallbackToRandom);
+            }
+        }
+    }
+
+    @Override
+    public BookieNode selectFromNetworkLocation(
+            String networkLoc,
+            Set<String> excludeRacks,
+            Set<Node> excludeBookies,
+            Predicate<BookieNode> predicate,
+            Ensemble<BookieNode> ensemble,
+            boolean fallbackToRandom)
+            throws BKNotEnoughBookiesException {
+        try {
+            return super.selectFromNetworkLocation(networkLoc, excludeRacks, excludeBookies, predicate, ensemble,
+                    fallbackToRandom);
+        } catch (BKException.BKNotEnoughBookiesException bnebe) {
+            if (slave == null) {
+                throw bnebe;
+            } else {
+                return slave.selectFromNetworkLocation(networkLoc, excludeRacks, excludeBookies, predicate, ensemble,
+                        fallbackToRandom);
             }
         }
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
index 23afaa9..ae640bb 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
@@ -79,7 +79,9 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP
     int maxWeightMultiple;
     private Map<BookieNode, WeightedObject> bookieInfoMap = new HashMap<BookieNode, WeightedObject>();
     private WeightedRandomSelection<BookieNode> weightedSelection;
+
     protected int minNumRacksPerWriteQuorum;
+    protected boolean enforceMinNumRacksPerWriteQuorum;
 
     public static final String REPP_DNS_RESOLVER_CLASS = "reppDnsResolverClass";
     public static final String REPP_RANDOM_READ_REORDERING = "ensembleRandomReadReordering";
@@ -238,6 +240,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP
                                                               boolean isWeighted,
                                                               int maxWeightMultiple,
                                                               int minNumRacksPerWriteQuorum,
+                                                              boolean enforceMinNumRacksPerWriteQuorum,
                                                               StatsLogger statsLogger) {
         checkNotNull(statsLogger, "statsLogger should not be null, use NullStatsLogger instead.");
         this.statsLogger = statsLogger;
@@ -250,6 +253,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP
         this.dnsResolver = new DNSResolverDecorator(dnsResolver, () -> this.getDefaultRack());
         this.timer = timer;
         this.minNumRacksPerWriteQuorum = minNumRacksPerWriteQuorum;
+        this.enforceMinNumRacksPerWriteQuorum = enforceMinNumRacksPerWriteQuorum;
 
         // create the network topology
         if (stabilizePeriodSeconds > 0) {
@@ -339,6 +343,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP
                 conf.getDiskWeightBasedPlacementEnabled(),
                 conf.getBookieMaxWeightMultipleForWeightBasedPlacement(),
                 conf.getMinNumRacksPerWriteQuorum(),
+                conf.getEnforceMinNumRacksPerWriteQuorum(),
                 statsLogger);
     }
 
@@ -536,6 +541,10 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP
             int numRacks = topology.getNumOfRacks();
             // only one rack, use the random algorithm.
             if (numRacks < 2) {
+                if (enforceMinNumRacksPerWriteQuorum && (minNumRacksPerWriteQuorumForThisEnsemble > 1)) {
+                    LOG.error("Only one rack available and minNumRacksPerWriteQuorum is enforced, so giving up");
+                    throw new BKNotEnoughBookiesException();
+                }
                 List<BookieNode> bns = selectRandom(ensembleSize, excludeNodes, TruePredicate.INSTANCE,
                         ensemble);
                 ArrayList<BookieSocketAddress> addrs = new ArrayList<BookieSocketAddress>(ensembleSize);
@@ -610,7 +619,9 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP
                     }
                     curRack = sb.toString();
                 }
-                prevNode = selectFromNetworkLocation(curRack, excludeNodes, ensemble, ensemble);
+                boolean firstBookieInTheEnsemble = (null == prevNode);
+                prevNode = selectFromNetworkLocation(curRack, excludeNodes, ensemble, ensemble,
+                        !enforceMinNumRacksPerWriteQuorum || firstBookieInTheEnsemble);
                 racks[i] = prevNode.getNetworkLocation();
             }
             ArrayList<BookieSocketAddress> bookieList = ensemble.toList();
@@ -657,7 +668,8 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP
                     networkLocationsToBeExcluded,
                     excludeNodes,
                     TruePredicate.INSTANCE,
-                    EnsembleForReplacementWithNoConstraints.INSTANCE);
+                    EnsembleForReplacementWithNoConstraints.INSTANCE,
+                    !enforceMinNumRacksPerWriteQuorum);
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Bookie {} is chosen to replace bookie {}.", candidate, bn);
             }
@@ -698,12 +710,20 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP
             String networkLoc,
             Set<Node> excludeBookies,
             Predicate<BookieNode> predicate,
-            Ensemble<BookieNode> ensemble)
+            Ensemble<BookieNode> ensemble,
+            boolean fallbackToRandom)
             throws BKNotEnoughBookiesException {
         // select one from local rack
         try {
             return selectRandomFromRack(networkLoc, excludeBookies, predicate, ensemble);
         } catch (BKNotEnoughBookiesException e) {
+            if (!fallbackToRandom) {
+                LOG.error(
+                        "Failed to choose a bookie from {} : "
+                                + "excluded {}, enforceMinNumRacksPerWriteQuorum is enabled so giving up.",
+                        networkLoc, excludeBookies);
+                throw e;
+            }
             LOG.warn("Failed to choose a bookie from {} : "
                      + "excluded {}, fallback to choose bookie randomly from the cluster.",
                      networkLoc, excludeBookies);
@@ -712,28 +732,25 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP
         }
     }
 
-    protected BookieNode selectFromNetworkLocation(String networkLoc,
+    @Override
+    public BookieNode selectFromNetworkLocation(String networkLoc,
                                                    Set<String> excludeRacks,
                                                    Set<Node> excludeBookies,
                                                    Predicate<BookieNode> predicate,
-                                                   Ensemble<BookieNode> ensemble)
+                                                   Ensemble<BookieNode> ensemble,
+                                                   boolean fallbackToRandom)
             throws BKNotEnoughBookiesException {
         // first attempt to select one from local rack
         try {
             return selectRandomFromRack(networkLoc, excludeBookies, predicate, ensemble);
         } catch (BKNotEnoughBookiesException e) {
-            if (isWeighted) {
-                // if weight based selection is enabled, randomly select one from the whole cluster
-                // based on weights and ignore the provided <tt>excludeRacks</tt>.
-                // randomly choose one from whole cluster, ignore the provided predicate.
-                return selectRandom(1, excludeBookies, predicate, ensemble).get(0);
-            } else {
-                // if weight based selection is disabled, and there is no enough bookie from local rack,
-                // select bookies from the whole cluster and exclude the racks specified at <tt>excludeRacks</tt>.
-                return selectFromNetworkLocation(excludeRacks, excludeBookies, predicate, ensemble);
-            }
+            /*
+             * there is no enough bookie from local rack, select bookies from
+             * the whole cluster and exclude the racks specified at
+             * <tt>excludeRacks</tt>.
+             */
+            return selectFromNetworkLocation(excludeRacks, excludeBookies, predicate, ensemble, fallbackToRandom);
         }
-
     }
 
 
@@ -742,27 +759,39 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP
      * <i>excludeBookies</i> set. If it fails to find one, it selects a random {@link BookieNode} from the whole
      * cluster.
      */
-    protected BookieNode selectFromNetworkLocation(Set<String> excludeRacks,
+    @Override
+    public BookieNode selectFromNetworkLocation(Set<String> excludeRacks,
                                                    Set<Node> excludeBookies,
                                                    Predicate<BookieNode> predicate,
-                                                   Ensemble<BookieNode> ensemble)
+                                                   Ensemble<BookieNode> ensemble,
+                                                   boolean fallbackToRandom)
             throws BKNotEnoughBookiesException {
-        List<BookieNode> knownNodes = new ArrayList<>(knownBookies.values());
-        Collections.shuffle(knownNodes);
 
+        List<BookieNode> knownNodes = new ArrayList<>(knownBookies.values());
+        Set<Node> fullExclusionBookiesList = new HashSet<Node>(excludeBookies);
         for (BookieNode knownNode : knownNodes) {
-            if (excludeBookies.contains(knownNode)) {
-                continue;
-            }
             if (excludeRacks.contains(knownNode.getNetworkLocation())) {
-                continue;
+                fullExclusionBookiesList.add(knownNode);
+            }
+        }
+
+        try {
+            return selectRandomInternal(knownNodes, 1, fullExclusionBookiesList, TruePredicate.INSTANCE,
+                    EnsembleForReplacementWithNoConstraints.INSTANCE).get(0);
+        } catch (BKNotEnoughBookiesException e) {
+            if (!fallbackToRandom) {
+                LOG.error(
+                        "Failed to choose a bookie excluding Racks: {} "
+                                + "Nodes: {}, enforceMinNumRacksPerWriteQuorum is enabled so giving up.",
+                        excludeRacks, excludeBookies);
+                throw e;
             }
-            return knownNode;
+
+            LOG.warn("Failed to choose a bookie: excluded {}, fallback to choose bookie randomly from the cluster.",
+                    excludeBookies);
+            // randomly choose one from whole cluster
+            return selectRandom(1, excludeBookies, predicate, ensemble).get(0);
         }
-        LOG.warn("Failed to choose a bookie: excluded {}, fallback to choose bookie randomly from the cluster.",
-                excludeBookies);
-        // randomly choose one from whole cluster
-        return selectRandom(1, excludeBookies, predicate, ensemble).get(0);
     }
 
     private WeightedRandomSelection<BookieNode> prepareForWeightedSelection(List<Node> leaves) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
index 218781b..dcd4b17 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
@@ -131,7 +131,7 @@ public class RegionAwareEnsemblePlacementPolicy extends RackawareEnsemblePlaceme
                 perRegionPlacement.put(region, new RackawareEnsemblePlacementPolicy()
                         .initialize(dnsResolver, timer, this.reorderReadsRandom, this.stabilizePeriodSeconds,
                                 this.reorderThresholdPendingRequests, this.isWeighted, this.maxWeightMultiple,
-                                this.minNumRacksPerWriteQuorum, statsLogger)
+                                this.minNumRacksPerWriteQuorum, this.enforceMinNumRacksPerWriteQuorum, statsLogger)
                         .withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK));
             }
 
@@ -176,11 +176,11 @@ public class RegionAwareEnsemblePlacementPolicy extends RackawareEnsemblePlaceme
             // Regions are specified as
             // R1;R2;...
             String[] regions = regionsString.split(";");
-            for (String region: regions) {
+            for (String region : regions) {
                 perRegionPlacement.put(region, new RackawareEnsemblePlacementPolicy(true)
                         .initialize(dnsResolver, timer, this.reorderReadsRandom, this.stabilizePeriodSeconds,
                                 this.reorderThresholdPendingRequests, this.isWeighted, this.maxWeightMultiple,
-                                this.minNumRacksPerWriteQuorum, statsLogger)
+                                this.minNumRacksPerWriteQuorum, this.enforceMinNumRacksPerWriteQuorum, statsLogger)
                         .withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK));
             }
             minRegionsForDurability = conf.getInt(REPP_MINIMUM_REGIONS_FOR_DURABILITY,
@@ -502,7 +502,8 @@ public class RegionAwareEnsemblePlacementPolicy extends RackawareEnsemblePlaceme
                         bookieNodeToReplace.getNetworkLocation(),
                         excludeBookies,
                         TruePredicate.INSTANCE,
-                        EnsembleForReplacementWithNoConstraints.INSTANCE);
+                        EnsembleForReplacementWithNoConstraints.INSTANCE,
+                        true);
                 } catch (BKException.BKNotEnoughBookiesException e) {
                     LOG.warn("Failed to choose a bookie from {} : "
                             + "excluded {}, fallback to choose bookie randomly from the cluster.",
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
index c813d5f..2c80f43 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
@@ -144,6 +144,9 @@ public abstract class AbstractConfiguration<T extends AbstractConfiguration>
     // minimum number of racks per write quorum
     public static final String MIN_NUM_RACKS_PER_WRITE_QUORUM = "minNumRacksPerWriteQuorum";
 
+    // enforce minimum number of racks per write quorum
+    public static final String ENFORCE_MIN_NUM_RACKS_PER_WRITE_QUORUM = "enforceMinNumRacksPerWriteQuorum";
+
     protected AbstractConfiguration() {
         super();
         if (READ_SYSTEM_PROPERTIES) {
@@ -796,6 +799,20 @@ public abstract class AbstractConfiguration<T extends AbstractConfiguration>
     }
 
     /**
+     * Set the flag to enforce minimum number of racks per write quorum.
+     */
+    public void setEnforceMinNumRacksPerWriteQuorum(boolean enforceMinNumRacksPerWriteQuorum) {
+        setProperty(ENFORCE_MIN_NUM_RACKS_PER_WRITE_QUORUM, enforceMinNumRacksPerWriteQuorum);
+    }
+
+    /**
+     * Get the flag which enforces the minimum number of racks per write quorum.
+     */
+    public boolean getEnforceMinNumRacksPerWriteQuorum() {
+        return getBoolean(ENFORCE_MIN_NUM_RACKS_PER_WRITE_QUORUM, false);
+    }
+
+    /**
      * Trickery to allow inheritance with fluent style.
      */
     protected abstract T getThis();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
index cdff679..e085006 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
@@ -17,8 +17,8 @@
  */
 package org.apache.bookkeeper.client;
 
-import static org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy.REPP_DNS_RESOLVER_CLASS;
-import static org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy.shuffleWithMask;
+import static org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl.REPP_DNS_RESOLVER_CLASS;
+import static org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl.shuffleWithMask;
 import static org.apache.bookkeeper.client.RoundRobinDistributionSchedule.writeSetFromValues;
 import static org.apache.bookkeeper.feature.SettableFeatureProvider.DISABLE_ALL;
 
@@ -28,8 +28,10 @@ import io.netty.util.HashedWheelTimer;
 
 import java.net.InetAddress;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -39,10 +41,14 @@ import junit.framework.TestCase;
 
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
 import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
+import org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy.BookieNode;
+import org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy.EnsembleForReplacementWithNoConstraints;
+import org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy.TruePredicate;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.net.DNSToSwitchMapping;
 import org.apache.bookkeeper.net.NetworkTopology;
+import org.apache.bookkeeper.net.Node;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.util.StaticDNSResolver;
 import org.junit.Test;
@@ -673,6 +679,504 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
     }
 
     @Test
+    public void testSingleRackWithEnforceMinNumRacks() throws Exception {
+        repp.uninitalize();
+        updateMyRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        StaticDNSResolver.addNodeToRack(addr1.getHostName(), NetworkTopology.DEFAULT_REGION_AND_RACK);
+        StaticDNSResolver.addNodeToRack(addr2.getHostName(), NetworkTopology.DEFAULT_REGION_AND_RACK);
+        StaticDNSResolver.addNodeToRack(addr3.getHostName(), NetworkTopology.DEFAULT_REGION_AND_RACK);
+        StaticDNSResolver.addNodeToRack(addr4.getHostName(), NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        ClientConfiguration clientConf = new ClientConfiguration(conf);
+        clientConf.setMinNumRacksPerWriteQuorum(2);
+        clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
+        repp = new RackawareEnsemblePlacementPolicy();
+        repp.initialize(clientConf, Optional.<DNSToSwitchMapping> empty(), timer, DISABLE_ALL,
+                NullStatsLogger.INSTANCE);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+        addrs.add(addr1);
+        addrs.add(addr2);
+        addrs.add(addr3);
+        addrs.add(addr4);
+        repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+
+        ArrayList<BookieSocketAddress> ensemble;
+        try {
+            ensemble = repp.newEnsemble(3, 2, 2, null, new HashSet<>());
+            fail("Should get not enough bookies exception since there is only one rack.");
+        } catch (BKNotEnoughBookiesException bnebe) {
+        }
+
+        try {
+            ensemble = repp.newEnsemble(3, 2, 2, new HashSet<>(), EnsembleForReplacementWithNoConstraints.INSTANCE,
+                    TruePredicate.INSTANCE);
+            fail("Should get not enough bookies exception since there is only one rack.");
+        } catch (BKNotEnoughBookiesException bnebe) {
+        }
+    }
+
+    @Test
+    public void testNewEnsembleWithEnforceMinNumRacks() throws Exception {
+        repp.uninitalize();
+
+        int minNumRacksPerWriteQuorum = 4;
+        ClientConfiguration clientConf = new ClientConfiguration(conf);
+        clientConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorum);
+        // set enforceMinNumRacksPerWriteQuorum
+        clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
+        repp = new RackawareEnsemblePlacementPolicy();
+        repp.initialize(clientConf, Optional.<DNSToSwitchMapping> empty(), timer, DISABLE_ALL,
+                NullStatsLogger.INSTANCE);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        int numOfRacks = 3;
+        int numOfBookiesPerRack = 5;
+        BookieSocketAddress[] bookieSocketAddresses = new BookieSocketAddress[numOfRacks * numOfBookiesPerRack];
+
+        for (int i = 0; i < numOfRacks; i++) {
+            for (int j = 0; j < numOfBookiesPerRack; j++) {
+                int index = i * numOfBookiesPerRack + j;
+                bookieSocketAddresses[index] = new BookieSocketAddress("128.0.0." + index, 3181);
+                StaticDNSResolver.addNodeToRack(bookieSocketAddresses[index].getHostName(), "/default-region/r" + i);
+            }
+        }
+
+        repp.onClusterChanged(new HashSet<BookieSocketAddress>(Arrays.asList(bookieSocketAddresses)),
+                new HashSet<BookieSocketAddress>());
+
+        try {
+            repp.newEnsemble(8, 4, 4, null, new HashSet<>());
+            fail("Should get not enough bookies exception since there are only 3 racks");
+        } catch (BKNotEnoughBookiesException bnebe) {
+        }
+
+        try {
+            repp.newEnsemble(8, 4, 4, new HashSet<>(),
+                    EnsembleForReplacementWithNoConstraints.INSTANCE, TruePredicate.INSTANCE);
+            fail("Should get not enough bookies exception since there are only 3 racks");
+        } catch (BKNotEnoughBookiesException bnebe) {
+        }
+
+        /*
+         * Though minNumRacksPerWriteQuorum is set to 4, since writeQuorum is 3
+         * and there are enough bookies in 3 racks, this newEnsemble calls should
+         * succeed.
+         */
+        ArrayList<BookieSocketAddress> ensemble;
+        int ensembleSize = numOfRacks * numOfBookiesPerRack;
+        int writeQuorumSize = numOfRacks;
+        int ackQuorumSize = numOfRacks;
+
+        ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, null, new HashSet<>());
+        assertEquals("Number of writeQuorum sets covered", ensembleSize,
+                getNumCoveredWriteQuorums(ensemble, writeQuorumSize, clientConf.getMinNumRacksPerWriteQuorum()));
+
+        ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, new HashSet<>(),
+                EnsembleForReplacementWithNoConstraints.INSTANCE, TruePredicate.INSTANCE);
+        assertEquals("Number of writeQuorum sets covered", ensembleSize,
+                getNumCoveredWriteQuorums(ensemble, writeQuorumSize, clientConf.getMinNumRacksPerWriteQuorum()));
+    }
+
+    @Test
+    public void testNewEnsembleWithSufficientRacksAndEnforceMinNumRacks() throws Exception {
+        repp.uninitalize();
+
+        int minNumRacksPerWriteQuorum = 4;
+        ClientConfiguration clientConf = new ClientConfiguration(conf);
+        clientConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorum);
+        // set enforceMinNumRacksPerWriteQuorum
+        clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
+        repp = new RackawareEnsemblePlacementPolicy();
+        repp.initialize(clientConf, Optional.<DNSToSwitchMapping> empty(), timer, DISABLE_ALL,
+                NullStatsLogger.INSTANCE);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        int writeQuorumSize = 3;
+        int ackQuorumSize = 3;
+        int effectiveMinNumRacksPerWriteQuorum = Math.min(minNumRacksPerWriteQuorum, writeQuorumSize);
+
+        int numOfRacks = 2 * effectiveMinNumRacksPerWriteQuorum - 1;
+        int numOfBookiesPerRack = 20;
+        BookieSocketAddress[] bookieSocketAddresses = new BookieSocketAddress[numOfRacks * numOfBookiesPerRack];
+
+        for (int i = 0; i < numOfRacks; i++) {
+            for (int j = 0; j < numOfBookiesPerRack; j++) {
+                int index = i * numOfBookiesPerRack + j;
+                bookieSocketAddresses[index] = new BookieSocketAddress("128.0.0." + index, 3181);
+                StaticDNSResolver.addNodeToRack(bookieSocketAddresses[index].getHostName(), "/default-region/r" + i);
+            }
+        }
+
+        Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+        repp.onClusterChanged(new HashSet<BookieSocketAddress>(Arrays.asList(bookieSocketAddresses)),
+                new HashSet<BookieSocketAddress>());
+
+        /*
+         * in this scenario we have enough number of racks (2 *
+         * effectiveMinNumRacksPerWriteQuorum - 1) and more number of bookies in
+         * each rack. So we should be able to create ensemble for all
+         * ensembleSizes (as long as there are enough number of bookies in each
+         * rack).
+         */
+        ArrayList<BookieSocketAddress> ensemble;
+        for (int ensembleSize = effectiveMinNumRacksPerWriteQuorum; ensembleSize < 40; ensembleSize++) {
+            ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, null, new HashSet<>());
+            assertEquals("Number of writeQuorum sets covered", ensembleSize,
+                    getNumCoveredWriteQuorums(ensemble, writeQuorumSize, clientConf.getMinNumRacksPerWriteQuorum()));
+
+            ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, new HashSet<>(),
+                    EnsembleForReplacementWithNoConstraints.INSTANCE, TruePredicate.INSTANCE);
+            assertEquals("Number of writeQuorum sets covered", ensembleSize,
+                    getNumCoveredWriteQuorums(ensemble, writeQuorumSize, clientConf.getMinNumRacksPerWriteQuorum()));
+        }
+    }
+
+    @Test
+    public void testReplaceBookieWithEnforceMinNumRacks() throws Exception {
+        repp.uninitalize();
+
+        int minNumRacksPerWriteQuorum = 4;
+        ClientConfiguration clientConf = new ClientConfiguration(conf);
+        clientConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorum);
+        // set enforceMinNumRacksPerWriteQuorum
+        clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
+        repp = new RackawareEnsemblePlacementPolicy();
+        repp.initialize(clientConf, Optional.<DNSToSwitchMapping> empty(), timer, DISABLE_ALL,
+                NullStatsLogger.INSTANCE);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        int numOfRacks = 3;
+        int numOfBookiesPerRack = 5;
+        Set<BookieSocketAddress> bookieSocketAddresses = new HashSet<BookieSocketAddress>();
+        Map<BookieSocketAddress, String> bookieRackMap = new HashMap<BookieSocketAddress, String>();
+        BookieSocketAddress bookieAddress;
+        String rack;
+        for (int i = 0; i < numOfRacks; i++) {
+            for (int j = 0; j < numOfBookiesPerRack; j++) {
+                int index = i * numOfBookiesPerRack + j;
+                bookieAddress = new BookieSocketAddress("128.0.0." + index, 3181);
+                rack = "/default-region/r" + i;
+                StaticDNSResolver.addNodeToRack(bookieAddress.getHostName(), rack);
+                bookieSocketAddresses.add(bookieAddress);
+                bookieRackMap.put(bookieAddress, rack);
+            }
+        }
+
+        repp.onClusterChanged(bookieSocketAddresses, new HashSet<BookieSocketAddress>());
+
+        /*
+         * Though minNumRacksPerWriteQuorum is set to 4, since writeQuorum is 3
+         * and there are enough bookies in 3 racks, this newEnsemble call should
+         * succeed.
+         */
+        ArrayList<BookieSocketAddress> ensemble;
+        int ensembleSize = numOfRacks * numOfBookiesPerRack;
+        int writeQuorumSize = numOfRacks;
+        int ackQuorumSize = numOfRacks;
+
+        ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, null, new HashSet<>());
+
+        BookieSocketAddress bookieInEnsembleToBeReplaced = ensemble.get(7);
+        // get rack of some other bookie
+        String rackOfOtherBookieInEnsemble = bookieRackMap.get(ensemble.get(8));
+        BookieSocketAddress newBookieAddress1 = new BookieSocketAddress("128.0.0.100", 3181);
+        /*
+         * add the newBookie to the rack of some other bookie in the current
+         * ensemble
+         */
+        StaticDNSResolver.addNodeToRack(newBookieAddress1.getHostName(), rackOfOtherBookieInEnsemble);
+        bookieSocketAddresses.add(newBookieAddress1);
+        bookieRackMap.put(newBookieAddress1, rackOfOtherBookieInEnsemble);
+
+        repp.onClusterChanged(bookieSocketAddresses, new HashSet<BookieSocketAddress>());
+        try {
+            repp.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, null,
+                    new HashSet<BookieSocketAddress>(ensemble), bookieInEnsembleToBeReplaced, new HashSet<>());
+            fail("Should get not enough bookies exception since there are no more bookies in rack"
+                    + "of 'bookieInEnsembleToReplace'"
+                    + "and new bookie added belongs to the rack of some other bookie in the ensemble");
+        } catch (BKNotEnoughBookiesException bnebe) {
+            // this is expected
+        }
+
+        String newRack = "/default-region/r100";
+        BookieSocketAddress newBookieAddress2 = new BookieSocketAddress("128.0.0.101", 3181);
+        /*
+         * add the newBookie to a new rack.
+         */
+        StaticDNSResolver.addNodeToRack(newBookieAddress2.getHostName(), newRack);
+        bookieSocketAddresses.add(newBookieAddress2);
+        bookieRackMap.put(newBookieAddress2, newRack);
+
+        repp.onClusterChanged(bookieSocketAddresses, new HashSet<BookieSocketAddress>());
+        /*
+         * this replaceBookie should succeed, because a new bookie is added to a
+         * new rack.
+         */
+        BookieSocketAddress replacedBookieAddress = repp.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize,
+                null, new HashSet<BookieSocketAddress>(ensemble), bookieInEnsembleToBeReplaced, new HashSet<>());
+        assertEquals("It should be newBookieAddress2", newBookieAddress2, replacedBookieAddress);
+
+        Set<BookieSocketAddress> bookiesToExclude = new HashSet<>();
+        bookiesToExclude.add(newBookieAddress2);
+        repp.onClusterChanged(bookieSocketAddresses, new HashSet<BookieSocketAddress>());
+        try {
+            repp.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, null,
+                    new HashSet<BookieSocketAddress>(ensemble), bookieInEnsembleToBeReplaced, bookiesToExclude);
+            fail("Should get not enough bookies exception since the only available bookie to replace"
+                    + "is added to excludedBookies list");
+        } catch (BKNotEnoughBookiesException bnebe) {
+            // this is expected
+        }
+
+        // get rack of the bookie to be replaced
+        String rackOfBookieToBeReplaced = bookieRackMap.get(bookieInEnsembleToBeReplaced);
+        BookieSocketAddress newBookieAddress3 = new BookieSocketAddress("128.0.0.102", 3181);
+        /*
+         * add the newBookie to rack of the bookie to be replaced.
+         */
+        StaticDNSResolver.addNodeToRack(newBookieAddress3.getHostName(), rackOfBookieToBeReplaced);
+        bookieSocketAddresses.add(newBookieAddress3);
+        bookieRackMap.put(newBookieAddress3, rackOfBookieToBeReplaced);
+
+        repp.onClusterChanged(bookieSocketAddresses, new HashSet<BookieSocketAddress>());
+        /*
+         * here we have added new bookie to the rack of the bookie to be
+         * replaced, so we should be able to replacebookie though
+         * newBookieAddress2 is added to excluded bookies list.
+         */
+        replacedBookieAddress = repp.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, null,
+                new HashSet<BookieSocketAddress>(ensemble), bookieInEnsembleToBeReplaced, bookiesToExclude);
+        assertEquals("It should be newBookieAddress3", newBookieAddress3, replacedBookieAddress);
+    }
+
+    @Test
+    public void testSelectBookieFromNetworkLoc() throws Exception {
+        repp.uninitalize();
+
+        int minNumRacksPerWriteQuorum = 4;
+        ClientConfiguration clientConf = new ClientConfiguration(conf);
+        clientConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorum);
+        // set enforceMinNumRacksPerWriteQuorum
+        clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
+        repp = new RackawareEnsemblePlacementPolicy();
+        repp.initialize(clientConf, Optional.<DNSToSwitchMapping> empty(), timer, DISABLE_ALL,
+                NullStatsLogger.INSTANCE);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        int numOfRacks = 3;
+        int numOfBookiesPerRack = 5;
+        String[] rackLocationNames = new String[numOfRacks];
+        List<BookieSocketAddress> bookieSocketAddresses = new ArrayList<BookieSocketAddress>();
+        Map<BookieSocketAddress, String> bookieRackMap = new HashMap<BookieSocketAddress, String>();
+        BookieSocketAddress bookieAddress;
+
+        for (int i = 0; i < numOfRacks; i++) {
+            rackLocationNames[i] = "/default-region/r" + i;
+            for (int j = 0; j < numOfBookiesPerRack; j++) {
+                int index = i * numOfBookiesPerRack + j;
+                bookieAddress = new BookieSocketAddress("128.0.0." + index, 3181);
+                StaticDNSResolver.addNodeToRack(bookieAddress.getHostName(), rackLocationNames[i]);
+                bookieSocketAddresses.add(bookieAddress);
+                bookieRackMap.put(bookieAddress, rackLocationNames[i]);
+            }
+        }
+        String nonExistingRackLocation = "/default-region/r25";
+
+        repp.onClusterChanged(new HashSet<BookieSocketAddress>(bookieSocketAddresses),
+                new HashSet<BookieSocketAddress>());
+
+        String rack = bookieRackMap.get(bookieSocketAddresses.get(0));
+        BookieNode bookieNode = repp.selectFromNetworkLocation(rack, new HashSet<Node>(), TruePredicate.INSTANCE,
+                EnsembleForReplacementWithNoConstraints.INSTANCE, false);
+        String recRack = bookieNode.getNetworkLocation();
+        assertEquals("Rack of node", rack, recRack);
+
+        try {
+            repp.selectFromNetworkLocation(nonExistingRackLocation, new HashSet<Node>(), TruePredicate.INSTANCE,
+                    EnsembleForReplacementWithNoConstraints.INSTANCE, false);
+            fail("Should get not enough bookies exception since there are no bookies in this rack");
+        } catch (BKNotEnoughBookiesException bnebe) {
+            // this is expected
+        }
+
+        // it should not fail, since fallback is set to true and it should pick
+        // some random one
+        repp.selectFromNetworkLocation(nonExistingRackLocation, new HashSet<Node>(), TruePredicate.INSTANCE,
+                EnsembleForReplacementWithNoConstraints.INSTANCE, true);
+
+        Set<BookieSocketAddress> excludeBookiesOfRackR0 = new HashSet<BookieSocketAddress>();
+        for (int i = 0; i < numOfBookiesPerRack; i++) {
+            excludeBookiesOfRackR0.add(bookieSocketAddresses.get(i));
+        }
+
+        Set<Node> excludeBookieNodesOfRackR0 = repp.convertBookiesToNodes(excludeBookiesOfRackR0);
+        try {
+            repp.selectFromNetworkLocation(bookieRackMap.get(bookieSocketAddresses.get(0)), excludeBookieNodesOfRackR0,
+                    TruePredicate.INSTANCE, EnsembleForReplacementWithNoConstraints.INSTANCE, false);
+            fail("Should get not enough bookies exception since all the bookies in r0 are added to the exclusion list");
+        } catch (BKNotEnoughBookiesException bnebe) {
+            // this is expected
+        }
+
+        // not expected to get exception since fallback is set to true
+        bookieNode = repp.selectFromNetworkLocation(bookieRackMap.get(bookieSocketAddresses.get(0)),
+                excludeBookieNodesOfRackR0, TruePredicate.INSTANCE, EnsembleForReplacementWithNoConstraints.INSTANCE,
+                true);
+        assertTrue("BookieNode should not be from Rack /r0" + bookieNode.getNetworkLocation(),
+                rackLocationNames[1].equals(bookieNode.getNetworkLocation())
+                        || rackLocationNames[2].equals(bookieNode.getNetworkLocation()));
+    }
+
+    @Test
+    public void testSelectBookieFromExcludingRacks() throws Exception {
+        repp.uninitalize();
+
+        int minNumRacksPerWriteQuorum = 4;
+        ClientConfiguration clientConf = new ClientConfiguration(conf);
+        clientConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorum);
+        // set enforceMinNumRacksPerWriteQuorum
+        clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
+        repp = new RackawareEnsemblePlacementPolicy();
+        repp.initialize(clientConf, Optional.<DNSToSwitchMapping> empty(), timer, DISABLE_ALL,
+                NullStatsLogger.INSTANCE);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        int numOfRacks = 3;
+        int numOfBookiesPerRack = 5;
+        String[] rackLocationNames = new String[numOfRacks];
+        List<BookieSocketAddress> bookieSocketAddresses = new ArrayList<BookieSocketAddress>();
+        Map<BookieSocketAddress, String> bookieRackMap = new HashMap<BookieSocketAddress, String>();
+        BookieSocketAddress bookieAddress;
+
+        for (int i = 0; i < numOfRacks; i++) {
+            rackLocationNames[i] = "/default-region/r" + i;
+            for (int j = 0; j < numOfBookiesPerRack; j++) {
+                int index = i * numOfBookiesPerRack + j;
+                bookieAddress = new BookieSocketAddress("128.0.0." + index, 3181);
+                StaticDNSResolver.addNodeToRack(bookieAddress.getHostName(), rackLocationNames[i]);
+                bookieSocketAddresses.add(bookieAddress);
+                bookieRackMap.put(bookieAddress, rackLocationNames[i]);
+            }
+        }
+
+        repp.onClusterChanged(new HashSet<BookieSocketAddress>(bookieSocketAddresses),
+                new HashSet<BookieSocketAddress>());
+
+        Set<BookieSocketAddress> excludeBookiesOfRackR0 = new HashSet<BookieSocketAddress>();
+        for (int i = 0; i < numOfBookiesPerRack; i++) {
+            excludeBookiesOfRackR0.add(bookieSocketAddresses.get(i));
+        }
+
+        Set<Node> excludeBookieNodesOfRackR0 = repp.convertBookiesToNodes(excludeBookiesOfRackR0);
+
+        Set<String> excludeRacksRackR1AndR2 = new HashSet<String>();
+        excludeRacksRackR1AndR2.add(rackLocationNames[1]);
+        excludeRacksRackR1AndR2.add(rackLocationNames[2]);
+
+        try {
+            repp.selectFromNetworkLocation(excludeRacksRackR1AndR2, excludeBookieNodesOfRackR0, TruePredicate.INSTANCE,
+                    EnsembleForReplacementWithNoConstraints.INSTANCE, false);
+            fail("Should get not enough bookies exception racks R1 and R2 are"
+                    + "excluded and all the bookies in r0 are added to the exclusion list");
+        } catch (BKNotEnoughBookiesException bnebe) {
+            // this is expected
+        }
+
+        BookieNode bookieNode = repp.selectFromNetworkLocation(excludeRacksRackR1AndR2, new HashSet<Node>(),
+                TruePredicate.INSTANCE, EnsembleForReplacementWithNoConstraints.INSTANCE, false);
+        assertTrue("BookieNode should be from Rack /r0" + bookieNode.getNetworkLocation(),
+                rackLocationNames[0].equals(bookieNode.getNetworkLocation()));
+
+        // not expected to get exception since fallback is set to true
+        bookieNode = repp.selectFromNetworkLocation(excludeRacksRackR1AndR2, excludeBookieNodesOfRackR0,
+                TruePredicate.INSTANCE, EnsembleForReplacementWithNoConstraints.INSTANCE, true);
+        assertTrue("BookieNode should not be from Rack /r0" + bookieNode.getNetworkLocation(),
+                rackLocationNames[1].equals(bookieNode.getNetworkLocation())
+                        || rackLocationNames[2].equals(bookieNode.getNetworkLocation()));
+    }
+
+    @Test
+    public void testSelectBookieFromNetworkLocAndExcludingRacks() throws Exception {
+        repp.uninitalize();
+
+        int minNumRacksPerWriteQuorum = 4;
+        ClientConfiguration clientConf = new ClientConfiguration(conf);
+        clientConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorum);
+        // set enforceMinNumRacksPerWriteQuorum
+        clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
+        repp = new RackawareEnsemblePlacementPolicy();
+        repp.initialize(clientConf, Optional.<DNSToSwitchMapping> empty(), timer, DISABLE_ALL,
+                NullStatsLogger.INSTANCE);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        int numOfRacks = 3;
+        int numOfBookiesPerRack = 5;
+        String[] rackLocationNames = new String[numOfRacks];
+        List<BookieSocketAddress> bookieSocketAddresses = new ArrayList<BookieSocketAddress>();
+        Map<BookieSocketAddress, String> bookieRackMap = new HashMap<BookieSocketAddress, String>();
+        BookieSocketAddress bookieAddress;
+
+        for (int i = 0; i < numOfRacks; i++) {
+            rackLocationNames[i] = "/default-region/r" + i;
+            for (int j = 0; j < numOfBookiesPerRack; j++) {
+                int index = i * numOfBookiesPerRack + j;
+                bookieAddress = new BookieSocketAddress("128.0.0." + index, 3181);
+                StaticDNSResolver.addNodeToRack(bookieAddress.getHostName(), rackLocationNames[i]);
+                bookieSocketAddresses.add(bookieAddress);
+                bookieRackMap.put(bookieAddress, rackLocationNames[i]);
+            }
+        }
+        String nonExistingRackLocation = "/default-region/r25";
+
+        repp.onClusterChanged(new HashSet<BookieSocketAddress>(bookieSocketAddresses),
+                new HashSet<BookieSocketAddress>());
+
+        Set<BookieSocketAddress> excludeBookiesOfRackR0 = new HashSet<BookieSocketAddress>();
+        for (int i = 0; i < numOfBookiesPerRack; i++) {
+            excludeBookiesOfRackR0.add(bookieSocketAddresses.get(i));
+        }
+
+        Set<Node> excludeBookieNodesOfRackR0 = repp.convertBookiesToNodes(excludeBookiesOfRackR0);
+
+        Set<String> excludeRacksRackR1AndR2 = new HashSet<String>();
+        excludeRacksRackR1AndR2.add(rackLocationNames[1]);
+        excludeRacksRackR1AndR2.add(rackLocationNames[2]);
+
+        try {
+            repp.selectFromNetworkLocation(nonExistingRackLocation, excludeRacksRackR1AndR2,
+                    excludeBookieNodesOfRackR0,
+                    TruePredicate.INSTANCE, EnsembleForReplacementWithNoConstraints.INSTANCE, false);
+            fail("Should get not enough bookies exception racks R1 and R2 are excluded and all the bookies in"
+                    + "r0 are added to the exclusion list");
+        } catch (BKNotEnoughBookiesException bnebe) {
+            // this is expected
+        }
+
+        BookieNode bookieNode = repp.selectFromNetworkLocation(rackLocationNames[0], excludeRacksRackR1AndR2,
+                new HashSet<Node>(), TruePredicate.INSTANCE, EnsembleForReplacementWithNoConstraints.INSTANCE, false);
+        assertTrue("BookieNode should be from Rack /r0" + bookieNode.getNetworkLocation(),
+                rackLocationNames[0].equals(bookieNode.getNetworkLocation()));
+
+        bookieNode = repp.selectFromNetworkLocation(rackLocationNames[0], new HashSet<String>(),
+                excludeBookieNodesOfRackR0, TruePredicate.INSTANCE,
+                EnsembleForReplacementWithNoConstraints.INSTANCE, false);
+        assertTrue("BookieNode should not be from Rack /r0" + bookieNode.getNetworkLocation(),
+                rackLocationNames[1].equals(bookieNode.getNetworkLocation())
+                        || rackLocationNames[2].equals(bookieNode.getNetworkLocation()));
+
+        bookieNode = repp.selectFromNetworkLocation(nonExistingRackLocation, excludeRacksRackR1AndR2,
+                excludeBookieNodesOfRackR0, TruePredicate.INSTANCE, EnsembleForReplacementWithNoConstraints.INSTANCE,
+                true);
+        assertTrue("BookieNode should not be from Rack /r0" + bookieNode.getNetworkLocation(),
+                rackLocationNames[1].equals(bookieNode.getNetworkLocation())
+                        || rackLocationNames[2].equals(bookieNode.getNetworkLocation()));
+    }
+
+    @Test
     public void testNewEnsembleWithMultipleRacks() throws Exception {
         BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);
         BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181);
@@ -904,12 +1408,15 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
 
     @Test
     public void testWeightedPlacementAndReplaceBookieWithoutEnoughBookiesInSameRack() throws Exception {
+        BookieSocketAddress addr0 = new BookieSocketAddress("126.0.0.1", 3181);
         BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);
         BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181);
         BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181);
         BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181);
         // update dns mapping
         StaticDNSResolver.reset();
+        StaticDNSResolver.addNodeToRack(addr0.getSocketAddress().getAddress().getHostAddress(),
+                NetworkTopology.DEFAULT_REGION + "/r0");
         StaticDNSResolver.addNodeToRack(addr1.getSocketAddress().getAddress().getHostAddress(),
                 NetworkTopology.DEFAULT_REGION_AND_RACK);
         StaticDNSResolver.addNodeToRack(addr2.getSocketAddress().getAddress().getHostAddress(),
@@ -920,6 +1427,7 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
                 NetworkTopology.DEFAULT_REGION + "/r4");
         // Update cluster
         Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+        addrs.add(addr0);
         addrs.add(addr1);
         addrs.add(addr2);
         addrs.add(addr3);
@@ -933,13 +1441,15 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
 
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         Map<BookieSocketAddress, BookieInfo> bookieInfoMap = new HashMap<BookieSocketAddress, BookieInfo>();
+        bookieInfoMap.put(addr0, new BookieInfo(50L, 50L));
         bookieInfoMap.put(addr1, new BookieInfo(100L, 100L));
         bookieInfoMap.put(addr2, new BookieInfo(100L, 100L));
         bookieInfoMap.put(addr3, new BookieInfo(200L, 200L));
-        bookieInfoMap.put(addr4, new BookieInfo(multiple * 100L, multiple * 100L));
+        bookieInfoMap.put(addr4, new BookieInfo(multiple * 50L, multiple * 50L));
         repp.updateBookieInfo(bookieInfoMap);
 
         Map<BookieSocketAddress, Long> selectionCounts = new HashMap<BookieSocketAddress, Long>();
+        selectionCounts.put(addr0, 0L);
         selectionCounts.put(addr1, 0L);
         selectionCounts.put(addr2, 0L);
         selectionCounts.put(addr3, 0L);
@@ -951,10 +1461,14 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
             // will come from other racks. However, the weight should be honored in such
             // selections as well
             replacedBookie = repp.replaceBookie(1, 1, 1, null, new HashSet<>(), addr2, new HashSet<>());
-            assertTrue(addr1.equals(replacedBookie) || addr3.equals(replacedBookie) || addr4.equals(replacedBookie));
+            assertTrue(addr0.equals(replacedBookie) || addr1.equals(replacedBookie) || addr3.equals(replacedBookie)
+                    || addr4.equals(replacedBookie));
             selectionCounts.put(replacedBookie, selectionCounts.get(replacedBookie) + 1);
         }
-
+        /*
+         * since addr2 has to be replaced, the remaining bookies weight are - 50, 100, 200, 500 (10*50)
+         * So the median calculated by WeightedRandomSelection is (100 + 200) / 2 = 150
+         */
         double medianWeight = 150;
         double medianSelectionCounts = (double) (medianWeight / bookieInfoMap.get(addr1).getWeight())
             * selectionCounts.get(addr1);
diff --git a/conf/bk_server.conf b/conf/bk_server.conf
index 1450def..7a0f361 100755
--- a/conf/bk_server.conf
+++ b/conf/bk_server.conf
@@ -860,6 +860,15 @@ zkEnableSecurity=false
 # The max number of args used in the script provided at `networkTopologyScriptFileName`
 # networkTopologyScriptNumberArgs=100
 
+# minimum number of racks per write quorum. RackawareEnsemblePlacementPolicy will try to
+# get bookies from atleast 'minNumRacksPerWriteQuorum' racks for a writeQuorum.
+# minNumRacksPerWriteQuorum=2
+
+# 'enforceMinNumRacksPerWriteQuorum' enforces RackawareEnsemblePlacementPolicy to pick
+# bookies from 'minNumRacksPerWriteQuorum' racks for a writeQuorum. If it cann't find
+# bookie then it would throw BKNotEnoughBookiesException instead of picking random one.
+# enforceMinNumRacksPerWriteQuorum=false
+
 #############################################################################
 ## Auditor settings
 #############################################################################
diff --git a/site/_data/config/bk_server.yaml b/site/_data/config/bk_server.yaml
index 39f2719..ac24fce 100644
--- a/site/_data/config/bk_server.yaml
+++ b/site/_data/config/bk_server.yaml
@@ -610,6 +610,12 @@ groups:
   - param: networkTopologyScriptNumberArgs
     description: |
       The max number of args used in the script provided at `networkTopologyScriptFileName`.
+  - param: minNumRacksPerWriteQuorum
+    description: |
+      minimum number of racks per write quorum. RackawareEnsemblePlacementPolicy will try to get bookies from atleast 'minNumRacksPerWriteQuorum' racks for a writeQuorum.
+  - param: enforceMinNumRacksPerWriteQuorum
+    description: |
+      'enforceMinNumRacksPerWriteQuorum' enforces RackawareEnsemblePlacementPolicy to pick bookies from 'minNumRacksPerWriteQuorum' racks for a writeQuorum. If it cann't find bookie then it would throw BKNotEnoughBookiesException instead of picking random one.
 
 - name: AutoRecovery auditor settings
   params: