You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by eo...@apache.org on 2022/09/05 08:13:44 UTC

[bookkeeper] branch master updated: Feature: auto recover support repaired not adhering placement ledger (#3359)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new fc981ba04b Feature: auto recover support repaired not adhering placement ledger (#3359)
fc981ba04b is described below

commit fc981ba04bae126afe3452b76006e80487cc9d84
Author: Yan Zhao <ho...@apache.org>
AuthorDate: Mon Sep 5 16:13:36 2022 +0800

    Feature: auto recover support repaired not adhering placement ledger (#3359)
---
 .../org/apache/bookkeeper/client/BookKeeper.java   |   2 +-
 .../apache/bookkeeper/client/BookKeeperAdmin.java  |  55 ++-
 .../bookkeeper/client/BookieWatcherImpl.java       |   8 +-
 .../bookkeeper/client/EnsemblePlacementPolicy.java |  33 ++
 .../apache/bookkeeper/client/LedgerFragment.java   |  23 +-
 .../client/RackawareEnsemblePlacementPolicy.java   |  22 ++
 .../RackawareEnsemblePlacementPolicyImpl.java      | 221 +++++++++++
 .../TopologyAwareEnsemblePlacementPolicy.java      |  35 +-
 .../bookkeeper/conf/ServerConfiguration.java       |  25 +-
 .../org/apache/bookkeeper/replication/Auditor.java |  16 +
 .../bookkeeper/replication/ReplicationWorker.java  | 106 ++++-
 .../TestRackawareEnsemblePlacementPolicy.java      | 432 +++++++++++++++++++--
 .../TestZoneawareEnsemblePlacementPolicy.java      |  28 +-
 .../AuditorPlacementPolicyCheckTest.java           | 145 +++++++
 .../replication/TestReplicationWorker.java         | 204 ++++++++++
 site3/website/docs/reference/config.md             |  21 +-
 16 files changed, 1301 insertions(+), 75 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index 155bffbece..a48e7d62a3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -568,7 +568,7 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
         bookieQuarantineRatio = 1.0;
     }
 
-    private EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfiguration conf,
+    protected EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfiguration conf,
                                                                       DNSToSwitchMapping dnsResolver,
                                                                       HashedWheelTimer timer,
                                                                       FeatureProvider featureProvider,
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
index d62b620791..2e95ab5a4c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
@@ -33,8 +33,10 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Enumeration;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -90,6 +92,7 @@ import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.AvailabilityOfEntriesOfLedger;
 import org.apache.bookkeeper.util.IOUtils;
+import org.apache.commons.collections4.MapUtils;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
@@ -1095,7 +1098,7 @@ public class BookKeeperAdmin implements AutoCloseable {
                             oldBookie,
                             bookiesToExclude);
             BookieId newBookie = replaceBookieResponse.getResult();
-            PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.isAdheringToPolicy();
+            PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.getAdheringToPolicy();
             if (isEnsembleAdheringToPlacementPolicy == PlacementPolicyAdherence.FAIL) {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug(
@@ -1129,14 +1132,23 @@ public class BookKeeperAdmin implements AutoCloseable {
      * @param ledgerFragment
      *            - LedgerFragment to replicate
      */
-    public void replicateLedgerFragment(LedgerHandle lh,
-            final LedgerFragment ledgerFragment,
-            final BiConsumer<Long, Long> onReadEntryFailureCallback)
-            throws InterruptedException, BKException {
-        Optional<Set<BookieId>> excludedBookies = Optional.empty();
-        Map<Integer, BookieId> targetBookieAddresses =
-                getReplacementBookiesByIndexes(lh, ledgerFragment.getEnsemble(),
-                        ledgerFragment.getBookiesIndexes(), excludedBookies);
+    public void replicateLedgerFragment(LedgerHandle lh, final LedgerFragment ledgerFragment,
+            final BiConsumer<Long, Long> onReadEntryFailureCallback) throws InterruptedException, BKException {
+        Map<Integer, BookieId> targetBookieAddresses = null;
+        if (LedgerFragment.ReplicateType.DATA_LOSS == ledgerFragment.getReplicateType()) {
+            Optional<Set<BookieId>> excludedBookies = Optional.empty();
+            targetBookieAddresses = getReplacementBookiesByIndexes(lh, ledgerFragment.getEnsemble(),
+                    ledgerFragment.getBookiesIndexes(), excludedBookies);
+        } else if (LedgerFragment.ReplicateType.DATA_NOT_ADHERING_PLACEMENT == ledgerFragment.getReplicateType()) {
+            targetBookieAddresses = replaceNotAdheringPlacementPolicyBookie(ledgerFragment.getEnsemble(),
+                    lh.getLedgerMetadata().getWriteQuorumSize(), lh.getLedgerMetadata().getAckQuorumSize());
+            ledgerFragment.getBookiesIndexes().addAll(targetBookieAddresses.keySet());
+        }
+        if (MapUtils.isEmpty(targetBookieAddresses)) {
+            LOG.warn("Could not replicate for {} ledger: {}, not find target bookie.",
+                    ledgerFragment.getReplicateType(), ledgerFragment.getLedgerId());
+            return;
+        }
         replicateLedgerFragment(lh, ledgerFragment, targetBookieAddresses, onReadEntryFailureCallback);
     }
 
@@ -1777,6 +1789,31 @@ public class BookKeeperAdmin implements AutoCloseable {
                 ackQuorumSize);
     }
 
+    public Map<Integer, BookieId> replaceNotAdheringPlacementPolicyBookie(List<BookieId> ensembleBookiesList,
+            int writeQuorumSize, int ackQuorumSize) {
+        try {
+            EnsemblePlacementPolicy.PlacementResult<List<BookieId>> placementResult = bkc.getPlacementPolicy()
+                    .replaceToAdherePlacementPolicy(ensembleBookiesList.size(), writeQuorumSize, ackQuorumSize,
+                            new HashSet<>(), ensembleBookiesList);
+            if (PlacementPolicyAdherence.FAIL != placementResult.getAdheringToPolicy()) {
+                Map<Integer, BookieId> targetMap = new HashMap<>();
+                List<BookieId> newEnsembles = placementResult.getResult();
+                for (int i = 0; i < ensembleBookiesList.size(); i++) {
+                    BookieId originBookie = ensembleBookiesList.get(i);
+                    BookieId newBookie = newEnsembles.get(i);
+                    if (!originBookie.equals(newBookie)) {
+                        targetMap.put(i, newBookie);
+                    }
+                }
+                return targetMap;
+            }
+        } catch (UnsupportedOperationException e) {
+            LOG.warn("The placement policy: {} didn't support replaceToAdherePlacementPolicy, "
+                    + "ignore replace not adhere bookie.", bkc.getPlacementPolicy().getClass().getName());
+        }
+        return Collections.emptyMap();
+    }
+
     /**
      * Makes async request for getting list of entries of ledger from a bookie
      * and returns Future for the result.
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcherImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcherImpl.java
index c2806877ac..652fbcc773 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcherImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcherImpl.java
@@ -269,7 +269,7 @@ class BookieWatcherImpl implements BookieWatcher {
             newEnsembleResponse = placementPolicy.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize,
                     customMetadata, new HashSet<BookieId>(quarantinedBookiesSet));
             socketAddresses = newEnsembleResponse.getResult();
-            isEnsembleAdheringToPlacementPolicy = newEnsembleResponse.isAdheringToPolicy();
+            isEnsembleAdheringToPlacementPolicy = newEnsembleResponse.getAdheringToPolicy();
             if (isEnsembleAdheringToPlacementPolicy == PlacementPolicyAdherence.FAIL) {
                 ensembleNotAdheringToPlacementPolicy.inc();
                 if (ensembleSize > 1) {
@@ -286,7 +286,7 @@ class BookieWatcherImpl implements BookieWatcher {
             newEnsembleResponse = placementPolicy.newEnsemble(
                     ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, new HashSet<>());
             socketAddresses = newEnsembleResponse.getResult();
-            isEnsembleAdheringToPlacementPolicy = newEnsembleResponse.isAdheringToPolicy();
+            isEnsembleAdheringToPlacementPolicy = newEnsembleResponse.getAdheringToPolicy();
             if (isEnsembleAdheringToPlacementPolicy == PlacementPolicyAdherence.FAIL) {
                 ensembleNotAdheringToPlacementPolicy.inc();
                 log.warn("New ensemble: {} is not adhering to Placement Policy", socketAddresses);
@@ -317,7 +317,7 @@ class BookieWatcherImpl implements BookieWatcher {
                     ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata,
                     existingBookies, addr, excludedBookiesAndQuarantinedBookies);
             socketAddress = replaceBookieResponse.getResult();
-            isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.isAdheringToPolicy();
+            isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.getAdheringToPolicy();
             if (isEnsembleAdheringToPlacementPolicy == PlacementPolicyAdherence.FAIL) {
                 ensembleNotAdheringToPlacementPolicy.inc();
                 log.warn(
@@ -333,7 +333,7 @@ class BookieWatcherImpl implements BookieWatcher {
             replaceBookieResponse = placementPolicy.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize,
                     customMetadata, existingBookies, addr, excludeBookies);
             socketAddress = replaceBookieResponse.getResult();
-            isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.isAdheringToPolicy();
+            isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.getAdheringToPolicy();
             if (isEnsembleAdheringToPlacementPolicy == PlacementPolicyAdherence.FAIL) {
                 ensembleNotAdheringToPlacementPolicy.inc();
                 log.warn(
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
index 39aec4574c..05a687b964 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
@@ -440,6 +440,31 @@ public interface EnsemblePlacementPolicy {
         return true;
     }
 
+    /**
+     * Returns placement result. If the currentEnsemble is not adhering placement policy, returns new ensemble that
+     * adheres placement policy. It should be implemented so as to minify the number of bookies replaced.
+     *
+     * @param ensembleSize
+     *            ensemble size
+     * @param writeQuorumSize
+ *                writeQuorumSize of the ensemble
+     * @param ackQuorumSize
+     *            ackQuorumSize of the ensemble
+     * @param excludeBookies
+     *            bookies that should not be considered as targets
+     * @param currentEnsemble
+     *            current ensemble
+     * @return a placement result
+     */
+    default PlacementResult<List<BookieId>> replaceToAdherePlacementPolicy(
+            int ensembleSize,
+            int writeQuorumSize,
+            int ackQuorumSize,
+            Set<BookieId> excludeBookies,
+            List<BookieId> currentEnsemble) {
+        throw new UnsupportedOperationException();
+    }
+
     /**
      * enum for PlacementPolicyAdherence. Currently we are supporting tri-value
      * enum for PlacementPolicyAdherence. If placement policy is met strictly
@@ -481,8 +506,16 @@ public interface EnsemblePlacementPolicy {
             return result;
         }
 
+        /**
+         * Use {@link #getAdheringToPolicy}.
+         */
+        @Deprecated
         public PlacementPolicyAdherence isAdheringToPolicy() {
             return policyAdherence;
         }
+
+        public PlacementPolicyAdherence getAdheringToPolicy() {
+            return policyAdherence;
+        }
     }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java
index 1fb1e50cb0..2dffa10f83 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java
@@ -39,8 +39,9 @@ public class LedgerFragment {
     private final long ledgerId;
     private final DistributionSchedule schedule;
     private final boolean isLedgerClosed;
+    private ReplicateType replicateType = ReplicateType.DATA_LOSS;
 
-    LedgerFragment(LedgerHandle lh,
+    public LedgerFragment(LedgerHandle lh,
                    long firstEntryId,
                    long lastKnownEntryId,
                    Set<Integer> bookieIndexes) {
@@ -56,7 +57,7 @@ public class LedgerFragment {
                 || !ensemble.equals(ensembles.get(ensembles.lastKey()));
     }
 
-    LedgerFragment(LedgerFragment lf, Set<Integer> subset) {
+    public LedgerFragment(LedgerFragment lf, Set<Integer> subset) {
         this.ledgerId = lf.ledgerId;
         this.firstEntryId = lf.firstEntryId;
         this.lastKnownEntryId = lf.lastKnownEntryId;
@@ -91,7 +92,7 @@ public class LedgerFragment {
         return isLedgerClosed;
     }
 
-    long getLedgerId() {
+    public long getLedgerId() {
         return ledgerId;
     }
 
@@ -217,6 +218,14 @@ public class LedgerFragment {
         return this.ensemble;
     }
 
+    public ReplicateType getReplicateType() {
+        return replicateType;
+    }
+
+    public void setReplicateType(ReplicateType replicateType) {
+        this.replicateType = replicateType;
+    }
+
     @Override
     public String toString() {
         return String.format("Fragment(LedgerID: %d, FirstEntryID: %d[%d], "
@@ -224,4 +233,12 @@ public class LedgerFragment {
                 getFirstStoredEntryId(), lastKnownEntryId, getLastStoredEntryId(),
                 getAddresses(), isLedgerClosed);
     }
+
+    /**
+     * ReplicateType.
+     */
+    public enum ReplicateType {
+        DATA_LOSS,
+        DATA_NOT_ADHERING_PLACEMENT
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
index 653b1a5e5d..72858f188f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
@@ -235,6 +235,28 @@ public class RackawareEnsemblePlacementPolicy extends RackawareEnsemblePlacement
         }
     }
 
+    @Override
+    public PlacementResult<List<BookieId>> replaceToAdherePlacementPolicy(
+            int ensembleSize,
+            int writeQuorumSize,
+            int ackQuorumSize,
+            Set<BookieId> excludeBookies,
+            List<BookieId> currentEnsemble) {
+        final PlacementResult<List<BookieId>> placementResult =
+                super.replaceToAdherePlacementPolicy(ensembleSize, writeQuorumSize, ackQuorumSize,
+                        excludeBookies, currentEnsemble);
+        if (placementResult.getAdheringToPolicy() != PlacementPolicyAdherence.FAIL) {
+            return placementResult;
+        } else {
+            if (slave == null) {
+                return placementResult;
+            } else {
+                return slave.replaceToAdherePlacementPolicy(ensembleSize, writeQuorumSize, ackQuorumSize,
+                        excludeBookies, currentEnsemble);
+            }
+        }
+    }
+
     @Override
     public void handleBookiesThatLeft(Set<BookieId> leftBookies) {
         super.handleBookiesThatLeft(leftBookies);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
index c6625b55e8..d3d557d14b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
@@ -26,6 +26,7 @@ import static org.apache.bookkeeper.client.BookKeeperClientStats.NUM_WRITABLE_BO
 import static org.apache.bookkeeper.client.BookKeeperClientStats.READ_REQUESTS_REORDERED;
 import static org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy.UNKNOWN_REGION;
 
+import com.beust.jcommander.internal.Lists;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
@@ -43,6 +44,8 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
 import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
 import org.apache.bookkeeper.client.WeightedRandomSelection.WeightedObject;
@@ -66,6 +69,7 @@ import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stats.annotations.StatsDoc;
 import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -1079,4 +1083,221 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP
         }
         return rackCounter.size() >= minWriteQuorumNumRacksPerWriteQuorum;
     }
+
+    @Override
+    public PlacementResult<List<BookieId>> replaceToAdherePlacementPolicy(
+            int ensembleSize,
+            int writeQuorumSize,
+            int ackQuorumSize,
+            Set<BookieId> excludeBookies,
+            List<BookieId> currentEnsemble) {
+        rwLock.readLock().lock();
+        try {
+            PlacementPolicyAdherence currentPlacementAdherence = isEnsembleAdheringToPlacementPolicy(
+                    currentEnsemble, writeQuorumSize, ackQuorumSize);
+            if (PlacementPolicyAdherence.FAIL != currentPlacementAdherence) {
+                return PlacementResult.of(new ArrayList<>(currentEnsemble), currentPlacementAdherence);
+            }
+            for (BookieId bookieId : currentEnsemble) {
+                if (!knownBookies.containsKey(bookieId)) {
+                    excludeBookies.add(bookieId);
+                }
+            }
+            int minNumRacksPerWriteQuorumForThisEnsemble = Math.min(writeQuorumSize, minNumRacksPerWriteQuorum);
+            int numRacks = topology.getNumOfRacks();
+            // only one rack or less than minNumRacksPerWriteQuorumForThisEnsemble, stop calculation to skip relocation
+            if (numRacks < 2 || numRacks < minNumRacksPerWriteQuorumForThisEnsemble) {
+                LOG.warn("Skip ensemble relocation because the cluster has only {} rack.", numRacks);
+                return PlacementResult.of(Collections.emptyList(), PlacementPolicyAdherence.FAIL);
+            }
+            PlacementResult<List<BookieId>> placementResult = PlacementResult.of(Collections.emptyList(),
+                    PlacementPolicyAdherence.FAIL);
+            int minDiffer = Integer.MAX_VALUE;
+            for (int i = 0; i < currentEnsemble.size(); i++) {
+                PlacementResult<List<BookieId>> result = doReplaceToAdherePlacementPolicy(ensembleSize,
+                        writeQuorumSize, ackQuorumSize, excludeBookies, currentEnsemble, i);
+                if (PlacementPolicyAdherence.FAIL == result.getAdheringToPolicy()) {
+                    continue;
+                }
+                int differ = differBetweenBookies(currentEnsemble, result.getResult());
+                if (differ < minDiffer) {
+                    minDiffer = differ;
+                    placementResult = result;
+                    if (minDiffer == 1) {
+                        break;
+                    }
+                }
+            }
+            return placementResult;
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    private PlacementResult<List<BookieId>> doReplaceToAdherePlacementPolicy(
+            int ensembleSize,
+            int writeQuorumSize,
+            int ackQuorumSize,
+            Set<BookieId> excludeBookies,
+            List<BookieId> currentEnsemble,
+            int startIndex) {
+        final List<BookieNode> provisionalEnsembleNodes = currentEnsemble.stream()
+                .map(this::convertBookieToNode).collect(Collectors.toList());
+        final Set<Node> excludeNodes = convertBookiesToNodes(
+                addDefaultRackBookiesIfMinNumRacksIsEnforced(excludeBookies));
+        int minNumRacksPerWriteQuorumForThisEnsemble = Math.min(writeQuorumSize, minNumRacksPerWriteQuorum);
+        final RRTopologyAwareCoverageEnsemble ensemble =
+                new RRTopologyAwareCoverageEnsemble(
+                        ensembleSize,
+                        writeQuorumSize,
+                        ackQuorumSize,
+                        RACKNAME_DISTANCE_FROM_LEAVES,
+                        null,
+                        null,
+                        minNumRacksPerWriteQuorumForThisEnsemble);
+        BookieNode prevNode = null;
+        final BookieNode firstNode = provisionalEnsembleNodes.get(startIndex);
+        // use same bookie at first to reduce ledger replication
+        if (!excludeNodes.contains(firstNode) && ensemble.apply(firstNode, ensemble)
+                && ensemble.addNode(firstNode)) {
+            excludeNodes.add(firstNode);
+            prevNode = firstNode;
+        }
+        for (int i = prevNode == null ? 0 : 1; i < ensembleSize; i++) {
+            int index = (startIndex + i) % ensembleSize;
+            final String curRack;
+            if (null == prevNode) {
+                if ((null == localNode) || defaultRack.equals(localNode.getNetworkLocation())) {
+                    curRack = NodeBase.ROOT;
+                } else {
+                    curRack = localNode.getNetworkLocation();
+                }
+            } else {
+                curRack = "~" + prevNode.getNetworkLocation();
+            }
+            try {
+                prevNode = replaceToAdherePlacementPolicyInternal(
+                        curRack, excludeNodes, ensemble, ensemble,
+                        provisionalEnsembleNodes, index, ensembleSize, minNumRacksPerWriteQuorumForThisEnsemble);
+                // got a good candidate
+                if (ensemble.addNode(prevNode)) {
+                    // add the candidate to exclude set
+                    excludeNodes.add(prevNode);
+                } else {
+                    throw new BKNotEnoughBookiesException();
+                }
+                // replace to newer node
+                provisionalEnsembleNodes.set(index, prevNode);
+            } catch (BKNotEnoughBookiesException e) {
+                LOG.warn("Skip ensemble relocation because the cluster has not enough bookies.");
+                return PlacementResult.of(Collections.emptyList(), PlacementPolicyAdherence.FAIL);
+            }
+        }
+        List<BookieId> bookieList = ensemble.toList();
+        if (ensembleSize != bookieList.size()) {
+            LOG.warn("Not enough {} bookies are available to form an ensemble : {}.",
+                    ensembleSize, bookieList);
+            return PlacementResult.of(Collections.emptyList(), PlacementPolicyAdherence.FAIL);
+        }
+        PlacementPolicyAdherence placementPolicyAdherence = isEnsembleAdheringToPlacementPolicy(bookieList,
+                writeQuorumSize, ackQuorumSize);
+        if (PlacementPolicyAdherence.FAIL == placementPolicyAdherence) {
+            return PlacementResult.of(Collections.emptyList(), PlacementPolicyAdherence.FAIL);
+        }
+        return PlacementResult.of(revertBookieListByIndex(bookieList, startIndex), placementPolicyAdherence);
+    }
+
+    private List<BookieId> revertBookieListByIndex(List<BookieId> bookies, int startIndex) {
+        BookieId[] bookieIds = new BookieId[bookies.size()];
+        for (int i = 0; i < bookies.size(); i++) {
+            if (startIndex == bookies.size()) {
+                startIndex = 0;
+            }
+            bookieIds[startIndex++] = bookies.get(i);
+        }
+        return Lists.newArrayList(bookieIds);
+    }
+
+    private BookieNode replaceToAdherePlacementPolicyInternal(
+            String netPath, Set<Node> excludeBookies, Predicate<BookieNode> predicate,
+            Ensemble<BookieNode> ensemble, List<BookieNode> provisionalEnsembleNodes, int ensembleIndex,
+            int ensembleSize, int minNumRacksPerWriteQuorumForThisEnsemble) throws BKNotEnoughBookiesException {
+        final BookieNode currentNode = provisionalEnsembleNodes.get(ensembleIndex);
+        // if the current bookie could be applied to the ensemble, apply it to minify the number of bookies replaced
+        if (!excludeBookies.contains(currentNode) && predicate.apply(currentNode, ensemble)) {
+            return currentNode;
+        }
+        final List<Pair<String, List<BookieNode>>> conditionList = new ArrayList<>();
+        final Set<String> preExcludeRacks = new HashSet<>();
+        final Set<String> postExcludeRacks = new HashSet<>();
+        for (int i = 0; i < minNumRacksPerWriteQuorumForThisEnsemble - 1; i++) {
+            preExcludeRacks.add(provisionalEnsembleNodes.get(Math.floorMod((ensembleIndex - i - 1), ensembleSize))
+                    .getNetworkLocation());
+            postExcludeRacks.add(provisionalEnsembleNodes.get(Math.floorMod((ensembleIndex + i + 1), ensembleSize))
+                    .getNetworkLocation());
+        }
+        // adhere minNumRacksPerWriteQuorum by preExcludeRacks
+        // avoid additional replace from write quorum candidates by preExcludeRacks and postExcludeRacks
+        // avoid to use first candidate bookies for election by provisionalEnsembleNodes
+        conditionList.add(Pair.of(
+                "~" + String.join(",",
+                        Stream.concat(preExcludeRacks.stream(), postExcludeRacks.stream()).collect(Collectors.toSet())),
+                provisionalEnsembleNodes
+        ));
+        // avoid to use same rack between previous index by netPath
+        // avoid to use first candidate bookies for election by provisionalEnsembleNodes
+        conditionList.add(Pair.of(netPath, provisionalEnsembleNodes));
+        // avoid to use same rack between previous index by netPath
+        conditionList.add(Pair.of(netPath, Collections.emptyList()));
+
+        for (Pair<String, List<BookieNode>> condition : conditionList) {
+            WeightedRandomSelection<BookieNode> wRSelection = null;
+
+            final List<Node> leaves = new ArrayList<>(topology.getLeaves(condition.getLeft()));
+            if (!isWeighted) {
+                Collections.shuffle(leaves);
+            } else {
+                if (CollectionUtils.subtract(leaves, excludeBookies).size() < 1) {
+                    throw new BKNotEnoughBookiesException();
+                }
+                wRSelection = prepareForWeightedSelection(leaves);
+                if (wRSelection == null) {
+                    throw new BKNotEnoughBookiesException();
+                }
+            }
+
+            final Iterator<Node> it = leaves.iterator();
+            final Set<Node> bookiesSeenSoFar = new HashSet<>();
+            while (true) {
+                Node n;
+                if (isWeighted) {
+                    if (bookiesSeenSoFar.size() == leaves.size()) {
+                        // Don't loop infinitely.
+                        break;
+                    }
+                    n = wRSelection.getNextRandom();
+                    bookiesSeenSoFar.add(n);
+                } else {
+                    if (it.hasNext()) {
+                        n = it.next();
+                    } else {
+                        break;
+                    }
+                }
+                if (excludeBookies.contains(n)) {
+                    continue;
+                }
+                if (!(n instanceof BookieNode) || !predicate.apply((BookieNode) n, ensemble)) {
+                    continue;
+                }
+                // additional excludeBookies
+                if (condition.getRight().contains(n)) {
+                    continue;
+                }
+                BookieNode bn = (BookieNode) n;
+                return bn;
+            }
+        }
+        throw new BKNotEnoughBookiesException();
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
index 805e60b1ac..852b7f3e35 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
@@ -52,6 +52,7 @@ import org.apache.bookkeeper.proto.BookieAddressResolver;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.annotations.StatsDoc;
+import org.apache.commons.collections4.CollectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -763,6 +764,22 @@ abstract class TopologyAwareEnsemblePlacementPolicy implements
         }
     }
 
+    public static int differBetweenBookies(List<BookieId> bookiesA, List<BookieId> bookiesB) {
+        if (CollectionUtils.isEmpty(bookiesA) || CollectionUtils.isEmpty(bookiesB)) {
+            return Integer.MAX_VALUE;
+        }
+        if (bookiesA.size() != bookiesB.size()) {
+            return Integer.MAX_VALUE;
+        }
+        int differ = 0;
+        for (int i = 0; i < bookiesA.size(); i++) {
+            if (!bookiesA.get(i).equals(bookiesB.get(i))) {
+                differ++;
+            }
+        }
+        return differ;
+    }
+
     @Override
     public void updateBookieInfo(Map<BookieId, BookieInfo> bookieInfoMap) {
         if (!isWeighted) {
@@ -812,15 +829,19 @@ abstract class TopologyAwareEnsemblePlacementPolicy implements
         }
     }
 
-    protected Set<Node> convertBookiesToNodes(Collection<BookieId> excludeBookies) {
+    protected Set<Node> convertBookiesToNodes(Collection<BookieId> bookies) {
         Set<Node> nodes = new HashSet<Node>();
-        for (BookieId addr : excludeBookies) {
-            BookieNode bn = knownBookies.get(addr);
-            if (null == bn) {
-                bn = createBookieNode(addr);
-            }
-            nodes.add(bn);
+        for (BookieId addr : bookies) {
+            nodes.add(convertBookieToNode(addr));
         }
         return nodes;
     }
+
+    protected BookieNode convertBookieToNode(BookieId addr) {
+        BookieNode bn = knownBookies.get(addr);
+        if (null == bn) {
+            bn = createBookieNode(addr);
+        }
+        return bn;
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index 8044c34c52..05a4dec907 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -204,6 +204,8 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
     protected static final String AUDITOR_PERIODIC_BOOKIE_CHECK_INTERVAL = "auditorPeriodicBookieCheckInterval";
     protected static final String AUDITOR_PERIODIC_PLACEMENT_POLICY_CHECK_INTERVAL =
                                                                 "auditorPeriodicPlacementPolicyCheckInterval";
+    protected static final String REPAIRED_PLACEMENT_POLICY_NOT_ADHERING_BOOKIE_ENABLED =
+                                                                "repairedPlacementPolicyNotAdheringBookieEnabled";
     protected static final String AUDITOR_LEDGER_VERIFICATION_PERCENTAGE = "auditorLedgerVerificationPercentage";
     protected static final String AUTO_RECOVERY_DAEMON_ENABLED = "autoRecoveryDaemonEnabled";
     protected static final String LOST_BOOKIE_RECOVERY_DELAY = "lostBookieRecoveryDelay";
@@ -2620,12 +2622,33 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
      * Get the regularity at which the auditor does placement policy check of
      * all ledgers, which are closed.
      *
-     * @return The interval in seconds. By default it is disabled.
+     * @return The interval in seconds. By default, it is disabled.
      */
     public long getAuditorPeriodicPlacementPolicyCheckInterval() {
         return getLong(AUDITOR_PERIODIC_PLACEMENT_POLICY_CHECK_INTERVAL, 0);
     }
 
+    public void setRepairedPlacementPolicyNotAdheringBookieEnable(boolean enabled) {
+        setProperty(REPAIRED_PLACEMENT_POLICY_NOT_ADHERING_BOOKIE_ENABLED, enabled);
+    }
+
+    /**
+     * Now the feature only support RackawareEnsemblePlacementPolicy.
+     *
+     * In Auditor, it combines with {@link #getAuditorPeriodicPlacementPolicyCheckInterval}, to control is marked
+     * ledger id to under replication managed when found a ledger ensemble not adhere to placement policy.
+     * In ReplicationWorker, to control is to repair the ledger which the ensemble does not adhere to the placement
+     * policy. By default, it is disabled.
+     *
+     * If you want to enable this feature, there maybe lots of ledger will be mark underreplicated.
+     * The replicationWorker will replicate lots of ledger, it will increase read request and write request in bookie
+     * server. You should set a suitable rereplicationEntryBatchSize to avoid bookie server pressure.
+     *
+     */
+    public boolean isRepairedPlacementPolicyNotAdheringBookieEnable() {
+        return getBoolean(REPAIRED_PLACEMENT_POLICY_NOT_ADHERING_BOOKIE_ENABLED, false);
+    }
+
     /**
      * Sets the grace period (in seconds) for underreplicated ledgers recovery.
      * If ledger is marked underreplicated for more than this period then it
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
index fa6f7195c1..243203da0e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
@@ -1470,6 +1470,22 @@ public class Auditor implements AutoCloseable {
                             }
                             if (foundSegmentNotAdheringToPlacementPolicy) {
                                 numOfLedgersFoundNotAdheringInPlacementPolicyCheck.incrementAndGet();
+                                //If user enable repaired, mark this ledger to under replication manager.
+                                if (conf.isRepairedPlacementPolicyNotAdheringBookieEnable()) {
+                                    ledgerUnderreplicationManager.markLedgerUnderreplicatedAsync(ledgerId,
+                                            Collections.emptyList()).whenComplete((res, e) -> {
+                                        if (e != null) {
+                                            LOG.error("For ledger: {}, the placement policy not adhering bookie "
+                                                    + "storage, mark it to under replication manager failed.",
+                                                    ledgerId, e);
+                                            return;
+                                        }
+                                        if (LOG.isDebugEnabled()) {
+                                            LOG.debug("For ledger: {}, the placement policy not adhering bookie"
+                                                    + " storage, mark it to under replication manager", ledgerId);
+                                        }
+                                    });
+                                }
                             } else if (foundSegmentSoftlyAdheringToPlacementPolicy) {
                                 numOfLedgersFoundSoftlyAdheringInPlacementPolicyCheck.incrementAndGet();
                             }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
index c4a0e430d6..e15f5c1407 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
@@ -34,7 +34,9 @@ import com.google.common.cache.LoadingCache;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
@@ -42,6 +44,7 @@ import java.util.Set;
 import java.util.SortedMap;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -53,12 +56,15 @@ import org.apache.bookkeeper.client.BKException.BKNoSuchLedgerExistsOnMetadataSe
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.BookKeeperAdmin;
+import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
 import org.apache.bookkeeper.client.LedgerChecker;
 import org.apache.bookkeeper.client.LedgerFragment;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
 import org.apache.bookkeeper.net.BookieId;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
@@ -69,6 +75,7 @@ import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stats.annotations.StatsDoc;
+import org.apache.bookkeeper.versioning.Versioned;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -100,6 +107,7 @@ public class ReplicationWorker implements Runnable {
     private final long lockReleaseOfFailedLedgerGracePeriod;
     private final long baseBackoffForLockReleaseOfFailedLedger;
     private final BiConsumer<Long, Long> onReadEntryFailureCallback;
+    private final LedgerManager ledgerManager;
 
     // Expose Stats
     private final StatsLogger statsLogger;
@@ -173,6 +181,7 @@ public class ReplicationWorker implements Runnable {
         this.ownBkc = ownBkc;
 
         this.underreplicationManager = bkc.getLedgerManagerFactory().newLedgerUnderreplicationManager();
+        this.ledgerManager = bkc.getLedgerManagerFactory().newLedgerManager();
         this.admin = new BookKeeperAdmin(bkc, statsLogger, new ClientConfiguration(conf));
         this.ledgerChecker = new LedgerChecker(bkc);
         this.workerThread = new BookieThread(this, "ReplicationWorker");
@@ -359,6 +368,65 @@ public class ReplicationWorker implements Runnable {
         return (returnRCValue.get() == BKException.Code.OK);
     }
 
+    private Set<LedgerFragment> getNeedRepairedPlacementNotAdheringFragments(LedgerHandle lh) {
+        if (!conf.isRepairedPlacementPolicyNotAdheringBookieEnable()) {
+            return Collections.emptySet();
+        }
+        long ledgerId = lh.getId();
+        Set<LedgerFragment> placementNotAdheringFragments = new HashSet<>();
+        CompletableFuture<Versioned<LedgerMetadata>> future = ledgerManager.readLedgerMetadata(
+                ledgerId).whenComplete((metadataVer, exception) -> {
+            if (exception == null) {
+                LedgerMetadata metadata = metadataVer.getValue();
+                int writeQuorumSize = metadata.getWriteQuorumSize();
+                int ackQuorumSize = metadata.getAckQuorumSize();
+                if (!metadata.isClosed()) {
+                    return;
+                }
+                Long curEntryId = null;
+                EnsemblePlacementPolicy.PlacementPolicyAdherence previousSegmentAdheringToPlacementPolicy = null;
+
+                for (Map.Entry<Long, ? extends List<BookieId>> entry : metadata.getAllEnsembles().entrySet()) {
+                    if (curEntryId != null) {
+                        if (EnsemblePlacementPolicy.PlacementPolicyAdherence.FAIL
+                                == previousSegmentAdheringToPlacementPolicy) {
+                            LedgerFragment ledgerFragment = new LedgerFragment(lh, curEntryId,
+                                    entry.getKey() - 1, new HashSet<>());
+                            ledgerFragment.setReplicateType(LedgerFragment.ReplicateType.DATA_NOT_ADHERING_PLACEMENT);
+                            placementNotAdheringFragments.add(ledgerFragment);
+                        }
+                    }
+                    previousSegmentAdheringToPlacementPolicy =
+                            admin.isEnsembleAdheringToPlacementPolicy(entry.getValue(),
+                                    writeQuorumSize, ackQuorumSize);
+                    curEntryId = entry.getKey();
+                }
+                if (curEntryId != null) {
+                    if (EnsemblePlacementPolicy.PlacementPolicyAdherence.FAIL
+                            == previousSegmentAdheringToPlacementPolicy) {
+                        long lastEntry = lh.getLedgerMetadata().getLastEntryId();
+                        LedgerFragment ledgerFragment = new LedgerFragment(lh, curEntryId, lastEntry,
+                                new HashSet<>());
+                        ledgerFragment.setReplicateType(LedgerFragment.ReplicateType.DATA_NOT_ADHERING_PLACEMENT);
+                        placementNotAdheringFragments.add(ledgerFragment);
+                    }
+                }
+            } else if (BKException.getExceptionCode(exception)
+                    == BKException.Code.NoSuchLedgerExistsOnMetadataServerException) {
+                LOG.debug("Ignoring replication of already deleted ledger {}", ledgerId);
+            } else {
+                LOG.warn("Unable to read the ledger: {} information", ledgerId);
+            }
+        });
+        try {
+            FutureUtils.result(future);
+        } catch (Exception e) {
+            LOG.warn("Check ledger need repaired placement not adhering bookie failed", e);
+            return Collections.emptySet();
+        }
+        return placementNotAdheringFragments;
+    }
+
     @SuppressFBWarnings("RCN_REDUNDANT_NULLCHECK_WOULD_HAVE_BEEN_A_NPE")
     private boolean rereplicate(long ledgerIdToReplicate) throws InterruptedException, BKException,
             UnavailableException {
@@ -369,8 +437,8 @@ public class ReplicationWorker implements Runnable {
         boolean deferLedgerLockRelease = false;
 
         try (LedgerHandle lh = admin.openLedgerNoRecovery(ledgerIdToReplicate)) {
-            Set<LedgerFragment> fragments =
-                getUnderreplicatedFragments(lh, conf.getAuditorLedgerVerificationPercentage());
+            Set<LedgerFragment> fragments = getUnderreplicatedFragments(lh,
+                    conf.getAuditorLedgerVerificationPercentage());
 
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Founds fragments {} for replication from ledger: {}", fragments, ledgerIdToReplicate);
@@ -502,10 +570,40 @@ public class ReplicationWorker implements Runnable {
      */
     private Set<LedgerFragment> getUnderreplicatedFragments(LedgerHandle lh, Long ledgerVerificationPercentage)
             throws InterruptedException {
+        //The data loss fragments is first to repair. If a fragment is data_loss and not_adhering_placement
+        //at the same time, we only fix data_loss in this time. After fix data_loss, the fragment is still
+        //not_adhering_placement, Auditor will mark this ledger again.
+        Set<LedgerFragment> underreplicatedFragments = new HashSet<>();
+
+        Set<LedgerFragment> dataLossFragments = getDataLossFragments(lh, ledgerVerificationPercentage);
+        underreplicatedFragments.addAll(dataLossFragments);
+
+        Set<LedgerFragment> notAdheringFragments = getNeedRepairedPlacementNotAdheringFragments(lh);
+
+        for (LedgerFragment notAdheringFragment : notAdheringFragments) {
+            if (!checkFragmentRepeat(underreplicatedFragments, notAdheringFragment)) {
+                underreplicatedFragments.add(notAdheringFragment);
+            }
+        }
+        return underreplicatedFragments;
+    }
+
+    private Set<LedgerFragment> getDataLossFragments(LedgerHandle lh, Long ledgerVerificationPercentage)
+            throws InterruptedException {
         CheckerCallback checkerCb = new CheckerCallback();
         ledgerChecker.checkLedger(lh, checkerCb, ledgerVerificationPercentage);
-        Set<LedgerFragment> fragments = checkerCb.waitAndGetResult();
-        return fragments;
+        return checkerCb.waitAndGetResult();
+    }
+
+    private boolean checkFragmentRepeat(Set<LedgerFragment> fragments, LedgerFragment needChecked) {
+        for (LedgerFragment fragment : fragments) {
+            if (fragment.getLedgerId() == needChecked.getLedgerId()
+                    && fragment.getFirstEntryId() == needChecked.getFirstEntryId()
+                    && fragment.getLastKnownEntryId() == needChecked.getLastKnownEntryId()) {
+                return true;
+            }
+        }
+        return false;
     }
 
     void scheduleTaskWithDelay(TimerTask timerTask, long delayPeriod) {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
index 0e9233136e..396e86d92b 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
@@ -21,6 +21,9 @@ import static org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl.
 import static org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl.shuffleWithMask;
 import static org.apache.bookkeeper.client.RoundRobinDistributionSchedule.writeSetFromValues;
 import static org.apache.bookkeeper.feature.SettableFeatureProvider.DISABLE_ALL;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import io.netty.util.HashedWheelTimer;
@@ -36,6 +39,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 import junit.framework.TestCase;
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
 import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
@@ -60,6 +64,10 @@ import org.apache.bookkeeper.test.TestStatsProvider;
 import org.apache.bookkeeper.test.TestStatsProvider.TestStatsLogger;
 import org.apache.bookkeeper.util.StaticDNSResolver;
 import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -650,7 +658,7 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
         EnsemblePlacementPolicy.PlacementResult<BookieId> replaceBookieResponse =
             repp.replaceBookie(1, 1, 1, null, new ArrayList<>(), addr2.toBookieId(), new HashSet<>());
         BookieId replacedBookie = replaceBookieResponse.getResult();
-        PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.isAdheringToPolicy();
+        PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.getAdheringToPolicy();
         assertEquals(addr3.toBookieId(), replacedBookie);
         assertEquals(PlacementPolicyAdherence.MEETS_STRICT, isEnsembleAdheringToPlacementPolicy);
     }
@@ -679,7 +687,7 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
         EnsemblePlacementPolicy.PlacementResult<BookieId> replaceBookieResponse =
             repp.replaceBookie(1, 1, 1, null, new ArrayList<>(), addr2.toBookieId(), excludedAddrs);
         BookieId replacedBookie = replaceBookieResponse.getResult();
-        PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.isAdheringToPolicy();
+        PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.getAdheringToPolicy();
         assertFalse(addr1.toBookieId().equals(replacedBookie));
         assertTrue(addr3.toBookieId().equals(replacedBookie) || addr4.toBookieId().equals(replacedBookie));
         assertEquals(PlacementPolicyAdherence.MEETS_STRICT, isEnsembleAdheringToPlacementPolicy);
@@ -744,7 +752,7 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
             addr4.toBookieId(),
             new HashSet<>());
         BookieId replacedBookie = replaceBookieResponse.getResult();
-        PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.isAdheringToPolicy();
+        PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.getAdheringToPolicy();
         assertEquals(addr1.toBookieId(), replacedBookie);
         assertEquals(PlacementPolicyAdherence.MEETS_STRICT, isEnsembleAdheringToPlacementPolicy);
     }
@@ -766,14 +774,14 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
             EnsemblePlacementPolicy.PlacementResult<List<BookieId>> ensembleResponse;
             ensembleResponse = repp.newEnsemble(3, 2, 2, null, new HashSet<>());
             List<BookieId> ensemble = ensembleResponse.getResult();
-            PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy = ensembleResponse.isAdheringToPolicy();
+            PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy = ensembleResponse.getAdheringToPolicy();
             assertEquals(0, getNumCoveredWriteQuorums(ensemble, 2, conf.getMinNumRacksPerWriteQuorum(),
                                                       repp.bookieAddressResolver));
             assertEquals(PlacementPolicyAdherence.FAIL, isEnsembleAdheringToPlacementPolicy);
             EnsemblePlacementPolicy.PlacementResult<List<BookieId>> ensembleResponse2;
             ensembleResponse2 = repp.newEnsemble(4, 2, 2, null, new HashSet<>());
             List<BookieId> ensemble2 = ensembleResponse2.getResult();
-            PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy2 = ensembleResponse2.isAdheringToPolicy();
+            PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy2 = ensembleResponse2.getAdheringToPolicy();
             assertEquals(0, getNumCoveredWriteQuorums(ensemble2, 2, conf.getMinNumRacksPerWriteQuorum(),
                                                       repp.bookieAddressResolver));
             assertEquals(PlacementPolicyAdherence.FAIL, isEnsembleAdheringToPlacementPolicy);
@@ -898,7 +906,7 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
 
         ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, null, new HashSet<>());
         ensemble = ensembleResponse.getResult();
-        isEnsembleAdheringToPlacementPolicy = ensembleResponse.isAdheringToPolicy();
+        isEnsembleAdheringToPlacementPolicy = ensembleResponse.getAdheringToPolicy();
         assertEquals("Number of writeQuorum sets covered", ensembleSize,
                 getNumCoveredWriteQuorums(ensemble, writeQuorumSize, clientConf.getMinNumRacksPerWriteQuorum(),
                                                       repp.bookieAddressResolver));
@@ -908,7 +916,7 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
                 new HashSet<>(defaultRackBookiesList), EnsembleForReplacementWithNoConstraints.INSTANCE,
                 TruePredicate.INSTANCE);
         ensemble = ensembleResponse.getResult();
-        isEnsembleAdheringToPlacementPolicy = ensembleResponse.isAdheringToPolicy();
+        isEnsembleAdheringToPlacementPolicy = ensembleResponse.getAdheringToPolicy();
         assertEquals("Number of writeQuorum sets covered", ensembleSize,
                 getNumCoveredWriteQuorums(ensemble, writeQuorumSize, clientConf.getMinNumRacksPerWriteQuorum(),
                                                       repp.bookieAddressResolver));
@@ -962,7 +970,7 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
         for (int ensembleSize = effectiveMinNumRacksPerWriteQuorum; ensembleSize < 40; ensembleSize++) {
             ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, null, new HashSet<>());
             ensemble = ensembleResponse.getResult();
-            isEnsembleAdheringToPlacementPolicy = ensembleResponse.isAdheringToPolicy();
+            isEnsembleAdheringToPlacementPolicy = ensembleResponse.getAdheringToPolicy();
             assertEquals("Number of writeQuorum sets covered", ensembleSize,
                     getNumCoveredWriteQuorums(ensemble, writeQuorumSize, clientConf.getMinNumRacksPerWriteQuorum(),
                                                       repp.bookieAddressResolver));
@@ -971,7 +979,7 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
             ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, new HashSet<>(),
                     EnsembleForReplacementWithNoConstraints.INSTANCE, TruePredicate.INSTANCE);
             ensemble = ensembleResponse.getResult();
-            isEnsembleAdheringToPlacementPolicy = ensembleResponse.isAdheringToPolicy();
+            isEnsembleAdheringToPlacementPolicy = ensembleResponse.getAdheringToPolicy();
             assertEquals("Number of writeQuorum sets covered", ensembleSize,
                     getNumCoveredWriteQuorums(ensemble, writeQuorumSize, clientConf.getMinNumRacksPerWriteQuorum(),
                                                       repp.bookieAddressResolver));
@@ -1098,7 +1106,7 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
         replaceBookieResponse = repp.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, null, ensemble,
                 bookieInEnsembleToBeReplaced, new HashSet<>());
         replacedBookieAddress = replaceBookieResponse.getResult();
-        isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.isAdheringToPolicy();
+        isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.getAdheringToPolicy();
         assertEquals("It should be newBookieAddress2", newBookieAddress2.toBookieId(), replacedBookieAddress);
         assertEquals(PlacementPolicyAdherence.MEETS_STRICT, isEnsembleAdheringToPlacementPolicy);
 
@@ -1138,7 +1146,7 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
         replaceBookieResponse = repp.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, null,
                 ensemble, bookieInEnsembleToBeReplaced, bookiesToExclude);
         replacedBookieAddress = replaceBookieResponse.getResult();
-        isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.isAdheringToPolicy();
+        isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.getAdheringToPolicy();
         assertEquals("It should be newBookieAddress3", newBookieAddress3.toBookieId(), replacedBookieAddress);
         assertEquals(PlacementPolicyAdherence.MEETS_STRICT, isEnsembleAdheringToPlacementPolicy);
     }
@@ -1491,7 +1499,7 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
                 repp.newEnsemble(ensembleSize, writeQuorumSize,
                                  acqQuorumSize, null, new HashSet<>());
             List<BookieId> ensemble = ensembleResponse.getResult();
-            PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy = ensembleResponse.isAdheringToPolicy();
+            PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy = ensembleResponse.getAdheringToPolicy();
             int numCovered = getNumCoveredWriteQuorums(ensemble, writeQuorumSize,
                                                        conf.getMinNumRacksPerWriteQuorum(), repp.bookieAddressResolver);
             assertTrue(numCovered >= 1 && numCovered < 3);
@@ -1501,7 +1509,7 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
                 repp.newEnsemble(ensembleSize, writeQuorumSize,
                                  acqQuorumSize, null, new HashSet<>());
             List<BookieId> ensemble2 = ensembleResponse2.getResult();
-            PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy2 = ensembleResponse2.isAdheringToPolicy();
+            PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy2 = ensembleResponse2.getAdheringToPolicy();
             numCovered = getNumCoveredWriteQuorums(ensemble2, writeQuorumSize,
                                                    conf.getMinNumRacksPerWriteQuorum(), repp.bookieAddressResolver);
             assertTrue(numCovered >= 1 && numCovered < 3);
@@ -1579,7 +1587,7 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
             repp.newEnsemble(ensembleSize, writeQuorumSize,
                              writeQuorumSize, null, new HashSet<>());
         List<BookieId> ensemble = ensembleResponse.getResult();
-        PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy = ensembleResponse.isAdheringToPolicy();
+        PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy = ensembleResponse.getAdheringToPolicy();
         int numCovered = getNumCoveredWriteQuorums(ensemble, writeQuorumSize,
                                                    minNumRacksPerWriteQuorum, repp.bookieAddressResolver);
         assertEquals("minimum number of racks covered for writequorum ensemble: " + ensemble, ensembleSize, numCovered);
@@ -1625,7 +1633,7 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
                 repp.newEnsemble(ensembleSize, writeQuorumSize,
                                    ackQuorumSize, null, new HashSet<>());
             List<BookieId> ensemble1 = ensembleResponse.getResult();
-            PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy1 = ensembleResponse.isAdheringToPolicy();
+            PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy1 = ensembleResponse.getAdheringToPolicy();
             assertEquals(ensembleSize,
                     getNumCoveredWriteQuorums(ensemble1, writeQuorumSize, conf.getMinNumRacksPerWriteQuorum(),
                                                       repp.bookieAddressResolver));
@@ -1635,7 +1643,7 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
             EnsemblePlacementPolicy.PlacementResult<List<BookieId>> ensembleResponse2 =
                 repp.newEnsemble(ensembleSize, writeQuorumSize, 2, null, new HashSet<>());
             List<BookieId> ensemble2 = ensembleResponse2.getResult();
-            PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy2 = ensembleResponse2.isAdheringToPolicy();
+            PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy2 = ensembleResponse2.getAdheringToPolicy();
             assertEquals(ensembleSize,
                     getNumCoveredWriteQuorums(ensemble2, writeQuorumSize, conf.getMinNumRacksPerWriteQuorum(),
                                                       repp.bookieAddressResolver));
@@ -1719,7 +1727,7 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
             replaceBookieResponse = repp.replaceBookie(1, 1, 1, null, new ArrayList<>(),
                                                        addr2.toBookieId(), new HashSet<>());
             replacedBookie = replaceBookieResponse.getResult();
-            isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.isAdheringToPolicy();
+            isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.getAdheringToPolicy();
             assertTrue("replaced : " + replacedBookie, addr3.toBookieId().equals(replacedBookie)
                     || addr4.toBookieId().equals(replacedBookie));
             assertEquals(PlacementPolicyAdherence.MEETS_STRICT, isEnsembleAdheringToPlacementPolicy);
@@ -1790,7 +1798,7 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
             replaceBookieResponse = repp.replaceBookie(1, 1, 1, null, new ArrayList<>(),
                                                        addr2.toBookieId(), new HashSet<>());
             replacedBookie = replaceBookieResponse.getResult();
-            isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.isAdheringToPolicy();
+            isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.getAdheringToPolicy();
             assertTrue(addr0.toBookieId().equals(replacedBookie)
                     || addr1.toBookieId().equals(replacedBookie)
                     || addr3.toBookieId().equals(replacedBookie)
@@ -2072,7 +2080,7 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
             EnsemblePlacementPolicy.PlacementResult<List<BookieId>> ensembleResponse =
                 repp.newEnsemble(3, 2, 2, null, new HashSet<BookieId>());
             List<BookieId> ensemble = ensembleResponse.getResult();
-            PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy = ensembleResponse.isAdheringToPolicy();
+            PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy = ensembleResponse.getAdheringToPolicy();
             assertFalse(ensemble.contains(addr4.toBookieId()));
             assertEquals(PlacementPolicyAdherence.FAIL, isEnsembleAdheringToPlacementPolicy);
         }
@@ -2081,7 +2089,7 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
         EnsemblePlacementPolicy.PlacementResult<List<BookieId>> ensembleResponse =
             repp.newEnsemble(4, 2, 2, null, new HashSet<BookieId>());
         List<BookieId> ensemble = ensembleResponse.getResult();
-        PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy = ensembleResponse.isAdheringToPolicy();
+        PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy = ensembleResponse.getAdheringToPolicy();
         assertEquals(PlacementPolicyAdherence.FAIL, isEnsembleAdheringToPlacementPolicy);
         assertTrue(ensemble.contains(addr4.toBookieId()));
     }
@@ -2375,7 +2383,7 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
         for (int ensembleSize = effectiveMinNumRacksPerWriteQuorum; ensembleSize < 40; ensembleSize++) {
             ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, null, new HashSet<>());
             ensemble = ensembleResponse.getResult();
-            isEnsembleAdheringToPlacementPolicy = ensembleResponse.isAdheringToPolicy();
+            isEnsembleAdheringToPlacementPolicy = ensembleResponse.getAdheringToPolicy();
             assertEquals("Number of writeQuorum sets covered", ensembleSize,
                     getNumCoveredWriteQuorums(ensemble, writeQuorumSize, clientConf.getMinNumRacksPerWriteQuorum(),
                                                       repp.bookieAddressResolver));
@@ -2383,7 +2391,7 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
 
             ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, null, new HashSet<>());
             ensemble = ensembleResponse.getResult();
-            isEnsembleAdheringToPlacementPolicy = ensembleResponse.isAdheringToPolicy();
+            isEnsembleAdheringToPlacementPolicy = ensembleResponse.getAdheringToPolicy();
             assertEquals("Number of writeQuorum sets covered", ensembleSize,
                     getNumCoveredWriteQuorums(ensemble, writeQuorumSize, clientConf.getMinNumRacksPerWriteQuorum(),
                                                       repp.bookieAddressResolver));
@@ -2484,4 +2492,384 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
         testAreAckedBookiesAdheringToPlacementPolicyHelper(4, 6, 3, 2, 6, 3, 3);
         testAreAckedBookiesAdheringToPlacementPolicyHelper(5, 7, 5, 3, 7, 5, 2);
     }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testReplaceToAdherePlacementPolicy() throws Exception {
+        final BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);
+        final BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181);
+        final BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181);
+        final BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181);
+        final BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.5", 3181);
+        final BookieSocketAddress addr6 = new BookieSocketAddress("127.0.0.6", 3181);
+        final BookieSocketAddress addr7 = new BookieSocketAddress("127.0.0.7", 3181);
+        final BookieSocketAddress addr8 = new BookieSocketAddress("127.0.0.8", 3181);
+        final BookieSocketAddress addr9 = new BookieSocketAddress("127.0.0.9", 3181);
+
+        final String rackName1 = NetworkTopology.DEFAULT_REGION + "/r1";
+        final String rackName2 = NetworkTopology.DEFAULT_REGION + "/r2";
+        final String rackName3 = NetworkTopology.DEFAULT_REGION + "/r3";
+
+        // update dns mapping
+        StaticDNSResolver.addNodeToRack(addr1.getSocketAddress().getAddress().getHostAddress(), rackName1);
+        StaticDNSResolver.addNodeToRack(addr2.getSocketAddress().getAddress().getHostAddress(), rackName1);
+        StaticDNSResolver.addNodeToRack(addr3.getSocketAddress().getAddress().getHostAddress(), rackName1);
+        StaticDNSResolver.addNodeToRack(addr4.getSocketAddress().getAddress().getHostAddress(), rackName2);
+        StaticDNSResolver.addNodeToRack(addr5.getSocketAddress().getAddress().getHostAddress(), rackName2);
+        StaticDNSResolver.addNodeToRack(addr6.getSocketAddress().getAddress().getHostAddress(), rackName2);
+        StaticDNSResolver.addNodeToRack(addr7.getSocketAddress().getAddress().getHostAddress(), rackName3);
+        StaticDNSResolver.addNodeToRack(addr8.getSocketAddress().getAddress().getHostAddress(), rackName3);
+        StaticDNSResolver.addNodeToRack(addr9.getSocketAddress().getAddress().getHostAddress(), rackName3);
+
+        // Update cluster
+        final Set<BookieId> addrs = new HashSet<>();
+        addrs.add(addr1.toBookieId());
+        addrs.add(addr2.toBookieId());
+        addrs.add(addr3.toBookieId());
+        addrs.add(addr4.toBookieId());
+        addrs.add(addr5.toBookieId());
+        addrs.add(addr6.toBookieId());
+        addrs.add(addr7.toBookieId());
+        addrs.add(addr8.toBookieId());
+        addrs.add(addr9.toBookieId());
+
+        final ClientConfiguration newConf = new ClientConfiguration(conf);
+        newConf.setDiskWeightBasedPlacementEnabled(false);
+        newConf.setMinNumRacksPerWriteQuorum(2);
+        newConf.setEnforceMinNumRacksPerWriteQuorum(true);
+
+        repp.initialize(newConf, Optional.empty(), timer,
+                DISABLE_ALL, NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        repp.onClusterChanged(addrs, new HashSet<>());
+        final Map<BookieId, BookieInfo> bookieInfoMap = new HashMap<>();
+        bookieInfoMap.put(addr1.toBookieId(), new BookieInfo(100L, 100L));
+        bookieInfoMap.put(addr2.toBookieId(), new BookieInfo(100L, 100L));
+        bookieInfoMap.put(addr3.toBookieId(), new BookieInfo(100L, 100L));
+        bookieInfoMap.put(addr4.toBookieId(), new BookieInfo(100L, 100L));
+        bookieInfoMap.put(addr5.toBookieId(), new BookieInfo(100L, 100L));
+        bookieInfoMap.put(addr6.toBookieId(), new BookieInfo(100L, 100L));
+        bookieInfoMap.put(addr7.toBookieId(), new BookieInfo(100L, 100L));
+        bookieInfoMap.put(addr8.toBookieId(), new BookieInfo(100L, 100L));
+        bookieInfoMap.put(addr9.toBookieId(), new BookieInfo(100L, 100L));
+
+        repp.updateBookieInfo(bookieInfoMap);
+
+        final Set<BookieId> excludeList = new HashSet<>();
+        final int ensembleSize = 7;
+        final int writeQuorumSize = 2;
+        final int ackQuorumSize = 2;
+
+        final BookieRackMatcher rack1 = new BookieRackMatcher(rackName1);
+        final BookieRackMatcher rack2 = new BookieRackMatcher(rackName2);
+        final BookieRackMatcher rack3 = new BookieRackMatcher(rackName3);
+        final BookieRackMatcher rack12 = new BookieRackMatcher(rackName1, rackName2);
+        final BookieRackMatcher rack13 = new BookieRackMatcher(rackName1, rackName3);
+        final BookieRackMatcher rack23 = new BookieRackMatcher(rackName2, rackName3);
+        final BookieRackMatcher rack123 = new BookieRackMatcher(rackName1, rackName2, rackName3);
+        final Consumer<Pair<List<BookieId>, Matcher<Iterable<? extends BookieId>>>> test = (pair) -> {
+            // RackawareEnsemblePlacementPolicyImpl#isEnsembleAdheringToPlacementPolicy
+            // is not scope of this test case. So, use the method in assertion for convenience.
+            assertEquals(PlacementPolicyAdherence.FAIL,
+                    repp.isEnsembleAdheringToPlacementPolicy(pair.getLeft(), writeQuorumSize, ackQuorumSize));
+            final EnsemblePlacementPolicy.PlacementResult<List<BookieId>> result =
+                    repp.replaceToAdherePlacementPolicy(ensembleSize, writeQuorumSize, ackQuorumSize,
+                            excludeList, pair.getLeft());
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("input: {}, result: {}", pair.getLeft(), result.getResult());
+            }
+            assertEquals(PlacementPolicyAdherence.MEETS_STRICT, result.getAdheringToPolicy());
+            assertThat(result.getResult(), pair.getRight());
+        };
+
+        for (int i = 0; i < 1000; i++) {
+            test.accept(Pair.of(Arrays.asList(addr1.toBookieId(), addr4.toBookieId(), addr7.toBookieId(),
+                            addr2.toBookieId(), addr5.toBookieId(), addr8.toBookieId(), addr9.toBookieId()),
+                    // first, same, same, same, same, same, condition[0]
+                    contains(is(addr1.toBookieId()), is(addr4.toBookieId()), is(addr7.toBookieId()),
+                            is(addr2.toBookieId()), is(addr5.toBookieId()), is(addr8.toBookieId()),
+                            is(addr6.toBookieId()))));
+
+            test.accept(Pair.of(Arrays.asList(addr6.toBookieId(), addr4.toBookieId(), addr7.toBookieId(),
+                            addr2.toBookieId(), addr5.toBookieId(), addr8.toBookieId(), addr3.toBookieId()),
+                    // first, condition[0], same, same, same, same, same
+                    contains(is(addr6.toBookieId()), is(addr1.toBookieId()), is(addr7.toBookieId()),
+                            is(addr2.toBookieId()), is(addr5.toBookieId()), is(addr8.toBookieId()),
+                            is(addr3.toBookieId()))));
+
+            test.accept(Pair.of(Arrays.asList(addr1.toBookieId(), addr2.toBookieId(), addr3.toBookieId(),
+                            addr4.toBookieId(), addr5.toBookieId(), addr6.toBookieId(), addr7.toBookieId()),
+                    // first, candidate[0], same, same, candidate[0], same, same
+                    contains(is(addr1.toBookieId()), is(rack3), is(addr3.toBookieId()),
+                            is(addr4.toBookieId()), is(rack13), is(addr6.toBookieId()), is(addr7.toBookieId()))));
+
+            test.accept(Pair.of(Arrays.asList(addr1.toBookieId(), addr2.toBookieId(), addr4.toBookieId(),
+                            addr5.toBookieId(), addr7.toBookieId(), addr8.toBookieId(), addr9.toBookieId()),
+                    contains(is(addr1.toBookieId()), is(rack23), is(rack123), is(rack123),
+                            is(rack123), is(rack123), is(rack23))));
+        }
+        StaticDNSResolver.reset();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testReplaceToAdherePlacementPolicyWithOutOfOrder() throws Exception {
+        final BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);
+        final BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181);
+        final BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181);
+        final BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181);
+        final BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.5", 3181);
+        final BookieSocketAddress addr6 = new BookieSocketAddress("127.0.0.6", 3181);
+
+        final String rackName1 = NetworkTopology.DEFAULT_REGION + "/r1";
+        final String rackName2 = NetworkTopology.DEFAULT_REGION + "/r2";
+
+        // update dns mapping
+        StaticDNSResolver.addNodeToRack(addr1.getSocketAddress().getAddress().getHostAddress(), rackName1);
+        StaticDNSResolver.addNodeToRack(addr2.getSocketAddress().getAddress().getHostAddress(), rackName1);
+        StaticDNSResolver.addNodeToRack(addr3.getSocketAddress().getAddress().getHostAddress(), rackName1);
+        StaticDNSResolver.addNodeToRack(addr4.getSocketAddress().getAddress().getHostAddress(), rackName2);
+        StaticDNSResolver.addNodeToRack(addr5.getSocketAddress().getAddress().getHostAddress(), rackName2);
+        StaticDNSResolver.addNodeToRack(addr6.getSocketAddress().getAddress().getHostAddress(), rackName2);
+
+        // Update cluster
+        final Set<BookieId> addrs = new HashSet<>();
+        addrs.add(addr1.toBookieId());
+        addrs.add(addr2.toBookieId());
+        addrs.add(addr3.toBookieId());
+        addrs.add(addr4.toBookieId());
+        addrs.add(addr5.toBookieId());
+        addrs.add(addr6.toBookieId());
+
+        final ClientConfiguration newConf = new ClientConfiguration(conf);
+        newConf.setDiskWeightBasedPlacementEnabled(false);
+        newConf.setMinNumRacksPerWriteQuorum(2);
+        newConf.setEnforceMinNumRacksPerWriteQuorum(true);
+
+        repp.initialize(newConf, Optional.empty(), timer,
+                DISABLE_ALL, NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        repp.onClusterChanged(addrs, new HashSet<>());
+        final Map<BookieId, BookieInfo> bookieInfoMap = new HashMap<>();
+        bookieInfoMap.put(addr1.toBookieId(), new BookieInfo(100L, 100L));
+        bookieInfoMap.put(addr2.toBookieId(), new BookieInfo(100L, 100L));
+        bookieInfoMap.put(addr3.toBookieId(), new BookieInfo(100L, 100L));
+        bookieInfoMap.put(addr4.toBookieId(), new BookieInfo(100L, 100L));
+        bookieInfoMap.put(addr5.toBookieId(), new BookieInfo(100L, 100L));
+        bookieInfoMap.put(addr6.toBookieId(), new BookieInfo(100L, 100L));
+
+        repp.updateBookieInfo(bookieInfoMap);
+
+        final Set<BookieId> excludeList = new HashSet<>();
+        final int ensembleSize = 6;
+        final int writeQuorumSize = 2;
+        final int ackQuorumSize = 2;
+
+        final Consumer<Pair<List<BookieId>, Matcher<Iterable<? extends BookieId>>>> test = (pair) -> {
+            // RackawareEnsemblePlacementPolicyImpl#isEnsembleAdheringToPlacementPolicy
+            // is not scope of this test case. So, use the method in assertion for convenience.
+            assertEquals(PlacementPolicyAdherence.FAIL,
+                    repp.isEnsembleAdheringToPlacementPolicy(pair.getLeft(), writeQuorumSize, ackQuorumSize));
+            final EnsemblePlacementPolicy.PlacementResult<List<BookieId>> result =
+                    repp.replaceToAdherePlacementPolicy(ensembleSize, writeQuorumSize, ackQuorumSize,
+                            excludeList, pair.getLeft());
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("input: {}, result: {}", pair.getLeft(), result.getResult());
+            }
+            assertEquals(PlacementPolicyAdherence.MEETS_STRICT, result.getAdheringToPolicy());
+        };
+
+        for (int i = 0; i < 1000; i++) {
+            //All bookies already in the ensemble, the bookie order not adhere the placement policy.
+            test.accept(Pair.of(Arrays.asList(addr1.toBookieId(), addr2.toBookieId(), addr3.toBookieId(),
+                            addr4.toBookieId(), addr5.toBookieId(), addr6.toBookieId()),
+                    //The result is not predict. We know the best min replace place is 2.
+                    //1,2,3,4,5,6 => 1,5,3,4,2,6
+                    //But maybe the final result is 1,6,3,4,2,5.
+                    //When we from index 0 to replace, the first bookie(1) is /rack1, we only pick /rack2 bookie
+                    //for the second bookie, so we can choose 4,5,6, the choice is random. If we pick 6 for the second,
+                    // the final result is 1,6,3,4,2,5. If we pick 5 for the second, the final result is 1,5,3,4,2,6
+                    null));
+        }
+        StaticDNSResolver.reset();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testReplaceToAdherePlacementPolicyWithNoMoreRackBookie() throws Exception {
+        final BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);
+        final BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181);
+        final BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181);
+        final BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181);
+        final BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.5", 3181);
+        final BookieSocketAddress addr6 = new BookieSocketAddress("127.0.0.6", 3181);
+
+        final String rackName1 = NetworkTopology.DEFAULT_REGION + "/r1";
+        final String rackName2 = NetworkTopology.DEFAULT_REGION + "/r2";
+
+        // update dns mapping
+        StaticDNSResolver.addNodeToRack(addr1.getSocketAddress().getAddress().getHostAddress(), rackName1);
+        StaticDNSResolver.addNodeToRack(addr2.getSocketAddress().getAddress().getHostAddress(), rackName1);
+        StaticDNSResolver.addNodeToRack(addr3.getSocketAddress().getAddress().getHostAddress(), rackName1);
+        StaticDNSResolver.addNodeToRack(addr4.getSocketAddress().getAddress().getHostAddress(), rackName2);
+        StaticDNSResolver.addNodeToRack(addr5.getSocketAddress().getAddress().getHostAddress(), rackName2);
+        StaticDNSResolver.addNodeToRack(addr6.getSocketAddress().getAddress().getHostAddress(), rackName2);
+
+        // Update cluster
+        final Set<BookieId> addrs = new HashSet<>();
+        addrs.add(addr1.toBookieId());
+        addrs.add(addr2.toBookieId());
+        addrs.add(addr3.toBookieId());
+        addrs.add(addr4.toBookieId());
+        addrs.add(addr5.toBookieId());
+        addrs.add(addr6.toBookieId());
+
+        final ClientConfiguration newConf = new ClientConfiguration(conf);
+        newConf.setDiskWeightBasedPlacementEnabled(false);
+        newConf.setMinNumRacksPerWriteQuorum(2);
+        newConf.setEnforceMinNumRacksPerWriteQuorum(true);
+
+        repp.initialize(newConf, Optional.empty(), timer,
+                DISABLE_ALL, NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        repp.onClusterChanged(addrs, new HashSet<>());
+        final Map<BookieId, BookieInfo> bookieInfoMap = new HashMap<>();
+        bookieInfoMap.put(addr1.toBookieId(), new BookieInfo(100L, 100L));
+        bookieInfoMap.put(addr2.toBookieId(), new BookieInfo(100L, 100L));
+        bookieInfoMap.put(addr3.toBookieId(), new BookieInfo(100L, 100L));
+        bookieInfoMap.put(addr4.toBookieId(), new BookieInfo(100L, 100L));
+        bookieInfoMap.put(addr5.toBookieId(), new BookieInfo(100L, 100L));
+        bookieInfoMap.put(addr6.toBookieId(), new BookieInfo(100L, 100L));
+
+        repp.updateBookieInfo(bookieInfoMap);
+
+        final Set<BookieId> excludeList = new HashSet<>();
+        final int ensembleSize = 3;
+        final int writeQuorumSize = 2;
+        final int ackQuorumSize = 2;
+
+        final Consumer<Pair<List<BookieId>, Matcher<Iterable<? extends BookieId>>>> test = (pair) -> {
+            // RackawareEnsemblePlacementPolicyImpl#isEnsembleAdheringToPlacementPolicy
+            // is not scope of this test case. So, use the method in assertion for convenience.
+            assertEquals(PlacementPolicyAdherence.FAIL,
+                    repp.isEnsembleAdheringToPlacementPolicy(pair.getLeft(), writeQuorumSize, ackQuorumSize));
+            final EnsemblePlacementPolicy.PlacementResult<List<BookieId>> result =
+                    repp.replaceToAdherePlacementPolicy(ensembleSize, writeQuorumSize, ackQuorumSize,
+                            excludeList, pair.getLeft());
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("input: {}, result: {}", pair.getLeft(), result.getResult());
+            }
+            assertEquals(PlacementPolicyAdherence.FAIL, result.getAdheringToPolicy());
+            assertEquals(0, result.getResult().size());
+        };
+
+        for (int i = 0; i < 1000; i++) {
+            test.accept(Pair.of(Arrays.asList(addr1.toBookieId(), addr2.toBookieId(), addr4.toBookieId()),
+                    null));
+        }
+        StaticDNSResolver.reset();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testReplaceToAdherePlacementPolicyWithUnknowBookie() throws Exception {
+        final BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);
+        final BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181);
+        final BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181);
+        final BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181);
+        final BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.5", 3181);
+        final BookieSocketAddress addr6 = new BookieSocketAddress("127.0.0.6", 3181);
+
+        final String rackName1 = NetworkTopology.DEFAULT_REGION + "/r1";
+        final String rackName2 = NetworkTopology.DEFAULT_REGION + "/r2";
+
+        // update dns mapping
+        StaticDNSResolver.addNodeToRack(addr1.getSocketAddress().getAddress().getHostAddress(), rackName1);
+        StaticDNSResolver.addNodeToRack(addr2.getSocketAddress().getAddress().getHostAddress(), rackName1);
+        StaticDNSResolver.addNodeToRack(addr3.getSocketAddress().getAddress().getHostAddress(), rackName1);
+        StaticDNSResolver.addNodeToRack(addr4.getSocketAddress().getAddress().getHostAddress(), rackName2);
+        StaticDNSResolver.addNodeToRack(addr5.getSocketAddress().getAddress().getHostAddress(), rackName2);
+        StaticDNSResolver.addNodeToRack(addr6.getSocketAddress().getAddress().getHostAddress(), rackName2);
+
+        // Update cluster
+        final Set<BookieId> addrs = new HashSet<>();
+        addrs.add(addr1.toBookieId());
+        addrs.add(addr2.toBookieId());
+        addrs.add(addr3.toBookieId());
+        addrs.add(addr4.toBookieId());
+        addrs.add(addr5.toBookieId());
+        addrs.add(addr6.toBookieId());
+
+        final ClientConfiguration newConf = new ClientConfiguration(conf);
+        newConf.setDiskWeightBasedPlacementEnabled(false);
+        newConf.setMinNumRacksPerWriteQuorum(2);
+        newConf.setEnforceMinNumRacksPerWriteQuorum(true);
+
+        repp.initialize(newConf, Optional.empty(), timer,
+                DISABLE_ALL, NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        repp.onClusterChanged(addrs, new HashSet<>());
+        final Map<BookieId, BookieInfo> bookieInfoMap = new HashMap<>();
+        bookieInfoMap.put(addr1.toBookieId(), new BookieInfo(100L, 100L));
+        bookieInfoMap.put(addr2.toBookieId(), new BookieInfo(100L, 100L));
+        bookieInfoMap.put(addr3.toBookieId(), new BookieInfo(100L, 100L));
+        bookieInfoMap.put(addr4.toBookieId(), new BookieInfo(100L, 100L));
+        bookieInfoMap.put(addr5.toBookieId(), new BookieInfo(100L, 100L));
+        bookieInfoMap.put(addr6.toBookieId(), new BookieInfo(100L, 100L));
+
+        repp.updateBookieInfo(bookieInfoMap);
+
+        final Set<BookieId> excludeList = new HashSet<>();
+        final int ensembleSize = 6;
+        final int writeQuorumSize = 2;
+        final int ackQuorumSize = 2;
+
+        final BookieRackMatcher rack1 = new BookieRackMatcher(rackName1);
+
+        final Consumer<Pair<List<BookieId>, Matcher<Iterable<? extends BookieId>>>> test = (pair) -> {
+            // RackawareEnsemblePlacementPolicyImpl#isEnsembleAdheringToPlacementPolicy
+            // is not scope of this test case. So, use the method in assertion for convenience.
+            assertEquals(PlacementPolicyAdherence.FAIL,
+                    repp.isEnsembleAdheringToPlacementPolicy(pair.getLeft(), writeQuorumSize, ackQuorumSize));
+            final EnsemblePlacementPolicy.PlacementResult<List<BookieId>> result =
+                    repp.replaceToAdherePlacementPolicy(ensembleSize, writeQuorumSize, ackQuorumSize,
+                            excludeList, pair.getLeft());
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("input: {}, result: {}", pair.getLeft(), result.getResult());
+            }
+            assertEquals(PlacementPolicyAdherence.MEETS_STRICT, result.getAdheringToPolicy());
+            assertThat(result.getResult(), pair.getRight());
+        };
+
+        for (int i = 0; i < 1000; i++) {
+            test.accept(Pair.of(Arrays.asList(BookieId.parse("127.0.0.10:3181"), BookieId.parse("127.0.0.11:3181"),
+                            addr3.toBookieId(),
+                            addr4.toBookieId(), addr5.toBookieId(), addr6.toBookieId()),
+                    contains(is(rack1), is(addr5.toBookieId()), is(addr3.toBookieId()),
+                            is(addr4.toBookieId()), is(rack1), is(addr6.toBookieId()))));
+        }
+        StaticDNSResolver.reset();
+    }
+
+    private static class BookieRackMatcher extends TypeSafeMatcher<BookieId> {
+        final List<String> expectedRacks;
+
+        public BookieRackMatcher(String... expectedRacks) {
+            this.expectedRacks = Arrays.asList(expectedRacks);
+        }
+
+        @Override
+        protected boolean matchesSafely(BookieId bookieId) {
+            return expectedRacks.contains(StaticDNSResolver.getRack(bookieId.toString().split(":")[0]));
+        }
+
+        @Override
+        public void describeTo(Description description) {
+            description.appendText("expected racks " + expectedRacks);
+        }
+    }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestZoneawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestZoneawareEnsemblePlacementPolicy.java
index 1f7ebde9b0..b191225c83 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestZoneawareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestZoneawareEnsemblePlacementPolicy.java
@@ -241,7 +241,7 @@ public class TestZoneawareEnsemblePlacementPolicy extends TestCase {
                 newEnsemblePlacementResult.getResult());
         assertTrue("New ensemble should contain all 6 rw bookies", newEnsembleSet.containsAll(rwAddrs));
         assertEquals("PlacementPolicyAdherence", PlacementPolicyAdherence.MEETS_STRICT,
-                newEnsemblePlacementResult.isAdheringToPolicy());
+                newEnsemblePlacementResult.getAdheringToPolicy());
 
         /*
          * there are enough bookies so newEnsemble should succeed.
@@ -251,7 +251,7 @@ public class TestZoneawareEnsemblePlacementPolicy extends TestCase {
         assertTrue("New ensemble should contain 3 rw bookies",
                 (newEnsembleSet.size() == 3) && (rwAddrs.containsAll(newEnsembleSet)));
         assertEquals("PlacementPolicyAdherence", PlacementPolicyAdherence.MEETS_STRICT,
-                newEnsemblePlacementResult.isAdheringToPolicy());
+                newEnsemblePlacementResult.getAdheringToPolicy());
     }
 
     @Test
@@ -309,7 +309,7 @@ public class TestZoneawareEnsemblePlacementPolicy extends TestCase {
             assertTrue("Bookie from default faultDomain shouldn't be part of ensemble",
                     Collections.disjoint(newEnsembleSet, bookiesInDefaultFaultDomain));
             assertEquals("PlacementPolicyAdherence", PlacementPolicyAdherence.MEETS_STRICT,
-                    newEnsemblePlacementResult.isAdheringToPolicy());
+                    newEnsemblePlacementResult.getAdheringToPolicy());
         }
     }
 
@@ -373,7 +373,7 @@ public class TestZoneawareEnsemblePlacementPolicy extends TestCase {
         assertTrue("Bookie from default faultDomain shouldn't be part of ensemble",
                 Collections.disjoint(newEnsembleSet, bookiesInDefaultFaultDomain));
         assertEquals("PlacementPolicyAdherence", PlacementPolicyAdherence.MEETS_SOFT,
-                newEnsemblePlacementResult.isAdheringToPolicy());
+                newEnsemblePlacementResult.getAdheringToPolicy());
 
         try {
             /*
@@ -404,7 +404,7 @@ public class TestZoneawareEnsemblePlacementPolicy extends TestCase {
         newEnsembleSet = new HashSet<BookieId>(newEnsemblePlacementResult.getResult());
         assertTrue("New ensemble should contain 4 different bookies", newEnsembleSet.size() == 4);
         assertEquals("PlacementPolicyAdherence", PlacementPolicyAdherence.FAIL,
-                newEnsemblePlacementResult.isAdheringToPolicy());
+                newEnsemblePlacementResult.getAdheringToPolicy());
     }
 
     @Test
@@ -479,7 +479,7 @@ public class TestZoneawareEnsemblePlacementPolicy extends TestCase {
          */
         newEnsemblePlacementResult = zepp.newEnsemble(ensSize, writeQuorum, 2, null, new HashSet<>());
         assertEquals("PlacementPolicyAdherence", PlacementPolicyAdherence.MEETS_STRICT,
-                newEnsemblePlacementResult.isAdheringToPolicy());
+                newEnsemblePlacementResult.getAdheringToPolicy());
         List<BookieId> newEnsemble = newEnsemblePlacementResult.getResult();
         Set<BookieId> newEnsembleSet = new HashSet<BookieId>(newEnsemble);
         assertTrue("New ensemble should contain all 6 rw bookies in non-default fault domains",
@@ -563,7 +563,7 @@ public class TestZoneawareEnsemblePlacementPolicy extends TestCase {
         assertTrue("New ensemble should contain 6 rw bookies in non-default fault domains",
                 rwAddrs.containsAll(newEnsembleSet) && (newEnsembleSet.size() == 6));
         assertEquals("PlacementPolicyAdherence", PlacementPolicyAdherence.MEETS_SOFT,
-                newEnsemblePlacementResult.isAdheringToPolicy());
+                newEnsemblePlacementResult.getAdheringToPolicy());
         Set<String> bookiesNetworkLocations = new HashSet<String>();
 
         for (BookieId bookieAddr : newEnsembleSet) {
@@ -654,7 +654,7 @@ public class TestZoneawareEnsemblePlacementPolicy extends TestCase {
                 excludedBookies);
         List<BookieId> newEnsembleList = newEnsemblePlacementResult.getResult();
         assertEquals("PlacementPolicyAdherence", PlacementPolicyAdherence.MEETS_SOFT,
-                newEnsemblePlacementResult.isAdheringToPolicy());
+                newEnsemblePlacementResult.getAdheringToPolicy());
         Set<BookieId> newEnsembleSet = new HashSet<BookieId>(newEnsembleList);
         Set<String> bookiesNetworkLocationsSet = new HashSet<String>();
         List<ZoneAwareNodeLocation> bookiesNodeLocationList = new ArrayList<ZoneAwareNodeLocation>();
@@ -761,7 +761,7 @@ public class TestZoneawareEnsemblePlacementPolicy extends TestCase {
         BookieId replacedBookie = replacePlacementResult.getResult();
         assertEquals("replaced bookie", addr8.toBookieId(), replacedBookie);
         assertEquals("PlacementPolicyAdherence", PlacementPolicyAdherence.MEETS_STRICT,
-                replacePlacementResult.isAdheringToPolicy());
+                replacePlacementResult.getAdheringToPolicy());
 
         excludedBookies.add(addr8.toBookieId());
         /*
@@ -854,7 +854,7 @@ public class TestZoneawareEnsemblePlacementPolicy extends TestCase {
         BookieId replacedBookie = replaceResponse.getResult();
         assertEquals("replaced bookie", "/zone3/ud2", zepp.resolveNetworkLocation(replacedBookie));
         assertEquals("PlacementPolicyAdherence", PlacementPolicyAdherence.MEETS_SOFT,
-                replaceResponse.isAdheringToPolicy());
+                replaceResponse.getAdheringToPolicy());
     }
 
     @Test
@@ -1074,7 +1074,7 @@ public class TestZoneawareEnsemblePlacementPolicy extends TestCase {
             List<BookieId> ensemble = ensembleResponse.getResult();
             assertFalse(ensemble.contains(addr4.toBookieId()));
             assertEquals("PlacementPolicyAdherence", PlacementPolicyAdherence.MEETS_STRICT,
-                    ensembleResponse.isAdheringToPolicy());
+                    ensembleResponse.getAdheringToPolicy());
         }
 
         // we could still use addr4 for urgent allocation if it is just bookie
@@ -1084,7 +1084,7 @@ public class TestZoneawareEnsemblePlacementPolicy extends TestCase {
         List<BookieId> ensemble = ensembleResponse.getResult();
         assertTrue(ensemble.contains(addr4.toBookieId()));
         assertEquals("PlacementPolicyAdherence", PlacementPolicyAdherence.MEETS_STRICT,
-                ensembleResponse.isAdheringToPolicy());
+                ensembleResponse.getAdheringToPolicy());
     }
 
     @Test
@@ -1135,7 +1135,7 @@ public class TestZoneawareEnsemblePlacementPolicy extends TestCase {
         assertFalse("excludeBookie should not be included in the ensemble",
                 newEnsembleSet.contains(addr5.toBookieId()));
         assertEquals("PlacementPolicyAdherence", PlacementPolicyAdherence.FAIL,
-                newEnsemblePlacementResult.isAdheringToPolicy());
+                newEnsemblePlacementResult.getAdheringToPolicy());
 
         rwAddrs.remove(addr4.toBookieId());
         roAddrs.add(addr4.toBookieId());
@@ -1209,7 +1209,7 @@ public class TestZoneawareEnsemblePlacementPolicy extends TestCase {
          */
         assertEquals("ReplaceBookie candidate", addr7.toBookieId(), replaceBookie);
         assertEquals("PlacementPolicyAdherence", PlacementPolicyAdherence.FAIL,
-                replaceResponse.isAdheringToPolicy());
+                replaceResponse.getAdheringToPolicy());
 
         rwAddrs.remove(addr7.toBookieId());
         excludeBookies.add(addr7.toBookieId());
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTest.java
index ff1c71757a..f8cddf3f5e 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTest.java
@@ -294,6 +294,151 @@ public class AuditorPlacementPolicyCheckTest extends BookKeeperClusterTestCase {
         }
     }
 
