You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2022/08/07 14:07:16 UTC

[GitHub] [bookkeeper] eolivelli commented on a diff in pull request #3359: Feature: auto recover support repaired not adhering placement ledger

eolivelli commented on code in PR #3359:
URL: https://github.com/apache/bookkeeper/pull/3359#discussion_r939673475


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java:
##########
@@ -1079,4 +1083,236 @@ public boolean areAckedBookiesAdheringToPlacementPolicy(Set<BookieId> ackedBooki
         }
         return rackCounter.size() >= minWriteQuorumNumRacksPerWriteQuorum;
     }
+
+    @Override
+    public PlacementResult<List<BookieId>> replaceToAdherePlacementPolicy(
+            int ensembleSize,
+            int writeQuorumSize,
+            int ackQuorumSize,
+            Set<BookieId> excludeBookies,
+            List<BookieId> currentEnsemble) {
+        rwLock.readLock().lock();
+        try {
+            PlacementPolicyAdherence currentPlacementAdherence = isEnsembleAdheringToPlacementPolicy(
+                    currentEnsemble, writeQuorumSize, ackQuorumSize);
+            if (PlacementPolicyAdherence.FAIL != currentPlacementAdherence) {
+                return PlacementResult.of(new ArrayList<>(currentEnsemble), currentPlacementAdherence);
+            }
+            for (BookieId bookieId : currentEnsemble) {
+                if (!knownBookies.containsKey(bookieId)) {
+                    excludeBookies.add(bookieId);
+                }
+            }
+            PlacementResult<List<BookieId>> placementResult = PlacementResult.of(Collections.emptyList(),
+                    PlacementPolicyAdherence.FAIL);
+            int minDiffer = Integer.MAX_VALUE;
+            for (int i = 0; i < currentEnsemble.size(); i++) {
+                PlacementResult<List<BookieId>> result = doReplaceToAdherePlacementPolicy(ensembleSize,
+                        writeQuorumSize, ackQuorumSize, excludeBookies, currentEnsemble, i);
+                if (PlacementPolicyAdherence.FAIL == result.getAdheringToPolicy()) {
+                    continue;
+                }
+                int differ = differBetweenBookies(currentEnsemble, result.getResult());
+                if (differ < minDiffer) {
+                    minDiffer = differ;
+                    placementResult = result;
+                    if (minDiffer == 1) {
+                        break;
+                    }
+                }
+            }
+            return placementResult;
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    private int differBetweenBookies(List<BookieId> bookiesA, List<BookieId> bookiesB) {

Review Comment:
   Nit: static?



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java:
##########
@@ -1079,4 +1083,236 @@ public boolean areAckedBookiesAdheringToPlacementPolicy(Set<BookieId> ackedBooki
         }
         return rackCounter.size() >= minWriteQuorumNumRacksPerWriteQuorum;
     }
+
+    @Override
+    public PlacementResult<List<BookieId>> replaceToAdherePlacementPolicy(
+            int ensembleSize,
+            int writeQuorumSize,
+            int ackQuorumSize,
+            Set<BookieId> excludeBookies,
+            List<BookieId> currentEnsemble) {
+        rwLock.readLock().lock();
+        try {
+            PlacementPolicyAdherence currentPlacementAdherence = isEnsembleAdheringToPlacementPolicy(
+                    currentEnsemble, writeQuorumSize, ackQuorumSize);
+            if (PlacementPolicyAdherence.FAIL != currentPlacementAdherence) {
+                return PlacementResult.of(new ArrayList<>(currentEnsemble), currentPlacementAdherence);
+            }
+            for (BookieId bookieId : currentEnsemble) {
+                if (!knownBookies.containsKey(bookieId)) {
+                    excludeBookies.add(bookieId);
+                }
+            }
+            PlacementResult<List<BookieId>> placementResult = PlacementResult.of(Collections.emptyList(),
+                    PlacementPolicyAdherence.FAIL);
+            int minDiffer = Integer.MAX_VALUE;
+            for (int i = 0; i < currentEnsemble.size(); i++) {
+                PlacementResult<List<BookieId>> result = doReplaceToAdherePlacementPolicy(ensembleSize,
+                        writeQuorumSize, ackQuorumSize, excludeBookies, currentEnsemble, i);
+                if (PlacementPolicyAdherence.FAIL == result.getAdheringToPolicy()) {
+                    continue;
+                }
+                int differ = differBetweenBookies(currentEnsemble, result.getResult());
+                if (differ < minDiffer) {
+                    minDiffer = differ;
+                    placementResult = result;
+                    if (minDiffer == 1) {
+                        break;
+                    }
+                }
+            }
+            return placementResult;
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    private int differBetweenBookies(List<BookieId> bookiesA, List<BookieId> bookiesB) {
+        if (CollectionUtils.isEmpty(bookiesA) || CollectionUtils.isEmpty(bookiesB)) {
+            return Integer.MAX_VALUE;
+        }
+        if (bookiesA.size() != bookiesB.size()) {
+            return Integer.MAX_VALUE;
+        }
+        int differ = 0;
+        for (int i = 0; i < bookiesA.size(); i++) {
+            if (!bookiesA.get(i).equals(bookiesB.get(i))) {
+                differ++;
+            }
+        }
+        return differ;
+    }
+
+    private PlacementResult<List<BookieId>> doReplaceToAdherePlacementPolicy(
+            int ensembleSize,
+            int writeQuorumSize,
+            int ackQuorumSize,
+            Set<BookieId> excludeBookies,
+            List<BookieId> currentEnsemble,
+            int startIndex) {
+        final List<BookieNode> provisionalEnsembleNodes = currentEnsemble.stream()
+                .map(this::convertBookieToNode).collect(Collectors.toList());
+        final Set<Node> excludeNodes = convertBookiesToNodes(
+                addDefaultRackBookiesIfMinNumRacksIsEnforced(excludeBookies));
+        int minNumRacksPerWriteQuorumForThisEnsemble = Math.min(writeQuorumSize, minNumRacksPerWriteQuorum);
+        final RRTopologyAwareCoverageEnsemble ensemble =
+                new RRTopologyAwareCoverageEnsemble(
+                        ensembleSize,
+                        writeQuorumSize,
+                        ackQuorumSize,
+                        RACKNAME_DISTANCE_FROM_LEAVES,
+                        null,
+                        null,
+                        minNumRacksPerWriteQuorumForThisEnsemble);
+        int numRacks = topology.getNumOfRacks();
+        // only one rack or less than minNumRacksPerWriteQuorumForThisEnsemble, stop calculation to skip relocation
+        if (numRacks < 2 || numRacks < minNumRacksPerWriteQuorumForThisEnsemble) {
+            LOG.warn("Skip ensemble relocation because the cluster has only {} rack.", numRacks);
+            return PlacementResult.of(Collections.emptyList(), PlacementPolicyAdherence.FAIL);
+        }
+        BookieNode prevNode = null;
+        final BookieNode firstNode = provisionalEnsembleNodes.get(startIndex);
+        // use same bookie at first to reduce ledger replication
+        if (!excludeNodes.contains(firstNode) && ensemble.apply(firstNode, ensemble)
+                && ensemble.addNode(firstNode)) {
+            excludeNodes.add(firstNode);
+            prevNode = firstNode;
+        }
+        for (int i = prevNode == null ? 0 : 1; i < ensembleSize; i++) {
+            int index = (startIndex + i) % ensembleSize;
+            final String curRack;
+            if (null == prevNode) {
+                if ((null == localNode) || defaultRack.equals(localNode.getNetworkLocation())) {
+                    curRack = NodeBase.ROOT;
+                } else {
+                    curRack = localNode.getNetworkLocation();
+                }
+            } else {
+                curRack = "~" + prevNode.getNetworkLocation();
+            }
+            try {
+                prevNode = replaceToAdherePlacementPolicyInternal(
+                        curRack, excludeNodes, ensemble, ensemble,
+                        provisionalEnsembleNodes, index, ensembleSize, minNumRacksPerWriteQuorumForThisEnsemble);
+                // got a good candidate
+                if (ensemble.addNode(prevNode)) {
+                    // add the candidate to exclude set
+                    excludeNodes.add(prevNode);
+                } else {
+                    throw new BKNotEnoughBookiesException();
+                }
+                // replace to newer node
+                provisionalEnsembleNodes.set(index, prevNode);
+            } catch (BKNotEnoughBookiesException e) {
+                LOG.warn("Skip ensemble relocation because the cluster has not enough bookies.");
+                return PlacementResult.of(Collections.emptyList(), PlacementPolicyAdherence.FAIL);
+            }
+        }
+        List<BookieId> bookieList = ensemble.toList();
+        if (ensembleSize != bookieList.size()) {
+            LOG.warn("Not enough {} bookies are available to form an ensemble : {}.",
+                    ensembleSize, bookieList);
+            return PlacementResult.of(Collections.emptyList(), PlacementPolicyAdherence.FAIL);
+        }
+        PlacementPolicyAdherence placementPolicyAdherence = isEnsembleAdheringToPlacementPolicy(bookieList,
+                writeQuorumSize, ackQuorumSize);
+        if (PlacementPolicyAdherence.FAIL == placementPolicyAdherence) {
+            return PlacementResult.of(Collections.emptyList(), PlacementPolicyAdherence.FAIL);
+        }
+        return PlacementResult.of(revertBookieListByIndex(bookieList, startIndex), placementPolicyAdherence);
+    }
+
+    private List<BookieId> revertBookieListByIndex(List<BookieId> bookies, int startIndex) {
+        BookieId[] bookieIds = new BookieId[bookies.size()];
+        for (int i = 0; i < bookies.size(); i++) {
+            if (startIndex == bookies.size()) {
+                startIndex = 0;
+            }
+            bookieIds[startIndex++] = bookies.get(i);
+        }
+        return Lists.newArrayList(bookieIds);
+    }
+
+    private BookieNode replaceToAdherePlacementPolicyInternal(
+            String netPath, Set<Node> excludeBookies, Predicate<BookieNode> predicate,
+            Ensemble<BookieNode> ensemble, List<BookieNode> provisionalEnsembleNodes, int ensembleIndex,
+            int ensembleSize, int minNumRacksPerWriteQuorumForThisEnsemble) throws BKNotEnoughBookiesException {
+        final BookieNode currentNode = provisionalEnsembleNodes.get(ensembleIndex);
+        // if the current bookie could be applied to the ensemble, apply it to minify the number of bookies replaced
+        if (!excludeBookies.contains(currentNode) && predicate.apply(currentNode, ensemble)) {
+            return currentNode;
+        }
+        final List<Pair<String, List<BookieNode>>> conditionList = new ArrayList<>();
+        final Set<String> preExcludeRacks = new HashSet<>();
+        final Set<String> postExcludeRacks = new HashSet<>();
+        for (int i = 0; i < minNumRacksPerWriteQuorumForThisEnsemble - 1; i++) {
+            preExcludeRacks.add(provisionalEnsembleNodes.get(Math.floorMod((ensembleIndex - i - 1), ensembleSize))
+                    .getNetworkLocation());
+            postExcludeRacks.add(provisionalEnsembleNodes.get(Math.floorMod((ensembleIndex + i + 1), ensembleSize))
+                    .getNetworkLocation());
+        }
+        // adhere minNumRacksPerWriteQuorum by preExcludeRacks
+        // avoid additional replace from write quorum candidates by preExcludeRacks and postExcludeRacks
+        // avoid to use first candidate bookies for election by provisionalEnsembleNodes
+        conditionList.add(Pair.of(
+                "~" + String.join(",",
+                        Stream.concat(preExcludeRacks.stream(), postExcludeRacks.stream()).collect(Collectors.toSet())),
+                provisionalEnsembleNodes
+        ));
+        // avoid to use same rack between previous index by netPath
+        // avoid to use first candidate bookies for election by provisionalEnsembleNodes
+        conditionList.add(Pair.of(netPath, provisionalEnsembleNodes));
+        // avoid to use same rack between previous index by netPath
+        conditionList.add(Pair.of(netPath, Collections.emptyList()));
+
+        for (Pair<String, List<BookieNode>> condition : conditionList) {
+            WeightedRandomSelection<BookieNode> wRSelection = null;
+
+            final List<Node> leaves = new ArrayList<>(topology.getLeaves(condition.getLeft()));
+            if (!isWeighted) {
+                Collections.shuffle(leaves);
+            } else {
+                if (CollectionUtils.subtract(leaves, excludeBookies).size() < 1) {
+                    throw new BKNotEnoughBookiesException();
+                }
+                wRSelection = prepareForWeightedSelection(leaves);
+                if (wRSelection == null) {
+                    throw new BKNotEnoughBookiesException();
+                }
+            }
+
+            final Iterator<Node> it = leaves.iterator();
+            final Set<Node> bookiesSeenSoFar = new HashSet<>();
+            while (true) {
+                Node n;
+                if (isWeighted) {
+                    if (bookiesSeenSoFar.size() == leaves.size()) {
+                        // Don't loop infinitely.
+                        break;
+                    }
+                    n = wRSelection.getNextRandom();
+                    bookiesSeenSoFar.add(n);
+                } else {
+                    if (it.hasNext()) {
+                        n = it.next();
+                    } else {
+                        break;
+                    }
+                }
+                if (excludeBookies.contains(n)) {
+                    continue;
+                }
+                if (!(n instanceof BookieNode) || !predicate.apply((BookieNode) n, ensemble)) {
+                    continue;
+                }
+                // additional excludeBookies
+                if (condition.getRight().contains(n)) {
+                    continue;
+                }
+                BookieNode bn = (BookieNode) n;
+                return bn;
+            }
+        }
+        throw new BKNotEnoughBookiesException();

Review Comment:
   Can we log something if we reach to this point?
   



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java:
##########
@@ -440,6 +440,31 @@ default boolean areAckedBookiesAdheringToPlacementPolicy(Set<BookieId> ackedBook
         return true;
     }
 
+    /**
+     * Returns placement result. If the currentEnsemble is not adhering placement policy, returns new ensemble that
+     * adheres placement policy. It should be implemented so as to minify the number of bookies replaced.
+     *
+     * @param ensembleSize
+     *            ensemble size
+     * @param writeQuorumSize
+ *                writeQuorumSize of the ensemble
+     * @param ackQuorumSize
+     *            ackQuorumSize of the ensemble
+     * @param excludeBookies
+     *            bookies that should not be considered as targets
+     * @param currentEnsemble
+     *            current ensemble
+     * @return a placement result
+     */
+    default PlacementResult<List<BookieId>> replaceToAdherePlacementPolicy(
+            int ensembleSize,
+            int writeQuorumSize,
+            int ackQuorumSize,
+            Set<BookieId> excludeBookies,
+            List<BookieId> currentEnsemble) {
+        throw new UnsupportedOperationException();

Review Comment:
   What happens if you don't override this method?
   
   Arw we handling this exception in the code that calls this method?
   
   My understanding is that we cannot provide a good default implementation.
   
   In the called code we could catch this exception, log something and abort gracefully the operation 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org