You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2019/01/13 02:04:39 UTC

[bookkeeper] branch master updated: Enhance EnsemblePlacementPolicy and DNSResolverDecorator to log relevant metrics.

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 42fe150  Enhance EnsemblePlacementPolicy and DNSResolverDecorator to log relevant metrics.
42fe150 is described below

commit 42fe1506afe04b8068b897c530c7ca7188da503e
Author: Charan Reddy Guttapalem <re...@gmail.com>
AuthorDate: Sat Jan 12 18:04:35 2019 -0800

    Enhance EnsemblePlacementPolicy and DNSResolverDecorator to log relevant metrics.
    
    
    Descriptions of the changes in this PR:
    
    
    Make changes to EnsemblePlacementPolicy so that it would return boolean value indicating
    if the return value of newEnsemble and replaceBookie are strictly adhering to corresponding
    PlacementPolicy or it fell back to random.
    
    Similarly DNSResolverDecorator should log a metric when it was unable to resolve rack info and
    it is using default rack.
    
    
    
    Reviewers: Samuel Just <sj...@salesforce.com>, Sijie Guo <si...@apache.org>, Venkateswararao Jujjuri (JV) <None>
    
    This closes #1883 from reddycharan/enhanceplacementpolicy
---
 .../bookkeeper/bookie/BookKeeperServerStats.java   |   2 +
 .../bookie/LocalBookieEnsemblePlacementPolicy.java |  19 ++-
 .../apache/bookkeeper/client/BookKeeperAdmin.java  |  16 +-
 .../bookkeeper/client/BookieWatcherImpl.java       |  71 +++++++--
 .../client/DefaultEnsemblePlacementPolicy.java     |  29 +++-
 .../bookkeeper/client/EnsemblePlacementPolicy.java |  50 ++++--
 .../ITopologyAwareEnsemblePlacementPolicy.java     |   3 +-
 .../client/RackawareEnsemblePlacementPolicy.java   |  15 +-
 .../RackawareEnsemblePlacementPolicyImpl.java      | 100 ++++++++++--
 .../client/RegionAwareEnsemblePlacementPolicy.java |  42 ++++-
 .../apache/bookkeeper/net/ScriptBasedMapping.java  |   8 +-
 .../client/GenericEnsemblePlacementPolicyTest.java |   7 +-
 .../TestRackawareEnsemblePlacementPolicy.java      | 177 +++++++++++++++------
 ...ackawareEnsemblePlacementPolicyUsingScript.java |  45 ++++--
 .../TestRackawarePolicyNotificationUpdates.java    |   9 +-
 .../TestRegionAwareEnsemblePlacementPolicy.java    | 108 ++++++++-----
 16 files changed, 515 insertions(+), 186 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
index cdafd15..b58514b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
@@ -69,6 +69,8 @@ public interface BookKeeperServerStats {
     String WATCHER_SCOPE = "bookie_watcher";
     String REPLACE_BOOKIE_TIME = "REPLACE_BOOKIE_TIME";
     String NEW_ENSEMBLE_TIME = "NEW_ENSEMBLE_TIME";
+    String FAILED_TO_RESOLVE_NETWORK_LOCATION_COUNTER = "FAILED_TO_RESOLVE_NETWORK_LOCATION_COUNTER";
+    String ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER = "ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER";
 
     // Bookie Operations
     String BOOKIE_ADD_ENTRY = "BOOKIE_ADD_ENTRY";
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
index 46978ea..7b7cc46 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
@@ -36,6 +36,7 @@ import org.apache.bookkeeper.feature.FeatureProvider;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.net.DNSToSwitchMapping;
 import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.commons.lang3.tuple.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -80,9 +81,9 @@ public class LocalBookieEnsemblePlacementPolicy implements EnsemblePlacementPoli
     }
 
     @Override
-    public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
-        java.util.Map<String, byte[]> customMetadata, Set<BookieSocketAddress> currentEnsemble,
-        BookieSocketAddress bookieToReplace, Set<BookieSocketAddress> excludeBookies)
+    public Pair<BookieSocketAddress, Boolean> replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
+            java.util.Map<String, byte[]> customMetadata, List<BookieSocketAddress> currentEnsemble,
+            BookieSocketAddress bookieToReplace, Set<BookieSocketAddress> excludeBookies)
             throws BKNotEnoughBookiesException {
         throw new BKNotEnoughBookiesException();
     }
@@ -109,18 +110,24 @@ public class LocalBookieEnsemblePlacementPolicy implements EnsemblePlacementPoli
     }
 
     @Override
-    public List<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
-        java.util.Map<String, byte[]> customMetadata, Set<BookieSocketAddress> excludeBookies)
+    public Pair<List<BookieSocketAddress>, Boolean> newEnsemble(int ensembleSize, int writeQuorumSize,
+            int ackQuorumSize, java.util.Map<String, byte[]> customMetadata, Set<BookieSocketAddress> excludeBookies)
             throws BKNotEnoughBookiesException {
         if (ensembleSize > 1) {
             throw new IllegalArgumentException("Local ensemble policy can only return 1 bookie");
         }
 
-        return Lists.newArrayList(bookieAddress);
+        return Pair.of(Lists.newArrayList(bookieAddress), true);
     }
 
     @Override
     public void updateBookieInfo(Map<BookieSocketAddress, BookieInfo> bookieToFreeSpaceMap) {
         return;
     }
+
+    @Override
+    public boolean isEnsembleAdheringToPlacementPolicy(List<BookieSocketAddress> ensembleList, int writeQuorumSize,
+            int ackQuorumSize) {
+        return true;
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
index 3f837db..7b49292 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
@@ -35,7 +35,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Enumeration;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -82,6 +81,7 @@ import org.apache.bookkeeper.replication.ReplicationException.UnavailableExcepti
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.IOUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
@@ -994,15 +994,25 @@ public class BookKeeperAdmin implements AutoCloseable {
         // allocate bookies
         for (Integer bookieIndex : bookieIndexesToRereplicate) {
             BookieSocketAddress oldBookie = ensemble.get(bookieIndex);
-            BookieSocketAddress newBookie =
+            Pair<BookieSocketAddress, Boolean> replaceBookieResponse =
                     bkc.getPlacementPolicy().replaceBookie(
                             lh.getLedgerMetadata().getEnsembleSize(),
                             lh.getLedgerMetadata().getWriteQuorumSize(),
                             lh.getLedgerMetadata().getAckQuorumSize(),
                             lh.getLedgerMetadata().getCustomMetadata(),
-                            new HashSet<>(ensemble),
+                            ensemble,
                             oldBookie,
                             bookiesToExclude);
+            BookieSocketAddress newBookie = replaceBookieResponse.getLeft();
+            boolean isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.getRight();
+            if (!isEnsembleAdheringToPlacementPolicy) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(
+                            "replaceBookie for bookie: {} in ensemble: {} "
+                                    + "is not adhering to placement policy and chose {}",
+                            oldBookie, ensemble, newBookie);
+                }
+            }
             targetBookieAddresses.put(bookieIndex, newBookie);
             bookiesToExclude.add(newBookie);
         }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcherImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcherImpl.java
index 8b14b77..74c1df9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcherImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcherImpl.java
@@ -17,6 +17,7 @@
  */
 package org.apache.bookkeeper.client;
 
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.NEW_ENSEMBLE_TIME;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.REPLACE_BOOKIE_TIME;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WATCHER_SCOPE;
@@ -35,6 +36,8 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import lombok.extern.slf4j.Slf4j;
+
+import org.apache.bookkeeper.bookie.BookKeeperServerStats;
 import org.apache.bookkeeper.client.BKException.BKInterruptedException;
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
 import org.apache.bookkeeper.client.BKException.MetaStoreException;
@@ -43,9 +46,11 @@ import org.apache.bookkeeper.common.util.MathUtils;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.discover.RegistrationClient;
 import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stats.annotations.StatsDoc;