+    @Test
+    public void testPlacementPolicyCheckWithLedgersNotAdheringToPlacementPolicyAndNotMarkToUnderreplication()
+            throws Exception {
+        int numOfBookies = 5;
+        int numOfLedgersNotAdheringToPlacementPolicy = 0;
+        List<BookieId> bookieAddresses = new ArrayList<>();
+        try (RegistrationManager regManager = driver.createRegistrationManager()) {
+            for (int i = 0; i < numOfBookies; i++) {
+                BookieId bookieAddress = new BookieSocketAddress("98.98.98." + i, 2181).toBookieId();
+                bookieAddresses.add(bookieAddress);
+                regManager.registerBookie(bookieAddress, false, BookieServiceInfo.EMPTY);
+            }
+        }
+
+        // only three racks
+        StaticDNSResolver.addNodeToRack("98.98.98.0", "/rack1");
+        StaticDNSResolver.addNodeToRack("98.98.98.1", "/rack2");
+        StaticDNSResolver.addNodeToRack("98.98.98.2", "/rack3");
+        StaticDNSResolver.addNodeToRack("98.98.98.3", "/rack1");
+        StaticDNSResolver.addNodeToRack("98.98.98.4", "/rack2");
+
+        LedgerManagerFactory mFactory = driver.getLedgerManagerFactory();
+        LedgerManager lm = mFactory.newLedgerManager();
+        int ensembleSize = 5;
+        int writeQuorumSize = 3;
+        int ackQuorumSize = 2;
+        int minNumRacksPerWriteQuorumConfValue = 3;
+
+        /*
+         * this closed ledger doesn't adhere to placement policy because there are only
+         * 3 racks, and the ensembleSize is 5.
+         */
+        LedgerMetadata initMeta = LedgerMetadataBuilder.create()
+                .withId(1L)
+                .withEnsembleSize(ensembleSize)
+                .withWriteQuorumSize(writeQuorumSize)
+                .withAckQuorumSize(ackQuorumSize)
+                .newEnsembleEntry(0L, bookieAddresses)
+                .withClosedState()
+                .withLastEntryId(100)
+                .withLength(10000)
+                .withDigestType(DigestType.DUMMY)
+                .withPassword(new byte[0])
+                .build();
+        lm.createLedgerMetadata(1L, initMeta).get();
+        numOfLedgersNotAdheringToPlacementPolicy++;
+
+        ServerConfiguration servConf = new ServerConfiguration(confByIndex(0));
+        servConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorumConfValue);
+        setServerConfigPropertiesForRackPlacement(servConf);
+        MutableObject<Auditor> auditorRef = new MutableObject<Auditor>();
+        try {
+            TestStatsLogger statsLogger = startAuditorAndWaitForPlacementPolicyCheck(servConf, auditorRef);
+            Gauge<? extends Number> ledgersNotAdheringToPlacementPolicyGuage = statsLogger
+                    .getGauge(ReplicationStats.NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY);
+            assertEquals("NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY guage value",
+                    numOfLedgersNotAdheringToPlacementPolicy, ledgersNotAdheringToPlacementPolicyGuage.getSample());
+            Gauge<? extends Number> ledgersSoftlyAdheringToPlacementPolicyGuage = statsLogger
+                    .getGauge(ReplicationStats.NUM_LEDGERS_SOFTLY_ADHERING_TO_PLACEMENT_POLICY);
+            assertEquals("NUM_LEDGERS_SOFTLY_ADHERING_TO_PLACEMENT_POLICY guage value",
+                    0, ledgersSoftlyAdheringToPlacementPolicyGuage.getSample());
+        } finally {
+            Auditor auditor = auditorRef.getValue();
+            if (auditor != null) {
+                auditor.close();
+            }
+        }
+        LedgerUnderreplicationManager underreplicationManager = mFactory.newLedgerUnderreplicationManager();
+        long unnderReplicateLedgerId = underreplicationManager.pollLedgerToRereplicate();
+        assertEquals(unnderReplicateLedgerId, -1);
+    }
+
+    @Test
+    public void testPlacementPolicyCheckWithLedgersNotAdheringToPlacementPolicyAndMarkToUnderreplication()
+            throws Exception {
+        int numOfBookies = 5;
+        int numOfLedgersNotAdheringToPlacementPolicy = 0;
+        List<BookieId> bookieAddresses = new ArrayList<>();
+        try (RegistrationManager regManager = driver.createRegistrationManager()) {
+            for (int i = 0; i < numOfBookies; i++) {
+                BookieId bookieAddress = new BookieSocketAddress("98.98.98." + i, 2181).toBookieId();
+                bookieAddresses.add(bookieAddress);
+                regManager.registerBookie(bookieAddress, false, BookieServiceInfo.EMPTY);
+            }
+        }
+
+        // only three racks
+        StaticDNSResolver.addNodeToRack("98.98.98.0", "/rack1");
+        StaticDNSResolver.addNodeToRack("98.98.98.1", "/rack2");
+        StaticDNSResolver.addNodeToRack("98.98.98.2", "/rack3");
+        StaticDNSResolver.addNodeToRack("98.98.98.3", "/rack1");
+        StaticDNSResolver.addNodeToRack("98.98.98.4", "/rack2");
+
+        LedgerManagerFactory mFactory = driver.getLedgerManagerFactory();
+        LedgerManager lm = mFactory.newLedgerManager();
+        int ensembleSize = 5;
+        int writeQuorumSize = 3;
+        int ackQuorumSize = 2;
+        int minNumRacksPerWriteQuorumConfValue = 3;
+
+        /*
+         * this closed ledger doesn't adhere to placement policy because there are only
+         * 3 racks, and the ensembleSize is 5.
+         */
+        LedgerMetadata initMeta = LedgerMetadataBuilder.create()
+                .withId(1L)
+                .withEnsembleSize(ensembleSize)
+                .withWriteQuorumSize(writeQuorumSize)
+                .withAckQuorumSize(ackQuorumSize)
+                .newEnsembleEntry(0L, bookieAddresses)
+                .withClosedState()
+                .withLastEntryId(100)
+                .withLength(10000)
+                .withDigestType(DigestType.DUMMY)
+                .withPassword(new byte[0])
+                .build();
+        lm.createLedgerMetadata(1L, initMeta).get();
+        numOfLedgersNotAdheringToPlacementPolicy++;
+
+        ServerConfiguration servConf = new ServerConfiguration(confByIndex(0));
+        servConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorumConfValue);
+        servConf.setRepairedPlacementPolicyNotAdheringBookieEnable(true);
+        setServerConfigPropertiesForRackPlacement(servConf);
+        MutableObject<Auditor> auditorRef = new MutableObject<Auditor>();
+        try {
+            TestStatsLogger statsLogger = startAuditorAndWaitForPlacementPolicyCheck(servConf, auditorRef);
+            Gauge<? extends Number> ledgersNotAdheringToPlacementPolicyGuage = statsLogger
+                    .getGauge(ReplicationStats.NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY);
+            assertEquals("NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY guage value",
+                    numOfLedgersNotAdheringToPlacementPolicy, ledgersNotAdheringToPlacementPolicyGuage.getSample());
+            Gauge<? extends Number> ledgersSoftlyAdheringToPlacementPolicyGuage = statsLogger
+                    .getGauge(ReplicationStats.NUM_LEDGERS_SOFTLY_ADHERING_TO_PLACEMENT_POLICY);
+            assertEquals("NUM_LEDGERS_SOFTLY_ADHERING_TO_PLACEMENT_POLICY guage value",
+                    0, ledgersSoftlyAdheringToPlacementPolicyGuage.getSample());
+        } finally {
+            Auditor auditor = auditorRef.getValue();
+            if (auditor != null) {
+                auditor.close();
+            }
+        }
+        LedgerUnderreplicationManager underreplicationManager = mFactory.newLedgerUnderreplicationManager();
+        long unnderReplicateLedgerId = underreplicationManager.pollLedgerToRereplicate();
+        assertEquals(unnderReplicateLedgerId, 1L);
+    }
+
     @Test
     public void testPlacementPolicyCheckForURLedgersElapsedRecoveryGracePeriod() throws Exception {
         testPlacementPolicyCheckWithURLedgers(true);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
index a69b9cbd49..510d02f8db 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
@@ -19,32 +19,48 @@
  */
 package org.apache.bookkeeper.replication;
 
+import static org.apache.bookkeeper.replication.ReplicationStats.AUDITOR_SCOPE;
 import static org.apache.bookkeeper.replication.ReplicationStats.NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import io.netty.util.HashedWheelTimer;
 import java.io.IOException;
 import java.net.URI;
+import java.net.UnknownHostException;
 import java.util.Enumeration;
 import java.util.List;
 import java.util.Map.Entry;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.TimerTask;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 import lombok.Cleanup;
+import org.apache.bookkeeper.bookie.BookieImpl;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.BookKeeperTestClient;
 import org.apache.bookkeeper.client.ClientUtil;
+import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
+import org.apache.bookkeeper.client.ZoneawareEnsemblePlacementPolicy;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.feature.FeatureProvider;
 import org.apache.bookkeeper.meta.AbstractZkLedgerManager;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
@@ -53,19 +69,25 @@ import org.apache.bookkeeper.meta.MetadataBookieDriver;
 import org.apache.bookkeeper.meta.MetadataClientDriver;
 import org.apache.bookkeeper.meta.MetadataDrivers;
 import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
+import org.apache.bookkeeper.meta.exceptions.MetadataException;
 import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
 import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.net.DNSToSwitchMapping;
+import org.apache.bookkeeper.proto.BookieAddressResolver;
 import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
 import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.test.TestStatsProvider;
 import org.apache.bookkeeper.test.TestStatsProvider.TestStatsLogger;
 import org.apache.bookkeeper.util.BookKeeperConstants;
+import org.apache.bookkeeper.util.StaticDNSResolver;
 import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase;
+import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.zookeeper.AsyncCallback.StatCallback;
 import org.apache.zookeeper.KeeperException;
@@ -75,6 +97,8 @@ import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooKeeper.States;
+import org.apache.zookeeper.data.Stat;
+import org.awaitility.Awaitility;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -1129,4 +1153,184 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase {
             bkWithMockZK.close();
         }
     }
