You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/08/21 13:38:45 UTC

[GitHub] ivankelly commented on a change in pull request #1587: UpdateLedgerOp uses MetadataUpdateLoop

ivankelly commented on a change in pull request #1587: UpdateLedgerOp uses MetadataUpdateLoop
URL: https://github.com/apache/bookkeeper/pull/1587#discussion_r211608073
 
 

 ##########
 File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/client/UpdateLedgerOp.java
 ##########
 @@ -81,174 +87,80 @@ public UpdateLedgerOp(final BookKeeper bkc, final BookKeeperAdmin admin) {
      *             metadata
      */
     public void updateBookieIdInLedgers(final BookieSocketAddress oldBookieId, final BookieSocketAddress newBookieId,
-            final int rate, final int limit, final UpdateLedgerNotifier progressable) throws IOException {
+                                        final int rate, final int limit, final UpdateLedgerNotifier progressable)
+            throws IOException, InterruptedException {
 
-        final ExecutorService executor = Executors
-                .newSingleThreadExecutor(new DefaultThreadFactory("UpdateLedgerThread", true));
         final AtomicInteger issuedLedgerCnt = new AtomicInteger();
         final AtomicInteger updatedLedgerCnt = new AtomicInteger();
-        final Future<?> updateBookieCb = executor.submit(new Runnable() {
-
-            @Override
-            public void run() {
-                updateLedgers(oldBookieId, newBookieId, rate, limit, progressable);
-            }
-
-            private void updateLedgers(final BookieSocketAddress oldBookieId, final BookieSocketAddress newBookieId,
-                    final int rate, final int limit, final UpdateLedgerNotifier progressable) {
-                try {
-                    final AtomicBoolean stop = new AtomicBoolean(false);
-                    final Set<Long> outstandings = Collections.newSetFromMap(new ConcurrentHashMap<Long, Boolean>());
-                    final RateLimiter throttler = RateLimiter.create(rate);
-                    final Iterator<Long> ledgerItr = admin.listLedgers().iterator();
-                    final CountDownLatch syncObj = new CountDownLatch(1);
-
-                    // iterate through all the ledgers
-                    while (ledgerItr.hasNext() && !stop.get()) {
-                        // throttler to control updates per second
-                        throttler.acquire();
-
-                        final Long lId = ledgerItr.next();
-                        final ReadLedgerMetadataCb readCb = new ReadLedgerMetadataCb(bkc, lId, oldBookieId,
-                                newBookieId);
-                        outstandings.add(lId);
-
-                        FutureCallback<Void> updateLedgerCb = new UpdateLedgerCb(lId, stop, issuedLedgerCnt,
-                                updatedLedgerCnt, outstandings, syncObj, progressable);
-                        Futures.addCallback(readCb.getFutureListener(), updateLedgerCb);
-
-                        issuedLedgerCnt.incrementAndGet();
-                        if (limit != Integer.MIN_VALUE && issuedLedgerCnt.get() >= limit || !ledgerItr.hasNext()) {
-                            stop.set(true);
+        final AtomicBoolean errorOccurred = new AtomicBoolean(false);
+        final Set<CompletableFuture<?>> outstanding =
+            Collections.newSetFromMap(new ConcurrentHashMap<CompletableFuture<?>, Boolean>());
+        final RateLimiter throttler = RateLimiter.create(rate);
+        final Iterator<Long> ledgerItr = admin.listLedgers().iterator();
+
+        // iterate through all the ledgers
+        while (ledgerItr.hasNext() && !errorOccurred.get()
+               && (limit == Integer.MIN_VALUE || issuedLedgerCnt.get() < limit)) {
+            // throttler to control updates per second
+            throttler.acquire();
+
+            final long ledgerId = ledgerItr.next();
+            issuedLedgerCnt.incrementAndGet();
+
+            GenericCallbackFuture<LedgerMetadata> promise = new GenericCallbackFuture<>();
+            lm.readLedgerMetadata(ledgerId, promise);
+            promise.thenCompose((readMetadata) -> {
+                    AtomicReference<LedgerMetadata> ref = new AtomicReference<>(readMetadata);
+                    return new MetadataUpdateLoop(
+                            lm, ledgerId,
+                            ref::get,
+                            (metadata) -> {
+                                return metadata.getEnsembles().values().stream()
+                                    .flatMap(Collection::stream)
+                                    .filter(b -> b.equals(oldBookieId))
+                                    .count() > 0;
+                            },
+                            (metadata) -> {
+                                return replaceBookieInEnsembles(metadata, oldBookieId, newBookieId);
+                            },
+                            ref::compareAndSet).run();
+                }).whenComplete((metadata, ex) -> {
+                        LOG.info("Update of {} finished", ledgerId);
+                        if (ex != null
+                            && !(ex instanceof BKException.BKNoSuchLedgerExistsException)) {
+                            LOG.error("Updating ledger metadata {} failed", ledgerId, ex);
+                            errorOccurred.set(true);
+                        } else {
+                            updatedLedgerCnt.incrementAndGet();
+                            progressable.progress(updatedLedgerCnt.get(), issuedLedgerCnt.get());
                         }
-                        bkc.getLedgerManager().readLedgerMetadata(lId, readCb);
-                    }
-                    // waiting till all the issued ledgers are finished
-                    syncObj.await();
-                } catch (IOException ioe) {
-                    LOG.error("Exception while updating ledger", ioe);
-                    throw new RuntimeException("Exception while updating ledger", ioe.getCause());
-                } catch (InterruptedException ie) {
-                    LOG.error("Exception while updating ledger metadata", ie);
-                    Thread.currentThread().interrupt();
-                    throw new RuntimeException("Exception while updating ledger", ie.getCause());
-                }
-            }
-        });
-        try {
-            // Wait to finish the issued ledgers.
-            updateBookieCb.get();
-        } catch (ExecutionException ee) {
-            throw new IOException("Exception while updating ledger", ee);
-        } catch (InterruptedException ie) {
-            Thread.currentThread().interrupt();
-            throw new IOException("Exception while updating ledger", ie);
-        } finally {
-            executor.shutdown();
-        }
-    }
+                    });
 
-    private static final class UpdateLedgerCb implements FutureCallback<Void> {
-        final long ledgerId;
-        final AtomicBoolean stop;
-        final AtomicInteger issuedLedgerCnt;
-        final AtomicInteger updatedLedgerCnt;
-        final Set<Long> outstandings;
-        final CountDownLatch syncObj;
-        final UpdateLedgerNotifier progressable;
-
-        public UpdateLedgerCb(long ledgerId, AtomicBoolean stop, AtomicInteger issuedLedgerCnt,
-                AtomicInteger updatedLedgerCnt, Set<Long> outstandings, CountDownLatch syncObj,
-                UpdateLedgerNotifier progressable) {
-            this.ledgerId = ledgerId;
-            this.stop = stop;
-            this.issuedLedgerCnt = issuedLedgerCnt;
-            this.updatedLedgerCnt = updatedLedgerCnt;
-            this.outstandings = outstandings;
-            this.syncObj = syncObj;
-            this.progressable = progressable;
+            outstanding.add(promise);
+            promise.whenComplete((metadata, ex) -> outstanding.remove(promise));
         }
 
-        @Override
-        public void onFailure(Throwable th) {
-            LOG.error("Error updating ledger {}", ledgerId, th);
-            stop.set(true);
-            finishUpdateLedger();
-        }
-
-        @Override
-        public void onSuccess(Void obj) {
-            updatedLedgerCnt.incrementAndGet();
-            // may print progress
-            progressable.progress(updatedLedgerCnt.get(), issuedLedgerCnt.get());
-            finishUpdateLedger();
-        }
-
-        private void finishUpdateLedger() {
-            outstandings.remove(ledgerId);
-            if (outstandings.isEmpty() && stop.get()) {
-                LOG.info("Total number of ledgers issued={} updated={}", issuedLedgerCnt.get(), updatedLedgerCnt.get());
-                syncObj.countDown();
-            }
+        try {
+            CompletableFuture.allOf(outstanding.stream().toArray(CompletableFuture[]::new)).get();
+            LOG.info("Total number of ledgers issued={} updated={}",
+                     issuedLedgerCnt.get(), updatedLedgerCnt.get());
+        } catch (ExecutionException e) {
+            LOG.info("Error in execution", e);
+            throw new IOException("Error executing update", e);
         }
     }
 
-    private static final class ReadLedgerMetadataCb implements GenericCallback<LedgerMetadata> {
-        final BookKeeper bkc;
-        final Long ledgerId;
-        final BookieSocketAddress curBookieAddr;
-        final BookieSocketAddress toBookieAddr;
-        SettableFuture<Void> future = SettableFuture.create();
-        public ReadLedgerMetadataCb(BookKeeper bkc, Long ledgerId, BookieSocketAddress curBookieAddr,
-                BookieSocketAddress toBookieAddr) {
-            this.bkc = bkc;
-            this.ledgerId = ledgerId;
-            this.curBookieAddr = curBookieAddr;
-            this.toBookieAddr = toBookieAddr;
+    private static LedgerMetadata replaceBookieInEnsembles(LedgerMetadata metadata,
+                                                           BookieSocketAddress oldBookieId,
+                                                           BookieSocketAddress newBookieId) {
+        LedgerMetadataBuilder builder = LedgerMetadataBuilder.from(metadata);
+        for (Map.Entry<Long, ? extends List<BookieSocketAddress>> e : metadata.getEnsembles().entrySet()) {
+            List<BookieSocketAddress> newEnsemble = e.getValue().stream()
+                .map(b -> b.equals(oldBookieId) ? newBookieId : b)
+                .collect(Collectors.toList());
+            builder.withEnsembleEntry(e.getKey(), newEnsemble);
 
 Review comment:
   Will change to newEnsembleEntry and replaceEnsembleEntry with validation for each.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services