+import org.apache.commons.lang3.tuple.Pair;
 
 /**
  * This class is responsible for maintaining a consistent view of what bookies
@@ -88,6 +93,12 @@ class BookieWatcherImpl implements BookieWatcher {
         help = "operation stats of replacing bookie in an ensemble"
     )
     private final OpStatsLogger replaceBookieTimer;
+    @StatsDoc(
+            name = ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER,
+            help = "total number of newEnsemble/replaceBookie operations failed to adhere"
+            + " EnsemblePlacementPolicy"
+    )
+    private final Counter ensembleNotAdheringToPlacementPolicy;
 
     // Bookies that will not be preferred to be chosen in a new ensemble
     final Cache<BookieSocketAddress, Boolean> quarantinedBookies;
@@ -117,6 +128,8 @@ class BookieWatcherImpl implements BookieWatcher {
                 }).build();
         this.newEnsembleTimer = statsLogger.getOpStatsLogger(NEW_ENSEMBLE_TIME);
         this.replaceBookieTimer = statsLogger.getOpStatsLogger(REPLACE_BOOKIE_TIME);
+        this.ensembleNotAdheringToPlacementPolicy = statsLogger
+                .getCounter(BookKeeperServerStats.ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER);
     }
 
     @Override
@@ -213,19 +226,34 @@ class BookieWatcherImpl implements BookieWatcher {
         int ackQuorumSize, Map<String, byte[]> customMetadata)
             throws BKNotEnoughBookiesException {
         long startTime = MathUtils.nowInNano();
+        Pair<List<BookieSocketAddress>, Boolean> newEnsembleResponse;
         List<BookieSocketAddress> socketAddresses;
+        boolean isEnsembleAdheringToPlacementPolicy = false;
         try {
-            socketAddresses = placementPolicy.newEnsemble(ensembleSize,
-                    writeQuorumSize, ackQuorumSize, customMetadata, new HashSet<BookieSocketAddress>(
-                            quarantinedBookies.asMap().keySet()));
+            Set<BookieSocketAddress> quarantinedBookiesSet = quarantinedBookies.asMap().keySet();
+            newEnsembleResponse = placementPolicy.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize,
+                    customMetadata, new HashSet<BookieSocketAddress>(quarantinedBookiesSet));
+            socketAddresses = newEnsembleResponse.getLeft();
+            isEnsembleAdheringToPlacementPolicy = newEnsembleResponse.getRight();
+            if (!isEnsembleAdheringToPlacementPolicy) {
+                ensembleNotAdheringToPlacementPolicy.inc();
+                log.warn("New ensemble: {} is not adhering to Placement Policy. quarantinedBookies: {}",
+                        socketAddresses, quarantinedBookiesSet);
+            }
             // we try to only get from the healthy bookies first
             newEnsembleTimer.registerSuccessfulEvent(MathUtils.nowInNano() - startTime, TimeUnit.NANOSECONDS);
         } catch (BKNotEnoughBookiesException e) {
             if (log.isDebugEnabled()) {
                 log.debug("Not enough healthy bookies available, using quarantined bookies");
             }
-            socketAddresses = placementPolicy.newEnsemble(
+            newEnsembleResponse = placementPolicy.newEnsemble(
                     ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, new HashSet<>());
+            socketAddresses = newEnsembleResponse.getLeft();
+            isEnsembleAdheringToPlacementPolicy = newEnsembleResponse.getRight();
+            if (!isEnsembleAdheringToPlacementPolicy) {
+                ensembleNotAdheringToPlacementPolicy.inc();
+                log.warn("New ensemble: {} is not adhering to Placement Policy", socketAddresses);
+            }
             newEnsembleTimer.registerFailedEvent(MathUtils.nowInNano() - startTime, TimeUnit.NANOSECONDS);
         }
         return socketAddresses;
@@ -239,22 +267,43 @@ class BookieWatcherImpl implements BookieWatcher {
             throws BKNotEnoughBookiesException {
         long startTime = MathUtils.nowInNano();
         BookieSocketAddress addr = existingBookies.get(bookieIdx);
+        Pair<BookieSocketAddress, Boolean> replaceBookieResponse;
         BookieSocketAddress socketAddress;
+        boolean isEnsembleAdheringToPlacementPolicy = false;
         try {
             // we exclude the quarantined bookies also first
-            Set<BookieSocketAddress> existingAndQuarantinedBookies = new HashSet<BookieSocketAddress>(existingBookies);
-            existingAndQuarantinedBookies.addAll(quarantinedBookies.asMap().keySet());
-            socketAddress = placementPolicy.replaceBookie(
+            Set<BookieSocketAddress> excludedBookiesAndQuarantinedBookies = new HashSet<BookieSocketAddress>(
+                    excludeBookies);
+            Set<BookieSocketAddress> quarantinedBookiesSet = quarantinedBookies.asMap().keySet();
+            excludedBookiesAndQuarantinedBookies.addAll(quarantinedBookiesSet);
+            replaceBookieResponse = placementPolicy.replaceBookie(
                     ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata,
-                    existingAndQuarantinedBookies, addr, excludeBookies);
+                    existingBookies, addr, excludedBookiesAndQuarantinedBookies);
+            socketAddress = replaceBookieResponse.getLeft();
+            isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.getRight();
+            if (!isEnsembleAdheringToPlacementPolicy) {
+                ensembleNotAdheringToPlacementPolicy.inc();
+                log.warn(
+                        "replaceBookie for bookie: {} in ensemble: {} is not adhering to placement policy and"
+                                + " chose {}. excludedBookies {} and quarantinedBookies {}",
+                        addr, existingBookies, socketAddress, excludeBookies, quarantinedBookiesSet);
+            }
             replaceBookieTimer.registerSuccessfulEvent(MathUtils.nowInNano() - startTime, TimeUnit.NANOSECONDS);
         } catch (BKNotEnoughBookiesException e) {
             if (log.isDebugEnabled()) {
                 log.debug("Not enough healthy bookies available, using quarantined bookies");
             }
-            socketAddress = placementPolicy.replaceBookie(
-                    ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata,
-                    new HashSet<BookieSocketAddress>(existingBookies), addr, excludeBookies);
+            replaceBookieResponse = placementPolicy.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize,
+                    customMetadata, existingBookies, addr, excludeBookies);
+            socketAddress = replaceBookieResponse.getLeft();
+            isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.getRight();
+            if (!isEnsembleAdheringToPlacementPolicy) {
+                ensembleNotAdheringToPlacementPolicy.inc();
+                log.warn(
+                        "replaceBookie for bookie: {} in ensemble: {} is not adhering to placement policy and"
+                                + " chose {}. excludedBookies {}",
+                        addr, existingBookies, socketAddress, excludeBookies);
+            }
             replaceBookieTimer.registerFailedEvent(MathUtils.nowInNano() - startTime, TimeUnit.NANOSECONDS);
         }
         return socketAddress;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
index 917d18f..dddbe1c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
@@ -39,6 +39,7 @@ import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.net.DNSToSwitchMapping;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -64,12 +65,12 @@ public class DefaultEnsemblePlacementPolicy implements EnsemblePlacementPolicy {
     }
 
     @Override
-    public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int quorumSize, int ackQuorumSize,
+    public Pair<List<BookieSocketAddress>, Boolean> newEnsemble(int ensembleSize, int quorumSize, int ackQuorumSize,
             Map<String, byte[]> customMetadata, Set<BookieSocketAddress> excludeBookies)
             throws BKNotEnoughBookiesException {
         ArrayList<BookieSocketAddress> newBookies = new ArrayList<BookieSocketAddress>(ensembleSize);
         if (ensembleSize <= 0) {
-            return newBookies;
+            return Pair.of(newBookies, false);
         }
         List<BookieSocketAddress> allBookies;
         rwLock.readLock().lock();
@@ -95,7 +96,8 @@ public class DefaultEnsemblePlacementPolicy implements EnsemblePlacementPolicy {
                     newBookies.add(b);
                     --ensembleSize;
                     if (ensembleSize == 0) {
-                        return newBookies;
+                        return Pair.of(newBookies,
+                                isEnsembleAdheringToPlacementPolicy(newBookies, quorumSize, ackQuorumSize));
                     }
                 }
             } finally {
@@ -110,7 +112,8 @@ public class DefaultEnsemblePlacementPolicy implements EnsemblePlacementPolicy {
                 newBookies.add(bookie);
                 --ensembleSize;
                 if (ensembleSize == 0) {
-                    return newBookies;
+                    return Pair.of(newBookies,
+                            isEnsembleAdheringToPlacementPolicy(newBookies, quorumSize, ackQuorumSize));
                 }
             }
         }
@@ -118,13 +121,17 @@ public class DefaultEnsemblePlacementPolicy implements EnsemblePlacementPolicy {
     }
 
     @Override
-    public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
-            Map<String, byte[]> customMetadata, Set<BookieSocketAddress> currentEnsemble,
+    public Pair<BookieSocketAddress, Boolean> replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
+            Map<String, byte[]> customMetadata, List<BookieSocketAddress> currentEnsemble,
             BookieSocketAddress bookieToReplace, Set<BookieSocketAddress> excludeBookies)
             throws BKNotEnoughBookiesException {
         excludeBookies.addAll(currentEnsemble);
-        ArrayList<BookieSocketAddress> addresses = newEnsemble(1, 1, 1, customMetadata, excludeBookies);
-        return addresses.get(0);
+        List<BookieSocketAddress> addresses = newEnsemble(1, 1, 1, customMetadata, excludeBookies).getLeft();
+
+        BookieSocketAddress candidateAddr = addresses.get(0);
+        List<BookieSocketAddress> newEnsemble = new ArrayList<BookieSocketAddress>(currentEnsemble);
+        newEnsemble.set(currentEnsemble.indexOf(bookieToReplace), candidateAddr);
+        return Pair.of(candidateAddr, isEnsembleAdheringToPlacementPolicy(newEnsemble, writeQuorumSize, ackQuorumSize));
     }
 
     @Override
@@ -210,4 +217,10 @@ public class DefaultEnsemblePlacementPolicy implements EnsemblePlacementPolicy {
     public void uninitalize() {
         // do nothing
     }
+
+    @Override
+    public boolean isEnsembleAdheringToPlacementPolicy(List<BookieSocketAddress> ensembleList, int writeQuorumSize,
+            int ackQuorumSize) {
+        return true;
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
index 23932a3..00bac8e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
@@ -35,6 +35,7 @@ import org.apache.bookkeeper.feature.FeatureProvider;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.net.DNSToSwitchMapping;
 import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.commons.lang3.tuple.Pair;
 
 /**
  * {@link EnsemblePlacementPolicy} encapsulates the algorithm that bookkeeper client uses to select a number of bookies
@@ -263,12 +264,12 @@ public interface EnsemblePlacementPolicy {
      * @throws BKNotEnoughBookiesException if not enough bookies available.
      * @return the List&lt;org.apache.bookkeeper.net.BookieSocketAddress&gt;
      */
-    List<BookieSocketAddress> newEnsemble(int ensembleSize,
-                                               int writeQuorumSize,
-                                               int ackQuorumSize,
-                                               Map<String, byte[]> customMetadata,
-                                               Set<BookieSocketAddress> excludeBookies)
-        throws BKNotEnoughBookiesException;
+    Pair<List<BookieSocketAddress>, Boolean> newEnsemble(int ensembleSize,
+                                                         int writeQuorumSize,
+                                                         int ackQuorumSize,
+                                                         Map<String, byte[]> customMetadata,
+                                                         Set<BookieSocketAddress> excludeBookies)
+            throws BKNotEnoughBookiesException;
 
     /**
      * Choose a new bookie to replace <i>bookieToReplace</i>. If no bookie available in the cluster,
@@ -287,14 +288,14 @@ public interface EnsemblePlacementPolicy {
      * @throws BKNotEnoughBookiesException
      * @return the org.apache.bookkeeper.net.BookieSocketAddress
      */