+
+    @Test
+    public void testRepairedNotAdheringPlacementPolicyLedgerFragmentsOnRack() throws Exception {
+        testRepairedNotAdheringPlacementPolicyLedgerFragments(RackawareEnsemblePlacementPolicy.class);
+    }
+
+    private void testRepairedNotAdheringPlacementPolicyLedgerFragments(
+            Class<? extends EnsemblePlacementPolicy> placementPolicyClass) throws Exception {
+        List<BookieId> firstThreeBookies = servers.stream().map(ele -> {
+            try {
+                return ele.getServer().getBookieId();
+            } catch (UnknownHostException e) {
+                return null;
+            }
+        }).filter(Objects::nonNull).collect(Collectors.toList());
+
+        baseClientConf.setProperty("reppDnsResolverClass", StaticDNSResolver.class.getName());
+        baseClientConf.setProperty("enforceStrictZoneawarePlacement", false);
+        bkc = new BookKeeperTestClient(baseClientConf) {
+            @Override
+            protected EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfiguration conf,
+                    DNSToSwitchMapping dnsResolver, HashedWheelTimer timer, FeatureProvider featureProvider,
+                    StatsLogger statsLogger, BookieAddressResolver bookieAddressResolver) throws IOException {
+                EnsemblePlacementPolicy ensemblePlacementPolicy = null;
+                if (ZoneawareEnsemblePlacementPolicy.class == placementPolicyClass) {
+                    ensemblePlacementPolicy = buildZoneAwareEnsemblePlacementPolicy(firstThreeBookies);
+                } else if (RackawareEnsemblePlacementPolicy.class == placementPolicyClass) {
+                    ensemblePlacementPolicy = buildRackAwareEnsemblePlacementPolicy(firstThreeBookies);
+                }
+                ensemblePlacementPolicy.initialize(conf, Optional.ofNullable(dnsResolver), timer,
+                        featureProvider, statsLogger, bookieAddressResolver);
+                return ensemblePlacementPolicy;
+            }
+        };
+
+        //This ledger not adhering placement policy, the combine(0,1,2) rack is 1.
+        LedgerHandle lh = bkc.createLedger(3, 3, 3, BookKeeper.DigestType.CRC32, TESTPASSWD);
+
+        int entrySize = 10;
+        for (int i = 0; i < entrySize; i++) {
+            lh.addEntry(data);
+        }
+        lh.close();
+
+        int minNumRacksPerWriteQuorumConfValue = 2;
+
+        ServerConfiguration servConf = new ServerConfiguration(confByIndex(0));
+        servConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorumConfValue);
+        servConf.setProperty("reppDnsResolverClass", StaticDNSResolver.class.getName());
+        servConf.setAuditorPeriodicPlacementPolicyCheckInterval(1000);
+        servConf.setRepairedPlacementPolicyNotAdheringBookieEnable(true);
+
+        MutableObject<Auditor> auditorRef = new MutableObject<Auditor>();
+        try {
+            TestStatsLogger statsLogger = startAuditorAndWaitForPlacementPolicyCheck(servConf, auditorRef);
+            Gauge<? extends Number> ledgersNotAdheringToPlacementPolicyGuage = statsLogger
+                    .getGauge(ReplicationStats.NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY);
+            assertEquals("NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY guage value",
+                    1, ledgersNotAdheringToPlacementPolicyGuage.getSample());
+            Gauge<? extends Number> ledgersSoftlyAdheringToPlacementPolicyGuage = statsLogger
+                    .getGauge(ReplicationStats.NUM_LEDGERS_SOFTLY_ADHERING_TO_PLACEMENT_POLICY);
+            assertEquals("NUM_LEDGERS_SOFTLY_ADHERING_TO_PLACEMENT_POLICY guage value",
+                    0, ledgersSoftlyAdheringToPlacementPolicyGuage.getSample());
+        } finally {
+            Auditor auditor = auditorRef.getValue();
+            if (auditor != null) {
+                auditor.close();
+            }
+        }
+
+        Stat stat = bkc.getZkHandle()
+                .exists("/ledgers/underreplication/ledgers/0000/0000/0000/0000/urL0000000000", false);
+        assertNotNull(stat);
+
+        baseConf.setRepairedPlacementPolicyNotAdheringBookieEnable(true);
+        BookKeeper bookKeeper = new BookKeeperTestClient(baseClientConf) {
+            @Override
+            protected EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfiguration conf,
+                    DNSToSwitchMapping dnsResolver, HashedWheelTimer timer, FeatureProvider featureProvider,
+                    StatsLogger statsLogger, BookieAddressResolver bookieAddressResolver) throws IOException {
+                EnsemblePlacementPolicy ensemblePlacementPolicy = null;
+                if (ZoneawareEnsemblePlacementPolicy.class == placementPolicyClass) {
+                    ensemblePlacementPolicy = buildZoneAwareEnsemblePlacementPolicy(firstThreeBookies);
+                } else if (RackawareEnsemblePlacementPolicy.class == placementPolicyClass) {
+                    ensemblePlacementPolicy = buildRackAwareEnsemblePlacementPolicy(firstThreeBookies);
+                }
+                ensemblePlacementPolicy.initialize(conf, Optional.ofNullable(dnsResolver), timer,
+                        featureProvider, statsLogger, bookieAddressResolver);
+                return ensemblePlacementPolicy;
+            }
+        };
+        ReplicationWorker rw = new ReplicationWorker(baseConf, bookKeeper, false, NullStatsLogger.INSTANCE);
+        rw.start();
+
+        //start new bookie, the rack is /rack2
+        BookieId newBookieId = startNewBookieAndReturnBookieId();
+
+        Awaitility.await().untilAsserted(() -> {
+            LedgerMetadata metadata = bkc.getLedgerManager().readLedgerMetadata(lh.getId()).get().getValue();
+            List<BookieId> newBookies = metadata.getAllEnsembles().get(0L);
+            assertTrue(newBookies.contains(newBookieId));
+        });
+
+        Awaitility.await().untilAsserted(() -> {
+            Stat stat1 = bkc.getZkHandle()
+                    .exists("/ledgers/underreplication/ledgers/0000/0000/0000/0000/urL0000000000", false);
+            assertNull(stat1);
+        });
+
+        for (BookieId rack1Book : firstThreeBookies) {
+            killBookie(rack1Book);
+        }
+
+        verifyRecoveredLedgers(lh, 0, entrySize - 1);
+    }
+
+    private EnsemblePlacementPolicy buildRackAwareEnsemblePlacementPolicy(List<BookieId> bookieIds) {
+        return new RackawareEnsemblePlacementPolicy() {
+            @Override
+            public String resolveNetworkLocation(BookieId addr) {
+                if (bookieIds.contains(addr)) {
+                    return "/rack1";
+                }
+                //The other bookie is /rack2
+                return "/rack2";
+            }
+        };
+    }
+
+    private EnsemblePlacementPolicy buildZoneAwareEnsemblePlacementPolicy(List<BookieId> firstThreeBookies) {
+        return new ZoneawareEnsemblePlacementPolicy() {
+            @Override
+            protected String resolveNetworkLocation(BookieId addr) {
+                //The first three bookie 1 is /zone1/ud1
+                //The first three bookie 2,3 is /zone1/ud2
+                if (firstThreeBookies.get(0).equals(addr)) {
+                    return "/zone1/ud1";
+                } else if (firstThreeBookies.contains(addr)) {
+                    return "/zone1/ud2";
+                }
+                //The other bookie is /zone2/ud1
+                return "/zone2/ud1";
+            }
+        };
+    }
+
+    private TestStatsLogger startAuditorAndWaitForPlacementPolicyCheck(ServerConfiguration servConf,
+            MutableObject<Auditor> auditorRef) throws MetadataException, CompatibilityException, KeeperException,
+            InterruptedException, ReplicationException.UnavailableException, UnknownHostException {
+        LedgerManagerFactory mFactory = driver.getLedgerManagerFactory();
+        LedgerUnderreplicationManager urm = mFactory.newLedgerUnderreplicationManager();
+        TestStatsProvider statsProvider = new TestStatsProvider();
+        TestStatsLogger statsLogger = statsProvider.getStatsLogger(AUDITOR_SCOPE);
+        TestStatsProvider.TestOpStatsLogger placementPolicyCheckStatsLogger =
+                (TestStatsProvider.TestOpStatsLogger) statsLogger
+                .getOpStatsLogger(ReplicationStats.PLACEMENT_POLICY_CHECK_TIME);
+
+        final AuditorPeriodicCheckTest.TestAuditor auditor = new AuditorPeriodicCheckTest.TestAuditor(
+                BookieImpl.getBookieId(servConf).toString(), servConf, bkc, false, statsLogger);
+        auditorRef.setValue(auditor);
+        CountDownLatch latch = auditor.getLatch();
+        assertEquals("PLACEMENT_POLICY_CHECK_TIME SuccessCount", 0,
+                placementPolicyCheckStatsLogger.getSuccessCount());
+        urm.setPlacementPolicyCheckCTime(-1);
+        auditor.start();
+        /*
+         * since placementPolicyCheckCTime is set to -1, placementPolicyCheck should be
+         * scheduled to run with no initialdelay
+         */
+        assertTrue("placementPolicyCheck should have executed", latch.await(20, TimeUnit.SECONDS));
+        for (int i = 0; i < 20; i++) {
+            Thread.sleep(100);
+            if (placementPolicyCheckStatsLogger.getSuccessCount() >= 1) {
+                break;
+            }
+        }
+        assertEquals("PLACEMENT_POLICY_CHECK_TIME SuccessCount", 1,
+                placementPolicyCheckStatsLogger.getSuccessCount());
+        return statsLogger;
+    }
 }
