You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/08/23 16:47:14 UTC

[GitHub] eolivelli commented on a change in pull request #1621: Recovery uses immutable metadata

eolivelli commented on a change in pull request #1621: Recovery uses immutable metadata
URL: https://github.com/apache/bookkeeper/pull/1621#discussion_r212368625
 
 

 ##########
 File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
 ##########
 @@ -207,4 +206,190 @@ public void readLastConfirmedComplete(int rc, long lastConfirmed, Object ctx) {
             }
         }, ctx);
     }
+
+    List<BookieSocketAddress> replaceBookiesInEnsemble(LedgerMetadata metadata,
+                                                       List<BookieSocketAddress> oldEnsemble,
+                                                       Map<Integer, BookieSocketAddress> failedBookies)
+            throws BKException.BKNotEnoughBookiesException {
+        List<BookieSocketAddress> newEnsemble = new ArrayList<BookieSocketAddress>(oldEnsemble);
+
+        int ensembleSize = metadata.getEnsembleSize();
+        int writeQ = metadata.getWriteQuorumSize();
+        int ackQ = metadata.getAckQuorumSize();
+        Map<String, byte[]> customMetadata = metadata.getCustomMetadata();
+
+        Set<BookieSocketAddress> exclude = new HashSet<BookieSocketAddress>(failedBookies.values());
+
+        int replaced = 0;
+        for (Map.Entry<Integer, BookieSocketAddress> entry : failedBookies.entrySet()) {
+            int idx = entry.getKey();
+            BookieSocketAddress addr = entry.getValue();
+            LOG.debug("[EnsembleChange-L{}] replacing bookie: {} index: {}", getId(), addr, idx);
+
+            if (!newEnsemble.get(idx).equals(addr)) {
+                LOG.debug("[EnsembleChange-L{}] Not changing failed bookie {} at index {}, already changed to {}",
+                          getId(), addr, idx, newEnsemble.get(idx));
+                continue;
+            }
+            try {
+                BookieSocketAddress newBookie = clientCtx.getBookieWatcher().replaceBookie(
+                        ensembleSize, writeQ, ackQ, customMetadata, newEnsemble, idx, exclude);
+                newEnsemble.set(idx, newBookie);
+
+                replaced++;
+            } catch (BKException.BKNotEnoughBookiesException e) {
+                // if there is no bookie replaced, we throw not enough bookie exception
+                if (replaced <= 0) {
+                    throw e;
+                } else {
+                    break;
+                }
+            }
+        }
+        return newEnsemble;
+    }
+
+    private static Set<Integer> diffEnsemble(List<BookieSocketAddress> e1,
+                                             List<BookieSocketAddress> e2) {
+        checkArgument(e1.size() == e2.size(), "Ensembles must be of same size");
+        Set<Integer> diff = new HashSet<Integer>();
+        for (int i = 0; i < e1.size(); i++) {
+            if (!e1.get(i).equals(e2.get(i))) {
+                diff.add(i);
+            }
+        }
+        return diff;
+    }
+
+    @Override
+    void handleBookieFailure(final Map<Integer, BookieSocketAddress> failedBookies) {
+        blockAddCompletions.incrementAndGet();
+
+        // handleBookieFailure should always run in the ordered executor thread for this
+        // ledger, so this synchronized should be unnecessary, but putting it here now
+        // just in case (can be removed when we validate threads)
+        synchronized (this) {
+            long lac = getLastAddConfirmed();
+            LedgerMetadata metadata = getLedgerMetadata();
+            List<BookieSocketAddress> currentEnsemble = getCurrentEnsemble();
+            try {
+                List<BookieSocketAddress> newEnsemble = replaceBookiesInEnsemble(metadata, currentEnsemble,
+                                                                                 failedBookies);
+
+                Set<Integer> replaced = diffEnsemble(currentEnsemble, newEnsemble);
+                blockAddCompletions.decrementAndGet();
+                if (!replaced.isEmpty()) {
+                    newEnsemblesFromRecovery.put(lac + 1, newEnsemble);
+                    unsetSuccessAndSendWriteRequest(newEnsemble, replaced);
+                }
+            } catch (BKException.BKNotEnoughBookiesException e) {
+                LOG.error("Could not get additional bookie to remake ensemble, closing ledger: {}", ledgerId);
+
+                handleUnrecoverableErrorDuringAdd(e.getCode());
+                return;
+            }
+        }
+    }
+
+    @Override
+    void handleUnrecoverableErrorDuringAdd(int rc) {
+        errorOutPendingAdds(rc);
+    }
+
+    void recover(GenericCallback<Void> finalCb) {
+        recover(finalCb, null, false);
+    }
+
+    /**
+     * Recover the ledger.
+     *
+     * @param finalCb
+     *          callback after recovery is done.
+     * @param listener
+     *          read entry listener on recovery reads.
+     * @param forceRecovery
+     *          force the recovery procedure even the ledger metadata shows the ledger is closed.
+     */
+    void recover(GenericCallback<Void> finalCb,
+                 final @VisibleForTesting ReadEntryListener listener,
+                 final boolean forceRecovery) {
+        final GenericCallback<Void> cb = new TimedGenericCallback<Void>(
+            finalCb,
+            BKException.Code.OK,
+            clientCtx.getClientStats().getRecoverOpLogger());
+
+        MetadataUpdateLoop.NeedsUpdatePredicate needsUpdate =
+            (metadata) -> !(metadata.isClosed() || metadata.isInRecovery());
+        if (forceRecovery) {
+            // in the force recovery case, we want to update the metadata
+            // to IN_RECOVERY, even if the ledger is already closed
+            needsUpdate = (metadata) -> !metadata.isInRecovery();
+        }
+        new MetadataUpdateLoop(
+                clientCtx.getLedgerManager(), getId(),
+                this::getLedgerMetadata,
+                needsUpdate,
+                (metadata) -> LedgerMetadataBuilder.from(metadata).withInRecoveryState().build(),
+                this::setLedgerMetadata)
+            .run()
+            .thenCompose((metadata) -> {
+                    if (metadata.isClosed()) {
+                        return CompletableFuture.<Void>completedFuture(null);
+                    } else {
+                        return new LedgerRecoveryOp(ReadOnlyLedgerHandle.this, clientCtx)
+                            .setEntryListener(listener)
+                            .initiate();
+                    }
+            })
+            .thenCompose((ignore) -> closeRecovered())
+            .whenComplete((ignore, ex) -> {
+                    if (ex != null) {
+                        cb.operationComplete(getExceptionCode(ex, BKException.Code.UnexpectedConditionException), null);
+                    } else {
+                        cb.operationComplete(BKException.Code.OK, null);
+                    }
+            });
+    }
+
+    CompletableFuture<LedgerMetadata> closeRecovered() {
+        long lac, len;
+        synchronized (this) {
+            lac = lastAddConfirmed;
+            len = length;
+        }
+        LOG.info("Closing recovered ledger {} at entry {}", getId(), lac);
+        CompletableFuture<LedgerMetadata> f = new MetadataUpdateLoop(
+                clientCtx.getLedgerManager(), getId(),
+                this::getLedgerMetadata,
+                (metadata) -> metadata.isInRecovery(),
+                (metadata) -> {
+                    LedgerMetadataBuilder builder = LedgerMetadataBuilder.from(metadata);
+                    Optional<Long> lastEnsembleKey = metadata.getLastEnsembleKey();
+                    checkState(lastEnsembleKey.isPresent(),
 
 Review comment:
   How an error from this assertion will be handled from the MULoop ? 
   Will the resulting CompletableFuture fail ?
   Do we have test cases ?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services