-    BookieSocketAddress replaceBookie(int ensembleSize,
-                                      int writeQuorumSize,
-                                      int ackQuorumSize,
-                                      Map<String, byte[]> customMetadata,
-                                      Set<BookieSocketAddress> currentEnsemble,
-                                      BookieSocketAddress bookieToReplace,
-                                      Set<BookieSocketAddress> excludeBookies)
-        throws BKNotEnoughBookiesException;
+    Pair<BookieSocketAddress, Boolean> replaceBookie(int ensembleSize,
+                                                     int writeQuorumSize,
+                                                     int ackQuorumSize,
+                                                     Map<String, byte[]> customMetadata,
+                                                     List<BookieSocketAddress> currentEnsemble,
+                                                     BookieSocketAddress bookieToReplace,
+                                                     Set<BookieSocketAddress> excludeBookies)
+            throws BKNotEnoughBookiesException;
 
     /**
      * Register a bookie as slow so that it is tried after available and read-only bookies.
@@ -386,4 +387,25 @@ public interface EnsemblePlacementPolicy {
             return (currentStickyBookieIndex.get() + 1) % metadata.getEnsembleSize();
         }
     }
+
+    /**
+     * returns true if the Ensemble is strictly adhering to placement policy,
+     * like in the case of RackawareEnsemblePlacementPolicy, bookies in the
+     * writeset are from 'minNumRacksPerWriteQuorum' number of racks. And in the
+     * case of RegionawareEnsemblePlacementPolicy, check for
+     * minimumRegionsForDurability, reppRegionsToWrite, rack distribution within
+     * a region and other parameters of RegionAwareEnsemblePlacementPolicy.
+     *
+     * @param ensembleList
+     *            list of BookieSocketAddress of bookies in the ensemble
+     * @param writeQuorumSize
+     *            writeQuorumSize of the ensemble
+     * @param ackQuorumSize
+     *            ackQuorumSize of the ensemble
+     * @return
+     */
+    default boolean isEnsembleAdheringToPlacementPolicy(List<BookieSocketAddress> ensembleList, int writeQuorumSize,
+            int ackQuorumSize) {
+        return false;
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ITopologyAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ITopologyAwareEnsemblePlacementPolicy.java
index 7c9e07c..254f535 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ITopologyAwareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ITopologyAwareEnsemblePlacementPolicy.java
@@ -28,6 +28,7 @@ import org.apache.bookkeeper.common.annotation.InterfaceAudience;
 import org.apache.bookkeeper.common.annotation.InterfaceStability;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.net.Node;
+import org.apache.commons.lang3.tuple.Pair;
 
 /**
  * Interface for topology aware ensemble placement policy.
@@ -93,7 +94,7 @@ public interface ITopologyAwareEnsemblePlacementPolicy<T extends Node> extends E
      * @return list of bookies forming the ensemble
      * @throws BKException.BKNotEnoughBookiesException
      */
-    List<BookieSocketAddress> newEnsemble(
+    Pair<List<BookieSocketAddress>, Boolean> newEnsemble(
             int ensembleSize,
             int writeQuorumSize,
             int ackQuorumSize,
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
index 1fd7580..8054d97 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
@@ -18,19 +18,16 @@
 package org.apache.bookkeeper.client;
 
 import io.netty.util.HashedWheelTimer;
-
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
-import org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy.Ensemble;
-import org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy.Predicate;
-import org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy.BookieNode;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.net.DNSToSwitchMapping;
 import org.apache.bookkeeper.net.Node;
 import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.commons.lang3.tuple.Pair;
 
 /**
  * A placement policy implementation use rack information for placing ensembles.
@@ -95,8 +92,8 @@ public class RackawareEnsemblePlacementPolicy extends RackawareEnsemblePlacement
     }
 
     @Override
-    public List<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
-            Map<String, byte[]> customMetadata, Set<BookieSocketAddress> excludeBookies)
+    public Pair<List<BookieSocketAddress>, Boolean> newEnsemble(int ensembleSize, int writeQuorumSize,
+            int ackQuorumSize, Map<String, byte[]> customMetadata, Set<BookieSocketAddress> excludeBookies)
             throws BKException.BKNotEnoughBookiesException {
         try {
             return super.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, excludeBookies);
@@ -110,8 +107,8 @@ public class RackawareEnsemblePlacementPolicy extends RackawareEnsemblePlacement
     }
 
     @Override
-    public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
-            Map<String, byte[]> customMetadata, Set<BookieSocketAddress> currentEnsemble,
+    public Pair<BookieSocketAddress, Boolean> replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
+            Map<String, byte[]> customMetadata, List<BookieSocketAddress> currentEnsemble,
             BookieSocketAddress bookieToReplace, Set<BookieSocketAddress> excludeBookies)
             throws BKException.BKNotEnoughBookiesException {
        try {
@@ -146,7 +143,7 @@ public class RackawareEnsemblePlacementPolicy extends RackawareEnsemblePlacement
     }
 
     @Override
-    public List<BookieSocketAddress> newEnsemble(int ensembleSize,
+    public Pair<List<BookieSocketAddress>, Boolean> newEnsemble(int ensembleSize,
                                                  int writeQuorumSize,
                                                  int ackQuorumSize,
                                                  Set<BookieSocketAddress> excludeBookies,
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
index 7578c41..6db7de8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
@@ -20,6 +20,7 @@ package org.apache.bookkeeper.client;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIES_JOINED;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIES_LEFT;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.FAILED_TO_RESOLVE_NETWORK_LOCATION_COUNTER;
 import static org.apache.bookkeeper.client.BookKeeperClientStats.CLIENT_SCOPE;
 import static org.apache.bookkeeper.client.BookKeeperClientStats.READ_REQUESTS_REORDERED;
 import static org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy.UNKNOWN_REGION;
@@ -35,6 +36,7 @@ import io.netty.util.HashedWheelTimer;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -64,10 +66,12 @@ import org.apache.bookkeeper.net.Node;
 import org.apache.bookkeeper.net.NodeBase;
 import org.apache.bookkeeper.net.ScriptBasedMapping;
 import org.apache.bookkeeper.net.StabilizeNetworkTopology;
+import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stats.annotations.StatsDoc;
 import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -146,12 +150,19 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP
 
         final Supplier<String> defaultRackSupplier;
         final DNSToSwitchMapping resolver;
-
-        DNSResolverDecorator(DNSToSwitchMapping resolver, Supplier<String> defaultRackSupplier) {
+        @StatsDoc(
+                name = FAILED_TO_RESOLVE_NETWORK_LOCATION_COUNTER,
+                help = "total number of times Resolver failed to resolve rack information of a node"
+        )
+        final Counter failedToResolveNetworkLocationCounter;
+
+        DNSResolverDecorator(DNSToSwitchMapping resolver, Supplier<String> defaultRackSupplier,
+                Counter failedToResolveNetworkLocationCounter) {
             checkNotNull(resolver, "Resolver cannot be null");
             checkNotNull(defaultRackSupplier, "defaultRackSupplier should not be null");
             this.defaultRackSupplier = defaultRackSupplier;
             this.resolver = resolver;
+            this.failedToResolveNetworkLocationCounter = failedToResolveNetworkLocationCounter;
         }
 
         public List<String> resolve(List<String> names) {
@@ -167,6 +178,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP
                     if (rNames.get(i) == null) {
                         LOG.warn("Failed to resolve network location for {}, using default rack for it : {}.",
                                 names.get(i), defaultRack);
+                        failedToResolveNetworkLocationCounter.inc();
                         rNames.set(i, defaultRack);
                     }
                 }
@@ -178,6 +190,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP
             rNames = new ArrayList<>(names.size());
 
             for (int i = 0; i < names.size(); ++i) {
+                failedToResolveNetworkLocationCounter.inc();
                 rNames.add(defaultRack);
             }
             return rNames;
@@ -227,6 +240,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP
         help = "The distribution of number of bookies reordered on each read request"
     )
     protected OpStatsLogger readReorderedCounter = null;
+    protected Counter failedToResolveNetworkLocationCounter = null;
 
     private String defaultRack = NetworkTopology.DEFAULT_RACK;
 
@@ -267,10 +281,13 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP
         this.bookiesJoinedCounter = statsLogger.getOpStatsLogger(BOOKIES_JOINED);
         this.bookiesLeftCounter = statsLogger.getOpStatsLogger(BOOKIES_LEFT);
         this.readReorderedCounter = statsLogger.getOpStatsLogger(READ_REQUESTS_REORDERED);
+        this.failedToResolveNetworkLocationCounter = statsLogger
+                .getCounter(FAILED_TO_RESOLVE_NETWORK_LOCATION_COUNTER);
         this.reorderReadsRandom = reorderReadsRandom;
         this.stabilizePeriodSeconds = stabilizePeriodSeconds;
         this.reorderThresholdPendingRequests = reorderThresholdPendingRequests;
-        this.dnsResolver = new DNSResolverDecorator(dnsResolver, () -> this.getDefaultRack());
+        this.dnsResolver = new DNSResolverDecorator(dnsResolver, () -> this.getDefaultRack(),
+                failedToResolveNetworkLocationCounter);
         this.timer = timer;
         this.minNumRacksPerWriteQuorum = minNumRacksPerWriteQuorum;
         this.enforceMinNumRacksPerWriteQuorum = enforceMinNumRacksPerWriteQuorum;
@@ -342,9 +359,18 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP
                     ((RackChangeNotifier) dnsResolver).registerRackChangeListener(this);
                 }
             } catch (RuntimeException re) {
-                LOG.info("Failed to initialize DNS Resolver {}, used default subnet resolver : {}",
-                    dnsResolverName, re, re.getMessage());
-                dnsResolver = new DefaultResolver(() -> this.getDefaultRack());
+                if (!enforceMinNumRacksPerWriteQuorum) {
+                    LOG.info("Failed to initialize DNS Resolver {}, used default subnet resolver : {}", dnsResolverName,
+                            re, re.getMessage());
+                    dnsResolver = new DefaultResolver(() -> this.getDefaultRack());
+                } else {
+                    /*
+                     * if minNumRacksPerWriteQuorum is enforced, then it
+                     * shouldn't continue in the case of failure to create
+                     * dnsResolver.
+                     */
+                    throw re;
+                }
             }
         }
         slowBookies = CacheBuilder.newBuilder()
@@ -479,7 +505,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP
         }
     }
 
-    protected Set<Node> convertBookiesToNodes(Set<BookieSocketAddress> excludeBookies) {
+    protected Set<Node> convertBookiesToNodes(Collection<BookieSocketAddress> excludeBookies) {
         Set<Node> nodes = new HashSet<Node>();
         for (BookieSocketAddress addr : excludeBookies) {
             BookieNode bn = knownBookies.get(addr);
@@ -500,13 +526,13 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP
     }
 
     @Override
-    public List<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
-            Map<String, byte[]> customMetadata, Set<BookieSocketAddress> excludeBookies)
+    public Pair<List<BookieSocketAddress>, Boolean> newEnsemble(int ensembleSize, int writeQuorumSize,
+            int ackQuorumSize, Map<String, byte[]> customMetadata, Set<BookieSocketAddress> excludeBookies)
             throws BKNotEnoughBookiesException {
         return newEnsembleInternal(ensembleSize, writeQuorumSize, excludeBookies, null, null);
     }
 
-    protected List<BookieSocketAddress> newEnsembleInternal(int ensembleSize,
+    protected Pair<List<BookieSocketAddress>, Boolean> newEnsembleInternal(int ensembleSize,
                                                             int writeQuorumSize,
                                                             Set<BookieSocketAddress> excludeBookies,
                                                             Ensemble<BookieNode> parentEnsemble,
@@ -522,7 +548,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP
     }
 
     @Override
-    public List<BookieSocketAddress> newEnsemble(int ensembleSize,
+    public Pair<List<BookieSocketAddress>, Boolean> newEnsemble(int ensembleSize,
                                                  int writeQuorumSize,
                                                  int ackQuorumSize,
                                                  Set<BookieSocketAddress> excludeBookies,
@@ -538,7 +564,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP
                 parentPredicate);
     }
 
-    protected List<BookieSocketAddress> newEnsembleInternal(
+    protected Pair<List<BookieSocketAddress>, Boolean> newEnsembleInternal(
             int ensembleSize,
             int writeQuorumSize,
             int ackQuorumSize,
@@ -572,7 +598,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP
                 for (BookieNode bn : bns) {
                     addrs.add(bn.getAddr());
                 }
-                return addrs;
+                return Pair.of(addrs, false);
             }
 
             for (int i = 0; i < ensembleSize; i++) {
@@ -596,15 +622,15 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP
                           ensembleSize, bookieList);
                 throw new BKNotEnoughBookiesException();
             }
-            return bookieList;
+            return Pair.of(bookieList, isEnsembleAdheringToPlacementPolicy(bookieList, writeQuorumSize, ackQuorumSize));
         } finally {
             rwLock.readLock().unlock();
         }
     }
 
     @Override
-    public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
-            Map<String, byte[]> customMetadata, Set<BookieSocketAddress> currentEnsemble,
+    public Pair<BookieSocketAddress, Boolean> replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
+            Map<String, byte[]> customMetadata, List<BookieSocketAddress> currentEnsemble,
             BookieSocketAddress bookieToReplace, Set<BookieSocketAddress> excludeBookies)
             throws BKNotEnoughBookiesException {
         rwLock.readLock().lock();
@@ -639,7 +665,19 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Bookie {} is chosen to replace bookie {}.", candidate, bn);
             }
-            return candidate.getAddr();
+            BookieSocketAddress candidateAddr = candidate.getAddr();
+            List<BookieSocketAddress> newEnsemble = new ArrayList<BookieSocketAddress>(currentEnsemble);
+            if (currentEnsemble.isEmpty()) {
+                /*
+                 * in testing code there are test cases which would pass empty
+                 * currentEnsemble
+                 */
+                newEnsemble.add(candidateAddr);
+            } else {
+                newEnsemble.set(currentEnsemble.indexOf(bookieToReplace), candidateAddr);
+            }
+            return Pair.of(candidateAddr,
+                    isEnsembleAdheringToPlacementPolicy(newEnsemble, writeQuorumSize, ackQuorumSize));
         } finally {
             rwLock.readLock().unlock();
         }
@@ -1206,4 +1244,32 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP
             }
         }
     }
