You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2022/08/28 06:23:57 UTC

[GitHub] [bookkeeper] horizonzy opened a new pull request, #3359: Feature: auto recover support repaired not adhering placement ledger

horizonzy opened a new pull request, #3359:
URL: https://github.com/apache/bookkeeper/pull/3359

   Descriptions of the changes in this PR:
   There is a user case.
   
   1. They have two zones, they have a rack aware policy that ensures it writes across two zones
   2. They had some data on a topic with long retention
   3. They ran a disaster recovery test, during this test, they shut down one zone
   4. During the period of the DR test, auto-recovery ran. Because the DR test only has one zone active, and because the default of auto-recovery is to do rack aware with the best effort, it recovered up to an expected number of replicas
   5. They stopped the DR test and all was well, but now that ledger was only on one zone
   6. They ran another DR test, this time basically moving data to the another zone, but now data is missing because it is all only on one zone
   
   We should support a feature to cover this case.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] horizonzy commented on pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#issuecomment-1207521488

   @eolivelli ping, address the comment, could you review it again


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] rdhabalia commented on a diff in pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
rdhabalia commented on code in PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#discussion_r928058192


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java:
##########
@@ -1128,14 +1129,24 @@ private ArrayList<BookieId> replaceBookiesInEnsemble(
      * @param ledgerFragment
      *            - LedgerFragment to replicate
      */
-    public void replicateLedgerFragment(LedgerHandle lh,
-            final LedgerFragment ledgerFragment,
-            final BiConsumer<Long, Long> onReadEntryFailureCallback)
-            throws InterruptedException, BKException {
-        Optional<Set<BookieId>> excludedBookies = Optional.empty();
-        Map<Integer, BookieId> targetBookieAddresses =
-                getReplacementBookiesByIndexes(lh, ledgerFragment.getEnsemble(),
-                        ledgerFragment.getBookiesIndexes(), excludedBookies);
+    public void replicateLedgerFragment(LedgerHandle lh, final LedgerFragment ledgerFragment,
+            final BiConsumer<Long, Long> onReadEntryFailureCallback) throws InterruptedException, BKException {
+        Map<Integer, BookieId> targetBookieAddresses = null;
+        if (LedgerFragment.ReplicateType.DATA_LOSS == ledgerFragment.getReplicateType()) {

Review Comment:
   what's the difference between `DATA_LOSS` and `DATA_NOT_ADHERING_PLACEMENT`? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] rdhabalia commented on pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
rdhabalia commented on PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#issuecomment-1193025957

   it's also better if you can add in description, what modification we are making in the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] horizonzy commented on a diff in pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#discussion_r928451702


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java:
##########
@@ -502,10 +570,40 @@ private boolean isLastSegmentOpenAndMissingBookies(LedgerHandle lh) throws BKExc
      */
     private Set<LedgerFragment> getUnderreplicatedFragments(LedgerHandle lh, Long ledgerVerificationPercentage)
             throws InterruptedException {
+        //The data loss fragments is first to repair. If a fragment is data_loss and not_adhering_placement
+        //at the same time, we only fix data_loss in this time. After fix data_loss, the fragment is still
+        //not_adhering_placement, Auditor will mark this ledger again.

Review Comment:
   It maybe.If a ledger fragment is `data_loss` and `not _adhere_placement_policy` together, we can't fix it once. 
   Example: There are four bookies, 1(rack1), 2(rack1), 3(rack2), 4(rack3).
   There a fragment ensemble is (1, 2, 3), the min rack is 3, and bookie3 data losss.
   Only (1,3,4) or (2,3,4) can adere placement policy. We should fix data_loss firstly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] horizonzy commented on a diff in pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#discussion_r929497509


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java:
##########
@@ -570,7 +570,7 @@ public BookKeeper(ClientConfiguration conf, ZooKeeper zk, EventLoopGroup eventLo
         bookieQuarantineRatio = 1.0;
     }
 
-    private EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfiguration conf,
+    protected EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfiguration conf,

Review Comment:
   yes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] eolivelli commented on a diff in pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#discussion_r939673475


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java:
##########
@@ -1079,4 +1083,236 @@ public boolean areAckedBookiesAdheringToPlacementPolicy(Set<BookieId> ackedBooki
         }
         return rackCounter.size() >= minWriteQuorumNumRacksPerWriteQuorum;
     }
+
+    @Override
+    public PlacementResult<List<BookieId>> replaceToAdherePlacementPolicy(
+            int ensembleSize,
+            int writeQuorumSize,
+            int ackQuorumSize,
+            Set<BookieId> excludeBookies,
+            List<BookieId> currentEnsemble) {
+        rwLock.readLock().lock();
+        try {
+            PlacementPolicyAdherence currentPlacementAdherence = isEnsembleAdheringToPlacementPolicy(
+                    currentEnsemble, writeQuorumSize, ackQuorumSize);
+            if (PlacementPolicyAdherence.FAIL != currentPlacementAdherence) {
+                return PlacementResult.of(new ArrayList<>(currentEnsemble), currentPlacementAdherence);
+            }
+            for (BookieId bookieId : currentEnsemble) {
+                if (!knownBookies.containsKey(bookieId)) {
+                    excludeBookies.add(bookieId);
+                }
+            }
+            PlacementResult<List<BookieId>> placementResult = PlacementResult.of(Collections.emptyList(),
+                    PlacementPolicyAdherence.FAIL);
+            int minDiffer = Integer.MAX_VALUE;
+            for (int i = 0; i < currentEnsemble.size(); i++) {
+                PlacementResult<List<BookieId>> result = doReplaceToAdherePlacementPolicy(ensembleSize,
+                        writeQuorumSize, ackQuorumSize, excludeBookies, currentEnsemble, i);
+                if (PlacementPolicyAdherence.FAIL == result.getAdheringToPolicy()) {
+                    continue;
+                }
+                int differ = differBetweenBookies(currentEnsemble, result.getResult());
+                if (differ < minDiffer) {
+                    minDiffer = differ;
+                    placementResult = result;
+                    if (minDiffer == 1) {
+                        break;
+                    }
+                }
+            }
+            return placementResult;
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    private int differBetweenBookies(List<BookieId> bookiesA, List<BookieId> bookiesB) {

Review Comment:
   Nit: static?



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java:
##########
@@ -1079,4 +1083,236 @@ public boolean areAckedBookiesAdheringToPlacementPolicy(Set<BookieId> ackedBooki
         }
         return rackCounter.size() >= minWriteQuorumNumRacksPerWriteQuorum;
     }
+
+    @Override
+    public PlacementResult<List<BookieId>> replaceToAdherePlacementPolicy(
+            int ensembleSize,
+            int writeQuorumSize,
+            int ackQuorumSize,
+            Set<BookieId> excludeBookies,
+            List<BookieId> currentEnsemble) {
+        rwLock.readLock().lock();
+        try {
+            PlacementPolicyAdherence currentPlacementAdherence = isEnsembleAdheringToPlacementPolicy(
+                    currentEnsemble, writeQuorumSize, ackQuorumSize);
+            if (PlacementPolicyAdherence.FAIL != currentPlacementAdherence) {
+                return PlacementResult.of(new ArrayList<>(currentEnsemble), currentPlacementAdherence);
+            }
+            for (BookieId bookieId : currentEnsemble) {
+                if (!knownBookies.containsKey(bookieId)) {
+                    excludeBookies.add(bookieId);
+                }
+            }
+            PlacementResult<List<BookieId>> placementResult = PlacementResult.of(Collections.emptyList(),
+                    PlacementPolicyAdherence.FAIL);
+            int minDiffer = Integer.MAX_VALUE;
+            for (int i = 0; i < currentEnsemble.size(); i++) {
+                PlacementResult<List<BookieId>> result = doReplaceToAdherePlacementPolicy(ensembleSize,
+                        writeQuorumSize, ackQuorumSize, excludeBookies, currentEnsemble, i);
+                if (PlacementPolicyAdherence.FAIL == result.getAdheringToPolicy()) {
+                    continue;
+                }
+                int differ = differBetweenBookies(currentEnsemble, result.getResult());
+                if (differ < minDiffer) {
+                    minDiffer = differ;
+                    placementResult = result;
+                    if (minDiffer == 1) {
+                        break;
+                    }
+                }
+            }
+            return placementResult;
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    private int differBetweenBookies(List<BookieId> bookiesA, List<BookieId> bookiesB) {
+        if (CollectionUtils.isEmpty(bookiesA) || CollectionUtils.isEmpty(bookiesB)) {
+            return Integer.MAX_VALUE;
+        }
+        if (bookiesA.size() != bookiesB.size()) {
+            return Integer.MAX_VALUE;
+        }
+        int differ = 0;
+        for (int i = 0; i < bookiesA.size(); i++) {
+            if (!bookiesA.get(i).equals(bookiesB.get(i))) {
+                differ++;
+            }
+        }
+        return differ;
+    }
+
+    private PlacementResult<List<BookieId>> doReplaceToAdherePlacementPolicy(
+            int ensembleSize,
+            int writeQuorumSize,
+            int ackQuorumSize,
+            Set<BookieId> excludeBookies,
+            List<BookieId> currentEnsemble,
+            int startIndex) {
+        final List<BookieNode> provisionalEnsembleNodes = currentEnsemble.stream()
+                .map(this::convertBookieToNode).collect(Collectors.toList());
+        final Set<Node> excludeNodes = convertBookiesToNodes(
+                addDefaultRackBookiesIfMinNumRacksIsEnforced(excludeBookies));
+        int minNumRacksPerWriteQuorumForThisEnsemble = Math.min(writeQuorumSize, minNumRacksPerWriteQuorum);
+        final RRTopologyAwareCoverageEnsemble ensemble =
+                new RRTopologyAwareCoverageEnsemble(
+                        ensembleSize,
+                        writeQuorumSize,
+                        ackQuorumSize,
+                        RACKNAME_DISTANCE_FROM_LEAVES,
+                        null,
+                        null,
+                        minNumRacksPerWriteQuorumForThisEnsemble);
+        int numRacks = topology.getNumOfRacks();
+        // only one rack or less than minNumRacksPerWriteQuorumForThisEnsemble, stop calculation to skip relocation
+        if (numRacks < 2 || numRacks < minNumRacksPerWriteQuorumForThisEnsemble) {
+            LOG.warn("Skip ensemble relocation because the cluster has only {} rack.", numRacks);
+            return PlacementResult.of(Collections.emptyList(), PlacementPolicyAdherence.FAIL);
+        }
+        BookieNode prevNode = null;
+        final BookieNode firstNode = provisionalEnsembleNodes.get(startIndex);
+        // use same bookie at first to reduce ledger replication
+        if (!excludeNodes.contains(firstNode) && ensemble.apply(firstNode, ensemble)
+                && ensemble.addNode(firstNode)) {
+            excludeNodes.add(firstNode);
+            prevNode = firstNode;
+        }
+        for (int i = prevNode == null ? 0 : 1; i < ensembleSize; i++) {
+            int index = (startIndex + i) % ensembleSize;
+            final String curRack;
+            if (null == prevNode) {
+                if ((null == localNode) || defaultRack.equals(localNode.getNetworkLocation())) {
+                    curRack = NodeBase.ROOT;
+                } else {
+                    curRack = localNode.getNetworkLocation();
+                }
+            } else {
+                curRack = "~" + prevNode.getNetworkLocation();
+            }
+            try {
+                prevNode = replaceToAdherePlacementPolicyInternal(
+                        curRack, excludeNodes, ensemble, ensemble,
+                        provisionalEnsembleNodes, index, ensembleSize, minNumRacksPerWriteQuorumForThisEnsemble);
+                // got a good candidate
+                if (ensemble.addNode(prevNode)) {
+                    // add the candidate to exclude set
+                    excludeNodes.add(prevNode);
+                } else {
+                    throw new BKNotEnoughBookiesException();
+                }
+                // replace to newer node
+                provisionalEnsembleNodes.set(index, prevNode);
+            } catch (BKNotEnoughBookiesException e) {
+                LOG.warn("Skip ensemble relocation because the cluster has not enough bookies.");
+                return PlacementResult.of(Collections.emptyList(), PlacementPolicyAdherence.FAIL);
+            }
+        }
+        List<BookieId> bookieList = ensemble.toList();
+        if (ensembleSize != bookieList.size()) {
+            LOG.warn("Not enough {} bookies are available to form an ensemble : {}.",
+                    ensembleSize, bookieList);
+            return PlacementResult.of(Collections.emptyList(), PlacementPolicyAdherence.FAIL);
+        }
+        PlacementPolicyAdherence placementPolicyAdherence = isEnsembleAdheringToPlacementPolicy(bookieList,
+                writeQuorumSize, ackQuorumSize);
+        if (PlacementPolicyAdherence.FAIL == placementPolicyAdherence) {
+            return PlacementResult.of(Collections.emptyList(), PlacementPolicyAdherence.FAIL);
+        }
+        return PlacementResult.of(revertBookieListByIndex(bookieList, startIndex), placementPolicyAdherence);
+    }
+
+    private List<BookieId> revertBookieListByIndex(List<BookieId> bookies, int startIndex) {
+        BookieId[] bookieIds = new BookieId[bookies.size()];
+        for (int i = 0; i < bookies.size(); i++) {
+            if (startIndex == bookies.size()) {
+                startIndex = 0;
+            }
+            bookieIds[startIndex++] = bookies.get(i);
+        }
+        return Lists.newArrayList(bookieIds);
+    }
+
+    private BookieNode replaceToAdherePlacementPolicyInternal(
+            String netPath, Set<Node> excludeBookies, Predicate<BookieNode> predicate,
+            Ensemble<BookieNode> ensemble, List<BookieNode> provisionalEnsembleNodes, int ensembleIndex,
+            int ensembleSize, int minNumRacksPerWriteQuorumForThisEnsemble) throws BKNotEnoughBookiesException {
+        final BookieNode currentNode = provisionalEnsembleNodes.get(ensembleIndex);
+        // if the current bookie could be applied to the ensemble, apply it to minify the number of bookies replaced
+        if (!excludeBookies.contains(currentNode) && predicate.apply(currentNode, ensemble)) {
+            return currentNode;
+        }
+        final List<Pair<String, List<BookieNode>>> conditionList = new ArrayList<>();
+        final Set<String> preExcludeRacks = new HashSet<>();
+        final Set<String> postExcludeRacks = new HashSet<>();
+        for (int i = 0; i < minNumRacksPerWriteQuorumForThisEnsemble - 1; i++) {
+            preExcludeRacks.add(provisionalEnsembleNodes.get(Math.floorMod((ensembleIndex - i - 1), ensembleSize))
+                    .getNetworkLocation());
+            postExcludeRacks.add(provisionalEnsembleNodes.get(Math.floorMod((ensembleIndex + i + 1), ensembleSize))
+                    .getNetworkLocation());
+        }
+        // adhere minNumRacksPerWriteQuorum by preExcludeRacks
+        // avoid additional replace from write quorum candidates by preExcludeRacks and postExcludeRacks
+        // avoid to use first candidate bookies for election by provisionalEnsembleNodes
+        conditionList.add(Pair.of(
+                "~" + String.join(",",
+                        Stream.concat(preExcludeRacks.stream(), postExcludeRacks.stream()).collect(Collectors.toSet())),
+                provisionalEnsembleNodes
+        ));
+        // avoid to use same rack between previous index by netPath
+        // avoid to use first candidate bookies for election by provisionalEnsembleNodes
+        conditionList.add(Pair.of(netPath, provisionalEnsembleNodes));
+        // avoid to use same rack between previous index by netPath
+        conditionList.add(Pair.of(netPath, Collections.emptyList()));
+
+        for (Pair<String, List<BookieNode>> condition : conditionList) {
+            WeightedRandomSelection<BookieNode> wRSelection = null;
+
+            final List<Node> leaves = new ArrayList<>(topology.getLeaves(condition.getLeft()));
+            if (!isWeighted) {
+                Collections.shuffle(leaves);
+            } else {
+                if (CollectionUtils.subtract(leaves, excludeBookies).size() < 1) {
+                    throw new BKNotEnoughBookiesException();
+                }
+                wRSelection = prepareForWeightedSelection(leaves);
+                if (wRSelection == null) {
+                    throw new BKNotEnoughBookiesException();
+                }
+            }
+
+            final Iterator<Node> it = leaves.iterator();
+            final Set<Node> bookiesSeenSoFar = new HashSet<>();
+            while (true) {
+                Node n;
+                if (isWeighted) {
+                    if (bookiesSeenSoFar.size() == leaves.size()) {
+                        // Don't loop infinitely.
+                        break;
+                    }
+                    n = wRSelection.getNextRandom();
+                    bookiesSeenSoFar.add(n);
+                } else {
+                    if (it.hasNext()) {
+                        n = it.next();
+                    } else {
+                        break;
+                    }
+                }
+                if (excludeBookies.contains(n)) {
+                    continue;
+                }
+                if (!(n instanceof BookieNode) || !predicate.apply((BookieNode) n, ensemble)) {
+                    continue;
+                }
+                // additional excludeBookies
+                if (condition.getRight().contains(n)) {
+                    continue;
+                }
+                BookieNode bn = (BookieNode) n;
+                return bn;
+            }
+        }
+        throw new BKNotEnoughBookiesException();

Review Comment:
   Can we log something if we reach to this point?
   



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java:
##########
@@ -440,6 +440,31 @@ default boolean areAckedBookiesAdheringToPlacementPolicy(Set<BookieId> ackedBook
         return true;
     }
 
+    /**
+     * Returns placement result. If the currentEnsemble is not adhering placement policy, returns new ensemble that
+     * adheres placement policy. It should be implemented so as to minify the number of bookies replaced.
+     *
+     * @param ensembleSize
+     *            ensemble size
+     * @param writeQuorumSize
+ *                writeQuorumSize of the ensemble
+     * @param ackQuorumSize
+     *            ackQuorumSize of the ensemble
+     * @param excludeBookies
+     *            bookies that should not be considered as targets
+     * @param currentEnsemble
+     *            current ensemble
+     * @return a placement result
+     */
+    default PlacementResult<List<BookieId>> replaceToAdherePlacementPolicy(
+            int ensembleSize,
+            int writeQuorumSize,
+            int ackQuorumSize,
+            Set<BookieId> excludeBookies,
+            List<BookieId> currentEnsemble) {
+        throw new UnsupportedOperationException();

Review Comment:
   What happens if you don't override this method?
   
   Arw we handling this exception in the code that calls this method?
   
   My understanding is that we cannot provide a good default implementation.
   
   In the called code we could catch this exception, log something and abort gracefully the operation 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] horizonzy commented on a diff in pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#discussion_r939693630


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java:
##########
@@ -1079,4 +1083,236 @@ public boolean areAckedBookiesAdheringToPlacementPolicy(Set<BookieId> ackedBooki
         }
         return rackCounter.size() >= minWriteQuorumNumRacksPerWriteQuorum;
     }
+
+    @Override
+    public PlacementResult<List<BookieId>> replaceToAdherePlacementPolicy(
+            int ensembleSize,
+            int writeQuorumSize,
+            int ackQuorumSize,
+            Set<BookieId> excludeBookies,
+            List<BookieId> currentEnsemble) {
+        rwLock.readLock().lock();
+        try {
+            PlacementPolicyAdherence currentPlacementAdherence = isEnsembleAdheringToPlacementPolicy(
+                    currentEnsemble, writeQuorumSize, ackQuorumSize);
+            if (PlacementPolicyAdherence.FAIL != currentPlacementAdherence) {
+                return PlacementResult.of(new ArrayList<>(currentEnsemble), currentPlacementAdherence);
+            }
+            for (BookieId bookieId : currentEnsemble) {
+                if (!knownBookies.containsKey(bookieId)) {
+                    excludeBookies.add(bookieId);
+                }
+            }
+            PlacementResult<List<BookieId>> placementResult = PlacementResult.of(Collections.emptyList(),
+                    PlacementPolicyAdherence.FAIL);
+            int minDiffer = Integer.MAX_VALUE;
+            for (int i = 0; i < currentEnsemble.size(); i++) {
+                PlacementResult<List<BookieId>> result = doReplaceToAdherePlacementPolicy(ensembleSize,
+                        writeQuorumSize, ackQuorumSize, excludeBookies, currentEnsemble, i);
+                if (PlacementPolicyAdherence.FAIL == result.getAdheringToPolicy()) {
+                    continue;
+                }
+                int differ = differBetweenBookies(currentEnsemble, result.getResult());
+                if (differ < minDiffer) {
+                    minDiffer = differ;
+                    placementResult = result;
+                    if (minDiffer == 1) {
+                        break;
+                    }
+                }
+            }
+            return placementResult;
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    private int differBetweenBookies(List<BookieId> bookiesA, List<BookieId> bookiesB) {
+        if (CollectionUtils.isEmpty(bookiesA) || CollectionUtils.isEmpty(bookiesB)) {
+            return Integer.MAX_VALUE;
+        }
+        if (bookiesA.size() != bookiesB.size()) {
+            return Integer.MAX_VALUE;
+        }
+        int differ = 0;
+        for (int i = 0; i < bookiesA.size(); i++) {
+            if (!bookiesA.get(i).equals(bookiesB.get(i))) {
+                differ++;
+            }
+        }
+        return differ;
+    }
+
+    private PlacementResult<List<BookieId>> doReplaceToAdherePlacementPolicy(
+            int ensembleSize,
+            int writeQuorumSize,
+            int ackQuorumSize,
+            Set<BookieId> excludeBookies,
+            List<BookieId> currentEnsemble,
+            int startIndex) {
+        final List<BookieNode> provisionalEnsembleNodes = currentEnsemble.stream()
+                .map(this::convertBookieToNode).collect(Collectors.toList());
+        final Set<Node> excludeNodes = convertBookiesToNodes(
+                addDefaultRackBookiesIfMinNumRacksIsEnforced(excludeBookies));
+        int minNumRacksPerWriteQuorumForThisEnsemble = Math.min(writeQuorumSize, minNumRacksPerWriteQuorum);
+        final RRTopologyAwareCoverageEnsemble ensemble =
+                new RRTopologyAwareCoverageEnsemble(
+                        ensembleSize,
+                        writeQuorumSize,
+                        ackQuorumSize,
+                        RACKNAME_DISTANCE_FROM_LEAVES,
+                        null,
+                        null,
+                        minNumRacksPerWriteQuorumForThisEnsemble);
+        int numRacks = topology.getNumOfRacks();
+        // only one rack or less than minNumRacksPerWriteQuorumForThisEnsemble, stop calculation to skip relocation
+        if (numRacks < 2 || numRacks < minNumRacksPerWriteQuorumForThisEnsemble) {
+            LOG.warn("Skip ensemble relocation because the cluster has only {} rack.", numRacks);
+            return PlacementResult.of(Collections.emptyList(), PlacementPolicyAdherence.FAIL);
+        }
+        BookieNode prevNode = null;
+        final BookieNode firstNode = provisionalEnsembleNodes.get(startIndex);
+        // use same bookie at first to reduce ledger replication
+        if (!excludeNodes.contains(firstNode) && ensemble.apply(firstNode, ensemble)
+                && ensemble.addNode(firstNode)) {
+            excludeNodes.add(firstNode);
+            prevNode = firstNode;
+        }
+        for (int i = prevNode == null ? 0 : 1; i < ensembleSize; i++) {
+            int index = (startIndex + i) % ensembleSize;
+            final String curRack;
+            if (null == prevNode) {
+                if ((null == localNode) || defaultRack.equals(localNode.getNetworkLocation())) {
+                    curRack = NodeBase.ROOT;
+                } else {
+                    curRack = localNode.getNetworkLocation();
+                }
+            } else {
+                curRack = "~" + prevNode.getNetworkLocation();
+            }
+            try {
+                prevNode = replaceToAdherePlacementPolicyInternal(
+                        curRack, excludeNodes, ensemble, ensemble,
+                        provisionalEnsembleNodes, index, ensembleSize, minNumRacksPerWriteQuorumForThisEnsemble);
+                // got a good candidate
+                if (ensemble.addNode(prevNode)) {
+                    // add the candidate to exclude set
+                    excludeNodes.add(prevNode);
+                } else {
+                    throw new BKNotEnoughBookiesException();
+                }
+                // replace to newer node
+                provisionalEnsembleNodes.set(index, prevNode);
+            } catch (BKNotEnoughBookiesException e) {
+                LOG.warn("Skip ensemble relocation because the cluster has not enough bookies.");
+                return PlacementResult.of(Collections.emptyList(), PlacementPolicyAdherence.FAIL);
+            }
+        }
+        List<BookieId> bookieList = ensemble.toList();
+        if (ensembleSize != bookieList.size()) {
+            LOG.warn("Not enough {} bookies are available to form an ensemble : {}.",
+                    ensembleSize, bookieList);
+            return PlacementResult.of(Collections.emptyList(), PlacementPolicyAdherence.FAIL);
+        }
+        PlacementPolicyAdherence placementPolicyAdherence = isEnsembleAdheringToPlacementPolicy(bookieList,
+                writeQuorumSize, ackQuorumSize);
+        if (PlacementPolicyAdherence.FAIL == placementPolicyAdherence) {
+            return PlacementResult.of(Collections.emptyList(), PlacementPolicyAdherence.FAIL);
+        }
+        return PlacementResult.of(revertBookieListByIndex(bookieList, startIndex), placementPolicyAdherence);
+    }
+
+    private List<BookieId> revertBookieListByIndex(List<BookieId> bookies, int startIndex) {
+        BookieId[] bookieIds = new BookieId[bookies.size()];
+        for (int i = 0; i < bookies.size(); i++) {
+            if (startIndex == bookies.size()) {
+                startIndex = 0;
+            }
+            bookieIds[startIndex++] = bookies.get(i);
+        }
+        return Lists.newArrayList(bookieIds);
+    }
+
+    private BookieNode replaceToAdherePlacementPolicyInternal(
+            String netPath, Set<Node> excludeBookies, Predicate<BookieNode> predicate,
+            Ensemble<BookieNode> ensemble, List<BookieNode> provisionalEnsembleNodes, int ensembleIndex,
+            int ensembleSize, int minNumRacksPerWriteQuorumForThisEnsemble) throws BKNotEnoughBookiesException {
+        final BookieNode currentNode = provisionalEnsembleNodes.get(ensembleIndex);
+        // if the current bookie could be applied to the ensemble, apply it to minify the number of bookies replaced
+        if (!excludeBookies.contains(currentNode) && predicate.apply(currentNode, ensemble)) {
+            return currentNode;
+        }
+        final List<Pair<String, List<BookieNode>>> conditionList = new ArrayList<>();
+        final Set<String> preExcludeRacks = new HashSet<>();
+        final Set<String> postExcludeRacks = new HashSet<>();
+        for (int i = 0; i < minNumRacksPerWriteQuorumForThisEnsemble - 1; i++) {
+            preExcludeRacks.add(provisionalEnsembleNodes.get(Math.floorMod((ensembleIndex - i - 1), ensembleSize))
+                    .getNetworkLocation());
+            postExcludeRacks.add(provisionalEnsembleNodes.get(Math.floorMod((ensembleIndex + i + 1), ensembleSize))
+                    .getNetworkLocation());
+        }
+        // adhere minNumRacksPerWriteQuorum by preExcludeRacks
+        // avoid additional replace from write quorum candidates by preExcludeRacks and postExcludeRacks
+        // avoid to use first candidate bookies for election by provisionalEnsembleNodes
+        conditionList.add(Pair.of(
+                "~" + String.join(",",
+                        Stream.concat(preExcludeRacks.stream(), postExcludeRacks.stream()).collect(Collectors.toSet())),
+                provisionalEnsembleNodes
+        ));
+        // avoid to use same rack between previous index by netPath
+        // avoid to use first candidate bookies for election by provisionalEnsembleNodes
+        conditionList.add(Pair.of(netPath, provisionalEnsembleNodes));
+        // avoid to use same rack between previous index by netPath
+        conditionList.add(Pair.of(netPath, Collections.emptyList()));
+
+        for (Pair<String, List<BookieNode>> condition : conditionList) {
+            WeightedRandomSelection<BookieNode> wRSelection = null;
+
+            final List<Node> leaves = new ArrayList<>(topology.getLeaves(condition.getLeft()));
+            if (!isWeighted) {
+                Collections.shuffle(leaves);
+            } else {
+                if (CollectionUtils.subtract(leaves, excludeBookies).size() < 1) {
+                    throw new BKNotEnoughBookiesException();
+                }
+                wRSelection = prepareForWeightedSelection(leaves);
+                if (wRSelection == null) {
+                    throw new BKNotEnoughBookiesException();
+                }
+            }
+
+            final Iterator<Node> it = leaves.iterator();
+            final Set<Node> bookiesSeenSoFar = new HashSet<>();
+            while (true) {
+                Node n;
+                if (isWeighted) {
+                    if (bookiesSeenSoFar.size() == leaves.size()) {
+                        // Don't loop infinitely.
+                        break;
+                    }
+                    n = wRSelection.getNextRandom();
+                    bookiesSeenSoFar.add(n);
+                } else {
+                    if (it.hasNext()) {
+                        n = it.next();
+                    } else {
+                        break;
+                    }
+                }
+                if (excludeBookies.contains(n)) {
+                    continue;
+                }
+                if (!(n instanceof BookieNode) || !predicate.apply((BookieNode) n, ensemble)) {
+                    continue;
+                }
+                // additional excludeBookies
+                if (condition.getRight().contains(n)) {
+                    continue;
+                }
+                BookieNode bn = (BookieNode) n;
+                return bn;
+            }
+        }
+        throw new BKNotEnoughBookiesException();

Review Comment:
   It already log, in `doReplaceToAdherePlacementPolicy`, it catch `BKNotEnoughBookiesException` and log it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] StevenLuMT commented on pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
StevenLuMT commented on PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#issuecomment-1204667041

   rerun failure checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] equanz commented on a diff in pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
equanz commented on code in PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#discussion_r937302896


##########
bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java:
##########
@@ -1465,6 +1465,176 @@ public boolean validate() {
         }
     }
 
+    @Test
+    public void testReplaceNotAdheringPlacementPolicyBookie() throws Exception {
+        repp.uninitalize();
+
+        int minNumRacksPerWriteQuorum = 3;
+        ClientConfiguration clientConf = new ClientConfiguration(conf);
+        clientConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorum);
+        // set enforceMinNumRacksPerWriteQuorum
+        clientConf.setEnforceMinNumRacksPerWriteQuorum(true);

Review Comment:
   Why don't you set `enforceMinNumRacksPerWriteQuorum` to `false`? In my understanding, if `true`, we can't create a `[r1, r1, r1]` ensemble.



##########
bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java:
##########
@@ -1465,6 +1465,176 @@ public boolean validate() {
         }
     }
 
+    @Test
+    public void testReplaceNotAdheringPlacementPolicyBookie() throws Exception {
+        repp.uninitalize();
+
+        int minNumRacksPerWriteQuorum = 3;
+        ClientConfiguration clientConf = new ClientConfiguration(conf);
+        clientConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorum);
+        // set enforceMinNumRacksPerWriteQuorum
+        clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
+        repp = new RackawareEnsemblePlacementPolicy();
+        repp.initialize(clientConf, Optional.<DNSToSwitchMapping> empty(), timer, DISABLE_ALL,
+                NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        int numOfRacks = 3;
+        int numOfBookiesPerRack = 3;
+        String[] rackLocationNames = new String[numOfRacks];
+        List<BookieId> bookieSocketAddresses = new ArrayList<BookieId>();
+        Map<BookieId, String> bookieRackMap = new HashMap<BookieId, String>();
+        BookieId bookieAddress;
+
+        for (int i = 0; i < numOfRacks; i++) {
+            rackLocationNames[i] = "/default-region/r" + i;
+            for (int j = 0; j < numOfBookiesPerRack; j++) {
+                int index = i * numOfBookiesPerRack + j;
+                bookieAddress = new BookieSocketAddress("128.0.0." + index, 3181).toBookieId();
+                StaticDNSResolver.addNodeToRack("128.0.0." + index, rackLocationNames[i]);
+                bookieSocketAddresses.add(bookieAddress);
+                bookieRackMap.put(bookieAddress, rackLocationNames[i]);
+            }
+        }
+
+        repp.onClusterChanged(new HashSet<BookieId>(bookieSocketAddresses), new HashSet<BookieId>());
+
+        int writeQuorum = 3;
+        int ackQuorum = 3;
+
+        //test three knows bookie
+        List<BookieId> knowsEnsemble = new ArrayList<>();
+        knowsEnsemble.add(BookieId.parse("128.0.0.0:3181"));
+        knowsEnsemble.add(BookieId.parse("128.0.0.1:3181"));
+        knowsEnsemble.add(BookieId.parse("128.0.0.2:3181"));
+
+        PlacementPolicyAdherence placementPolicyAdherence = repp.isEnsembleAdheringToPlacementPolicy(
+                knowsEnsemble, writeQuorum, ackQuorum);
+        assertEquals(placementPolicyAdherence, PlacementPolicyAdherence.FAIL);

Review Comment:
   [nits]
   Expected value and actual value are opposite. Other lines are too.
   https://github.com/junit-team/junit4/blob/r4.12/src/main/java/junit/framework/TestCase.java#L247-L254
   



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java:
##########
@@ -788,6 +791,116 @@ public void updateBookieInfo(Map<BookieId, BookieInfo> bookieInfoMap) {
         }
     }
 
+    @Override
+    public Map<Integer, BookieId> replaceNotAdheringPlacementPolicyBookie(List<BookieId> ensemble, int writeQuorumSize,
+            int ackQuorumSize, Map<String, byte[]> customMetadata) {
+        rwLock.readLock().lock();
+        try {
+            if (CollectionUtils.isEmpty(ensemble)) {
+                return Collections.emptyMap();
+            }
+            PlacementPolicyAdherence ensembleAdheringToPlacementPolicy = isEnsembleAdheringToPlacementPolicy(ensemble,
+                    writeQuorumSize, ackQuorumSize);
+            if (PlacementPolicyAdherence.FAIL != ensembleAdheringToPlacementPolicy) {
+                return Collections.emptyMap();
+            }
+            Map<BookieId, Integer> bookieIndex = new HashMap<>();
+            for (int i = 0; i < ensemble.size(); i++) {
+                bookieIndex.put(ensemble.get(i), i);
+            }
+            Map<BookieId, BookieNode> clone = new HashMap<>(knownBookies);
+
+            Map<String, List<BookieNode>> toPlaceGroup = new HashMap<>();
+            for (BookieId bookieId : ensemble) {
+                //When ReplicationWorker.getUnderreplicatedFragments, the bookie is alive, so the fragment is not
+                // data_loss. When find other rack bookie to replace, the bookie maybe shutdown, so here we should pick
+                // the shutdown bookies. If the bookieId shutdown, put it to inactive. When do replace, we should
+                // replace inactive bookie firstly.
+                BookieNode bookieNode = clone.get(bookieId);
+                if (bookieNode == null) {
+                    List<BookieNode> list = toPlaceGroup.computeIfAbsent(NetworkTopology.INACTIVE,
+                            k -> new ArrayList<>());
+                    list.add(new BookieNode(bookieId, NetworkTopology.INACTIVE));
+                } else {
+                    List<BookieNode> list = toPlaceGroup.computeIfAbsent(bookieNode.getNetworkLocation(),
+                            k -> new ArrayList<>());
+                    list.add(bookieNode);
+                }
+            }
+            for (List<BookieNode> bookieNodes : toPlaceGroup.values()) {
+                Collections.shuffle(bookieNodes);
+            }
+
+            Map<String, List<BookieNode>> knownRackToBookies = clone.values().stream()
+                    .collect(Collectors.groupingBy(NodeBase::getNetworkLocation));
+            HashSet<String> knownRacks = new HashSet<>(knownRackToBookies.keySet());
+
+            Set<BookieId> excludesBookies = new HashSet<>();
+
+            for (String key : toPlaceGroup.keySet()) {
+                List<BookieNode> sameRack = knownRackToBookies.get(key);
+                if (!CollectionUtils.isEmpty(sameRack)) {
+                    excludesBookies.addAll(sameRack.stream().map(BookieNode::getAddr).collect(Collectors.toSet()));
+                }
+            }
+
+            Map<Integer, BookieId> targetBookieAddresses = new HashMap<>();
+            boolean placeSucceed = false;
+            while (knownRacks.size() > 0) {
+                BookieNode beReplaceNode = getBeReplaceNode(toPlaceGroup);
+                if (beReplaceNode == null) {
+                    break;
+                }
+                Integer index = bookieIndex.get(beReplaceNode.getAddr());
+                try {
+                    PlacementResult<BookieId> placementResult = replaceBookie(ensemble.size(), writeQuorumSize,
+                            ackQuorumSize, customMetadata, ensemble, beReplaceNode.getAddr(), excludesBookies);
+                    BookieNode replaceNode = clone.get(placementResult.getResult());
+                    String replaceNodeNetwork = replaceNode.getNetworkLocation();
+                    knownRacks.remove(replaceNodeNetwork);
+                    List<BookieNode> nodes = toPlaceGroup.computeIfAbsent(replaceNodeNetwork,
+                            k -> new ArrayList<>());
+                    nodes.add(replaceNode);
+                    targetBookieAddresses.put(index, replaceNode.getAddr());
+                    List<BookieNode> bookieNodes = knownRackToBookies.get(replaceNodeNetwork);
+                    if (!CollectionUtils.isEmpty(bookieNodes)) {
+                        excludesBookies.addAll(
+                                bookieNodes.stream().map(BookieNode::getAddr).collect(Collectors.toSet()));
+                    }
+                } catch (BKException.BKNotEnoughBookiesException e) {
+                    LOG.warn("Didn't find replaced bookie to adhere placement policy.", e);
+                    break;
+                }
+
+                List<BookieId> ensembles = toPlaceGroup.values().stream().flatMap(Collection::stream).map(
+                        BookieNode::getAddr).collect(Collectors.toList());
+                ensembleAdheringToPlacementPolicy = isEnsembleAdheringToPlacementPolicy(ensembles,
+                        writeQuorumSize, ackQuorumSize);
+                if (PlacementPolicyAdherence.FAIL != ensembleAdheringToPlacementPolicy) {
+                    placeSucceed = true;
+                    break;
+                }
+            }

Review Comment:
   In https://github.com/apache/bookkeeper/pull/2931 PR, I've tried to fix a similar issue considering the above case.
   
   If we don't care about the above case in this PR, but it is still an issue, I'll try to fix it in https://github.com/apache/bookkeeper/pull/2931 by following your interfaces.



##########
bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java:
##########
@@ -1465,6 +1465,176 @@ public boolean validate() {
         }
     }
 
+    @Test
+    public void testReplaceNotAdheringPlacementPolicyBookie() throws Exception {
+        repp.uninitalize();
+
+        int minNumRacksPerWriteQuorum = 3;
+        ClientConfiguration clientConf = new ClientConfiguration(conf);
+        clientConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorum);
+        // set enforceMinNumRacksPerWriteQuorum
+        clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
+        repp = new RackawareEnsemblePlacementPolicy();
+        repp.initialize(clientConf, Optional.<DNSToSwitchMapping> empty(), timer, DISABLE_ALL,
+                NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        int numOfRacks = 3;
+        int numOfBookiesPerRack = 3;
+        String[] rackLocationNames = new String[numOfRacks];
+        List<BookieId> bookieSocketAddresses = new ArrayList<BookieId>();
+        Map<BookieId, String> bookieRackMap = new HashMap<BookieId, String>();
+        BookieId bookieAddress;
+
+        for (int i = 0; i < numOfRacks; i++) {
+            rackLocationNames[i] = "/default-region/r" + i;
+            for (int j = 0; j < numOfBookiesPerRack; j++) {
+                int index = i * numOfBookiesPerRack + j;
+                bookieAddress = new BookieSocketAddress("128.0.0." + index, 3181).toBookieId();
+                StaticDNSResolver.addNodeToRack("128.0.0." + index, rackLocationNames[i]);
+                bookieSocketAddresses.add(bookieAddress);
+                bookieRackMap.put(bookieAddress, rackLocationNames[i]);
+            }
+        }
+
+        repp.onClusterChanged(new HashSet<BookieId>(bookieSocketAddresses), new HashSet<BookieId>());
+
+        int writeQuorum = 3;
+        int ackQuorum = 3;
+
+        //test three knows bookie
+        List<BookieId> knowsEnsemble = new ArrayList<>();
+        knowsEnsemble.add(BookieId.parse("128.0.0.0:3181"));
+        knowsEnsemble.add(BookieId.parse("128.0.0.1:3181"));
+        knowsEnsemble.add(BookieId.parse("128.0.0.2:3181"));
+
+        PlacementPolicyAdherence placementPolicyAdherence = repp.isEnsembleAdheringToPlacementPolicy(
+                knowsEnsemble, writeQuorum, ackQuorum);
+        assertEquals(placementPolicyAdherence, PlacementPolicyAdherence.FAIL);
+
+        Map<Integer, BookieId> targetBookie =
+                repp.replaceNotAdheringPlacementPolicyBookie(knowsEnsemble, ackQuorum, writeQuorum,
+                        Collections.emptyMap());

Review Comment:
   [nits]
   ```suggestion
                   repp.replaceNotAdheringPlacementPolicyBookie(knowsEnsemble, writeQuorum, ackQuorum
                           Collections.emptyMap());
   ```
   
   Other lines are too.



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java:
##########
@@ -788,6 +791,116 @@ public void updateBookieInfo(Map<BookieId, BookieInfo> bookieInfoMap) {
         }
     }
 
+    @Override
+    public Map<Integer, BookieId> replaceNotAdheringPlacementPolicyBookie(List<BookieId> ensemble, int writeQuorumSize,
+            int ackQuorumSize, Map<String, byte[]> customMetadata) {
+        rwLock.readLock().lock();
+        try {
+            if (CollectionUtils.isEmpty(ensemble)) {
+                return Collections.emptyMap();
+            }
+            PlacementPolicyAdherence ensembleAdheringToPlacementPolicy = isEnsembleAdheringToPlacementPolicy(ensemble,
+                    writeQuorumSize, ackQuorumSize);
+            if (PlacementPolicyAdherence.FAIL != ensembleAdheringToPlacementPolicy) {
+                return Collections.emptyMap();
+            }
+            Map<BookieId, Integer> bookieIndex = new HashMap<>();
+            for (int i = 0; i < ensemble.size(); i++) {
+                bookieIndex.put(ensemble.get(i), i);
+            }
+            Map<BookieId, BookieNode> clone = new HashMap<>(knownBookies);
+
+            Map<String, List<BookieNode>> toPlaceGroup = new HashMap<>();
+            for (BookieId bookieId : ensemble) {
+                //When ReplicationWorker.getUnderreplicatedFragments, the bookie is alive, so the fragment is not
+                // data_loss. When find other rack bookie to replace, the bookie maybe shutdown, so here we should pick
+                // the shutdown bookies. If the bookieId shutdown, put it to inactive. When do replace, we should
+                // replace inactive bookie firstly.
+                BookieNode bookieNode = clone.get(bookieId);
+                if (bookieNode == null) {
+                    List<BookieNode> list = toPlaceGroup.computeIfAbsent(NetworkTopology.INACTIVE,
+                            k -> new ArrayList<>());
+                    list.add(new BookieNode(bookieId, NetworkTopology.INACTIVE));
+                } else {
+                    List<BookieNode> list = toPlaceGroup.computeIfAbsent(bookieNode.getNetworkLocation(),
+                            k -> new ArrayList<>());
+                    list.add(bookieNode);
+                }
+            }
+            for (List<BookieNode> bookieNodes : toPlaceGroup.values()) {
+                Collections.shuffle(bookieNodes);
+            }
+
+            Map<String, List<BookieNode>> knownRackToBookies = clone.values().stream()
+                    .collect(Collectors.groupingBy(NodeBase::getNetworkLocation));
+            HashSet<String> knownRacks = new HashSet<>(knownRackToBookies.keySet());
+
+            Set<BookieId> excludesBookies = new HashSet<>();
+
+            for (String key : toPlaceGroup.keySet()) {
+                List<BookieNode> sameRack = knownRackToBookies.get(key);
+                if (!CollectionUtils.isEmpty(sameRack)) {
+                    excludesBookies.addAll(sameRack.stream().map(BookieNode::getAddr).collect(Collectors.toSet()));
+                }
+            }
+
+            Map<Integer, BookieId> targetBookieAddresses = new HashMap<>();
+            boolean placeSucceed = false;
+            while (knownRacks.size() > 0) {
+                BookieNode beReplaceNode = getBeReplaceNode(toPlaceGroup);
+                if (beReplaceNode == null) {
+                    break;
+                }
+                Integer index = bookieIndex.get(beReplaceNode.getAddr());
+                try {
+                    PlacementResult<BookieId> placementResult = replaceBookie(ensemble.size(), writeQuorumSize,
+                            ackQuorumSize, customMetadata, ensemble, beReplaceNode.getAddr(), excludesBookies);
+                    BookieNode replaceNode = clone.get(placementResult.getResult());
+                    String replaceNodeNetwork = replaceNode.getNetworkLocation();
+                    knownRacks.remove(replaceNodeNetwork);
+                    List<BookieNode> nodes = toPlaceGroup.computeIfAbsent(replaceNodeNetwork,
+                            k -> new ArrayList<>());
+                    nodes.add(replaceNode);
+                    targetBookieAddresses.put(index, replaceNode.getAddr());
+                    List<BookieNode> bookieNodes = knownRackToBookies.get(replaceNodeNetwork);
+                    if (!CollectionUtils.isEmpty(bookieNodes)) {
+                        excludesBookies.addAll(
+                                bookieNodes.stream().map(BookieNode::getAddr).collect(Collectors.toSet()));
+                    }
+                } catch (BKException.BKNotEnoughBookiesException e) {
+                    LOG.warn("Didn't find replaced bookie to adhere placement policy.", e);
+                    break;
+                }
+
+                List<BookieId> ensembles = toPlaceGroup.values().stream().flatMap(Collection::stream).map(
+                        BookieNode::getAddr).collect(Collectors.toList());
+                ensembleAdheringToPlacementPolicy = isEnsembleAdheringToPlacementPolicy(ensembles,
+                        writeQuorumSize, ackQuorumSize);
+                if (PlacementPolicyAdherence.FAIL != ensembleAdheringToPlacementPolicy) {
+                    placeSucceed = true;
+                    break;
+                }
+            }

Review Comment:
   In my understanding, we can't recover the ensemble fragment, which has multiple bookies for a specific rack.
   
   For example, consider `E=5, Qw=2, Qa=2` ensemble fragment like `["128.0.0.0:3181", "128.0.0.3:3181", "128.0.0.1:3181", "128.0.0.6:3181", "128.0.0.4:3181"]`, use `RackawareEnsemblePlacementPolicy`,  and set `enforceMinNumRacksPerWriteQuorum` to `false`.
   (definition of rack: https://github.com/apache/bookkeeper/pull/3359/files#diff-aac3491b47dd2a6b2936e726b36151e2d232409878845c599beb7df5a3c739afR1489-R1498 )
   
   When `/default-region/r3` goes down, `RackawareEnsemblePlacementPolicy#replaceBookie` returns new ensemble fragment by random selection like `["128.0.0.0:3181", "128.0.0.3:3181", "128.0.0.1:3181", "128.0.0.2:3181", "128.0.0.4:3181"]`.
   
   After that and `/default-region/r3` is recovered, then `TopologyAwareEnsemblePlacementPolicy#replaceNotAdheringPlacementPolicyBookie` can't return `MEET_STRICT` result.
   
   Test cases are as below.
   
   ```sh
   % git --no-pager show --no-patch HEAD
   commit 11701505ffda2fe68c84a2621aaf595ef51d9979 (HEAD, horizonzy/feature-auto-recover-match-placement)
   Merge: 0ff5d96f3 c3706e9c2
   Author: horizonzy <ho...@apache.org>
   Date:   Wed Jul 13 09:27:50 2022 +0800
   
       Merge branch 'master' into feature-auto-recover-match-placement
   
       # Conflicts:
       #       bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
       #       bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
   ```
   
   ```diff
   diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
   index 0b80f7af0..5dee731e6 100644
   --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
   +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
   @@ -22,6 +22,7 @@ import static org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl.
    import static org.apache.bookkeeper.client.RoundRobinDistributionSchedule.writeSetFromValues;
    import static org.apache.bookkeeper.feature.SettableFeatureProvider.DISABLE_ALL;
   
   +import com.google.common.collect.Sets;
    import com.google.common.util.concurrent.ThreadFactoryBuilder;
    import io.netty.util.HashedWheelTimer;
    import java.net.InetAddress;
   @@ -40,6 +41,7 @@ import junit.framework.TestCase;
    import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
    import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
    import org.apache.bookkeeper.client.EnsemblePlacementPolicy.PlacementPolicyAdherence;
   +import org.apache.bookkeeper.client.EnsemblePlacementPolicy.PlacementResult;
    import org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy.Ensemble;
    import org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy.EnsembleForReplacementWithNoConstraints;
    import org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy.TruePredicate;
   @@ -1421,11 +1423,11 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
        public void testReplaceNotAdheringPlacementPolicyBookie() throws Exception {
            repp.uninitalize();
   
   -        int minNumRacksPerWriteQuorum = 3;
   +        int minNumRacksPerWriteQuorum = 2;
            ClientConfiguration clientConf = new ClientConfiguration(conf);
            clientConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorum);
            // set enforceMinNumRacksPerWriteQuorum
   -        clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
   +        clientConf.setEnforceMinNumRacksPerWriteQuorum(false);
            repp = new RackawareEnsemblePlacementPolicy();
            repp.initialize(clientConf, Optional.<DNSToSwitchMapping> empty(), timer, DISABLE_ALL,
                    NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
   @@ -1436,6 +1438,7 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
            String[] rackLocationNames = new String[numOfRacks];
            List<BookieId> bookieSocketAddresses = new ArrayList<BookieId>();
            Map<BookieId, String> bookieRackMap = new HashMap<BookieId, String>();
   +        Map<String, Set<BookieId>> rackMap = new HashMap<>();
            BookieId bookieAddress;
   
            for (int i = 0; i < numOfRacks; i++) {
   @@ -1446,29 +1449,45 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
                    StaticDNSResolver.addNodeToRack("128.0.0." + index, rackLocationNames[i]);
                    bookieSocketAddresses.add(bookieAddress);
                    bookieRackMap.put(bookieAddress, rackLocationNames[i]);
   +                rackMap.computeIfAbsent(rackLocationNames[i], k -> Sets.newHashSet()).add(bookieAddress);
                }
            }
   
            repp.onClusterChanged(new HashSet<BookieId>(bookieSocketAddresses), new HashSet<BookieId>());
   
   -        int writeQuorum = 3;
   -        int ackQuorum = 3;
   +        int writeQuorum = 2;
   +        int ackQuorum = 2;
   
            //test three knows bookie
            List<BookieId> knowsEnsemble = new ArrayList<>();
            knowsEnsemble.add(BookieId.parse("128.0.0.0:3181"));
   +        knowsEnsemble.add(BookieId.parse("128.0.0.3:3181"));
            knowsEnsemble.add(BookieId.parse("128.0.0.1:3181"));
   -        knowsEnsemble.add(BookieId.parse("128.0.0.2:3181"));
   +        //knowsEnsemble.add(BookieId.parse("128.0.0.2:3181")); // should be replaced to /r3 like 128.0.0.6
   +        knowsEnsemble.add(BookieId.parse("128.0.0.6:3181"));
   +        knowsEnsemble.add(BookieId.parse("128.0.0.4:3181"));
   
            PlacementPolicyAdherence placementPolicyAdherence = repp.isEnsembleAdheringToPlacementPolicy(
                    knowsEnsemble, writeQuorum, ackQuorum);
   -        assertEquals(placementPolicyAdherence, PlacementPolicyAdherence.FAIL);
   +        //assertEquals(PlacementPolicyAdherence.FAIL, placementPolicyAdherence);
   +
   +        // /default-region/r3 goes down
   +        repp.handleBookiesThatLeft(rackMap.get(rackLocationNames[2]));
   +
   +        final PlacementResult<BookieId> result = repp.replaceBookie(knowsEnsemble.size(), writeQuorum, ackQuorum,
   +                Collections.emptyMap(), knowsEnsemble, knowsEnsemble.get(3), Sets.newHashSet());
   +
   +        assertNotSame(PlacementPolicyAdherence.MEETS_STRICT, result.isAdheringToPolicy());
   +        assertNotSame(rackLocationNames[2], bookieRackMap.get(result.getResult()));
   +        knowsEnsemble.set(3, result.getResult());
   +        LOG.error("{}", knowsEnsemble);
   +
   +        // /default-region/r3 is recovered
   +        repp.handleBookiesThatJoined(rackMap.get(rackLocationNames[2]));
   
            Map<Integer, BookieId> targetBookie =
   -                repp.replaceNotAdheringPlacementPolicyBookie(knowsEnsemble, ackQuorum, writeQuorum,
   +                repp.replaceNotAdheringPlacementPolicyBookie(knowsEnsemble, writeQuorum, ackQuorum,
                            Collections.emptyMap());
   -        //should replace two bookie
   -        assertEquals(targetBookie.size(), 2);
   
            for (Map.Entry<Integer, BookieId> entry : targetBookie.entrySet()) {
                knowsEnsemble.set(entry.getKey(), entry.getValue());
   @@ -1478,6 +1497,10 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
                    knowsEnsemble, writeQuorum, ackQuorum);
            assertEquals(placementPolicyAdherence, PlacementPolicyAdherence.MEETS_STRICT);
   
   +        //should replace one bookie
   +        LOG.error(String.valueOf(targetBookie));
   +        assertEquals(1, targetBookie.size());
   +
            //test three unknowns bookie
            List<BookieId> unknownEnsembles = new ArrayList<>();
            unknownEnsembles.add(BookieId.parse("128.0.0.100:3181"));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] StevenLuMT commented on pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
StevenLuMT commented on PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#issuecomment-1204757319

   please rebase master's newest code @horizonzy 
   
   > rerun failure checks
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] horizonzy commented on pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#issuecomment-1167074480

   How to use it?
   
   If we want to repaired the ledger which ensemble is not adhering placement policy, we should config two param.
   
   ```
   auditorPeriodicPlacementPolicyCheckInterval=3600
   repairedPlacementPolicyNotAdheringBookieEnabled=true
   ```
   **In Auditor**
   `auditorPeriodicPlacementPolicyCheckInterval` control the placement policy check detect interval, `repairedPlacementPolicyNotAdheringBookieEnabled` control is mark ledger Id to under replication managed when found a ledger ensemble not adhere placement policy.
   
   
   **In ReplicationWorker**
   `repairedPlacementPolicyNotAdheringBookieEnabled` control is to repaired the ledger which ensemble not adhere placement policy.
   
   _Attention_
   1. _we need ensure the config `repairedPlacementPolicyNotAdheringBookieEnabled=true` in Auditor and ReplicationWorker at the same time._
   
   2. _we also need the placement policy is same between Auditor and ReplicationWorker, cause both all need use placement policy to help to process._
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] hangc0276 commented on pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#issuecomment-1200388878

   @rdhabalia Would you please help review this PR again? thanks. We hope this feature can be included in 4.16.0


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] rdhabalia commented on pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
rdhabalia commented on PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#issuecomment-1298948840

   yes, we should cherry-pick it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] hangc0276 commented on pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#issuecomment-1203418679

   @eolivelli  Would you please help review this PR again? thanks. We hope this feature can be included in 4.16.0


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] horizonzy commented on a diff in pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#discussion_r939801881


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java:
##########
@@ -440,6 +440,31 @@ default boolean areAckedBookiesAdheringToPlacementPolicy(Set<BookieId> ackedBook
         return true;
     }
 
+    /**
+     * Returns placement result. If the currentEnsemble is not adhering placement policy, returns new ensemble that
+     * adheres placement policy. It should be implemented so as to minify the number of bookies replaced.
+     *
+     * @param ensembleSize
+     *            ensemble size
+     * @param writeQuorumSize
+ *                writeQuorumSize of the ensemble
+     * @param ackQuorumSize
+     *            ackQuorumSize of the ensemble
+     * @param excludeBookies
+     *            bookies that should not be considered as targets
+     * @param currentEnsemble
+     *            current ensemble
+     * @return a placement result
+     */
+    default PlacementResult<List<BookieId>> replaceToAdherePlacementPolicy(

Review Comment:
   > 
   
   
   
   > And what about the default rack? [#2931 (review)](https://github.com/apache/bookkeeper/pull/2931#pullrequestreview-1062016018)
   > 
   > If it is being addressed, please let me know where it is.
   
   I means if we didn't handle the shutdown bookies, it will be handle as default-bookie, the default-rack is different with other bookie's rack, so it won't be replaced. Now we add the shutdown bookies to excludes nodes to replace it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] hangc0276 commented on pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#issuecomment-1194963951

   > better if you can add in description, what modification we a
   
   @rdhabalia It is a new feature, but controlled by a flag. I'm not sure if it can be cherry-picked to branch-4.14 and branch-4.15, do you have any ideas? @eolivelli @dlg99 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] rdhabalia commented on a diff in pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
rdhabalia commented on code in PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#discussion_r928057386


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java:
##########
@@ -217,11 +218,27 @@ public List<BookieId> getEnsemble() {
         return this.ensemble;
     }
 
+    public ReplicateType getReplicateType() {
+        return replicateType;
+    }
+
+    public void setReplicateType(ReplicateType replicateType) {
+        this.replicateType = replicateType;
+    }
+
     @Override
     public String toString() {
         return String.format("Fragment(LedgerID: %d, FirstEntryID: %d[%d], "
                 + "LastKnownEntryID: %d[%d], Host: %s, Closed: %s)", ledgerId, firstEntryId,
                 getFirstStoredEntryId(), lastKnownEntryId, getLastStoredEntryId(),
                 getAddresses(), isLedgerClosed);
     }
+
+    /**
+     * ReplicateType.
+     */
+    public enum ReplicateType {
+        DATA_LOSS,

Review Comment:
   can we give better naming? `DATA_LOSS` doesn't sound correct. could be `IGNORE_PLACEMENT_POLICY` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] StevenLuMT commented on pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
StevenLuMT commented on PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#issuecomment-1225340578

   fix old workflow,please see #3455 for detail


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] hangc0276 commented on a diff in pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on code in PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#discussion_r933938514


##########
pom.xml:
##########
@@ -786,6 +786,13 @@
         <artifactId>rxjava</artifactId>
         <version>${rxjava.version}</version>
       </dependency>
+  
+      <dependency>

Review Comment:
   duplicated import



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] horizonzy commented on a diff in pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#discussion_r937364204


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java:
##########
@@ -788,6 +791,116 @@ public void updateBookieInfo(Map<BookieId, BookieInfo> bookieInfoMap) {
         }
     }
 
+    @Override
+    public Map<Integer, BookieId> replaceNotAdheringPlacementPolicyBookie(List<BookieId> ensemble, int writeQuorumSize,
+            int ackQuorumSize, Map<String, byte[]> customMetadata) {
+        rwLock.readLock().lock();
+        try {
+            if (CollectionUtils.isEmpty(ensemble)) {
+                return Collections.emptyMap();
+            }
+            PlacementPolicyAdherence ensembleAdheringToPlacementPolicy = isEnsembleAdheringToPlacementPolicy(ensemble,
+                    writeQuorumSize, ackQuorumSize);
+            if (PlacementPolicyAdherence.FAIL != ensembleAdheringToPlacementPolicy) {
+                return Collections.emptyMap();
+            }
+            Map<BookieId, Integer> bookieIndex = new HashMap<>();
+            for (int i = 0; i < ensemble.size(); i++) {
+                bookieIndex.put(ensemble.get(i), i);
+            }
+            Map<BookieId, BookieNode> clone = new HashMap<>(knownBookies);
+
+            Map<String, List<BookieNode>> toPlaceGroup = new HashMap<>();
+            for (BookieId bookieId : ensemble) {
+                //When ReplicationWorker.getUnderreplicatedFragments, the bookie is alive, so the fragment is not
+                // data_loss. When find other rack bookie to replace, the bookie maybe shutdown, so here we should pick
+                // the shutdown bookies. If the bookieId shutdown, put it to inactive. When do replace, we should
+                // replace inactive bookie firstly.
+                BookieNode bookieNode = clone.get(bookieId);
+                if (bookieNode == null) {
+                    List<BookieNode> list = toPlaceGroup.computeIfAbsent(NetworkTopology.INACTIVE,
+                            k -> new ArrayList<>());
+                    list.add(new BookieNode(bookieId, NetworkTopology.INACTIVE));
+                } else {
+                    List<BookieNode> list = toPlaceGroup.computeIfAbsent(bookieNode.getNetworkLocation(),
+                            k -> new ArrayList<>());
+                    list.add(bookieNode);
+                }
+            }
+            for (List<BookieNode> bookieNodes : toPlaceGroup.values()) {
+                Collections.shuffle(bookieNodes);
+            }
+
+            Map<String, List<BookieNode>> knownRackToBookies = clone.values().stream()
+                    .collect(Collectors.groupingBy(NodeBase::getNetworkLocation));
+            HashSet<String> knownRacks = new HashSet<>(knownRackToBookies.keySet());
+
+            Set<BookieId> excludesBookies = new HashSet<>();
+
+            for (String key : toPlaceGroup.keySet()) {
+                List<BookieNode> sameRack = knownRackToBookies.get(key);
+                if (!CollectionUtils.isEmpty(sameRack)) {
+                    excludesBookies.addAll(sameRack.stream().map(BookieNode::getAddr).collect(Collectors.toSet()));
+                }
+            }
+
+            Map<Integer, BookieId> targetBookieAddresses = new HashMap<>();
+            boolean placeSucceed = false;
+            while (knownRacks.size() > 0) {
+                BookieNode beReplaceNode = getBeReplaceNode(toPlaceGroup);
+                if (beReplaceNode == null) {
+                    break;
+                }
+                Integer index = bookieIndex.get(beReplaceNode.getAddr());
+                try {
+                    PlacementResult<BookieId> placementResult = replaceBookie(ensemble.size(), writeQuorumSize,
+                            ackQuorumSize, customMetadata, ensemble, beReplaceNode.getAddr(), excludesBookies);
+                    BookieNode replaceNode = clone.get(placementResult.getResult());
+                    String replaceNodeNetwork = replaceNode.getNetworkLocation();
+                    knownRacks.remove(replaceNodeNetwork);
+                    List<BookieNode> nodes = toPlaceGroup.computeIfAbsent(replaceNodeNetwork,
+                            k -> new ArrayList<>());
+                    nodes.add(replaceNode);
+                    targetBookieAddresses.put(index, replaceNode.getAddr());
+                    List<BookieNode> bookieNodes = knownRackToBookies.get(replaceNodeNetwork);
+                    if (!CollectionUtils.isEmpty(bookieNodes)) {
+                        excludesBookies.addAll(
+                                bookieNodes.stream().map(BookieNode::getAddr).collect(Collectors.toSet()));
+                    }
+                } catch (BKException.BKNotEnoughBookiesException e) {
+                    LOG.warn("Didn't find replaced bookie to adhere placement policy.", e);
+                    break;
+                }
+
+                List<BookieId> ensembles = toPlaceGroup.values().stream().flatMap(Collection::stream).map(
+                        BookieNode::getAddr).collect(Collectors.toList());
+                ensembleAdheringToPlacementPolicy = isEnsembleAdheringToPlacementPolicy(ensembles,
+                        writeQuorumSize, ackQuorumSize);
+                if (PlacementPolicyAdherence.FAIL != ensembleAdheringToPlacementPolicy) {
+                    placeSucceed = true;
+                    break;
+                }
+            }

Review Comment:
   Yes, cause the same rack bookie is close, we should consider the order of ensemble.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] horizonzy commented on a diff in pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#discussion_r928384565


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java:
##########
@@ -502,10 +570,40 @@ private boolean isLastSegmentOpenAndMissingBookies(LedgerHandle lh) throws BKExc
      */
     private Set<LedgerFragment> getUnderreplicatedFragments(LedgerHandle lh, Long ledgerVerificationPercentage)
             throws InterruptedException {
+        //The data loss fragments is first to repair. If a fragment is data_loss and not_adhering_placement
+        //at the same time, we only fix data_loss in this time. After fix data_loss, the fragment is still
+        //not_adhering_placement, Auditor will mark this ledger again.

Review Comment:
   The same ledger fragment maybe `data_loss` and `not _adhere_placement_policy` together. After we fix `data_loss` ledger fragment, it maybe adhere placement policy.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] hangc0276 commented on a diff in pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on code in PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#discussion_r914354951


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java:
##########
@@ -790,6 +794,113 @@ public void updateBookieInfo(Map<BookieId, BookieInfo> bookieInfoMap) {
         }
     }
 
+    @Override
+    public Map<Integer, BookieId> replaceNotAdheringPlacementPolicyBookie(List<BookieId> ensemble, int writeQuorumSize,
+            int ackQuorumSize) {
+        if (CollectionUtils.isEmpty(ensemble)) {
+            return Collections.emptyMap();
+        }
+        PlacementPolicyAdherence ensembleAdheringToPlacementPolicy = isEnsembleAdheringToPlacementPolicy(ensemble,
+                writeQuorumSize, ackQuorumSize);
+        if (PlacementPolicyAdherence.FAIL != ensembleAdheringToPlacementPolicy) {

Review Comment:
   Do we need to deal with the `MEET_SOFT` type?



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java:
##########
@@ -357,6 +366,65 @@ private boolean tryReadingFaultyEntries(LedgerHandle lh, LedgerFragment ledgerFr
         return (returnRCValue.get() == BKException.Code.OK);
     }
 
+    private Set<LedgerFragment> getNeedRepairedPlacementNotAdheringFragments(LedgerHandle lh) {
+        if (!conf.getRepairedPlacementPolicyNotAdheringBookieEnable()) {
+            return Collections.emptySet();
+        }
+        long ledgerId = lh.getId();
+        Set<LedgerFragment> placementNotAdheringFragments = new HashSet<>();
+        CompletableFuture<Versioned<LedgerMetadata>> future = ledgerManager.readLedgerMetadata(
+                ledgerId).whenComplete((metadataVer, exception) -> {
+            if (exception == null) {
+                LedgerMetadata metadata = metadataVer.getValue();
+                int writeQuorumSize = metadata.getWriteQuorumSize();
+                int ackQuorumSize = metadata.getAckQuorumSize();
+                if (!metadata.isClosed()) {
+                    return;
+                }
+                Long curEntryId = null;
+                EnsemblePlacementPolicy.PlacementPolicyAdherence previousSegmentAdheringToPlacementPolicy = null;
+
+                for (Map.Entry<Long, ? extends List<BookieId>> entry : metadata.getAllEnsembles().entrySet()) {
+                    if (curEntryId != null) {
+                        if (EnsemblePlacementPolicy.PlacementPolicyAdherence.FAIL
+                                == previousSegmentAdheringToPlacementPolicy) {
+                            LedgerFragment ledgerFragment = new LedgerFragment(lh, curEntryId,
+                                    entry.getKey() - 1, new HashSet<>());
+                            ledgerFragment.setReplicateType(LedgerFragment.ReplicateType.DATA_NOT_ADHERING_PLACEMENT);
+                            placementNotAdheringFragments.add(ledgerFragment);
+                        }
+                    }
+                    previousSegmentAdheringToPlacementPolicy =
+                            admin.isEnsembleAdheringToPlacementPolicy(entry.getValue(),
+                                    writeQuorumSize, ackQuorumSize);
+                    curEntryId = entry.getKey();
+                }
+                if (curEntryId != null) {
+                    if (EnsemblePlacementPolicy.PlacementPolicyAdherence.FAIL

Review Comment:
   We only deal with the `FAIL` type, but in the auditor check, we still mark the `MEETS_SOFT` type ledger as under-replicated. Do we need to deal with the `MEETS_SOFT` type of fragments?



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java:
##########
@@ -790,6 +794,113 @@ public void updateBookieInfo(Map<BookieId, BookieInfo> bookieInfoMap) {
         }
     }
 
+    @Override
+    public Map<Integer, BookieId> replaceNotAdheringPlacementPolicyBookie(List<BookieId> ensemble, int writeQuorumSize,
+            int ackQuorumSize) {
+        if (CollectionUtils.isEmpty(ensemble)) {
+            return Collections.emptyMap();
+        }
+        PlacementPolicyAdherence ensembleAdheringToPlacementPolicy = isEnsembleAdheringToPlacementPolicy(ensemble,
+                writeQuorumSize, ackQuorumSize);
+        if (PlacementPolicyAdherence.FAIL != ensembleAdheringToPlacementPolicy) {
+            return Collections.emptyMap();
+        }
+        Map<BookieId, Integer> bookieIndex = new HashMap<>();
+        for (int i = 0; i < ensemble.size(); i++) {
+            bookieIndex.put(ensemble.get(i), i);
+        }
+
+        Map<BookieId, BookieNode> clone = new HashMap<>(knownBookies);
+
+        Map<String, List<BookieNode>> toPlaceGroup = new HashMap<>();
+        for (BookieId bookieId : ensemble) {
+            //If the bookieId shutdown, put it to inactive.
+            BookieNode bookieNode = clone.get(bookieId);

Review Comment:
   If the bookie shutdown, it will be removed from knownBookies immediately. It belongs to `DATA_LOSS` type



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java:
##########
@@ -402,6 +402,7 @@ public void registerLedgerMetadataListener(long ledgerId, LedgerMetadataListener
                 }
             }
             synchronized (listenerSet) {
+                listenerSet = listeners.computeIfAbsent(ledgerId, k -> new HashSet<>());

Review Comment:
   Do you bing the previous change to this PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] horizonzy commented on a diff in pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#discussion_r914461704


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java:
##########
@@ -402,6 +402,7 @@ public void registerLedgerMetadataListener(long ledgerId, LedgerMetadataListener
                 }
             }
             synchronized (listenerSet) {
+                listenerSet = listeners.computeIfAbsent(ledgerId, k -> new HashSet<>());

Review Comment:
   yes, I will remove it in this pr.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] horizonzy commented on a diff in pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#discussion_r938691945


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java:
##########
@@ -1079,4 +1084,237 @@ public boolean areAckedBookiesAdheringToPlacementPolicy(Set<BookieId> ackedBooki
         }
         return rackCounter.size() >= minWriteQuorumNumRacksPerWriteQuorum;
     }
+
+    @Override
+    public PlacementResult<List<BookieId>> replaceToAdherePlacementPolicy(

Review Comment:
   @equanz I have cherry pick your code and enhance it, could you help to check it. Thanks a lot.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] horizonzy commented on a diff in pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#discussion_r939803617


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java:
##########
@@ -440,6 +440,31 @@ default boolean areAckedBookiesAdheringToPlacementPolicy(Set<BookieId> ackedBook
         return true;
     }
 
+    /**
+     * Returns placement result. If the currentEnsemble is not adhering placement policy, returns new ensemble that
+     * adheres placement policy. It should be implemented so as to minify the number of bookies replaced.
+     *
+     * @param ensembleSize
+     *            ensemble size
+     * @param writeQuorumSize
+ *                writeQuorumSize of the ensemble
+     * @param ackQuorumSize
+     *            ackQuorumSize of the ensemble
+     * @param excludeBookies
+     *            bookies that should not be considered as targets
+     * @param currentEnsemble
+     *            current ensemble
+     * @return a placement result
+     */
+    default PlacementResult<List<BookieId>> replaceToAdherePlacementPolicy(

Review Comment:
   > What about other TopologyAware classes like RegionAware, Zoneaware? Implement in the next PR? Or temporally implement the [old approach](https://github.com/horizonzy/bookkeeper/blob/7669c7ad15952f5ea0c448d4003e5aabcebebd94/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java#L794-L886)?
   
   Yes, We will implements Zoneaware at the next step.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] horizonzy commented on a diff in pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#discussion_r939758977


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java:
##########
@@ -481,7 +506,7 @@ public T getResult() {
             return result;
         }
 
-        public PlacementPolicyAdherence isAdheringToPolicy() {
+        public PlacementPolicyAdherence getAdheringToPolicy() {

Review Comment:
   thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] horizonzy commented on a diff in pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#discussion_r914469658


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java:
##########
@@ -790,6 +794,113 @@ public void updateBookieInfo(Map<BookieId, BookieInfo> bookieInfoMap) {
         }
     }
 
+    @Override
+    public Map<Integer, BookieId> replaceNotAdheringPlacementPolicyBookie(List<BookieId> ensemble, int writeQuorumSize,
+            int ackQuorumSize) {
+        if (CollectionUtils.isEmpty(ensemble)) {
+            return Collections.emptyMap();
+        }
+        PlacementPolicyAdherence ensembleAdheringToPlacementPolicy = isEnsembleAdheringToPlacementPolicy(ensemble,
+                writeQuorumSize, ackQuorumSize);
+        if (PlacementPolicyAdherence.FAIL != ensembleAdheringToPlacementPolicy) {
+            return Collections.emptyMap();
+        }
+        Map<BookieId, Integer> bookieIndex = new HashMap<>();
+        for (int i = 0; i < ensemble.size(); i++) {
+            bookieIndex.put(ensemble.get(i), i);
+        }
+
+        Map<BookieId, BookieNode> clone = new HashMap<>(knownBookies);
+
+        Map<String, List<BookieNode>> toPlaceGroup = new HashMap<>();
+        for (BookieId bookieId : ensemble) {
+            //If the bookieId shutdown, put it to inactive.
+            BookieNode bookieNode = clone.get(bookieId);

Review Comment:
   In theory, If the the fragment is DATA_LOSS, it won't invoke this method. It will repair data loss firstly.  
   But the bookie maybe shutdown after ledger check, so here replace the shutdown bookie firstlt



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] rdhabalia commented on a diff in pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
rdhabalia commented on code in PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#discussion_r928057386


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java:
##########
@@ -217,11 +218,27 @@ public List<BookieId> getEnsemble() {
         return this.ensemble;
     }
 
+    public ReplicateType getReplicateType() {
+        return replicateType;
+    }
+
+    public void setReplicateType(ReplicateType replicateType) {
+        this.replicateType = replicateType;
+    }
+
     @Override
     public String toString() {
         return String.format("Fragment(LedgerID: %d, FirstEntryID: %d[%d], "
                 + "LastKnownEntryID: %d[%d], Host: %s, Closed: %s)", ledgerId, firstEntryId,
                 getFirstStoredEntryId(), lastKnownEntryId, getLastStoredEntryId(),
                 getAddresses(), isLedgerClosed);
     }
+
+    /**
+     * ReplicateType.
+     */
+    public enum ReplicateType {
+        DATA_LOSS,

Review Comment:
   can we give better naming? `DATA_LOSS` doesn't sound correct. could be `IGNORE_PLACEMENT_POLICY` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] StevenLuMT commented on a diff in pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
StevenLuMT commented on code in PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#discussion_r929457448


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java:
##########
@@ -570,7 +570,7 @@ public BookKeeper(ClientConfiguration conf, ZooKeeper zk, EventLoopGroup eventLo
         bookieQuarantineRatio = 1.0;
     }
 
-    private EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfiguration conf,
+    protected EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfiguration conf,

Review Comment:
   change it just for test?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] horizonzy commented on pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#issuecomment-1236280447

   rerun failure checks
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] horizonzy commented on pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#issuecomment-1207330892

   @equanz Thanks for your graceful way to find replaceAderePlamencePolicy bookie, I have enhanced it and use it for AutoRecovery feature, could you help to view it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] zymap commented on a diff in pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
zymap commented on code in PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#discussion_r939756121


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java:
##########
@@ -481,7 +506,7 @@ public T getResult() {
             return result;
         }
 
-        public PlacementPolicyAdherence isAdheringToPolicy() {
+        public PlacementPolicyAdherence getAdheringToPolicy() {

Review Comment:
   Why change this? If you need, you can add a new method and keep the original method. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] hangc0276 commented on pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#issuecomment-1171888839

   ping @merlimat  @eolivelli  @dlg99 @zymap @reddycharan  Please help take a look at this PR, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] hangc0276 commented on pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#issuecomment-1200343280

   @horizonzy, Would you please take a look at the failed CI? 
   https://github.com/apache/bookkeeper/runs/7580208123?check_suite_focus=true


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] rdhabalia commented on a diff in pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
rdhabalia commented on code in PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#discussion_r928059417


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java:
##########
@@ -217,11 +218,27 @@ public List<BookieId> getEnsemble() {
         return this.ensemble;
     }
 
+    public ReplicateType getReplicateType() {
+        return replicateType;
+    }
+
+    public void setReplicateType(ReplicateType replicateType) {
+        this.replicateType = replicateType;
+    }
+
     @Override
     public String toString() {
         return String.format("Fragment(LedgerID: %d, FirstEntryID: %d[%d], "
                 + "LastKnownEntryID: %d[%d], Host: %s, Closed: %s)", ledgerId, firstEntryId,
                 getFirstStoredEntryId(), lastKnownEntryId, getLastStoredEntryId(),
                 getAddresses(), isLedgerClosed);
     }
+
+    /**
+     * ReplicateType.
+     */
+    public enum ReplicateType {
+        DATA_LOSS,

Review Comment:
   In which case, do we set `DATA_LOSS` ? I don't see if it's being set anywhere? is it possible to add test-case to cover `DATA_LOSS` case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] zymap commented on a diff in pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
zymap commented on code in PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#discussion_r928385490


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java:
##########
@@ -502,10 +570,40 @@ private boolean isLastSegmentOpenAndMissingBookies(LedgerHandle lh) throws BKExc
      */
     private Set<LedgerFragment> getUnderreplicatedFragments(LedgerHandle lh, Long ledgerVerificationPercentage)
             throws InterruptedException {
+        //The data loss fragments is first to repair. If a fragment is data_loss and not_adhering_placement
+        //at the same time, we only fix data_loss in this time. After fix data_loss, the fragment is still
+        //not_adhering_placement, Auditor will mark this ledger again.

Review Comment:
   it may introduce a double replicate?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] zymap commented on a diff in pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
zymap commented on code in PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#discussion_r929524964


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java:
##########
@@ -364,6 +364,18 @@ DistributionSchedule.WriteSet reorderReadLACSequence(
     default void updateBookieInfo(Map<BookieId, BookieInfo> bookieInfoMap) {
     }
 
+    /**
+     * Replace some bookie to adhering placement policy. If the all kinds of replacement
+     * didn't adhere placement policy, return empty map.
+     *
+     * @param ensemble
+     * @param writeQuorumSize
+     * @param ackQuorumSize
+     * @return Map: key means ensemble index, value means target replace bookieId.
+     */
+    Map<Integer, BookieId> replaceNotAdheringPlacementPolicyBookie(List<BookieId> ensemble, int writeQuorumSize,

Review Comment:
   If we cherry-pick this to a minor release, this will break the existing implementation which is out of this repo. So I would suggest using the default to avoid this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] horizonzy closed pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
horizonzy closed pull request #3359: Feature: auto recover support repaired not adhering placement ledger
URL: https://github.com/apache/bookkeeper/pull/3359


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] StevenLuMT commented on pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
StevenLuMT commented on PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#issuecomment-1204986473

   rerun failure checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] horizonzy commented on a diff in pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#discussion_r938691945


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java:
##########
@@ -1079,4 +1084,237 @@ public boolean areAckedBookiesAdheringToPlacementPolicy(Set<BookieId> ackedBooki
         }
         return rackCounter.size() >= minWriteQuorumNumRacksPerWriteQuorum;
     }
+
+    @Override
+    public PlacementResult<List<BookieId>> replaceToAdherePlacementPolicy(

Review Comment:
   @equanz ping, I have enhanced it, could you help to check it.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] horizonzy commented on pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#issuecomment-1167058078

   For this case, we already support detect these ledger which ensemble is not adhering placement policy at now.
   In Auditor, if user config `auditorPeriodicPlacementPolicyCheckInterval`, it will start a scheduled task to trigger `placementPolicyCheck`, In `placementPolicyCheck`, it will record the count of ledger fragment which not adhering placement policy.
   https://github.com/apache/bookkeeper/blob/677ccec3eb84f5be1b3556537871e14eb5e8359c/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java#L1378
   
   But it only record it to stat, not recover data to  make ensemble to adhere placement policy.
   
   So we can add a config `repairedPlacementPolicyNotAdheringBookieEnabled` to control is to repaired the data to adhere placement policy.
   
   **In Auditor**
   It will mark ledgerId to unnder replication managed if the ensemble is not adhering placement policy.
   
   **In ReplicationWorker**
   It will move data from old bookie to new bookie which network location is different to adhere placement policy. If there is not bookie with different network location, do nothing.
   
   _Attention_
   _In ReplicationWoker, it just poll under replicated ledger then process it. So when get an under replicated ledger, we should check two case. 1) Is the ledger fragments loss data. 2) Is the ledger fragments is not adhering placement policy. The one fragment maybe meet both case at the same time. If so, we will ignore case 2, just repaired the data loss. If the repaired result is not adhering the placement policy, the auditor will mark it again._


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] eolivelli commented on a diff in pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#discussion_r933230989


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java:
##########
@@ -364,6 +364,18 @@ DistributionSchedule.WriteSet reorderReadLACSequence(
     default void updateBookieInfo(Map<BookieId, BookieInfo> bookieInfoMap) {
     }
 
+    /**
+     * Replace some bookie to adhering placement policy. If the all kinds of replacement
+     * didn't adhere placement policy, return empty map.
+     *
+     * @param ensemble
+     * @param writeQuorumSize
+     * @param ackQuorumSize
+     * @return Map: key means ensemble index, value means target replace bookieId.
+     */
+    Map<Integer, BookieId> replaceNotAdheringPlacementPolicyBookie(List<BookieId> ensemble, int writeQuorumSize,

Review Comment:
   yes, we must add a default implementation
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] horizonzy commented on a diff in pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#discussion_r939807153


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java:
##########
@@ -1079,4 +1083,220 @@ public boolean areAckedBookiesAdheringToPlacementPolicy(Set<BookieId> ackedBooki
         }
         return rackCounter.size() >= minWriteQuorumNumRacksPerWriteQuorum;
     }
+
+    @Override
+    public PlacementResult<List<BookieId>> replaceToAdherePlacementPolicy(
+            int ensembleSize,
+            int writeQuorumSize,
+            int ackQuorumSize,
+            Set<BookieId> excludeBookies,
+            List<BookieId> currentEnsemble) {
+        rwLock.readLock().lock();
+        try {
+            PlacementPolicyAdherence currentPlacementAdherence = isEnsembleAdheringToPlacementPolicy(
+                    currentEnsemble, writeQuorumSize, ackQuorumSize);
+            if (PlacementPolicyAdherence.FAIL != currentPlacementAdherence) {
+                return PlacementResult.of(new ArrayList<>(currentEnsemble), currentPlacementAdherence);
+            }
+            for (BookieId bookieId : currentEnsemble) {
+                if (!knownBookies.containsKey(bookieId)) {
+                    excludeBookies.add(bookieId);
+                }
+            }
+            PlacementResult<List<BookieId>> placementResult = PlacementResult.of(Collections.emptyList(),
+                    PlacementPolicyAdherence.FAIL);
+            int minDiffer = Integer.MAX_VALUE;
+            for (int i = 0; i < currentEnsemble.size(); i++) {
+                PlacementResult<List<BookieId>> result = doReplaceToAdherePlacementPolicy(ensembleSize,
+                        writeQuorumSize, ackQuorumSize, excludeBookies, currentEnsemble, i);
+                if (PlacementPolicyAdherence.FAIL == result.getAdheringToPolicy()) {
+                    continue;
+                }
+                int differ = differBetweenBookies(currentEnsemble, result.getResult());
+                if (differ < minDiffer) {
+                    minDiffer = differ;
+                    placementResult = result;
+                    if (minDiffer == 1) {
+                        break;
+                    }
+                }
+            }
+            return placementResult;
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    private PlacementResult<List<BookieId>> doReplaceToAdherePlacementPolicy(
+            int ensembleSize,
+            int writeQuorumSize,
+            int ackQuorumSize,
+            Set<BookieId> excludeBookies,
+            List<BookieId> currentEnsemble,
+            int startIndex) {
+        final List<BookieNode> provisionalEnsembleNodes = currentEnsemble.stream()
+                .map(this::convertBookieToNode).collect(Collectors.toList());
+        final Set<Node> excludeNodes = convertBookiesToNodes(
+                addDefaultRackBookiesIfMinNumRacksIsEnforced(excludeBookies));
+        int minNumRacksPerWriteQuorumForThisEnsemble = Math.min(writeQuorumSize, minNumRacksPerWriteQuorum);
+        final RRTopologyAwareCoverageEnsemble ensemble =
+                new RRTopologyAwareCoverageEnsemble(
+                        ensembleSize,
+                        writeQuorumSize,
+                        ackQuorumSize,
+                        RACKNAME_DISTANCE_FROM_LEAVES,
+                        null,
+                        null,
+                        minNumRacksPerWriteQuorumForThisEnsemble);
+        int numRacks = topology.getNumOfRacks();
+        // only one rack or less than minNumRacksPerWriteQuorumForThisEnsemble, stop calculation to skip relocation
+        if (numRacks < 2 || numRacks < minNumRacksPerWriteQuorumForThisEnsemble) {
+            LOG.warn("Skip ensemble relocation because the cluster has only {} rack.", numRacks);
+            return PlacementResult.of(Collections.emptyList(), PlacementPolicyAdherence.FAIL);
+        }
+        BookieNode prevNode = null;
+        final BookieNode firstNode = provisionalEnsembleNodes.get(startIndex);
+        // use same bookie at first to reduce ledger replication
+        if (!excludeNodes.contains(firstNode) && ensemble.apply(firstNode, ensemble)
+                && ensemble.addNode(firstNode)) {
+            excludeNodes.add(firstNode);
+            prevNode = firstNode;
+        }

Review Comment:
   Do you means `provisionalEnsembleNodes.set(0, firstNode)`?
   If so, could you explain it more? We just change the start index to calculate, I think we shouldn't change `provisionalEnsembleNodes` element 



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopology.java:
##########
@@ -29,6 +29,7 @@ public interface NetworkTopology {
     String DEFAULT_RACK = "/default-rack";
     String DEFAULT_ZONE = "/default-zone";
     String DEFAULT_UPGRADEDOMAIN = "/default-upgradedomain";
+    String INACTIVE = "/inactive";

Review Comment:
   nice catch



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java:
##########
@@ -1079,4 +1083,220 @@ public boolean areAckedBookiesAdheringToPlacementPolicy(Set<BookieId> ackedBooki
         }
         return rackCounter.size() >= minWriteQuorumNumRacksPerWriteQuorum;
     }
+
+    @Override
+    public PlacementResult<List<BookieId>> replaceToAdherePlacementPolicy(
+            int ensembleSize,
+            int writeQuorumSize,
+            int ackQuorumSize,
+            Set<BookieId> excludeBookies,
+            List<BookieId> currentEnsemble) {
+        rwLock.readLock().lock();
+        try {
+            PlacementPolicyAdherence currentPlacementAdherence = isEnsembleAdheringToPlacementPolicy(
+                    currentEnsemble, writeQuorumSize, ackQuorumSize);
+            if (PlacementPolicyAdherence.FAIL != currentPlacementAdherence) {
+                return PlacementResult.of(new ArrayList<>(currentEnsemble), currentPlacementAdherence);
+            }
+            for (BookieId bookieId : currentEnsemble) {
+                if (!knownBookies.containsKey(bookieId)) {
+                    excludeBookies.add(bookieId);
+                }
+            }
+            PlacementResult<List<BookieId>> placementResult = PlacementResult.of(Collections.emptyList(),
+                    PlacementPolicyAdherence.FAIL);
+            int minDiffer = Integer.MAX_VALUE;
+            for (int i = 0; i < currentEnsemble.size(); i++) {
+                PlacementResult<List<BookieId>> result = doReplaceToAdherePlacementPolicy(ensembleSize,
+                        writeQuorumSize, ackQuorumSize, excludeBookies, currentEnsemble, i);
+                if (PlacementPolicyAdherence.FAIL == result.getAdheringToPolicy()) {
+                    continue;
+                }
+                int differ = differBetweenBookies(currentEnsemble, result.getResult());
+                if (differ < minDiffer) {
+                    minDiffer = differ;
+                    placementResult = result;
+                    if (minDiffer == 1) {
+                        break;
+                    }
+                }
+            }
+            return placementResult;
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    private PlacementResult<List<BookieId>> doReplaceToAdherePlacementPolicy(
+            int ensembleSize,
+            int writeQuorumSize,
+            int ackQuorumSize,
+            Set<BookieId> excludeBookies,
+            List<BookieId> currentEnsemble,
+            int startIndex) {
+        final List<BookieNode> provisionalEnsembleNodes = currentEnsemble.stream()
+                .map(this::convertBookieToNode).collect(Collectors.toList());
+        final Set<Node> excludeNodes = convertBookiesToNodes(
+                addDefaultRackBookiesIfMinNumRacksIsEnforced(excludeBookies));
+        int minNumRacksPerWriteQuorumForThisEnsemble = Math.min(writeQuorumSize, minNumRacksPerWriteQuorum);
+        final RRTopologyAwareCoverageEnsemble ensemble =
+                new RRTopologyAwareCoverageEnsemble(
+                        ensembleSize,
+                        writeQuorumSize,
+                        ackQuorumSize,
+                        RACKNAME_DISTANCE_FROM_LEAVES,
+                        null,
+                        null,
+                        minNumRacksPerWriteQuorumForThisEnsemble);
+        int numRacks = topology.getNumOfRacks();
+        // only one rack or less than minNumRacksPerWriteQuorumForThisEnsemble, stop calculation to skip relocation
+        if (numRacks < 2 || numRacks < minNumRacksPerWriteQuorumForThisEnsemble) {
+            LOG.warn("Skip ensemble relocation because the cluster has only {} rack.", numRacks);
+            return PlacementResult.of(Collections.emptyList(), PlacementPolicyAdherence.FAIL);
+        }

Review Comment:
   yes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] equanz commented on pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
equanz commented on PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#issuecomment-1207710503

   > Now we add the shutdown bookies to excludes nodes to replace it.
   
   Just confirm, are you mentioned here?
   https://github.com/horizonzy/bookkeeper/blob/6ef1e2aa8ac0780bd8199b360e840506cbc85e5d/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java#L1101-L1105
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] equanz commented on a diff in pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
equanz commented on code in PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#discussion_r939846111


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java:
##########
@@ -1079,4 +1083,220 @@ public boolean areAckedBookiesAdheringToPlacementPolicy(Set<BookieId> ackedBooki
         }
         return rackCounter.size() >= minWriteQuorumNumRacksPerWriteQuorum;
     }
+
+    @Override
+    public PlacementResult<List<BookieId>> replaceToAdherePlacementPolicy(
+            int ensembleSize,
+            int writeQuorumSize,
+            int ackQuorumSize,
+            Set<BookieId> excludeBookies,
+            List<BookieId> currentEnsemble) {
+        rwLock.readLock().lock();
+        try {
+            PlacementPolicyAdherence currentPlacementAdherence = isEnsembleAdheringToPlacementPolicy(
+                    currentEnsemble, writeQuorumSize, ackQuorumSize);
+            if (PlacementPolicyAdherence.FAIL != currentPlacementAdherence) {
+                return PlacementResult.of(new ArrayList<>(currentEnsemble), currentPlacementAdherence);
+            }
+            for (BookieId bookieId : currentEnsemble) {
+                if (!knownBookies.containsKey(bookieId)) {
+                    excludeBookies.add(bookieId);
+                }
+            }
+            PlacementResult<List<BookieId>> placementResult = PlacementResult.of(Collections.emptyList(),
+                    PlacementPolicyAdherence.FAIL);
+            int minDiffer = Integer.MAX_VALUE;
+            for (int i = 0; i < currentEnsemble.size(); i++) {
+                PlacementResult<List<BookieId>> result = doReplaceToAdherePlacementPolicy(ensembleSize,
+                        writeQuorumSize, ackQuorumSize, excludeBookies, currentEnsemble, i);
+                if (PlacementPolicyAdherence.FAIL == result.getAdheringToPolicy()) {
+                    continue;
+                }
+                int differ = differBetweenBookies(currentEnsemble, result.getResult());
+                if (differ < minDiffer) {
+                    minDiffer = differ;
+                    placementResult = result;
+                    if (minDiffer == 1) {
+                        break;
+                    }
+                }
+            }
+            return placementResult;
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    private PlacementResult<List<BookieId>> doReplaceToAdherePlacementPolicy(
+            int ensembleSize,
+            int writeQuorumSize,
+            int ackQuorumSize,
+            Set<BookieId> excludeBookies,
+            List<BookieId> currentEnsemble,
+            int startIndex) {
+        final List<BookieNode> provisionalEnsembleNodes = currentEnsemble.stream()
+                .map(this::convertBookieToNode).collect(Collectors.toList());
+        final Set<Node> excludeNodes = convertBookiesToNodes(
+                addDefaultRackBookiesIfMinNumRacksIsEnforced(excludeBookies));
+        int minNumRacksPerWriteQuorumForThisEnsemble = Math.min(writeQuorumSize, minNumRacksPerWriteQuorum);
+        final RRTopologyAwareCoverageEnsemble ensemble =
+                new RRTopologyAwareCoverageEnsemble(
+                        ensembleSize,
+                        writeQuorumSize,
+                        ackQuorumSize,
+                        RACKNAME_DISTANCE_FROM_LEAVES,
+                        null,
+                        null,
+                        minNumRacksPerWriteQuorumForThisEnsemble);
+        int numRacks = topology.getNumOfRacks();
+        // only one rack or less than minNumRacksPerWriteQuorumForThisEnsemble, stop calculation to skip relocation
+        if (numRacks < 2 || numRacks < minNumRacksPerWriteQuorumForThisEnsemble) {
+            LOG.warn("Skip ensemble relocation because the cluster has only {} rack.", numRacks);
+            return PlacementResult.of(Collections.emptyList(), PlacementPolicyAdherence.FAIL);
+        }
+        BookieNode prevNode = null;
+        final BookieNode firstNode = provisionalEnsembleNodes.get(startIndex);
+        // use same bookie at first to reduce ledger replication
+        if (!excludeNodes.contains(firstNode) && ensemble.apply(firstNode, ensemble)
+                && ensemble.addNode(firstNode)) {
+            excludeNodes.add(firstNode);
+            prevNode = firstNode;
+        }

Review Comment:
   I see. I now have a good understanding of the below process.
   https://github.com/horizonzy/bookkeeper/blob/edefdeb82f816c3910bbc340c1264fc644d3f3a5/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java#L1209-L1218
   
   It's my mistake.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] horizonzy commented on a diff in pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#discussion_r937325985


##########
bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java:
##########
@@ -1465,6 +1465,176 @@ public boolean validate() {
         }
     }
 
+    @Test
+    public void testReplaceNotAdheringPlacementPolicyBookie() throws Exception {
+        repp.uninitalize();
+
+        int minNumRacksPerWriteQuorum = 3;
+        ClientConfiguration clientConf = new ClientConfiguration(conf);
+        clientConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorum);
+        // set enforceMinNumRacksPerWriteQuorum
+        clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
+        repp = new RackawareEnsemblePlacementPolicy();
+        repp.initialize(clientConf, Optional.<DNSToSwitchMapping> empty(), timer, DISABLE_ALL,
+                NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        int numOfRacks = 3;
+        int numOfBookiesPerRack = 3;
+        String[] rackLocationNames = new String[numOfRacks];
+        List<BookieId> bookieSocketAddresses = new ArrayList<BookieId>();
+        Map<BookieId, String> bookieRackMap = new HashMap<BookieId, String>();
+        BookieId bookieAddress;
+
+        for (int i = 0; i < numOfRacks; i++) {
+            rackLocationNames[i] = "/default-region/r" + i;
+            for (int j = 0; j < numOfBookiesPerRack; j++) {
+                int index = i * numOfBookiesPerRack + j;
+                bookieAddress = new BookieSocketAddress("128.0.0." + index, 3181).toBookieId();
+                StaticDNSResolver.addNodeToRack("128.0.0." + index, rackLocationNames[i]);
+                bookieSocketAddresses.add(bookieAddress);
+                bookieRackMap.put(bookieAddress, rackLocationNames[i]);
+            }
+        }
+
+        repp.onClusterChanged(new HashSet<BookieId>(bookieSocketAddresses), new HashSet<BookieId>());
+
+        int writeQuorum = 3;
+        int ackQuorum = 3;
+
+        //test three knows bookie
+        List<BookieId> knowsEnsemble = new ArrayList<>();
+        knowsEnsemble.add(BookieId.parse("128.0.0.0:3181"));
+        knowsEnsemble.add(BookieId.parse("128.0.0.1:3181"));
+        knowsEnsemble.add(BookieId.parse("128.0.0.2:3181"));
+
+        PlacementPolicyAdherence placementPolicyAdherence = repp.isEnsembleAdheringToPlacementPolicy(
+                knowsEnsemble, writeQuorum, ackQuorum);
+        assertEquals(placementPolicyAdherence, PlacementPolicyAdherence.FAIL);
+
+        Map<Integer, BookieId> targetBookie =
+                repp.replaceNotAdheringPlacementPolicyBookie(knowsEnsemble, ackQuorum, writeQuorum,
+                        Collections.emptyMap());

Review Comment:
   nice catch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] rdhabalia commented on pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
rdhabalia commented on PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#issuecomment-1193025519

   also, can we cherry-pick this change to previous release 4.14?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] horizonzy commented on a diff in pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#discussion_r928059554


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java:
##########
@@ -217,11 +218,27 @@ public List<BookieId> getEnsemble() {
         return this.ensemble;
     }
 
+    public ReplicateType getReplicateType() {
+        return replicateType;
+    }
+
+    public void setReplicateType(ReplicateType replicateType) {
+        this.replicateType = replicateType;
+    }
+
     @Override
     public String toString() {
         return String.format("Fragment(LedgerID: %d, FirstEntryID: %d[%d], "
                 + "LastKnownEntryID: %d[%d], Host: %s, Closed: %s)", ledgerId, firstEntryId,
                 getFirstStoredEntryId(), lastKnownEntryId, getLastStoredEntryId(),
                 getAddresses(), isLedgerClosed);
     }
+
+    /**
+     * ReplicateType.
+     */
+    public enum ReplicateType {
+        DATA_LOSS,

Review Comment:
   see line_42, the default type is `DATA_LOSS`



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java:
##########
@@ -1128,14 +1129,24 @@ private ArrayList<BookieId> replaceBookiesInEnsemble(
      * @param ledgerFragment
      *            - LedgerFragment to replicate
      */
-    public void replicateLedgerFragment(LedgerHandle lh,
-            final LedgerFragment ledgerFragment,
-            final BiConsumer<Long, Long> onReadEntryFailureCallback)
-            throws InterruptedException, BKException {
-        Optional<Set<BookieId>> excludedBookies = Optional.empty();
-        Map<Integer, BookieId> targetBookieAddresses =
-                getReplacementBookiesByIndexes(lh, ledgerFragment.getEnsemble(),
-                        ledgerFragment.getBookiesIndexes(), excludedBookies);
+    public void replicateLedgerFragment(LedgerHandle lh, final LedgerFragment ledgerFragment,
+            final BiConsumer<Long, Long> onReadEntryFailureCallback) throws InterruptedException, BKException {
+        Map<Integer, BookieId> targetBookieAddresses = null;
+        if (LedgerFragment.ReplicateType.DATA_LOSS == ledgerFragment.getReplicateType()) {

Review Comment:
   On the `DATA_LOSS`, we know the bad bookie we want to replace in the ledgerChecker.
   On the `DATA_NOT_ADHERING_PLACEMENT`, we just know the ensemble is not adhering the placement policy, so we need find which bookie should be replaced.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] hangc0276 commented on pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#issuecomment-1244856469

   This PR is an enhancement for auto recovery, and the new interface has a default implementation, which is compatible with the old version. I suggest cherry-picking it to branch-4.14 and branch-4.15. Do you have any suggestions? @merlimat @eolivelli @dlg99 @rdhabalia @zymap 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] horizonzy commented on a diff in pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#discussion_r937325134


##########
bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java:
##########
@@ -1465,6 +1465,176 @@ public boolean validate() {
         }
     }
 
+    @Test
+    public void testReplaceNotAdheringPlacementPolicyBookie() throws Exception {
+        repp.uninitalize();
+
+        int minNumRacksPerWriteQuorum = 3;
+        ClientConfiguration clientConf = new ClientConfiguration(conf);
+        clientConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorum);
+        // set enforceMinNumRacksPerWriteQuorum
+        clientConf.setEnforceMinNumRacksPerWriteQuorum(true);

Review Comment:
   Yes, what ever the `enforceMinNumRacksPerWriteQuorum` value is true or false, it's unnecessary for this test.  I will remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] horizonzy commented on pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#issuecomment-1207717335

   > > Now we add the shutdown bookies to excludes nodes to replace it.
   > 
   > Just confirm, are you mentioned here? https://github.com/horizonzy/bookkeeper/blob/6ef1e2aa8ac0780bd8199b360e840506cbc85e5d/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java#L1101-L1105
   
   yes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] hangc0276 commented on pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#issuecomment-1236489337

   rerun failure checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] equanz commented on a diff in pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
equanz commented on code in PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#discussion_r939784939


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java:
##########
@@ -1079,4 +1083,220 @@ public boolean areAckedBookiesAdheringToPlacementPolicy(Set<BookieId> ackedBooki
         }
         return rackCounter.size() >= minWriteQuorumNumRacksPerWriteQuorum;
     }
+
+    @Override
+    public PlacementResult<List<BookieId>> replaceToAdherePlacementPolicy(
+            int ensembleSize,
+            int writeQuorumSize,
+            int ackQuorumSize,
+            Set<BookieId> excludeBookies,
+            List<BookieId> currentEnsemble) {
+        rwLock.readLock().lock();
+        try {
+            PlacementPolicyAdherence currentPlacementAdherence = isEnsembleAdheringToPlacementPolicy(
+                    currentEnsemble, writeQuorumSize, ackQuorumSize);
+            if (PlacementPolicyAdherence.FAIL != currentPlacementAdherence) {
+                return PlacementResult.of(new ArrayList<>(currentEnsemble), currentPlacementAdherence);
+            }
+            for (BookieId bookieId : currentEnsemble) {
+                if (!knownBookies.containsKey(bookieId)) {
+                    excludeBookies.add(bookieId);
+                }
+            }
+            PlacementResult<List<BookieId>> placementResult = PlacementResult.of(Collections.emptyList(),
+                    PlacementPolicyAdherence.FAIL);
+            int minDiffer = Integer.MAX_VALUE;
+            for (int i = 0; i < currentEnsemble.size(); i++) {
+                PlacementResult<List<BookieId>> result = doReplaceToAdherePlacementPolicy(ensembleSize,
+                        writeQuorumSize, ackQuorumSize, excludeBookies, currentEnsemble, i);
+                if (PlacementPolicyAdherence.FAIL == result.getAdheringToPolicy()) {
+                    continue;
+                }
+                int differ = differBetweenBookies(currentEnsemble, result.getResult());
+                if (differ < minDiffer) {
+                    minDiffer = differ;
+                    placementResult = result;
+                    if (minDiffer == 1) {
+                        break;
+                    }
+                }
+            }
+            return placementResult;
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    private PlacementResult<List<BookieId>> doReplaceToAdherePlacementPolicy(
+            int ensembleSize,
+            int writeQuorumSize,
+            int ackQuorumSize,
+            Set<BookieId> excludeBookies,
+            List<BookieId> currentEnsemble,
+            int startIndex) {
+        final List<BookieNode> provisionalEnsembleNodes = currentEnsemble.stream()
+                .map(this::convertBookieToNode).collect(Collectors.toList());
+        final Set<Node> excludeNodes = convertBookiesToNodes(
+                addDefaultRackBookiesIfMinNumRacksIsEnforced(excludeBookies));
+        int minNumRacksPerWriteQuorumForThisEnsemble = Math.min(writeQuorumSize, minNumRacksPerWriteQuorum);
+        final RRTopologyAwareCoverageEnsemble ensemble =
+                new RRTopologyAwareCoverageEnsemble(
+                        ensembleSize,
+                        writeQuorumSize,
+                        ackQuorumSize,
+                        RACKNAME_DISTANCE_FROM_LEAVES,
+                        null,
+                        null,
+                        minNumRacksPerWriteQuorumForThisEnsemble);
+        int numRacks = topology.getNumOfRacks();
+        // only one rack or less than minNumRacksPerWriteQuorumForThisEnsemble, stop calculation to skip relocation
+        if (numRacks < 2 || numRacks < minNumRacksPerWriteQuorumForThisEnsemble) {
+            LOG.warn("Skip ensemble relocation because the cluster has only {} rack.", numRacks);
+            return PlacementResult.of(Collections.emptyList(), PlacementPolicyAdherence.FAIL);
+        }
+        BookieNode prevNode = null;
+        final BookieNode firstNode = provisionalEnsembleNodes.get(startIndex);
+        // use same bookie at first to reduce ledger replication
+        if (!excludeNodes.contains(firstNode) && ensemble.apply(firstNode, ensemble)
+                && ensemble.addNode(firstNode)) {
+            excludeNodes.add(firstNode);
+            prevNode = firstNode;
+        }

Review Comment:
   Should set firstNode to the first index of the `provisionalEnsembleNodes`. This variable is used in `replaceToAdherePlacementPolicyInternal` for calculation.



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java:
##########
@@ -1079,4 +1083,220 @@ public boolean areAckedBookiesAdheringToPlacementPolicy(Set<BookieId> ackedBooki
         }
         return rackCounter.size() >= minWriteQuorumNumRacksPerWriteQuorum;
     }
+
+    @Override
+    public PlacementResult<List<BookieId>> replaceToAdherePlacementPolicy(
+            int ensembleSize,
+            int writeQuorumSize,
+            int ackQuorumSize,
+            Set<BookieId> excludeBookies,
+            List<BookieId> currentEnsemble) {
+        rwLock.readLock().lock();
+        try {
+            PlacementPolicyAdherence currentPlacementAdherence = isEnsembleAdheringToPlacementPolicy(
+                    currentEnsemble, writeQuorumSize, ackQuorumSize);
+            if (PlacementPolicyAdherence.FAIL != currentPlacementAdherence) {
+                return PlacementResult.of(new ArrayList<>(currentEnsemble), currentPlacementAdherence);
+            }
+            for (BookieId bookieId : currentEnsemble) {
+                if (!knownBookies.containsKey(bookieId)) {
+                    excludeBookies.add(bookieId);
+                }
+            }
+            PlacementResult<List<BookieId>> placementResult = PlacementResult.of(Collections.emptyList(),
+                    PlacementPolicyAdherence.FAIL);
+            int minDiffer = Integer.MAX_VALUE;
+            for (int i = 0; i < currentEnsemble.size(); i++) {
+                PlacementResult<List<BookieId>> result = doReplaceToAdherePlacementPolicy(ensembleSize,
+                        writeQuorumSize, ackQuorumSize, excludeBookies, currentEnsemble, i);
+                if (PlacementPolicyAdherence.FAIL == result.getAdheringToPolicy()) {
+                    continue;
+                }
+                int differ = differBetweenBookies(currentEnsemble, result.getResult());
+                if (differ < minDiffer) {
+                    minDiffer = differ;
+                    placementResult = result;
+                    if (minDiffer == 1) {
+                        break;
+                    }
+                }
+            }
+            return placementResult;
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    private PlacementResult<List<BookieId>> doReplaceToAdherePlacementPolicy(
+            int ensembleSize,
+            int writeQuorumSize,
+            int ackQuorumSize,
+            Set<BookieId> excludeBookies,
+            List<BookieId> currentEnsemble,
+            int startIndex) {
+        final List<BookieNode> provisionalEnsembleNodes = currentEnsemble.stream()
+                .map(this::convertBookieToNode).collect(Collectors.toList());
+        final Set<Node> excludeNodes = convertBookiesToNodes(
+                addDefaultRackBookiesIfMinNumRacksIsEnforced(excludeBookies));
+        int minNumRacksPerWriteQuorumForThisEnsemble = Math.min(writeQuorumSize, minNumRacksPerWriteQuorum);
+        final RRTopologyAwareCoverageEnsemble ensemble =
+                new RRTopologyAwareCoverageEnsemble(
+                        ensembleSize,
+                        writeQuorumSize,
+                        ackQuorumSize,
+                        RACKNAME_DISTANCE_FROM_LEAVES,
+                        null,
+                        null,
+                        minNumRacksPerWriteQuorumForThisEnsemble);
+        int numRacks = topology.getNumOfRacks();
+        // only one rack or less than minNumRacksPerWriteQuorumForThisEnsemble, stop calculation to skip relocation
+        if (numRacks < 2 || numRacks < minNumRacksPerWriteQuorumForThisEnsemble) {
+            LOG.warn("Skip ensemble relocation because the cluster has only {} rack.", numRacks);
+            return PlacementResult.of(Collections.emptyList(), PlacementPolicyAdherence.FAIL);
+        }

Review Comment:
   Should move to replaceToAdherePlacementPolicy and check it first to reduce redundant calls.



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java:
##########
@@ -440,6 +440,31 @@ default boolean areAckedBookiesAdheringToPlacementPolicy(Set<BookieId> ackedBook
         return true;
     }
 
+    /**
+     * Returns placement result. If the currentEnsemble is not adhering placement policy, returns new ensemble that
+     * adheres placement policy. It should be implemented so as to minify the number of bookies replaced.
+     *
+     * @param ensembleSize
+     *            ensemble size
+     * @param writeQuorumSize
+ *                writeQuorumSize of the ensemble
+     * @param ackQuorumSize
+     *            ackQuorumSize of the ensemble
+     * @param excludeBookies
+     *            bookies that should not be considered as targets
+     * @param currentEnsemble
+     *            current ensemble
+     * @return a placement result
+     */
+    default PlacementResult<List<BookieId>> replaceToAdherePlacementPolicy(

Review Comment:
   What about other TopologyAware classes like RegionAware, Zoneaware?
   Implement in the next PR? Or temporally implement the [old approach](https://github.com/horizonzy/bookkeeper/blob/7669c7ad15952f5ea0c448d4003e5aabcebebd94/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java#L794-L886)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] equanz commented on a diff in pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
equanz commented on code in PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#discussion_r939777030


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java:
##########
@@ -1079,4 +1083,220 @@ public boolean areAckedBookiesAdheringToPlacementPolicy(Set<BookieId> ackedBooki
         }
         return rackCounter.size() >= minWriteQuorumNumRacksPerWriteQuorum;
     }
+
+    @Override
+    public PlacementResult<List<BookieId>> replaceToAdherePlacementPolicy(
+            int ensembleSize,
+            int writeQuorumSize,
+            int ackQuorumSize,
+            Set<BookieId> excludeBookies,
+            List<BookieId> currentEnsemble) {
+        rwLock.readLock().lock();
+        try {
+            PlacementPolicyAdherence currentPlacementAdherence = isEnsembleAdheringToPlacementPolicy(
+                    currentEnsemble, writeQuorumSize, ackQuorumSize);
+            if (PlacementPolicyAdherence.FAIL != currentPlacementAdherence) {
+                return PlacementResult.of(new ArrayList<>(currentEnsemble), currentPlacementAdherence);
+            }
+            for (BookieId bookieId : currentEnsemble) {
+                if (!knownBookies.containsKey(bookieId)) {
+                    excludeBookies.add(bookieId);
+                }
+            }
+            PlacementResult<List<BookieId>> placementResult = PlacementResult.of(Collections.emptyList(),
+                    PlacementPolicyAdherence.FAIL);
+            int minDiffer = Integer.MAX_VALUE;
+            for (int i = 0; i < currentEnsemble.size(); i++) {
+                PlacementResult<List<BookieId>> result = doReplaceToAdherePlacementPolicy(ensembleSize,
+                        writeQuorumSize, ackQuorumSize, excludeBookies, currentEnsemble, i);
+                if (PlacementPolicyAdherence.FAIL == result.getAdheringToPolicy()) {
+                    continue;
+                }
+                int differ = differBetweenBookies(currentEnsemble, result.getResult());
+                if (differ < minDiffer) {
+                    minDiffer = differ;
+                    placementResult = result;
+                    if (minDiffer == 1) {
+                        break;
+                    }
+                }
+            }
+            return placementResult;
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    private PlacementResult<List<BookieId>> doReplaceToAdherePlacementPolicy(
+            int ensembleSize,
+            int writeQuorumSize,
+            int ackQuorumSize,
+            Set<BookieId> excludeBookies,
+            List<BookieId> currentEnsemble,
+            int startIndex) {
+        final List<BookieNode> provisionalEnsembleNodes = currentEnsemble.stream()
+                .map(this::convertBookieToNode).collect(Collectors.toList());
+        final Set<Node> excludeNodes = convertBookiesToNodes(
+                addDefaultRackBookiesIfMinNumRacksIsEnforced(excludeBookies));
+        int minNumRacksPerWriteQuorumForThisEnsemble = Math.min(writeQuorumSize, minNumRacksPerWriteQuorum);
+        final RRTopologyAwareCoverageEnsemble ensemble =
+                new RRTopologyAwareCoverageEnsemble(
+                        ensembleSize,
+                        writeQuorumSize,
+                        ackQuorumSize,
+                        RACKNAME_DISTANCE_FROM_LEAVES,
+                        null,
+                        null,
+                        minNumRacksPerWriteQuorumForThisEnsemble);
+        int numRacks = topology.getNumOfRacks();
+        // only one rack or less than minNumRacksPerWriteQuorumForThisEnsemble, stop calculation to skip relocation
+        if (numRacks < 2 || numRacks < minNumRacksPerWriteQuorumForThisEnsemble) {
+            LOG.warn("Skip ensemble relocation because the cluster has only {} rack.", numRacks);
+            return PlacementResult.of(Collections.emptyList(), PlacementPolicyAdherence.FAIL);
+        }

Review Comment:
   Should move to `replaceToAdherePlacementPolicy` and check it first to reduce redundant calls.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] equanz commented on pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
equanz commented on PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#issuecomment-1207628808

   And what about the default rack?
   https://github.com/apache/bookkeeper/pull/2931#pullrequestreview-1062016018
   
   If it is being addressed, please let me know where it is.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] horizonzy commented on a diff in pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#discussion_r939693677


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java:
##########
@@ -1079,4 +1083,236 @@ public boolean areAckedBookiesAdheringToPlacementPolicy(Set<BookieId> ackedBooki
         }
         return rackCounter.size() >= minWriteQuorumNumRacksPerWriteQuorum;
     }
+
+    @Override
+    public PlacementResult<List<BookieId>> replaceToAdherePlacementPolicy(
+            int ensembleSize,
+            int writeQuorumSize,
+            int ackQuorumSize,
+            Set<BookieId> excludeBookies,
+            List<BookieId> currentEnsemble) {
+        rwLock.readLock().lock();
+        try {
+            PlacementPolicyAdherence currentPlacementAdherence = isEnsembleAdheringToPlacementPolicy(
+                    currentEnsemble, writeQuorumSize, ackQuorumSize);
+            if (PlacementPolicyAdherence.FAIL != currentPlacementAdherence) {
+                return PlacementResult.of(new ArrayList<>(currentEnsemble), currentPlacementAdherence);
+            }
+            for (BookieId bookieId : currentEnsemble) {
+                if (!knownBookies.containsKey(bookieId)) {
+                    excludeBookies.add(bookieId);
+                }
+            }
+            PlacementResult<List<BookieId>> placementResult = PlacementResult.of(Collections.emptyList(),
+                    PlacementPolicyAdherence.FAIL);
+            int minDiffer = Integer.MAX_VALUE;
+            for (int i = 0; i < currentEnsemble.size(); i++) {
+                PlacementResult<List<BookieId>> result = doReplaceToAdherePlacementPolicy(ensembleSize,
+                        writeQuorumSize, ackQuorumSize, excludeBookies, currentEnsemble, i);
+                if (PlacementPolicyAdherence.FAIL == result.getAdheringToPolicy()) {
+                    continue;
+                }
+                int differ = differBetweenBookies(currentEnsemble, result.getResult());
+                if (differ < minDiffer) {
+                    minDiffer = differ;
+                    placementResult = result;
+                    if (minDiffer == 1) {
+                        break;
+                    }
+                }
+            }
+            return placementResult;
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    private int differBetweenBookies(List<BookieId> bookiesA, List<BookieId> bookiesB) {

Review Comment:
   agree



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java:
##########
@@ -440,6 +440,31 @@ default boolean areAckedBookiesAdheringToPlacementPolicy(Set<BookieId> ackedBook
         return true;
     }
 
+    /**
+     * Returns placement result. If the currentEnsemble is not adhering placement policy, returns new ensemble that
+     * adheres placement policy. It should be implemented so as to minify the number of bookies replaced.
+     *
+     * @param ensembleSize
+     *            ensemble size
+     * @param writeQuorumSize
+ *                writeQuorumSize of the ensemble
+     * @param ackQuorumSize
+     *            ackQuorumSize of the ensemble
+     * @param excludeBookies
+     *            bookies that should not be considered as targets
+     * @param currentEnsemble
+     *            current ensemble
+     * @return a placement result
+     */
+    default PlacementResult<List<BookieId>> replaceToAdherePlacementPolicy(
+            int ensembleSize,
+            int writeQuorumSize,
+            int ackQuorumSize,
+            Set<BookieId> excludeBookies,
+            List<BookieId> currentEnsemble) {
+        throw new UnsupportedOperationException();

Review Comment:
   Yes, that's fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] zymap commented on a diff in pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
zymap commented on code in PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#discussion_r939756121


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java:
##########
@@ -481,7 +506,7 @@ public T getResult() {
             return result;
         }
 
-        public PlacementPolicyAdherence isAdheringToPolicy() {
+        public PlacementPolicyAdherence getAdheringToPolicy() {

Review Comment:
   Why change this? If you need, you can add a new method and keep the original method. We need to keep the compatibility.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] horizonzy commented on a diff in pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#discussion_r939801881


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java:
##########
@@ -440,6 +440,31 @@ default boolean areAckedBookiesAdheringToPlacementPolicy(Set<BookieId> ackedBook
         return true;
     }
 
+    /**
+     * Returns placement result. If the currentEnsemble is not adhering placement policy, returns new ensemble that
+     * adheres placement policy. It should be implemented so as to minify the number of bookies replaced.
+     *
+     * @param ensembleSize
+     *            ensemble size
+     * @param writeQuorumSize
+ *                writeQuorumSize of the ensemble
+     * @param ackQuorumSize
+     *            ackQuorumSize of the ensemble
+     * @param excludeBookies
+     *            bookies that should not be considered as targets
+     * @param currentEnsemble
+     *            current ensemble
+     * @return a placement result
+     */
+    default PlacementResult<List<BookieId>> replaceToAdherePlacementPolicy(

Review Comment:
   > 
   
   
   
   > And what about the default rack? [#2931 (review)](https://github.com/apache/bookkeeper/pull/2931#pullrequestreview-1062016018)
   > 
   > If it is being addressed, please let me know where it is.
   
   I means if we didn't handle the shutdown bookies, it will be handle as default-bookie, the default-rack is different with other bookie's rack, so it won't be replaced. Now we add the shutdown bookies to excludes nodes to replace it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] horizonzy commented on a diff in pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#discussion_r937364204


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java:
##########
@@ -788,6 +791,116 @@ public void updateBookieInfo(Map<BookieId, BookieInfo> bookieInfoMap) {
         }
     }
 
+    @Override
+    public Map<Integer, BookieId> replaceNotAdheringPlacementPolicyBookie(List<BookieId> ensemble, int writeQuorumSize,
+            int ackQuorumSize, Map<String, byte[]> customMetadata) {
+        rwLock.readLock().lock();
+        try {
+            if (CollectionUtils.isEmpty(ensemble)) {
+                return Collections.emptyMap();
+            }
+            PlacementPolicyAdherence ensembleAdheringToPlacementPolicy = isEnsembleAdheringToPlacementPolicy(ensemble,
+                    writeQuorumSize, ackQuorumSize);
+            if (PlacementPolicyAdherence.FAIL != ensembleAdheringToPlacementPolicy) {
+                return Collections.emptyMap();
+            }
+            Map<BookieId, Integer> bookieIndex = new HashMap<>();
+            for (int i = 0; i < ensemble.size(); i++) {
+                bookieIndex.put(ensemble.get(i), i);
+            }
+            Map<BookieId, BookieNode> clone = new HashMap<>(knownBookies);
+
+            Map<String, List<BookieNode>> toPlaceGroup = new HashMap<>();
+            for (BookieId bookieId : ensemble) {
+                //When ReplicationWorker.getUnderreplicatedFragments, the bookie is alive, so the fragment is not
+                // data_loss. When find other rack bookie to replace, the bookie maybe shutdown, so here we should pick
+                // the shutdown bookies. If the bookieId shutdown, put it to inactive. When do replace, we should
+                // replace inactive bookie firstly.
+                BookieNode bookieNode = clone.get(bookieId);
+                if (bookieNode == null) {
+                    List<BookieNode> list = toPlaceGroup.computeIfAbsent(NetworkTopology.INACTIVE,
+                            k -> new ArrayList<>());
+                    list.add(new BookieNode(bookieId, NetworkTopology.INACTIVE));
+                } else {
+                    List<BookieNode> list = toPlaceGroup.computeIfAbsent(bookieNode.getNetworkLocation(),
+                            k -> new ArrayList<>());
+                    list.add(bookieNode);
+                }
+            }
+            for (List<BookieNode> bookieNodes : toPlaceGroup.values()) {
+                Collections.shuffle(bookieNodes);
+            }
+
+            Map<String, List<BookieNode>> knownRackToBookies = clone.values().stream()
+                    .collect(Collectors.groupingBy(NodeBase::getNetworkLocation));
+            HashSet<String> knownRacks = new HashSet<>(knownRackToBookies.keySet());
+
+            Set<BookieId> excludesBookies = new HashSet<>();
+
+            for (String key : toPlaceGroup.keySet()) {
+                List<BookieNode> sameRack = knownRackToBookies.get(key);
+                if (!CollectionUtils.isEmpty(sameRack)) {
+                    excludesBookies.addAll(sameRack.stream().map(BookieNode::getAddr).collect(Collectors.toSet()));
+                }
+            }
+
+            Map<Integer, BookieId> targetBookieAddresses = new HashMap<>();
+            boolean placeSucceed = false;
+            while (knownRacks.size() > 0) {
+                BookieNode beReplaceNode = getBeReplaceNode(toPlaceGroup);
+                if (beReplaceNode == null) {
+                    break;
+                }
+                Integer index = bookieIndex.get(beReplaceNode.getAddr());
+                try {
+                    PlacementResult<BookieId> placementResult = replaceBookie(ensemble.size(), writeQuorumSize,
+                            ackQuorumSize, customMetadata, ensemble, beReplaceNode.getAddr(), excludesBookies);
+                    BookieNode replaceNode = clone.get(placementResult.getResult());
+                    String replaceNodeNetwork = replaceNode.getNetworkLocation();
+                    knownRacks.remove(replaceNodeNetwork);
+                    List<BookieNode> nodes = toPlaceGroup.computeIfAbsent(replaceNodeNetwork,
+                            k -> new ArrayList<>());
+                    nodes.add(replaceNode);
+                    targetBookieAddresses.put(index, replaceNode.getAddr());
+                    List<BookieNode> bookieNodes = knownRackToBookies.get(replaceNodeNetwork);
+                    if (!CollectionUtils.isEmpty(bookieNodes)) {
+                        excludesBookies.addAll(
+                                bookieNodes.stream().map(BookieNode::getAddr).collect(Collectors.toSet()));
+                    }
+                } catch (BKException.BKNotEnoughBookiesException e) {
+                    LOG.warn("Didn't find replaced bookie to adhere placement policy.", e);
+                    break;
+                }
+
+                List<BookieId> ensembles = toPlaceGroup.values().stream().flatMap(Collection::stream).map(
+                        BookieNode::getAddr).collect(Collectors.toList());
+                ensembleAdheringToPlacementPolicy = isEnsembleAdheringToPlacementPolicy(ensembles,
+                        writeQuorumSize, ackQuorumSize);
+                if (PlacementPolicyAdherence.FAIL != ensembleAdheringToPlacementPolicy) {
+                    placeSucceed = true;
+                    break;
+                }
+            }

Review Comment:
   Yes, cause the same rack bookie is closed, we should consider the order by ensemble.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] horizonzy commented on pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#issuecomment-1207672356

   > And what about the default rack? [#2931 (review)](https://github.com/apache/bookkeeper/pull/2931#pullrequestreview-1062016018)
   > 
   > If it is being addressed, please let me know where it is.
   
   I means if we didn't handle the shutdown bookies, it will be handle as default-bookie, the default-rack is different with other bookie's rack, so it won't be replaced. Now we add the shutdown bookies to excludes nodes to replace it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] eolivelli merged pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
eolivelli merged PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] eolivelli commented on pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
eolivelli commented on PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#issuecomment-1250645511

   @hangc0276 please ask on dev@
   I agree with you.
   Also 4.15 and 4.16 have some problems that are blocking the adoption.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] zymap commented on a diff in pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
zymap commented on code in PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#discussion_r928369579


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java:
##########
@@ -502,10 +570,40 @@ private boolean isLastSegmentOpenAndMissingBookies(LedgerHandle lh) throws BKExc
      */
     private Set<LedgerFragment> getUnderreplicatedFragments(LedgerHandle lh, Long ledgerVerificationPercentage)
             throws InterruptedException {
+        //The data loss fragments is first to repair. If a fragment is data_loss and not_adhering_placement
+        //at the same time, we only fix data_loss in this time. After fix data_loss, the fragment is still
+        //not_adhering_placement, Auditor will mark this ledger again.

Review Comment:
   Why do we handle it separately? Why don't we get the correct node once the ensemble needs to replace and do the replicate?



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java:
##########
@@ -364,6 +364,18 @@ DistributionSchedule.WriteSet reorderReadLACSequence(
     default void updateBookieInfo(Map<BookieId, BookieInfo> bookieInfoMap) {
     }
 
+    /**
+     * Replace some bookie to adhering placement policy. If the all kinds of replacement
+     * didn't adhere placement policy, return empty map.
+     *
+     * @param ensemble
+     * @param writeQuorumSize
+     * @param ackQuorumSize
+     * @return Map: key means ensemble index, value means target replace bookieId.
+     */
+    Map<Integer, BookieId> replaceNotAdheringPlacementPolicyBookie(List<BookieId> ensemble, int writeQuorumSize,

Review Comment:
   Can we make it a default implementation so it won't break the user-side implementation? Like how line 364 do it.



##########
bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java:
##########
@@ -1417,6 +1417,176 @@ public boolean validate() {
         }
     }
 
+    @Test
+    public void testReplaceNotAdheringPlacementPolicyBookie() throws Exception {
+        repp.uninitalize();
+
+        int minNumRacksPerWriteQuorum = 3;
+        ClientConfiguration clientConf = new ClientConfiguration(conf);
+        clientConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorum);
+        // set enforceMinNumRacksPerWriteQuorum
+        clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
+        repp = new RackawareEnsemblePlacementPolicy();
+        repp.initialize(clientConf, Optional.<DNSToSwitchMapping> empty(), timer, DISABLE_ALL,
+                NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        int numOfRacks = 3;
+        int numOfBookiesPerRack = 3;
+        String[] rackLocationNames = new String[numOfRacks];
+        List<BookieId> bookieSocketAddresses = new ArrayList<BookieId>();
+        Map<BookieId, String> bookieRackMap = new HashMap<BookieId, String>();
+        BookieId bookieAddress;
+
+        for (int i = 0; i < numOfRacks; i++) {
+            rackLocationNames[i] = "/default-region/r" + i;
+            for (int j = 0; j < numOfBookiesPerRack; j++) {
+                int index = i * numOfBookiesPerRack + j;
+                bookieAddress = new BookieSocketAddress("128.0.0." + index, 3181).toBookieId();
+                StaticDNSResolver.addNodeToRack("128.0.0." + index, rackLocationNames[i]);
+                bookieSocketAddresses.add(bookieAddress);
+                bookieRackMap.put(bookieAddress, rackLocationNames[i]);
+            }
+        }
+
+        repp.onClusterChanged(new HashSet<BookieId>(bookieSocketAddresses), new HashSet<BookieId>());
+
+        int writeQuorum = 3;
+        int ackQuorum = 3;
+
+        //test three knows bookie
+        List<BookieId> knowsEnsemble = new ArrayList<>();
+        knowsEnsemble.add(BookieId.parse("128.0.0.0:3181"));
+        knowsEnsemble.add(BookieId.parse("128.0.0.1:3181"));
+        knowsEnsemble.add(BookieId.parse("128.0.0.2:3181"));
+
+        PlacementPolicyAdherence placementPolicyAdherence = repp.isEnsembleAdheringToPlacementPolicy(
+                knowsEnsemble, writeQuorum, ackQuorum);
+        assertEquals(placementPolicyAdherence, PlacementPolicyAdherence.FAIL);
+
+        Map<Integer, BookieId> targetBookie =
+                repp.replaceNotAdheringPlacementPolicyBookie(knowsEnsemble, ackQuorum, writeQuorum,
+                        Collections.emptyMap());
+        //should replace two bookie
+        assertEquals(targetBookie.size(), 2);
+
+        for (Map.Entry<Integer, BookieId> entry : targetBookie.entrySet()) {
+            knowsEnsemble.set(entry.getKey(), entry.getValue());
+        }
+
+        placementPolicyAdherence = repp.isEnsembleAdheringToPlacementPolicy(
+                knowsEnsemble, writeQuorum, ackQuorum);
+        assertEquals(placementPolicyAdherence, PlacementPolicyAdherence.MEETS_STRICT);
+
+        //test three unknowns bookie
+        List<BookieId> unknownEnsembles = new ArrayList<>();
+        unknownEnsembles.add(BookieId.parse("128.0.0.100:3181"));
+        unknownEnsembles.add(BookieId.parse("128.0.0.101:3181"));
+        unknownEnsembles.add(BookieId.parse("128.0.0.102:3181"));
+
+        placementPolicyAdherence = repp.isEnsembleAdheringToPlacementPolicy(
+                unknownEnsembles, writeQuorum, ackQuorum);
+        assertEquals(placementPolicyAdherence, PlacementPolicyAdherence.FAIL);
+
+        //should replace three bookie
+        targetBookie = repp.replaceNotAdheringPlacementPolicyBookie(unknownEnsembles, ackQuorum, writeQuorum,
+                Collections.emptyMap());
+        assertEquals(targetBookie.size(), 3);
+
+        for (Map.Entry<Integer, BookieId> entry : targetBookie.entrySet()) {
+            unknownEnsembles.set(entry.getKey(), entry.getValue());
+        }
+
+        placementPolicyAdherence = repp.isEnsembleAdheringToPlacementPolicy(
+                unknownEnsembles, writeQuorum, ackQuorum);
+        assertEquals(placementPolicyAdherence, PlacementPolicyAdherence.MEETS_STRICT);

Review Comment:
   If all ensembles are missing, doesn't that means the entry can not recover anymore?



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java:
##########
@@ -502,10 +570,40 @@ private boolean isLastSegmentOpenAndMissingBookies(LedgerHandle lh) throws BKExc
      */
     private Set<LedgerFragment> getUnderreplicatedFragments(LedgerHandle lh, Long ledgerVerificationPercentage)
             throws InterruptedException {
+        //The data loss fragments is first to repair. If a fragment is data_loss and not_adhering_placement
+        //at the same time, we only fix data_loss in this time. After fix data_loss, the fragment is still
+        //not_adhering_placement, Auditor will mark this ledger again.
+        Set<LedgerFragment> underreplicatedFragments = new HashSet<>();
+
+        Set<LedgerFragment> dataLossFragments = getDataLossFragments(lh, ledgerVerificationPercentage);
+        underreplicatedFragments.addAll(dataLossFragments);
+
+        Set<LedgerFragment> notAdheringFragments = getNeedRepairedPlacementNotAdheringFragments(lh);
+
+        for (LedgerFragment notAdheringFragment : notAdheringFragments) {
+            if (!checkFragmentRepeat(underreplicatedFragments, notAdheringFragment)) {

Review Comment:
   use `!underreplicatedFragments.contains(notAdheringFragment)`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] horizonzy commented on a diff in pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#discussion_r928385603


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java:
##########
@@ -502,10 +570,40 @@ private boolean isLastSegmentOpenAndMissingBookies(LedgerHandle lh) throws BKExc
      */
     private Set<LedgerFragment> getUnderreplicatedFragments(LedgerHandle lh, Long ledgerVerificationPercentage)
             throws InterruptedException {
+        //The data loss fragments is first to repair. If a fragment is data_loss and not_adhering_placement
+        //at the same time, we only fix data_loss in this time. After fix data_loss, the fragment is still
+        //not_adhering_placement, Auditor will mark this ledger again.
+        Set<LedgerFragment> underreplicatedFragments = new HashSet<>();
+
+        Set<LedgerFragment> dataLossFragments = getDataLossFragments(lh, ledgerVerificationPercentage);
+        underreplicatedFragments.addAll(dataLossFragments);
+
+        Set<LedgerFragment> notAdheringFragments = getNeedRepairedPlacementNotAdheringFragments(lh);
+
+        for (LedgerFragment notAdheringFragment : notAdheringFragments) {
+            if (!checkFragmentRepeat(underreplicatedFragments, notAdheringFragment)) {

Review Comment:
   Maybe not, we didn't overrride ledgerFragment equals method, and we just compare the first ledgerId and last ledgerId, so we use checkFragmentRepeat to handle the case.



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java:
##########
@@ -364,6 +364,18 @@ DistributionSchedule.WriteSet reorderReadLACSequence(
     default void updateBookieInfo(Map<BookieId, BookieInfo> bookieInfoMap) {
     }
 
+    /**
+     * Replace some bookie to adhering placement policy. If the all kinds of replacement
+     * didn't adhere placement policy, return empty map.
+     *
+     * @param ensemble
+     * @param writeQuorumSize
+     * @param ackQuorumSize
+     * @return Map: key means ensemble index, value means target replace bookieId.
+     */
+    Map<Integer, BookieId> replaceNotAdheringPlacementPolicyBookie(List<BookieId> ensemble, int writeQuorumSize,

Review Comment:
   That's fine.



##########
bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java:
##########
@@ -1417,6 +1417,176 @@ public boolean validate() {
         }
     }
 
+    @Test
+    public void testReplaceNotAdheringPlacementPolicyBookie() throws Exception {
+        repp.uninitalize();
+
+        int minNumRacksPerWriteQuorum = 3;
+        ClientConfiguration clientConf = new ClientConfiguration(conf);
+        clientConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorum);
+        // set enforceMinNumRacksPerWriteQuorum
+        clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
+        repp = new RackawareEnsemblePlacementPolicy();
+        repp.initialize(clientConf, Optional.<DNSToSwitchMapping> empty(), timer, DISABLE_ALL,
+                NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        int numOfRacks = 3;
+        int numOfBookiesPerRack = 3;
+        String[] rackLocationNames = new String[numOfRacks];
+        List<BookieId> bookieSocketAddresses = new ArrayList<BookieId>();
+        Map<BookieId, String> bookieRackMap = new HashMap<BookieId, String>();
+        BookieId bookieAddress;
+
+        for (int i = 0; i < numOfRacks; i++) {
+            rackLocationNames[i] = "/default-region/r" + i;
+            for (int j = 0; j < numOfBookiesPerRack; j++) {
+                int index = i * numOfBookiesPerRack + j;
+                bookieAddress = new BookieSocketAddress("128.0.0." + index, 3181).toBookieId();
+                StaticDNSResolver.addNodeToRack("128.0.0." + index, rackLocationNames[i]);
+                bookieSocketAddresses.add(bookieAddress);
+                bookieRackMap.put(bookieAddress, rackLocationNames[i]);
+            }
+        }
+
+        repp.onClusterChanged(new HashSet<BookieId>(bookieSocketAddresses), new HashSet<BookieId>());
+
+        int writeQuorum = 3;
+        int ackQuorum = 3;
+
+        //test three knows bookie
+        List<BookieId> knowsEnsemble = new ArrayList<>();
+        knowsEnsemble.add(BookieId.parse("128.0.0.0:3181"));
+        knowsEnsemble.add(BookieId.parse("128.0.0.1:3181"));
+        knowsEnsemble.add(BookieId.parse("128.0.0.2:3181"));
+
+        PlacementPolicyAdherence placementPolicyAdherence = repp.isEnsembleAdheringToPlacementPolicy(
+                knowsEnsemble, writeQuorum, ackQuorum);
+        assertEquals(placementPolicyAdherence, PlacementPolicyAdherence.FAIL);
+
+        Map<Integer, BookieId> targetBookie =
+                repp.replaceNotAdheringPlacementPolicyBookie(knowsEnsemble, ackQuorum, writeQuorum,
+                        Collections.emptyMap());
+        //should replace two bookie
+        assertEquals(targetBookie.size(), 2);
+
+        for (Map.Entry<Integer, BookieId> entry : targetBookie.entrySet()) {
+            knowsEnsemble.set(entry.getKey(), entry.getValue());
+        }
+
+        placementPolicyAdherence = repp.isEnsembleAdheringToPlacementPolicy(
+                knowsEnsemble, writeQuorum, ackQuorum);
+        assertEquals(placementPolicyAdherence, PlacementPolicyAdherence.MEETS_STRICT);
+
+        //test three unknowns bookie
+        List<BookieId> unknownEnsembles = new ArrayList<>();
+        unknownEnsembles.add(BookieId.parse("128.0.0.100:3181"));
+        unknownEnsembles.add(BookieId.parse("128.0.0.101:3181"));
+        unknownEnsembles.add(BookieId.parse("128.0.0.102:3181"));
+
+        placementPolicyAdherence = repp.isEnsembleAdheringToPlacementPolicy(
+                unknownEnsembles, writeQuorum, ackQuorum);
+        assertEquals(placementPolicyAdherence, PlacementPolicyAdherence.FAIL);
+
+        //should replace three bookie
+        targetBookie = repp.replaceNotAdheringPlacementPolicyBookie(unknownEnsembles, ackQuorum, writeQuorum,
+                Collections.emptyMap());
+        assertEquals(targetBookie.size(), 3);
+
+        for (Map.Entry<Integer, BookieId> entry : targetBookie.entrySet()) {
+            unknownEnsembles.set(entry.getKey(), entry.getValue());
+        }
+
+        placementPolicyAdherence = repp.isEnsembleAdheringToPlacementPolicy(
+                unknownEnsembles, writeQuorum, ackQuorum);
+        assertEquals(placementPolicyAdherence, PlacementPolicyAdherence.MEETS_STRICT);

Review Comment:
   What is `If all ensembles are missing` meaning?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] horizonzy commented on a diff in pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#discussion_r938691945


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java:
##########
@@ -1079,4 +1084,237 @@ public boolean areAckedBookiesAdheringToPlacementPolicy(Set<BookieId> ackedBooki
         }
         return rackCounter.size() >= minWriteQuorumNumRacksPerWriteQuorum;
     }
+
+    @Override
+    public PlacementResult<List<BookieId>> replaceToAdherePlacementPolicy(

Review Comment:
   @equanz I have cherry pick your code and enhance it, could you help to check it. Thanks a lot.
   I will append some test case to cover it.
   



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java:
##########
@@ -1079,4 +1084,237 @@ public boolean areAckedBookiesAdheringToPlacementPolicy(Set<BookieId> ackedBooki
         }
         return rackCounter.size() >= minWriteQuorumNumRacksPerWriteQuorum;
     }
+
+    @Override
+    public PlacementResult<List<BookieId>> replaceToAdherePlacementPolicy(

Review Comment:
   @equanz I have cherry pick your code and enhance it, could you help to check it. Thanks a lot.
   I will append some test case to cover it later.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] equanz commented on a diff in pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
equanz commented on code in PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#discussion_r939797743


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopology.java:
##########
@@ -29,6 +29,7 @@ public interface NetworkTopology {
     String DEFAULT_RACK = "/default-rack";
     String DEFAULT_ZONE = "/default-zone";
     String DEFAULT_UPGRADEDOMAIN = "/default-upgradedomain";
+    String INACTIVE = "/inactive";

Review Comment:
   [nits]
   Unused definition?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] horizonzy commented on a diff in pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#discussion_r914468095


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java:
##########
@@ -790,6 +794,113 @@ public void updateBookieInfo(Map<BookieId, BookieInfo> bookieInfoMap) {
         }
     }
 
+    @Override
+    public Map<Integer, BookieId> replaceNotAdheringPlacementPolicyBookie(List<BookieId> ensemble, int writeQuorumSize,
+            int ackQuorumSize) {
+        if (CollectionUtils.isEmpty(ensemble)) {
+            return Collections.emptyMap();
+        }
+        PlacementPolicyAdherence ensembleAdheringToPlacementPolicy = isEnsembleAdheringToPlacementPolicy(ensemble,
+                writeQuorumSize, ackQuorumSize);
+        if (PlacementPolicyAdherence.FAIL != ensembleAdheringToPlacementPolicy) {

Review Comment:
   Needn't do it. `MEET_SOFT` means the ensemble network location is more than `minNumZonesPerWriteQuorum`. 



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java:
##########
@@ -790,6 +794,113 @@ public void updateBookieInfo(Map<BookieId, BookieInfo> bookieInfoMap) {
         }
     }
 
+    @Override
+    public Map<Integer, BookieId> replaceNotAdheringPlacementPolicyBookie(List<BookieId> ensemble, int writeQuorumSize,
+            int ackQuorumSize) {
+        if (CollectionUtils.isEmpty(ensemble)) {
+            return Collections.emptyMap();
+        }
+        PlacementPolicyAdherence ensembleAdheringToPlacementPolicy = isEnsembleAdheringToPlacementPolicy(ensemble,
+                writeQuorumSize, ackQuorumSize);
+        if (PlacementPolicyAdherence.FAIL != ensembleAdheringToPlacementPolicy) {
+            return Collections.emptyMap();
+        }
+        Map<BookieId, Integer> bookieIndex = new HashMap<>();
+        for (int i = 0; i < ensemble.size(); i++) {
+            bookieIndex.put(ensemble.get(i), i);
+        }
+
+        Map<BookieId, BookieNode> clone = new HashMap<>(knownBookies);
+
+        Map<String, List<BookieNode>> toPlaceGroup = new HashMap<>();
+        for (BookieId bookieId : ensemble) {
+            //If the bookieId shutdown, put it to inactive.
+            BookieNode bookieNode = clone.get(bookieId);

Review Comment:
   In theory, If the the fragment is DATA_LOSS, it won't invoke this method. It will repair data loss firstly. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [bookkeeper] horizonzy commented on a diff in pull request #3359: Feature: auto recover support repaired not adhering placement ledger

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#discussion_r914468095


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java:
##########
@@ -790,6 +794,113 @@ public void updateBookieInfo(Map<BookieId, BookieInfo> bookieInfoMap) {
         }
     }
 
+    @Override
+    public Map<Integer, BookieId> replaceNotAdheringPlacementPolicyBookie(List<BookieId> ensemble, int writeQuorumSize,
+            int ackQuorumSize) {
+        if (CollectionUtils.isEmpty(ensemble)) {
+            return Collections.emptyMap();
+        }
+        PlacementPolicyAdherence ensembleAdheringToPlacementPolicy = isEnsembleAdheringToPlacementPolicy(ensemble,
+                writeQuorumSize, ackQuorumSize);
+        if (PlacementPolicyAdherence.FAIL != ensembleAdheringToPlacementPolicy) {

Review Comment:
   Needn't do it. `MEET_SOFT` means the ensemble network location is more than `minNumZonesPerWriteQuorum`, but the one zone holds two same `upgradeDomain` bookies.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org