diff --git a/site3/website/docs/reference/config.md b/site3/website/docs/reference/config.md
index 57d9abd89b..232100a970 100644
--- a/site3/website/docs/reference/config.md
+++ b/site3/website/docs/reference/config.md
@@ -325,16 +325,17 @@ The table below lists parameters that you can set to configure bookies. All conf
 
 ## AutoRecovery auditor settings
 
-| Parameter | Description | Default
-| --------- | ----------- | ------- | 
-| auditorPeriodicBookieCheckInterval | The time interval between auditor bookie checks, in seconds. The auditor bookie check checks ledger metadata to see which bookies should contain entries for each ledger. If a bookie that should contain entries is unavailable, then the ledger containing that entry is marked for recovery. Setting this to 0 disables the periodic check. Bookie checks will still run when a bookie fails. The default is once per day. | 86400 | 
-| auditorPeriodicCheckInterval | The time interval, in seconds, at which the auditor will check all ledgers in the cluster. By default this runs once a week.<br /><br />Set this to 0 to disable the periodic check completely. Note that periodic checking will put extra load on the cluster, so it should not be run more frequently than once a day.<br /> | 604800 | 
-| auditorPeriodicPlacementPolicyCheckInterval | The time interval between auditor placement policy checks, in seconds. The auditor placement policy check validates if the ensemble of segments of all the closed ledgers is adhering to the placement policy. It is just monitoring scrutiny but doesn't take any corrective measure other than logging error and reporting metrics. By default it is disabled. |  | 
-| auditorLedgerVerificationPercentage | The percentage of a ledger (fragment)'s entries will be verified before claiming a fragment as missing. If it is 0, it only verifies the first and last entries of a given fragment.<br /> |  | 
-| lostBookieRecoveryDelay | How long to wait, in seconds, before starting autorecovery of a lost bookie. |  | 
-| storeSystemTimeAsLedgerUnderreplicatedMarkTime | Enable the Auditor to use system time as underreplicated ledger mark time. If this is enabled, Auditor will write a ctime field into the underreplicated ledger znode. | true | 
-| underreplicatedLedgerRecoveryGracePeriod | The grace period (in seconds) for underreplicated ledgers recovery. If ledger is marked underreplicated for more than this period then it will be reported by placementPolicyCheck in Auditor. Setting this to 0 will disable this check. |  | 
-| auditorReplicasCheckInterval | Sets the regularity/interval at which the auditor will run a replicas check of all ledgers, which are closed. This should not be run very often since it validates availability of replicas of all ledgers by querying bookies. Setting this to 0 will completely disable the periodic replicas check. By default it is disabled. |  | 
+| Parameter | Description                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    [...]
+| --------- |----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...]
+| auditorPeriodicBookieCheckInterval | The time interval between auditor bookie checks, in seconds. The auditor bookie check checks ledger metadata to see which bookies should contain entries for each ledger. If a bookie that should contain entries is unavailable, then the ledger containing that entry is marked for recovery. Setting this to 0 disables the periodic check. Bookie checks will still run when a bookie fails. The default is once per day.                                         [...]
+| auditorPeriodicCheckInterval | The time interval, in seconds, at which the auditor will check all ledgers in the cluster. By default this runs once a week.<br /><br />Set this to 0 to disable the periodic check completely. Note that periodic checking will put extra load on the cluster, so it should not be run more frequently than once a day.<br />                                                                                                                                              [...]
+| auditorPeriodicPlacementPolicyCheckInterval | The time interval between auditor placement policy checks, in seconds. The auditor placement policy check validates if the ensemble of segments of all the closed ledgers is adhering to the placement policy. It is just monitoring scrutiny but doesn't take any corrective measure other than logging error and reporting metrics. By default, it is disabled.                                                                                            [...]
+| repairedPlacementPolicyNotAdheringBookieEnabled | In Auditor, it combines with auditorPeriodicPlacementPolicyCheckInterval, to control is marked ledger id to under replication managed when found a ledger ensemble not adhere to placement policy. In ReplicationWorker, to control is to repair the ledger which the ensemble does not adhere to the placement policy. By default, it is disabled. If you want to enable this feature, consider two factors. 1:Must config RackawareEnsemblePlacementPo [...]
+| auditorLedgerVerificationPercentage | The percentage of a ledger (fragment)'s entries will be verified before claiming a fragment as missing. If it is 0, it only verifies the first and last entries of a given fragment.<br />                                                                                                                                                                                                                                                                           [...]
+| lostBookieRecoveryDelay | How long to wait, in seconds, before starting autorecovery of a lost bookie.                                                                                                                                                                                                                                                                                                                                                                                                     [...]
+| storeSystemTimeAsLedgerUnderreplicatedMarkTime | Enable the Auditor to use system time as underreplicated ledger mark time. If this is enabled, Auditor will write a ctime field into the underreplicated ledger znode.                                                                                                                                                                                                                                                                                    [...]
+| underreplicatedLedgerRecoveryGracePeriod | The grace period (in seconds) for underreplicated ledgers recovery. If ledger is marked underreplicated for more than this period then it will be reported by placementPolicyCheck in Auditor. Setting this to 0 will disable this check.                                                                                                                                                                                                                       [...]
+| auditorReplicasCheckInterval | Sets the regularity/interval at which the auditor will run a replicas check of all ledgers, which are closed. This should not be run very often since it validates availability of replicas of all ledgers by querying bookies. Setting this to 0 will completely disable the periodic replicas check. By default it is disabled.                                                                                                                                           [...]
 
 
 ## AutoRecovery replication worker settings