+
+    @Override
+    public boolean isEnsembleAdheringToPlacementPolicy(List<BookieSocketAddress> ensembleList, int writeQuorumSize,
+            int ackQuorumSize) {
+        int ensembleSize = ensembleList.size();
+        int minNumRacksPerWriteQuorumForThisEnsemble = Math.min(writeQuorumSize, minNumRacksPerWriteQuorum);
+        HashSet<String> racksOrRegionsInQuorum = new HashSet<String>();
+        BookieSocketAddress bookie;
+        for (int i = 0; i < ensembleList.size(); i++) {
+            racksOrRegionsInQuorum.clear();
+            for (int j = 0; j < writeQuorumSize; j++) {
+                bookie = ensembleList.get((i + j) % ensembleSize);
+                try {
+                    racksOrRegionsInQuorum.add(knownBookies.get(bookie).getNetworkLocation());
+                } catch (Exception e) {
+                    /*
+                     * any issue/exception in analyzing whether ensemble is strictly adhering to
+                     * placement policy should be swallowed.
+                     */
+                    LOG.warn("Received exception while trying to get network location of bookie: {}", bookie, e);
+                }
+            }
+            if (racksOrRegionsInQuorum.size() < minNumRacksPerWriteQuorumForThisEnsemble) {
+                return false;
+            }
+        }
+        return true;
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
index c52e0fe..1bd4b75 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
@@ -225,8 +225,8 @@ public class RegionAwareEnsemblePlacementPolicy extends RackawareEnsemblePlaceme
 
 
     @Override
-    public List<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
-            Map<String, byte[]> customMetadata, Set<BookieSocketAddress> excludeBookies)
+    public Pair<List<BookieSocketAddress>, Boolean> newEnsemble(int ensembleSize, int writeQuorumSize,
+            int ackQuorumSize, Map<String, byte[]> customMetadata, Set<BookieSocketAddress> excludeBookies)
             throws BKException.BKNotEnoughBookiesException {
 
         int effectiveMinRegionsForDurability = disableDurabilityFeature.isAvailable() ? 1 : minRegionsForDurability;
@@ -279,7 +279,7 @@ public class RegionAwareEnsemblePlacementPolicy extends RackawareEnsemblePlaceme
                 for (BookieNode bn : bns) {
                     addrs.add(bn.getAddr());
                 }
-                return addrs;
+                return Pair.of(addrs, isEnsembleAdheringToPlacementPolicy(addrs, writeQuorumSize, ackQuorumSize));
             }
 
             // Single region, fall back to RackAwareEnsemblePlacement
@@ -347,7 +347,7 @@ public class RegionAwareEnsemblePlacementPolicy extends RackawareEnsemblePlaceme
                             try {
                                 List<BookieSocketAddress> allocated = policyWithinRegion.newEnsemble(newEnsembleSize,
                                         newWriteQuorumSize, newWriteQuorumSize, excludeBookies, tempEnsemble,
-                                        tempEnsemble);
+                                        tempEnsemble).getLeft();
                                 ensemble = tempEnsemble;
                                 remainingEnsemble -= addToEnsembleSize;
                                 remainingWriteQuorum -= addToWriteQuorum;
@@ -407,15 +407,17 @@ public class RegionAwareEnsemblePlacementPolicy extends RackawareEnsemblePlaceme
                 throw new BKException.BKNotEnoughBookiesException();
             }
             LOG.info("Bookies allocated successfully {}", ensemble);
-            return ensemble.toList();
+            List<BookieSocketAddress> ensembleList = ensemble.toList();
+            return Pair.of(ensembleList,
+                    isEnsembleAdheringToPlacementPolicy(ensembleList, writeQuorumSize, ackQuorumSize));
         } finally {
             rwLock.readLock().unlock();
         }
     }
 
     @Override
-    public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
-            Map<String, byte[]> customMetadata, Set<BookieSocketAddress> currentEnsemble,
+    public Pair<BookieSocketAddress, Boolean> replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
+            Map<String, byte[]> customMetadata, List<BookieSocketAddress> currentEnsemble,
             BookieSocketAddress bookieToReplace, Set<BookieSocketAddress> excludeBookies)
             throws BKException.BKNotEnoughBookiesException {
         rwLock.readLock().lock();
@@ -469,7 +471,19 @@ public class RegionAwareEnsemblePlacementPolicy extends RackawareEnsemblePlaceme
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Bookie {} is chosen to replace bookie {}.", candidate, bookieNodeToReplace);
             }
-            return candidate.getAddr();
+            BookieSocketAddress candidateAddr = candidate.getAddr();
+            List<BookieSocketAddress> newEnsemble = new ArrayList<BookieSocketAddress>(currentEnsemble);
+            if (currentEnsemble.isEmpty()) {
+                /*
+                 * in testing code there are test cases which would pass empty
+                 * currentEnsemble
+                 */
+                newEnsemble.add(candidateAddr);
+            } else {
+                newEnsemble.set(currentEnsemble.indexOf(bookieToReplace), candidateAddr);
+            }
+            return Pair.of(candidateAddr,
+                    isEnsembleAdheringToPlacementPolicy(newEnsemble, writeQuorumSize, ackQuorumSize));
         } finally {
             rwLock.readLock().unlock();
         }
@@ -550,4 +564,16 @@ public class RegionAwareEnsemblePlacementPolicy extends RackawareEnsemblePlaceme
         finalList.addMissingIndices(ensemble.size());
         return finalList;
     }
+
+    @Override
+    public boolean isEnsembleAdheringToPlacementPolicy(List<BookieSocketAddress> ensembleList, int writeQuorumSize,
+            int ackQuorumSize) {
+        /**
+         * TODO: have to implement actual logic for this method for
+         * RegionAwareEnsemblePlacementPolicy. For now return true value.
+         *
+         * - https://github.com/apache/bookkeeper/issues/1898
+         */
+        return true;
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/ScriptBasedMapping.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/ScriptBasedMapping.java
index 8c8350c..230f66d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/ScriptBasedMapping.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/ScriptBasedMapping.java
@@ -25,8 +25,8 @@ import java.util.StringTokenizer;
 
 import org.apache.bookkeeper.util.Shell.ShellCommandExecutor;
 import org.apache.commons.configuration.Configuration;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This class implements the {@link DNSToSwitchMapping} interface using a
@@ -129,7 +129,7 @@ public final class ScriptBasedMapping extends CachedDNSToSwitchMapping {
     private static final class RawScriptBasedMapping extends AbstractDNSToSwitchMapping {
         private String scriptName;
         private int maxArgs; //max hostnames per call of the script
-        private static final Log LOG = LogFactory.getLog(ScriptBasedMapping.class);
+        private static final Logger LOG = LoggerFactory.getLogger(ScriptBasedMapping.class);
 
         /**
          * Set the configuration and extract the configuration parameters of interest.
@@ -233,7 +233,7 @@ public final class ScriptBasedMapping extends CachedDNSToSwitchMapping {
                     s.execute();
                     allOutput.append(s.getOutput()).append(" ");
                 } catch (Exception e) {
-                    LOG.warn("Exception running " + s, e);
+                    LOG.warn("Exception running: {} Exception message: {}", s, e.getMessage());
                     return null;
                 }
                 loopCount++;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/GenericEnsemblePlacementPolicyTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/GenericEnsemblePlacementPolicyTest.java
index bb55d0c..205c5f4 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/GenericEnsemblePlacementPolicyTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/GenericEnsemblePlacementPolicyTest.java
@@ -33,6 +33,7 @@ import java.util.Set;
 
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.commons.lang3.tuple.Pair;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -69,8 +70,8 @@ public class GenericEnsemblePlacementPolicyTest extends BookKeeperClusterTestCas
     public static final class CustomEnsemblePlacementPolicy extends DefaultEnsemblePlacementPolicy {
 
         @Override
-        public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize,
-            int ackQuorumSize, Map<String, byte[]> customMetadata, Set<BookieSocketAddress> currentEnsemble,
+        public Pair<BookieSocketAddress, Boolean> replaceBookie(int ensembleSize, int writeQuorumSize,
+            int ackQuorumSize, Map<String, byte[]> customMetadata, List<BookieSocketAddress> currentEnsemble,
             BookieSocketAddress bookieToReplace, Set<BookieSocketAddress> excludeBookies)
             throws BKException.BKNotEnoughBookiesException {
             new Exception("replaceBookie " + ensembleSize + "," + customMetadata).printStackTrace();
@@ -81,7 +82,7 @@ public class GenericEnsemblePlacementPolicyTest extends BookKeeperClusterTestCas
         }
 
         @Override
-        public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int quorumSize,
+        public Pair<List<BookieSocketAddress>, Boolean> newEnsemble(int ensembleSize, int quorumSize,
             int ackQuorumSize, Map<String, byte[]> customMetadata, Set<BookieSocketAddress> excludeBookies)
             throws BKException.BKNotEnoughBookiesException {
             assertNotNull(customMetadata);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
index e75e93f..e6cd07b 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
@@ -52,6 +52,7 @@ import org.apache.bookkeeper.net.NetworkTopology;
 import org.apache.bookkeeper.net.Node;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.util.StaticDNSResolver;
+import org.apache.commons.lang3.tuple.Pair;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -69,6 +70,7 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
     ClientConfiguration conf = new ClientConfiguration();
     BookieSocketAddress addr1, addr2, addr3, addr4;
     io.netty.util.HashedWheelTimer timer;
+    final int minNumRacksPerWriteQuorumConfValue = 2;
 
     @Override
     protected void setUp() throws Exception {
@@ -80,6 +82,7 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
         StaticDNSResolver.addNodeToRack("localhost", NetworkTopology.DEFAULT_REGION_AND_RACK);
         LOG.info("Set up static DNS Resolver.");
         conf.setProperty(REPP_DNS_RESOLVER_CLASS, StaticDNSResolver.class.getName());
+        conf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorumConfValue);
         addr1 = new BookieSocketAddress("127.0.0.2", 3181);
         addr2 = new BookieSocketAddress("127.0.0.3", 3181);
         addr3 = new BookieSocketAddress("127.0.0.4", 3181);
@@ -564,8 +567,12 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
         addrs.add(addr4);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         // replace node under r2
-        BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, null, new HashSet<>(), addr2, new HashSet<>());
+        Pair<BookieSocketAddress, Boolean> replaceBookieResponse = repp.replaceBookie(1, 1, 1, null, new ArrayList<>(),
+                addr2, new HashSet<>());
+        BookieSocketAddress replacedBookie = replaceBookieResponse.getLeft();
+        boolean isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.getRight();
         assertEquals(addr3, replacedBookie);
+        assertTrue(isEnsembleAdheringToPlacementPolicy);
     }
 
     @Test
@@ -589,10 +596,13 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
         // replace node under r2
         Set<BookieSocketAddress> excludedAddrs = new HashSet<BookieSocketAddress>();
         excludedAddrs.add(addr1);
-        BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, null, new HashSet<>(), addr2, excludedAddrs);
-
+        Pair<BookieSocketAddress, Boolean> replaceBookieResponse = repp.replaceBookie(1, 1, 1, null, new ArrayList<>(),
+                addr2, excludedAddrs);
+        BookieSocketAddress replacedBookie = replaceBookieResponse.getLeft();
+        boolean isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.getRight();
         assertFalse(addr1.equals(replacedBookie));
         assertTrue(addr3.equals(replacedBookie) || addr4.equals(replacedBookie));
+        assertTrue(isEnsembleAdheringToPlacementPolicy);
     }
 
     @Test
@@ -619,7 +629,7 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
         excludedAddrs.add(addr3);
         excludedAddrs.add(addr4);
         try {
-            repp.replaceBookie(1, 1, 1, null, new HashSet<BookieSocketAddress>(), addr2, excludedAddrs);
+            repp.replaceBookie(1, 1, 1, null, new ArrayList<BookieSocketAddress>(), addr2, excludedAddrs);
             fail("Should throw BKNotEnoughBookiesException when there is not enough bookies");
         } catch (BKNotEnoughBookiesException bnebe) {
             // should throw not enou
@@ -628,7 +638,7 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
 
     @Test
     public void testReplaceBookieWithEnoughBookiesInSameRackAsEnsemble() throws Exception {
-        BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);
+        BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.5", 3181);
         BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181);
         BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181);
         BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181);
@@ -645,15 +655,18 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
         addrs.add(addr4);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         // replace node under r2
-        Set<BookieSocketAddress> ensembleBookies = new HashSet<BookieSocketAddress>();
+        List<BookieSocketAddress> ensembleBookies = new ArrayList<BookieSocketAddress>();
         ensembleBookies.add(addr2);
         ensembleBookies.add(addr4);
-        BookieSocketAddress replacedBookie = repp.replaceBookie(
+        Pair<BookieSocketAddress, Boolean> replaceBookieResponse = repp.replaceBookie(
             1, 1, 1 , null,
             ensembleBookies,
             addr4,
             new HashSet<>());
+        BookieSocketAddress replacedBookie = replaceBookieResponse.getLeft();
+        boolean isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.getRight();
         assertEquals(addr1, replacedBookie);
+        assertTrue(isEnsembleAdheringToPlacementPolicy);
     }
 
     @Test
@@ -670,10 +683,18 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
         addrs.add(addr4);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
-            List<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2, null, new HashSet<>());
+            Pair<List<BookieSocketAddress>, Boolean> ensembleResponse;
+            ensembleResponse = repp.newEnsemble(3, 2, 2, null, new HashSet<>());
+            List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
+            boolean isEnsembleAdheringToPlacementPolicy = ensembleResponse.getRight();
             assertEquals(0, getNumCoveredWriteQuorums(ensemble, 2, conf.getMinNumRacksPerWriteQuorum()));
-            List<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, null, new HashSet<>());
+            assertFalse(isEnsembleAdheringToPlacementPolicy);
+            Pair<List<BookieSocketAddress>, Boolean> ensembleResponse2;
+            ensembleResponse2 = repp.newEnsemble(4, 2, 2, null, new HashSet<>());
+            List<BookieSocketAddress> ensemble2 = ensembleResponse2.getLeft();
+            boolean isEnsembleAdheringToPlacementPolicy2 = ensembleResponse2.getRight();
             assertEquals(0, getNumCoveredWriteQuorums(ensemble2, 2, conf.getMinNumRacksPerWriteQuorum()));
+            assertFalse(isEnsembleAdheringToPlacementPolicy2);
         } catch (BKNotEnoughBookiesException bnebe) {
             fail("Should not get not enough bookies exception even there is only one rack.");
         }
@@ -703,17 +724,19 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
         addrs.add(addr3);
         addrs.add(addr4);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
-
+        Pair<List<BookieSocketAddress>, Boolean> ensembleResponse;
         List<BookieSocketAddress> ensemble;
         try {
-            ensemble = repp.newEnsemble(3, 2, 2, null, new HashSet<>());
+            ensembleResponse = repp.newEnsemble(3, 2, 2, null, new HashSet<>());
+            ensemble = ensembleResponse.getLeft();
             fail("Should get not enough bookies exception since there is only one rack.");
         } catch (BKNotEnoughBookiesException bnebe) {
         }
 
         try {
-            ensemble = repp.newEnsemble(3, 2, 2, new HashSet<>(), EnsembleForReplacementWithNoConstraints.INSTANCE,
-                    TruePredicate.INSTANCE);
+            ensembleResponse = repp.newEnsemble(3, 2, 2, new HashSet<>(),
+                    EnsembleForReplacementWithNoConstraints.INSTANCE, TruePredicate.INSTANCE);
+            ensemble = ensembleResponse.getLeft();
             fail("Should get not enough bookies exception since there is only one rack.");
         } catch (BKNotEnoughBookiesException bnebe) {
         }
@@ -766,19 +789,27 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
          * and there are enough bookies in 3 racks, this newEnsemble calls should
          * succeed.
          */
+        Pair<List<BookieSocketAddress>, Boolean> ensembleResponse;
         List<BookieSocketAddress> ensemble;
+        boolean isEnsembleAdheringToPlacementPolicy;
         int ensembleSize = numOfRacks * numOfBookiesPerRack;
         int writeQuorumSize = numOfRacks;
         int ackQuorumSize = numOfRacks;
 
-        ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, null, new HashSet<>());
+        ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, null, new HashSet<>());
+        ensemble = ensembleResponse.getLeft();
+        isEnsembleAdheringToPlacementPolicy = ensembleResponse.getRight();
         assertEquals("Number of writeQuorum sets covered", ensembleSize,
                 getNumCoveredWriteQuorums(ensemble, writeQuorumSize, clientConf.getMinNumRacksPerWriteQuorum()));
+        assertTrue(isEnsembleAdheringToPlacementPolicy);
 
-        ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, new HashSet<>(),
+        ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, new HashSet<>(),
                 EnsembleForReplacementWithNoConstraints.INSTANCE, TruePredicate.INSTANCE);
+        ensemble = ensembleResponse.getLeft();
+        isEnsembleAdheringToPlacementPolicy = ensembleResponse.getRight();
         assertEquals("Number of writeQuorum sets covered", ensembleSize,
                 getNumCoveredWriteQuorums(ensemble, writeQuorumSize, clientConf.getMinNumRacksPerWriteQuorum()));
+        assertTrue(isEnsembleAdheringToPlacementPolicy);
     }
 
     @Test
@@ -822,16 +853,24 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
          * ensembleSizes (as long as there are enough number of bookies in each
          * rack).
          */
+        Pair<List<BookieSocketAddress>, Boolean> ensembleResponse;
         List<BookieSocketAddress> ensemble;
+        boolean isEnsembleAdheringToPlacementPolicy;
         for (int ensembleSize = effectiveMinNumRacksPerWriteQuorum; ensembleSize < 40; ensembleSize++) {
-            ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, null, new HashSet<>());
+            ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, null, new HashSet<>());
+            ensemble = ensembleResponse.getLeft();
+            isEnsembleAdheringToPlacementPolicy = ensembleResponse.getRight();
             assertEquals("Number of writeQuorum sets covered", ensembleSize,
                     getNumCoveredWriteQuorums(ensemble, writeQuorumSize, clientConf.getMinNumRacksPerWriteQuorum()));
+            assertTrue(isEnsembleAdheringToPlacementPolicy);
 
-            ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, new HashSet<>(),
+            ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, new HashSet<>(),
                     EnsembleForReplacementWithNoConstraints.INSTANCE, TruePredicate.INSTANCE);
+            ensemble = ensembleResponse.getLeft();
+            isEnsembleAdheringToPlacementPolicy = ensembleResponse.getRight();
             assertEquals("Number of writeQuorum sets covered", ensembleSize,
                     getNumCoveredWriteQuorums(ensemble, writeQuorumSize, clientConf.getMinNumRacksPerWriteQuorum()));
+            assertTrue(isEnsembleAdheringToPlacementPolicy);
         }
     }
 
@@ -873,12 +912,14 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
          * and there are enough bookies in 3 racks, this newEnsemble call should
          * succeed.
          */
+        Pair<List<BookieSocketAddress>, Boolean> ensembleResponse;
         List<BookieSocketAddress> ensemble;
         int ensembleSize = numOfRacks * numOfBookiesPerRack;
         int writeQuorumSize = numOfRacks;
         int ackQuorumSize = numOfRacks;
 
-        ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, null, new HashSet<>());
+        ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, null, new HashSet<>());
+        ensemble = ensembleResponse.getLeft();
 
         BookieSocketAddress bookieInEnsembleToBeReplaced = ensemble.get(7);
         // get rack of some other bookie
@@ -895,7 +936,7 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
         repp.onClusterChanged(bookieSocketAddresses, new HashSet<BookieSocketAddress>());
         try {
             repp.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, null,
-                    new HashSet<BookieSocketAddress>(ensemble), bookieInEnsembleToBeReplaced, new HashSet<>());
+                    ensemble, bookieInEnsembleToBeReplaced, new HashSet<>());
             fail("Should get not enough bookies exception since there are no more bookies in rack"
                     + "of 'bookieInEnsembleToReplace'"
                     + "and new bookie added belongs to the rack of some other bookie in the ensemble");
@@ -917,16 +958,22 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
          * this replaceBookie should succeed, because a new bookie is added to a
          * new rack.
          */
-        BookieSocketAddress replacedBookieAddress = repp.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize,
-                null, new HashSet<BookieSocketAddress>(ensemble), bookieInEnsembleToBeReplaced, new HashSet<>());
+        Pair<BookieSocketAddress, Boolean> replaceBookieResponse;
+        BookieSocketAddress replacedBookieAddress;
+        boolean isEnsembleAdheringToPlacementPolicy;
+        replaceBookieResponse = repp.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, null, ensemble,
+                bookieInEnsembleToBeReplaced, new HashSet<>());
+        replacedBookieAddress = replaceBookieResponse.getLeft();
+        isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.getRight();
         assertEquals("It should be newBookieAddress2", newBookieAddress2, replacedBookieAddress);
+        assertTrue(isEnsembleAdheringToPlacementPolicy);
 
         Set<BookieSocketAddress> bookiesToExclude = new HashSet<>();
         bookiesToExclude.add(newBookieAddress2);
         repp.onClusterChanged(bookieSocketAddresses, new HashSet<BookieSocketAddress>());
         try {
-            repp.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, null,
-                    new HashSet<BookieSocketAddress>(ensemble), bookieInEnsembleToBeReplaced, bookiesToExclude);
+            repp.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, null, ensemble,
+                    bookieInEnsembleToBeReplaced, bookiesToExclude);
             fail("Should get not enough bookies exception since the only available bookie to replace"
                     + "is added to excludedBookies list");
         } catch (BKNotEnoughBookiesException bnebe) {
@@ -949,9 +996,12 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
          * replaced, so we should be able to replacebookie though
          * newBookieAddress2 is added to excluded bookies list.
          */
-        replacedBookieAddress = repp.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, null,
-                new HashSet<BookieSocketAddress>(ensemble), bookieInEnsembleToBeReplaced, bookiesToExclude);
+        replaceBookieResponse = repp.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, null,
+                ensemble, bookieInEnsembleToBeReplaced, bookiesToExclude);
+        replacedBookieAddress = replaceBookieResponse.getLeft();
+        isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.getRight();
         assertEquals("It should be newBookieAddress3", newBookieAddress3, replacedBookieAddress);
+        assertTrue(isEnsembleAdheringToPlacementPolicy);
     }
 
     @Test
@@ -1298,15 +1348,21 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
             int ensembleSize = 3;
             int writeQuorumSize = 2;
             int acqQuorumSize = 2;
-            List<BookieSocketAddress> ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize, acqQuorumSize,
-                    null, new HashSet<>());
+            Pair<List<BookieSocketAddress>, Boolean> ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize,
+                    acqQuorumSize, null, new HashSet<>());
+            List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
+            boolean isEnsembleAdheringToPlacementPolicy = ensembleResponse.getRight();
             int numCovered = getNumCoveredWriteQuorums(ensemble, writeQuorumSize, conf.getMinNumRacksPerWriteQuorum());
             assertTrue(numCovered >= 1 && numCovered < 3);
+            assertFalse(isEnsembleAdheringToPlacementPolicy);
             ensembleSize = 4;
-            List<BookieSocketAddress> ensemble2 = repp.newEnsemble(ensembleSize, writeQuorumSize, acqQuorumSize,
-                    null, new HashSet<>());
+            Pair<List<BookieSocketAddress>, Boolean> ensembleResponse2 = repp.newEnsemble(ensembleSize, writeQuorumSize,
+                    acqQuorumSize, null, new HashSet<>());
+            List<BookieSocketAddress> ensemble2 = ensembleResponse2.getLeft();
+            boolean isEnsembleAdheringToPlacementPolicy2 = ensembleResponse2.getRight();
             numCovered = getNumCoveredWriteQuorums(ensemble2, writeQuorumSize, conf.getMinNumRacksPerWriteQuorum());
             assertTrue(numCovered >= 1 && numCovered < 3);
+            assertFalse(isEnsembleAdheringToPlacementPolicy2);
         } catch (BKNotEnoughBookiesException bnebe) {
             fail("Should not get not enough bookies exception even there is only one rack.");
         }
@@ -1375,11 +1431,13 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
         repp.initialize(newConf, Optional.<DNSToSwitchMapping> empty(), timer, DISABLE_ALL, NullStatsLogger.INSTANCE);
         repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
-
-        List<BookieSocketAddress> ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize, writeQuorumSize, null,
-                new HashSet<>());
+        Pair<List<BookieSocketAddress>, Boolean> ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize,
+                writeQuorumSize, null, new HashSet<>());
+        List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
+        boolean isEnsembleAdheringToPlacementPolicy = ensembleResponse.getRight();
         int numCovered = getNumCoveredWriteQuorums(ensemble, writeQuorumSize, minNumRacksPerWriteQuorum);
         assertEquals("minimum number of racks covered for writequorum ensemble: " + ensemble, ensembleSize, numCovered);
+        assertTrue(isEnsembleAdheringToPlacementPolicy);
     }
 
     @Test
@@ -1417,16 +1475,22 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
             int ensembleSize = 3;
             int writeQuorumSize = 3;
             int ackQuorumSize = 2;
-            List<BookieSocketAddress> ensemble1 = repp.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize,
-                    null, new HashSet<>());
+            Pair<List<BookieSocketAddress>, Boolean> ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize,
+                    ackQuorumSize, null, new HashSet<>());
+            List<BookieSocketAddress> ensemble1 = ensembleResponse.getLeft();
+            boolean isEnsembleAdheringToPlacementPolicy1 = ensembleResponse.getRight();
             assertEquals(ensembleSize,
                     getNumCoveredWriteQuorums(ensemble1, writeQuorumSize, conf.getMinNumRacksPerWriteQuorum()));
+            assertTrue(isEnsembleAdheringToPlacementPolicy1);
             ensembleSize = 4;
             writeQuorumSize = 4;
-            List<BookieSocketAddress> ensemble2 = repp.newEnsemble(ensembleSize, writeQuorumSize, 2, null,
-                    new HashSet<>());
+            Pair<List<BookieSocketAddress>, Boolean> ensembleResponse2 = repp.newEnsemble(ensembleSize, writeQuorumSize,
+                    2, null, new HashSet<>());
+            List<BookieSocketAddress> ensemble2 = ensembleResponse2.getLeft();
+            boolean isEnsembleAdheringToPlacementPolicy2 = ensembleResponse2.getRight();
             assertEquals(ensembleSize,
                     getNumCoveredWriteQuorums(ensemble2, writeQuorumSize, conf.getMinNumRacksPerWriteQuorum()));
+            assertTrue(isEnsembleAdheringToPlacementPolicy2);
         } catch (BKNotEnoughBookiesException bnebe) {
             fail("Should not get not enough bookies exception even there is only one rack.");
         }
@@ -1497,11 +1561,16 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
         selectionCounts.put(addr3, 0L);
         selectionCounts.put(addr4, 0L);
         int numTries = 50000;
+        Pair<BookieSocketAddress, Boolean> replaceBookieResponse;
+        boolean isEnsembleAdheringToPlacementPolicy;
         BookieSocketAddress replacedBookie;
         for (int i = 0; i < numTries; i++) {
             // replace node under r2
-            replacedBookie = repp.replaceBookie(1, 1, 1, null, new HashSet<>(), addr2, new HashSet<>());
+            replaceBookieResponse = repp.replaceBookie(1, 1, 1, null, new ArrayList<>(), addr2, new HashSet<>());
+            replacedBookie = replaceBookieResponse.getLeft();
+            isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.getRight();
             assertTrue("replaced : " + replacedBookie, addr3.equals(replacedBookie) || addr4.equals(replacedBookie));
+            assertTrue(isEnsembleAdheringToPlacementPolicy);
             selectionCounts.put(replacedBookie, selectionCounts.get(replacedBookie) + 1);
         }
         double observedMultiple = ((double) selectionCounts.get(addr4) / (double) selectionCounts.get(addr3));
@@ -1557,14 +1626,19 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
         selectionCounts.put(addr3, 0L);
         selectionCounts.put(addr4, 0L);
         int numTries = 50000;
+        Pair<BookieSocketAddress, Boolean> replaceBookieResponse;
         BookieSocketAddress replacedBookie;
+        boolean isEnsembleAdheringToPlacementPolicy;
         for (int i = 0; i < numTries; i++) {
             // addr2 is on /r2 and this is the only one on this rack. So the replacement
             // will come from other racks. However, the weight should be honored in such
             // selections as well
-            replacedBookie = repp.replaceBookie(1, 1, 1, null, new HashSet<>(), addr2, new HashSet<>());
+            replaceBookieResponse = repp.replaceBookie(1, 1, 1, null, new ArrayList<>(), addr2, new HashSet<>());
+            replacedBookie = replaceBookieResponse.getLeft();
+            isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.getRight();
             assertTrue(addr0.equals(replacedBookie) || addr1.equals(replacedBookie) || addr3.equals(replacedBookie)
                     || addr4.equals(replacedBookie));
+            assertTrue(isEnsembleAdheringToPlacementPolicy);
             selectionCounts.put(replacedBookie, selectionCounts.get(replacedBookie) + 1);
         }
         /*
@@ -1656,6 +1730,7 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
         int numTries = 10000;
 
         Set<BookieSocketAddress> excludeList = new HashSet<BookieSocketAddress>();
+        Pair<List<BookieSocketAddress>, Boolean> ensembleResponse;
         List<BookieSocketAddress> ensemble;
         int ensembleSize = 3;
         int writeQuorumSize = 2;
@@ -1664,7 +1739,8 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
             // addr2 is on /r2 and this is the only one on this rack. So the replacement
             // will come from other racks. However, the weight should be honored in such
             // selections as well
-            ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize, acqQuorumSize, null, excludeList);
+            ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize, acqQuorumSize, null, excludeList);
+            ensemble = ensembleResponse.getLeft();
             assertTrue(
                     "Rackaware selection not happening "
                             + getNumCoveredWriteQuorums(ensemble, writeQuorumSize, conf.getMinNumRacksPerWriteQuorum()),
@@ -1726,21 +1802,23 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
         bookieInfoMap.put(addr5, new BookieInfo(1000L, 1000L));
 
         repp.updateBookieInfo(bookieInfoMap);
-
-        List<BookieSocketAddress> ensemble = new ArrayList<BookieSocketAddress>();
+        Pair<List<BookieSocketAddress>, Boolean> ensembleResponse;
+        List<BookieSocketAddress> ensemble;
         Set<BookieSocketAddress> excludeList = new HashSet<BookieSocketAddress>();
         try {
             excludeList.add(addr1);
             excludeList.add(addr2);
             excludeList.add(addr3);
             excludeList.add(addr4);
-            ensemble = repp.newEnsemble(3, 2, 2, null, excludeList);
+            ensembleResponse = repp.newEnsemble(3, 2, 2, null, excludeList);
+            ensemble = ensembleResponse.getLeft();
             fail("Should throw BKNotEnoughBookiesException when there is not enough bookies" + ensemble);
         } catch (BKNotEnoughBookiesException e) {
             // this is expected
         }
         try {
-            ensemble = repp.newEnsemble(1, 1, 1, null, excludeList);
+            ensembleResponse = repp.newEnsemble(1, 1, 1, null, excludeList);
+            ensemble = ensembleResponse.getLeft();
         } catch (BKNotEnoughBookiesException e) {
             fail("Should not throw BKNotEnoughBookiesException when there are enough bookies for the ensemble");
         }
@@ -1824,13 +1902,20 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
 
         // we will never use addr4 even it is in the stabilized network topology
         for (int i = 0; i < 5; i++) {
-            List<BookieSocketAddress> ensemble =
-                    repp.newEnsemble(3, 3, 3, null, new HashSet<BookieSocketAddress>());
+            Pair<List<BookieSocketAddress>, Boolean> ensembleResponse = repp.newEnsemble(3, 2, 2, null,
+                    new HashSet<BookieSocketAddress>());
+            List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
+            boolean isEnsembleAdheringToPlacementPolicy = ensembleResponse.getRight();
             assertFalse(ensemble.contains(addr4));
+            assertFalse(isEnsembleAdheringToPlacementPolicy);
         }
 
         // we could still use addr4 for urgent allocation if it is just bookie flapping
-        List<BookieSocketAddress> ensemble = repp.newEnsemble(4, 4, 4, null, new HashSet<BookieSocketAddress>());
+        Pair<List<BookieSocketAddress>, Boolean> ensembleResponse = repp.newEnsemble(4, 2, 2, null,
+                new HashSet<BookieSocketAddress>());
+        List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
+        boolean isEnsembleAdheringToPlacementPolicy = ensembleResponse.getRight();
+        assertFalse(isEnsembleAdheringToPlacementPolicy);
         assertTrue(ensemble.contains(addr4));
     }
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicyUsingScript.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicyUsingScript.java
index d9f2535..e0fd2bd 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicyUsingScript.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicyUsingScript.java
@@ -30,6 +30,8 @@ import static org.junit.Assert.fail;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 import io.netty.util.HashedWheelTimer;
+
+import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
@@ -44,6 +46,7 @@ import org.apache.bookkeeper.net.DNSToSwitchMapping;
 import org.apache.bookkeeper.net.ScriptBasedMapping;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.util.Shell;
+import org.apache.commons.lang3.tuple.Pair;
 import org.junit.After;
 import org.junit.Assume;
 import org.junit.Before;
@@ -111,7 +114,9 @@ public class TestRackawareEnsemblePlacementPolicyUsingScript {
         addrs.add(addr4);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         // replace node under r2
-        BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, null, new HashSet<>(), addr2, new HashSet<>());
+        Pair<BookieSocketAddress, Boolean> replaceBookieResponse = repp.replaceBookie(1, 1, 1, null, new ArrayList<>(),
+                addr2, new HashSet<>());
+        BookieSocketAddress replacedBookie = replaceBookieResponse.getLeft();
         assertEquals(addr3, replacedBookie);
     }
 
@@ -133,7 +138,9 @@ public class TestRackawareEnsemblePlacementPolicyUsingScript {
         // replace node under r2
         Set<BookieSocketAddress> excludedAddrs = new HashSet<BookieSocketAddress>();
         excludedAddrs.add(addr1);
-        BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, null, new HashSet<>(), addr2, excludedAddrs);
+        Pair<BookieSocketAddress, Boolean> replaceBookieResponse = repp.replaceBookie(1, 1, 1, null, new ArrayList<>(),
+                addr2, excludedAddrs);
+        BookieSocketAddress replacedBookie = replaceBookieResponse.getLeft();
 
         assertFalse(addr1.equals(replacedBookie));
         assertTrue(addr3.equals(replacedBookie) || addr4.equals(replacedBookie));
@@ -160,7 +167,7 @@ public class TestRackawareEnsemblePlacementPolicyUsingScript {
         excludedAddrs.add(addr3);
         excludedAddrs.add(addr4);
         try {
-            repp.replaceBookie(1, 1, 1, null, new HashSet<BookieSocketAddress>(), addr2, excludedAddrs);
+            repp.replaceBookie(1, 1, 1, null, new ArrayList<BookieSocketAddress>(), addr2, excludedAddrs);
             fail("Should throw BKNotEnoughBookiesException when there is not enough bookies");
         } catch (BKNotEnoughBookiesException bnebe) {
             // should throw not BKNotEnoughBookiesException
@@ -197,7 +204,9 @@ public class TestRackawareEnsemblePlacementPolicyUsingScript {
         // replace node under r2
         Set<BookieSocketAddress> excludedAddrs = new HashSet<BookieSocketAddress>();
         excludedAddrs.add(addr1);
-        BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, null, new HashSet<>(), addr2, excludedAddrs);
+        Pair<BookieSocketAddress, Boolean> replaceBookieResponse = repp.replaceBookie(1, 1, 1, null, new ArrayList<>(),
+                addr2, excludedAddrs);
+        BookieSocketAddress replacedBookie = replaceBookieResponse.getLeft();
 
         assertFalse(addr1.equals(replacedBookie));
         assertFalse(addr2.equals(replacedBookie));
@@ -235,7 +244,9 @@ public class TestRackawareEnsemblePlacementPolicyUsingScript {
         // replace node under r2
         Set<BookieSocketAddress> excludedAddrs = new HashSet<BookieSocketAddress>();
         excludedAddrs.add(addr1);
-        BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, null, new HashSet<>(), addr2, excludedAddrs);
+        Pair<BookieSocketAddress, Boolean> replaceBookieResponse = repp.replaceBookie(1, 1, 1, null, new ArrayList<>(),
+                addr2, excludedAddrs);
+        BookieSocketAddress replacedBookie = replaceBookieResponse.getLeft();
 
         assertFalse(addr1.equals(replacedBookie));
         assertFalse(addr2.equals(replacedBookie));
@@ -257,9 +268,13 @@ public class TestRackawareEnsemblePlacementPolicyUsingScript {
         addrs.add(addr4);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
-            List<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2, null, new HashSet<>());
+            Pair<List<BookieSocketAddress>, Boolean> ensembleResponse = repp.newEnsemble(3, 2, 2, null,
+                    new HashSet<>());
+            List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
             assertEquals(0, getNumCoveredWriteQuorums(ensemble, 2));
-            List<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, null, new HashSet<>());
+            Pair<List<BookieSocketAddress>, Boolean> ensembleResponse2 = repp.newEnsemble(4, 2, 2, null,
+                    new HashSet<>());
+            List<BookieSocketAddress> ensemble2 = ensembleResponse2.getLeft();
             assertEquals(0, getNumCoveredWriteQuorums(ensemble2, 2));
         } catch (BKNotEnoughBookiesException bnebe) {
             fail("Should not get not enough bookies exception even there is only one rack.");
@@ -281,10 +296,14 @@ public class TestRackawareEnsemblePlacementPolicyUsingScript {
         addrs.add(addr4);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
-            List<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2, null, new HashSet<>());
+            Pair<List<BookieSocketAddress>, Boolean> ensembleResponse = repp.newEnsemble(3, 2, 2, null,
+                    new HashSet<>());
+            List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
             int numCovered = getNumCoveredWriteQuorums(ensemble, 2);
             assertTrue(numCovered == 2);
-            List<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, null, new HashSet<>());
+            Pair<List<BookieSocketAddress>, Boolean> ensembleResponse2 = repp.newEnsemble(4, 2, 2, null,
+                    new HashSet<>());
+            List<BookieSocketAddress> ensemble2 = ensembleResponse2.getLeft();
             numCovered = getNumCoveredWriteQuorums(ensemble2, 2);
             assertTrue(numCovered == 2);
         } catch (BKNotEnoughBookiesException bnebe) {
@@ -315,9 +334,13 @@ public class TestRackawareEnsemblePlacementPolicyUsingScript {
         addrs.add(addr8);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
-            List<BookieSocketAddress> ensemble1 = repp.newEnsemble(3, 2, 2, null, new HashSet<>());
+            Pair<List<BookieSocketAddress>, Boolean> ensembleResponse1 = repp.newEnsemble(3, 2, 2, null,
+                    new HashSet<>());
+            List<BookieSocketAddress> ensemble1 = ensembleResponse1.getLeft();
             assertEquals(3, getNumCoveredWriteQuorums(ensemble1, 2));
-            List<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, null, new HashSet<>());
+            Pair<List<BookieSocketAddress>, Boolean> ensembleResponse2 = repp.newEnsemble(4, 2, 2, null,
+                    new HashSet<>());
+            List<BookieSocketAddress> ensemble2 = ensembleResponse2.getLeft();
             assertEquals(4, getNumCoveredWriteQuorums(ensemble2, 2));
         } catch (BKNotEnoughBookiesException bnebe) {
             fail("Should not get not enough bookies exception.");
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java
index 7dc1d39..3192d04 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java
@@ -42,6 +42,7 @@ import org.apache.bookkeeper.net.DNSToSwitchMapping;
 import org.apache.bookkeeper.net.NetworkTopology;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.util.StaticDNSResolver;
+import org.apache.commons.lang3.tuple.Pair;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -104,8 +105,9 @@ public class TestRackawarePolicyNotificationUpdates extends TestCase {
         int ensembleSize = 3;
         int writeQuorumSize = 2;
         int acqQuorumSize = 2;
-        List<BookieSocketAddress> ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize, acqQuorumSize,
-                Collections.emptyMap(), Collections.emptySet());
+        Pair<List<BookieSocketAddress>, Boolean> ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize,
+                acqQuorumSize, Collections.emptyMap(), Collections.emptySet());
+        List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
         int numCovered = TestRackawareEnsemblePlacementPolicy.getNumCoveredWriteQuorums(ensemble, writeQuorumSize,
                 conf.getMinNumRacksPerWriteQuorum());
         assertTrue(numCovered >= 1 && numCovered < 3);
@@ -118,8 +120,9 @@ public class TestRackawarePolicyNotificationUpdates extends TestCase {
         StaticDNSResolver.changeRack(bookieAddressList, rackList);
         numOfAvailableRacks = numOfAvailableRacks + 1;
         acqQuorumSize = 1;
-        ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize, acqQuorumSize, Collections.emptyMap(),
+        ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize, acqQuorumSize, Collections.emptyMap(),
                 Collections.emptySet());
+        ensemble = ensembleResponse.getLeft();
         assertEquals(3, TestRackawareEnsemblePlacementPolicy.getNumCoveredWriteQuorums(ensemble, writeQuorumSize,
                 conf.getMinNumRacksPerWriteQuorum()));
         assertTrue(ensemble.contains(addr1));
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
index e541230..8e4f10d 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
@@ -51,6 +51,7 @@ import org.apache.bookkeeper.net.NetworkTopology;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.util.BookKeeperConstants;
 import org.apache.bookkeeper.util.StaticDNSResolver;
+import org.apache.commons.lang3.tuple.Pair;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -423,8 +424,9 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
         addrs.add(addr4);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         // replace node under r2
-        BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, null, new HashSet<BookieSocketAddress>(),
-                addr2, new HashSet<BookieSocketAddress>());
+        Pair<BookieSocketAddress, Boolean> replaceBookieResponse = repp.replaceBookie(1, 1, 1, null,
+                new ArrayList<BookieSocketAddress>(), addr2, new HashSet<BookieSocketAddress>());
+        BookieSocketAddress replacedBookie = replaceBookieResponse.getLeft();
         assertEquals(addr3, replacedBookie);
     }
 
@@ -449,8 +451,9 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
         // replace node under r2
         Set<BookieSocketAddress> excludedAddrs = new HashSet<BookieSocketAddress>();
         excludedAddrs.add(addr1);
-        BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, null,
-                new HashSet<BookieSocketAddress>(), addr2, excludedAddrs);
+        Pair<BookieSocketAddress, Boolean> replaceBookieResponse = repp.replaceBookie(1, 1, 1, null,
+                new ArrayList<BookieSocketAddress>(), addr2, excludedAddrs);
+        BookieSocketAddress replacedBookie = replaceBookieResponse.getLeft();
 
         assertFalse(addr1.equals(replacedBookie));
         assertTrue(addr3.equals(replacedBookie) || addr4.equals(replacedBookie));
@@ -475,7 +478,9 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
         addrs.add(addr4);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
-            List<BookieSocketAddress> list = repp.newEnsemble(5, 5, 3, null, new HashSet<BookieSocketAddress>());
+            Pair<List<BookieSocketAddress>, Boolean> ensembleResponse = repp.newEnsemble(5, 5, 3, null,
+                    new HashSet<BookieSocketAddress>());
+            List<BookieSocketAddress> list = ensembleResponse.getLeft();
             LOG.info("Ensemble : {}", list);
             fail("Should throw BKNotEnoughBookiesException when there is not enough bookies");
         } catch (BKNotEnoughBookiesException bnebe) {
@@ -507,7 +512,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
         excludedAddrs.add(addr3);
         excludedAddrs.add(addr4);
         try {
-            repp.replaceBookie(1, 1, 1, null, new HashSet<BookieSocketAddress>(), addr2, excludedAddrs);
+            repp.replaceBookie(1, 1, 1, null, new ArrayList<BookieSocketAddress>(), addr2, excludedAddrs);
             fail("Should throw BKNotEnoughBookiesException when there is not enough bookies");
         } catch (BKNotEnoughBookiesException bnebe) {
             // should throw not enou
@@ -536,11 +541,13 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
         addrs.add(addr4);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
-            List<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2, null,
+            Pair<List<BookieSocketAddress>, Boolean> ensembleResponse = repp.newEnsemble(3, 2, 2, null,
                     new HashSet<BookieSocketAddress>());
+            List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
             assertEquals(0, getNumCoveredRegionsInWriteQuorum(ensemble, 2));
-            List<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, null,
+            Pair<List<BookieSocketAddress>, Boolean> ensembleResponse2 = repp.newEnsemble(4, 2, 2, null,
                     new HashSet<BookieSocketAddress>());
+            List<BookieSocketAddress> ensemble2 = ensembleResponse2.getLeft();
             assertEquals(0, getNumCoveredRegionsInWriteQuorum(ensemble2, 2));
         } catch (BKNotEnoughBookiesException bnebe) {
             fail("Should not get not enough bookies exception even there is only one rack.");
@@ -569,8 +576,9 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
         addrs.add(addr4);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
-            List<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2, null,
+            Pair<List<BookieSocketAddress>, Boolean> ensembleResponse = repp.newEnsemble(3, 2, 2, null,
                     new HashSet<BookieSocketAddress>());
+            List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
             int numCovered = getNumCoveredRegionsInWriteQuorum(ensemble, 2);
             assertTrue(numCovered >= 1);
             assertTrue(numCovered < 3);
@@ -578,8 +586,9 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
             fail("Should not get not enough bookies exception even there is only one rack.");
         }
         try {
-            List<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, null,
+            Pair<List<BookieSocketAddress>, Boolean> ensembleResponse2 = repp.newEnsemble(4, 2, 2, null,
                     new HashSet<BookieSocketAddress>());
+            List<BookieSocketAddress> ensemble2 = ensembleResponse2.getLeft();
             int numCovered = getNumCoveredRegionsInWriteQuorum(ensemble2, 2);
             assertTrue(numCovered >= 1 && numCovered < 3);
         } catch (BKNotEnoughBookiesException bnebe) {
@@ -618,11 +627,13 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
         addrs.add(addr8);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
-            List<BookieSocketAddress> ensemble1 = repp.newEnsemble(3, 2, 2, null,
+            Pair<List<BookieSocketAddress>, Boolean> ensembleResponse1 = repp.newEnsemble(3, 2, 2, null,
                     new HashSet<BookieSocketAddress>());
+            List<BookieSocketAddress> ensemble1 = ensembleResponse1.getLeft();
             assertEquals(3, getNumCoveredRegionsInWriteQuorum(ensemble1, 2));
-            List<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, null,
+            Pair<List<BookieSocketAddress>, Boolean> ensembleResponse2 = repp.newEnsemble(4, 2, 2, null,
                     new HashSet<BookieSocketAddress>());
+            List<BookieSocketAddress> ensemble2 = ensembleResponse2.getLeft();
             assertEquals(4, getNumCoveredRegionsInWriteQuorum(ensemble2, 2));
         } catch (BKNotEnoughBookiesException bnebe) {
             fail("Should not get not enough bookies exception even there is only one rack.");
@@ -669,23 +680,27 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
         addrs.add(addr10);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
-            List<BookieSocketAddress> ensemble = repp.newEnsemble(6, 6, 4, null,
+            Pair<List<BookieSocketAddress>, Boolean> ensembleResponse = repp.newEnsemble(6, 6, 4, null,
                     new HashSet<BookieSocketAddress>());
+            List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
             assert(ensemble.contains(addr4));
             assert(ensemble.contains(addr8));
             assert(ensemble.size() == 6);
             assertEquals(3, getNumRegionsInEnsemble(ensemble));
-            ensemble = repp.newEnsemble(7, 7, 4, null, new HashSet<BookieSocketAddress>());
+            ensembleResponse = repp.newEnsemble(7, 7, 4, null, new HashSet<BookieSocketAddress>());
+            ensemble = ensembleResponse.getLeft();
             assert(ensemble.contains(addr4));
             assert(ensemble.contains(addr8));
             assert(ensemble.size() == 7);
             assertEquals(3, getNumRegionsInEnsemble(ensemble));
-            ensemble = repp.newEnsemble(8, 8, 5, null, new HashSet<BookieSocketAddress>());
+            ensembleResponse = repp.newEnsemble(8, 8, 5, null, new HashSet<BookieSocketAddress>());
+            ensemble = ensembleResponse.getLeft();
             assert(ensemble.contains(addr4));
             assert(ensemble.contains(addr8));
             assert(ensemble.size() == 8);
             assertEquals(3, getNumRegionsInEnsemble(ensemble));
-            ensemble = repp.newEnsemble(9, 9, 5, null, new HashSet<BookieSocketAddress>());
+            ensembleResponse = repp.newEnsemble(9, 9, 5, null, new HashSet<BookieSocketAddress>());
+            ensemble = ensembleResponse.getLeft();
             assert(ensemble.contains(addr4));
             assert(ensemble.contains(addr8));
             assert(ensemble.size() == 9);
@@ -738,8 +753,9 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
             ((SettableFeature) featureProvider.scope("region1").getFeature("disallowBookies")).set(true);
-            List<BookieSocketAddress> ensemble = repp.newEnsemble(6, 6, 4, null,
+            Pair<List<BookieSocketAddress>, Boolean> ensembleResponse = repp.newEnsemble(6, 6, 4, null,
                     new HashSet<BookieSocketAddress>());
+            List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
             assertEquals(2, getNumRegionsInEnsemble(ensemble));
             assert(ensemble.contains(addr1));
             assert(ensemble.contains(addr3));
@@ -753,16 +769,16 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
         }
         try {
             ((SettableFeature) featureProvider.scope("region2").getFeature("disallowBookies")).set(true);
-            List<BookieSocketAddress> ensemble = repp.newEnsemble(6, 6, 4, null,
-                    new HashSet<BookieSocketAddress>());
+            repp.newEnsemble(6, 6, 4, null, new HashSet<BookieSocketAddress>());
             fail("Should get not enough bookies exception even there is only one region with insufficient bookies.");
         } catch (BKNotEnoughBookiesException bnebe) {
             // Expected
         }
         try {
             ((SettableFeature) featureProvider.scope("region2").getFeature("disallowBookies")).set(false);
-            List<BookieSocketAddress> ensemble = repp.newEnsemble(6, 6, 4, null,
+            Pair<List<BookieSocketAddress>, Boolean> ensembleResponse = repp.newEnsemble(6, 6, 4, null,
                     new HashSet<BookieSocketAddress>());
+            List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
             assert(ensemble.contains(addr1));
             assert(ensemble.contains(addr3));
             assert(ensemble.contains(addr4));
@@ -835,8 +851,9 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
 
         try {
-            List<BookieSocketAddress> ensemble = repp.newEnsemble(10, 10, 10, null,
+            Pair<List<BookieSocketAddress>, Boolean> ensembleResponse = repp.newEnsemble(10, 10, 10, null,
                     new HashSet<BookieSocketAddress>());
+            List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
             assert(ensemble.size() == 10);
             assertEquals(5, getNumRegionsInEnsemble(ensemble));
         } catch (BKNotEnoughBookiesException bnebe) {
@@ -847,7 +864,9 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
         try {
             Set<BookieSocketAddress> excludedAddrs = new HashSet<BookieSocketAddress>();
             excludedAddrs.add(addr10);
-            List<BookieSocketAddress> ensemble = repp.newEnsemble(10, 10, 10, null, excludedAddrs);
+            Pair<List<BookieSocketAddress>, Boolean> ensembleResponse = repp.newEnsemble(10, 10, 10, null,
+                    excludedAddrs);
+            List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
             assert(ensemble.contains(addr11) && ensemble.contains(addr12));
             assert(ensemble.size() == 10);
             assertEquals(5, getNumRegionsInEnsemble(ensemble));
@@ -937,9 +956,11 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
             ackQuorum = 5;
         }
 
+        Pair<List<BookieSocketAddress>, Boolean> ensembleResponse;
         List<BookieSocketAddress> ensemble;
         try {
-            ensemble = repp.newEnsemble(6, 6, ackQuorum, null, new HashSet<BookieSocketAddress>());
+            ensembleResponse = repp.newEnsemble(6, 6, ackQuorum, null, new HashSet<BookieSocketAddress>());
+            ensemble = ensembleResponse.getLeft();
             assert(ensemble.size() == 6);
             assertEquals(3, getNumRegionsInEnsemble(ensemble));
         } catch (BKNotEnoughBookiesException bnebe) {
@@ -960,9 +981,9 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
             Set<BookieSocketAddress> excludedAddrs = new HashSet<BookieSocketAddress>();
             for (BookieSocketAddress addr: region2Bookies) {
                 if (ensemble.contains(addr)) {
-                    BookieSocketAddress replacedBookie = repp.replaceBookie(
-                        6, 6, ackQuorum, null,
-                        new HashSet<>(ensemble), addr, excludedAddrs);
+                    Pair<BookieSocketAddress, Boolean> replaceBookieResponse = repp.replaceBookie(6, 6, ackQuorum, null,
+                            ensemble, addr, excludedAddrs);
+                    BookieSocketAddress replacedBookie = replaceBookieResponse.getLeft();
                     ensemble.remove(addr);
                     ensemble.add(replacedBookie);
                 }
@@ -986,9 +1007,9 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
             Set<BookieSocketAddress> excludedAddrs = new HashSet<BookieSocketAddress>();
 
             try {
-                BookieSocketAddress replacedBookie = repp.replaceBookie(
-                    6, 6, ackQuorum, null,
-                    new HashSet<>(ensemble), bookieToReplace, excludedAddrs);
+                Pair<BookieSocketAddress, Boolean> replaceBookieResponse = repp.replaceBookie(6, 6, ackQuorum, null,
+                        ensemble, bookieToReplace, excludedAddrs);
+                BookieSocketAddress replacedBookie = replaceBookieResponse.getLeft();
                 assert (replacedBookie.equals(replacedBookieExpected));
                 assertEquals(3, getNumRegionsInEnsemble(ensemble));
             } catch (BKNotEnoughBookiesException bnebe) {
@@ -997,9 +1018,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
 
             excludedAddrs.add(replacedBookieExpected);
             try {
-                BookieSocketAddress replacedBookie = repp.replaceBookie(
-                    6, 6, ackQuorum, null,
-                    new HashSet<>(ensemble), bookieToReplace, excludedAddrs);
+                repp.replaceBookie(6, 6, ackQuorum, null, ensemble, bookieToReplace, excludedAddrs);
                 if (minDurability > 1 && !disableDurabilityFeature.isAvailable()) {
                     fail("Should throw BKNotEnoughBookiesException when there is not enough bookies");
                 }
@@ -1073,9 +1092,11 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
                     .set(true);
         }
 
+        Pair<List<BookieSocketAddress>, Boolean> ensembleResponse;
         List<BookieSocketAddress> ensemble;
         try {
-            ensemble = repp.newEnsemble(6, 6, 4, null, new HashSet<BookieSocketAddress>());
+            ensembleResponse = repp.newEnsemble(6, 6, 4, null, new HashSet<BookieSocketAddress>());
+            ensemble = ensembleResponse.getLeft();
             assert(ensemble.size() == 6);
         } catch (BKNotEnoughBookiesException bnebe) {
             LOG.error("BKNotEnoughBookiesException", bnebe);
@@ -1086,9 +1107,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
         Set<BookieSocketAddress> excludedAddrs = new HashSet<BookieSocketAddress>();
 
         try {
-            repp.replaceBookie(
-                6, 6, 4, null,
-                new HashSet<>(ensemble), addr4, excludedAddrs);
+            repp.replaceBookie(6, 6, 4, null, ensemble, ensemble.get(2), excludedAddrs);
         } catch (BKNotEnoughBookiesException bnebe) {
             fail("Should not get not enough bookies exception even there is only one rack.");
         }
@@ -1141,8 +1160,8 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
         excludedAddrs.add(addr10);
         excludedAddrs.add(addr9);
         try {
-            List<BookieSocketAddress> list = repp.newEnsemble(5, 5, 5, null, excludedAddrs);
-            LOG.info("Ensemble : {}", list);
+            Pair<List<BookieSocketAddress>, Boolean> list = repp.newEnsemble(5, 5, 5, null, excludedAddrs);
+            LOG.info("Ensemble : {}", list.getLeft());
             fail("Should throw BKNotEnoughBookiesException when there is not enough bookies");
         } catch (BKNotEnoughBookiesException bnebe) {
             // should throw not enou
@@ -1201,8 +1220,9 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
 
     private void basicReorderReadSequenceWithLocalRegionTest(String myRegion, boolean isReadLAC) throws Exception {
         prepareNetworkTopologyForReorderTests(myRegion);
-
-        List<BookieSocketAddress> ensemble = repp.newEnsemble(9, 9, 5, null, new HashSet<BookieSocketAddress>());
+        Pair<List<BookieSocketAddress>, Boolean> ensembleResponse = repp.newEnsemble(9, 9, 5, null,
+                new HashSet<BookieSocketAddress>());
+        List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
         assertEquals(9, getNumCoveredRegionsInWriteQuorum(ensemble, 9));
 
         DistributionSchedule ds = new RoundRobinDistributionSchedule(9, 9, 9);
@@ -1258,7 +1278,9 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
     private void basicReorderReadSequenceWithRemoteRegionTest(String myRegion, boolean isReadLAC) throws Exception {
         prepareNetworkTopologyForReorderTests(myRegion);
 
-        List<BookieSocketAddress> ensemble = repp.newEnsemble(9, 9, 5, null, new HashSet<BookieSocketAddress>());
+        Pair<List<BookieSocketAddress>, Boolean> ensembleResponse = repp.newEnsemble(9, 9, 5, null,
+                new HashSet<BookieSocketAddress>());
+        List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
         assertEquals(9, getNumCoveredRegionsInWriteQuorum(ensemble, 9));
 
         DistributionSchedule ds = new RoundRobinDistributionSchedule(9, 9, 9);
@@ -1328,7 +1350,9 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
 
         prepareNetworkTopologyForReorderTests(myRegion);
 
-        List<BookieSocketAddress> ensemble = repp.newEnsemble(9, 9, 5, null, new HashSet<BookieSocketAddress>());
+        Pair<List<BookieSocketAddress>, Boolean> ensembleResponse = repp.newEnsemble(9, 9, 5, null,
+                new HashSet<BookieSocketAddress>());
+        List<BookieSocketAddress> ensemble = ensembleResponse.getLeft();
         assertEquals(9, getNumCoveredRegionsInWriteQuorum(ensemble, 9));
 
         DistributionSchedule ds = new RoundRobinDistributionSchedule(9, 9, 9);