You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/08/23 07:51:55 UTC

[GitHub] ivankelly closed pull request #1587: UpdateLedgerOp uses MetadataUpdateLoop

ivankelly closed pull request #1587: UpdateLedgerOp uses MetadataUpdateLoop
URL: https://github.com/apache/bookkeeper/pull/1587
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataBuilder.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataBuilder.java
index 76a7088303..8c37bd5391 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataBuilder.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataBuilder.java
@@ -23,10 +23,10 @@
 import com.google.common.collect.ImmutableMap;
 
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.TreeMap;
 
 import org.apache.bookkeeper.client.api.DigestType;
 import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -41,7 +41,7 @@
     private LedgerMetadataFormat.State state = LedgerMetadataFormat.State.OPEN;
     private Optional<Long> lastEntryId = Optional.empty();
 
-    private Map<Long, List<BookieSocketAddress>> ensembles = new HashMap<>();
+    private TreeMap<Long, List<BookieSocketAddress>> ensembles = new TreeMap<>();
 
     private DigestType digestType = DigestType.CRC32C;
     private Optional<byte[]> password = Optional.empty();
@@ -91,14 +91,26 @@ LedgerMetadataBuilder withEnsembleSize(int ensembleSize) {
         return this;
     }
 
-    LedgerMetadataBuilder withEnsembleEntry(long firstEntry, List<BookieSocketAddress> ensemble) {
+    LedgerMetadataBuilder newEnsembleEntry(long firstEntry, List<BookieSocketAddress> ensemble) {
         checkArgument(ensemble.size() == ensembleSize,
                       "Size of passed in ensemble must match the ensembleSize of the builder");
+        checkArgument(ensembles.isEmpty() || firstEntry > ensembles.lastKey(),
+                      "New entry must have a first entry greater than any existing ensemble key");
+        ensembles.put(firstEntry, ensemble);
+        return this;
+    }
 
+    LedgerMetadataBuilder replaceEnsembleEntry(long firstEntry, List<BookieSocketAddress> ensemble) {
+        checkArgument(ensemble.size() == ensembleSize,
+                      "Size of passed in ensemble must match the ensembleSize of the builder");
+        checkArgument(ensembles.containsKey(firstEntry),
+                      "Ensemble must replace an existing ensemble in the ensemble map");
         ensembles.put(firstEntry, ensemble);
         return this;
     }
 
+
+
     LedgerMetadataBuilder closingAtEntry(long lastEntryId) {
         this.lastEntryId = Optional.of(lastEntryId);
         this.state = LedgerMetadataFormat.State.CLOSED;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/UpdateLedgerOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/UpdateLedgerOp.java
index b55e2d01ae..befd4f3724 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/UpdateLedgerOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/UpdateLedgerOp.java
@@ -18,33 +18,26 @@
 
 package org.apache.bookkeeper.client;
 
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.RateLimiter;
-import com.google.common.util.concurrent.SettableFuture;
-
-import io.netty.util.concurrent.DefaultThreadFactory;
 
 import java.io.IOException;
-import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 
 import org.apache.bookkeeper.bookie.BookieShell.UpdateLedgerNotifier;
+import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.net.BookieSocketAddress;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallbackFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,11 +47,11 @@
 public class UpdateLedgerOp {
 
     private static final Logger LOG = LoggerFactory.getLogger(UpdateLedgerOp.class);
-    private final BookKeeper bkc;
+    private final LedgerManager lm;
     private final BookKeeperAdmin admin;
 
     public UpdateLedgerOp(final BookKeeper bkc, final BookKeeperAdmin admin) {
-        this.bkc = bkc;
+        this.lm = bkc.getLedgerManager();
         this.admin = admin;
     }
 
@@ -81,174 +74,100 @@ public UpdateLedgerOp(final BookKeeper bkc, final BookKeeperAdmin admin) {
      *             metadata
      */
     public void updateBookieIdInLedgers(final BookieSocketAddress oldBookieId, final BookieSocketAddress newBookieId,
-            final int rate, final int limit, final UpdateLedgerNotifier progressable) throws IOException {
+                                        final int rate, final int limit, final UpdateLedgerNotifier progressable)
+            throws IOException, InterruptedException {
 
-        final ExecutorService executor = Executors
-                .newSingleThreadExecutor(new DefaultThreadFactory("UpdateLedgerThread", true));
         final AtomicInteger issuedLedgerCnt = new AtomicInteger();
         final AtomicInteger updatedLedgerCnt = new AtomicInteger();
-        final Future<?> updateBookieCb = executor.submit(new Runnable() {
-
-            @Override
-            public void run() {
-                updateLedgers(oldBookieId, newBookieId, rate, limit, progressable);
-            }
-
-            private void updateLedgers(final BookieSocketAddress oldBookieId, final BookieSocketAddress newBookieId,
-                    final int rate, final int limit, final UpdateLedgerNotifier progressable) {
-                try {
-                    final AtomicBoolean stop = new AtomicBoolean(false);
-                    final Set<Long> outstandings = Collections.newSetFromMap(new ConcurrentHashMap<Long, Boolean>());
-                    final RateLimiter throttler = RateLimiter.create(rate);
-                    final Iterator<Long> ledgerItr = admin.listLedgers().iterator();
-                    final CountDownLatch syncObj = new CountDownLatch(1);
-
-                    // iterate through all the ledgers
-                    while (ledgerItr.hasNext() && !stop.get()) {
-                        // throttler to control updates per second
-                        throttler.acquire();
-
-                        final Long lId = ledgerItr.next();
-                        final ReadLedgerMetadataCb readCb = new ReadLedgerMetadataCb(bkc, lId, oldBookieId,
-                                newBookieId);
-                        outstandings.add(lId);
-
-                        FutureCallback<Void> updateLedgerCb = new UpdateLedgerCb(lId, stop, issuedLedgerCnt,
-                                updatedLedgerCnt, outstandings, syncObj, progressable);
-                        Futures.addCallback(readCb.getFutureListener(), updateLedgerCb);
-
-                        issuedLedgerCnt.incrementAndGet();
-                        if (limit != Integer.MIN_VALUE && issuedLedgerCnt.get() >= limit || !ledgerItr.hasNext()) {
-                            stop.set(true);
+        final CompletableFuture<Void> finalPromise = new CompletableFuture<>();
+        final Set<CompletableFuture<?>> outstanding =
+            Collections.newSetFromMap(new ConcurrentHashMap<CompletableFuture<?>, Boolean>());
+        final RateLimiter throttler = RateLimiter.create(rate);
+        final Iterator<Long> ledgerItr = admin.listLedgers().iterator();
+
+        // iterate through all the ledgers
+        while (ledgerItr.hasNext() && !finalPromise.isDone()
+               && (limit == Integer.MIN_VALUE || issuedLedgerCnt.get() < limit)) {
+            // throttler to control updates per second
+            throttler.acquire();
+
+            final long ledgerId = ledgerItr.next();
+            issuedLedgerCnt.incrementAndGet();
+
+            GenericCallbackFuture<LedgerMetadata> readPromise = new GenericCallbackFuture<>();
+            lm.readLedgerMetadata(ledgerId, readPromise);
+            CompletableFuture<LedgerMetadata> writePromise = readPromise.thenCompose((readMetadata) -> {
+                    AtomicReference<LedgerMetadata> ref = new AtomicReference<>(readMetadata);
+                    return new MetadataUpdateLoop(
+                            lm, ledgerId,
+                            ref::get,
+                            (metadata) -> {
+                                return metadata.getEnsembles().values().stream()
+                                    .flatMap(Collection::stream)
+                                    .filter(b -> b.equals(oldBookieId))
+                                    .count() > 0;
+                            },
+                            (metadata) -> {
+                                return replaceBookieInEnsembles(metadata, oldBookieId, newBookieId);
+                            },
+                            ref::compareAndSet).run();
+                });
+
+            outstanding.add(writePromise);
+            writePromise.whenComplete((metadata, ex) -> {
+                        if (ex != null
+                            && !(ex instanceof BKException.BKNoSuchLedgerExistsException)) {
+                            String error = String.format("Failed to update ledger metadata %s, replacing %s with %s",
+                                                         ledgerId, oldBookieId, newBookieId);
+                            LOG.error(error, ex);
+                            finalPromise.completeExceptionally(new IOException(error, ex));
+                        } else {
+                            LOG.info("Updated ledger {} metadata, replacing {} with {}",
+                                     ledgerId, oldBookieId, newBookieId);
+
+                            updatedLedgerCnt.incrementAndGet();
+                            progressable.progress(updatedLedgerCnt.get(), issuedLedgerCnt.get());
                         }
-                        bkc.getLedgerManager().readLedgerMetadata(lId, readCb);
-                    }
-                    // waiting till all the issued ledgers are finished
-                    syncObj.await();
-                } catch (IOException ioe) {
-                    LOG.error("Exception while updating ledger", ioe);
-                    throw new RuntimeException("Exception while updating ledger", ioe.getCause());
-                } catch (InterruptedException ie) {
-                    LOG.error("Exception while updating ledger metadata", ie);
-                    Thread.currentThread().interrupt();
-                    throw new RuntimeException("Exception while updating ledger", ie.getCause());
-                }
-            }
-        });
-        try {
-            // Wait to finish the issued ledgers.
-            updateBookieCb.get();
-        } catch (ExecutionException ee) {
-            throw new IOException("Exception while updating ledger", ee);
-        } catch (InterruptedException ie) {
-            Thread.currentThread().interrupt();
-            throw new IOException("Exception while updating ledger", ie);
-        } finally {
-            executor.shutdown();
-        }
-    }
-
-    private static final class UpdateLedgerCb implements FutureCallback<Void> {
-        final long ledgerId;
-        final AtomicBoolean stop;
-        final AtomicInteger issuedLedgerCnt;
-        final AtomicInteger updatedLedgerCnt;
-        final Set<Long> outstandings;
-        final CountDownLatch syncObj;
-        final UpdateLedgerNotifier progressable;
-
-        public UpdateLedgerCb(long ledgerId, AtomicBoolean stop, AtomicInteger issuedLedgerCnt,
-                AtomicInteger updatedLedgerCnt, Set<Long> outstandings, CountDownLatch syncObj,
-                UpdateLedgerNotifier progressable) {
-            this.ledgerId = ledgerId;
-            this.stop = stop;
-            this.issuedLedgerCnt = issuedLedgerCnt;
-            this.updatedLedgerCnt = updatedLedgerCnt;
-            this.outstandings = outstandings;
-            this.syncObj = syncObj;
-            this.progressable = progressable;
-        }
-
-        @Override
-        public void onFailure(Throwable th) {
-            LOG.error("Error updating ledger {}", ledgerId, th);
-            stop.set(true);
-            finishUpdateLedger();
+                        outstanding.remove(writePromise);
+                    });
         }
 
-        @Override
-        public void onSuccess(Void obj) {
-            updatedLedgerCnt.incrementAndGet();
-            // may print progress
-            progressable.progress(updatedLedgerCnt.get(), issuedLedgerCnt.get());
-            finishUpdateLedger();
-        }
+        CompletableFuture.allOf(outstanding.stream().toArray(CompletableFuture[]::new))
+            .whenComplete((res, ex) -> {
+                    if (ex != null) {
+                        finalPromise.completeExceptionally(ex);
+                    } else {
+                        finalPromise.complete(null);
+                    }
+                });
 
-        private void finishUpdateLedger() {
-            outstandings.remove(ledgerId);
-            if (outstandings.isEmpty() && stop.get()) {
-                LOG.info("Total number of ledgers issued={} updated={}", issuedLedgerCnt.get(), updatedLedgerCnt.get());
-                syncObj.countDown();
+        try {
+            finalPromise.get();
+            LOG.info("Total number of ledgers issued={} updated={}",
+                     issuedLedgerCnt.get(), updatedLedgerCnt.get());
+        } catch (ExecutionException e) {
+            String error = String.format("Error waiting for ledger metadata updates to complete (replacing %s with %s)",
+                                         oldBookieId, newBookieId);
+            LOG.info(error, e);
+            if (e.getCause() instanceof IOException) {
+                throw (IOException) e.getCause();
+            } else {
+                throw new IOException(error, e);
             }
         }
     }
 
-    private static final class ReadLedgerMetadataCb implements GenericCallback<LedgerMetadata> {
-        final BookKeeper bkc;
-        final Long ledgerId;
-        final BookieSocketAddress curBookieAddr;
-        final BookieSocketAddress toBookieAddr;
-        SettableFuture<Void> future = SettableFuture.create();
-        public ReadLedgerMetadataCb(BookKeeper bkc, Long ledgerId, BookieSocketAddress curBookieAddr,
-                BookieSocketAddress toBookieAddr) {
-            this.bkc = bkc;
-            this.ledgerId = ledgerId;
-            this.curBookieAddr = curBookieAddr;
-            this.toBookieAddr = toBookieAddr;
-        }
-
-        ListenableFuture<Void> getFutureListener() {
-            return future;
+    private static LedgerMetadata replaceBookieInEnsembles(LedgerMetadata metadata,
+                                                           BookieSocketAddress oldBookieId,
+                                                           BookieSocketAddress newBookieId) {
+        LedgerMetadataBuilder builder = LedgerMetadataBuilder.from(metadata);
+        for (Map.Entry<Long, ? extends List<BookieSocketAddress>> e : metadata.getEnsembles().entrySet()) {
+            List<BookieSocketAddress> newEnsemble = e.getValue().stream()
+                .map(b -> b.equals(oldBookieId) ? newBookieId : b)
+                .collect(Collectors.toList());
+            builder.replaceEnsembleEntry(e.getKey(), newEnsemble);
         }
 
-        @Override
-        public void operationComplete(int rc, LedgerMetadata metadata) {
-            if (BKException.Code.NoSuchLedgerExistsException == rc) {
-                future.set(null);
-                return; // this is OK
-            } else if (BKException.Code.OK != rc) {
-                // open ledger failed.
-                LOG.error("Get ledger metadata {} failed: {}", ledgerId, BKException.codeLogger(rc));
-                future.setException(BKException.create(rc));
-                return;
-            }
-            boolean updateEnsemble = false;
-            for (Map.Entry<Long, ? extends List<BookieSocketAddress>> e : metadata.getEnsembles().entrySet()) {
-                List<BookieSocketAddress> newEnsemble = new ArrayList<>(e.getValue());
-                int index = newEnsemble.indexOf(curBookieAddr);
-                if (-1 != index) {
-                    newEnsemble.set(index, toBookieAddr);
-                    metadata.updateEnsemble(e.getKey(), newEnsemble);
-                    updateEnsemble = true;
-                }
-            }
-            if (!updateEnsemble) {
-                future.set(null);
-                return; // ledger doesn't contains the given curBookieId
-            }
-            final GenericCallback<LedgerMetadata> writeCb = new GenericCallback<LedgerMetadata>() {
-                @Override
-                public void operationComplete(int rc, LedgerMetadata result) {
-                    if (rc != BKException.Code.OK) {
-                        // metadata update failed
-                        LOG.error("Ledger {} metadata update failed. Error code {}", ledgerId, rc);
-                        future.setException(BKException.create(rc));
-                        return;
-                    }
-                    future.set(null);
-                }
-            };
-            bkc.getLedgerManager().writeLedgerMetadata(ledgerId, metadata, writeCb);
-        }
+        return builder.build();
     }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MetadataUpdateLoopTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MetadataUpdateLoopTest.java
index cb1dbd2e96..5ed75ce977 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MetadataUpdateLoopTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MetadataUpdateLoopTest.java
@@ -66,12 +66,12 @@
     public void testBasicUpdate() throws Exception {
         try (LedgerManager lm = new MockLedgerManager()) {
             LedgerMetadata initMeta = LedgerMetadataBuilder.create().withEnsembleSize(5)
-                .withEnsembleEntry(0L, Lists.newArrayList(
-                                           new BookieSocketAddress("0.0.0.0:3181"),
-                                           new BookieSocketAddress("0.0.0.1:3181"),
-                                           new BookieSocketAddress("0.0.0.2:3181"),
-                                           new BookieSocketAddress("0.0.0.3:3181"),
-                                           new BookieSocketAddress("0.0.0.4:3181"))).build();
+                .newEnsembleEntry(0L, Lists.newArrayList(
+                                          new BookieSocketAddress("0.0.0.0:3181"),
+                                          new BookieSocketAddress("0.0.0.1:3181"),
+                                          new BookieSocketAddress("0.0.0.2:3181"),
+                                          new BookieSocketAddress("0.0.0.3:3181"),
+                                          new BookieSocketAddress("0.0.0.4:3181"))).build();
             GenericCallbackFuture<LedgerMetadata> promise = new GenericCallbackFuture<>();
             long ledgerId = 1234L;
             lm.createLedgerMetadata(ledgerId, initMeta, promise);
@@ -88,7 +88,7 @@ public void testBasicUpdate() throws Exception {
                     (currentMetadata) -> {
                         List<BookieSocketAddress> ensemble = Lists.newArrayList(currentMetadata.getEnsemble(0L));
                         ensemble.set(0, newAddress);
-                        return LedgerMetadataBuilder.from(currentMetadata).withEnsembleEntry(0L, ensemble).build();
+                        return LedgerMetadataBuilder.from(currentMetadata).replaceEnsembleEntry(0L, ensemble).build();
                     },
                     reference::compareAndSet);
             loop.run().get();
@@ -114,7 +114,7 @@ public void testConflictOnWrite() throws Exception {
             BookieSocketAddress b3 = new BookieSocketAddress("0.0.0.3:3181");
 
             LedgerMetadata initMeta = LedgerMetadataBuilder.create().withEnsembleSize(2)
-                .withEnsembleEntry(0L, Lists.newArrayList(b0, b1)).build();
+                .newEnsembleEntry(0L, Lists.newArrayList(b0, b1)).build();
             GenericCallbackFuture<LedgerMetadata> promise = new GenericCallbackFuture<>();
             lm.createLedgerMetadata(ledgerId, initMeta, promise);
             LedgerMetadata writtenMetadata = promise.get();
@@ -128,7 +128,7 @@ public void testConflictOnWrite() throws Exception {
                     (currentMetadata) -> {
                         List<BookieSocketAddress> ensemble = Lists.newArrayList(currentMetadata.getEnsemble(0L));
                         ensemble.set(0, b2);
-                        return LedgerMetadataBuilder.from(currentMetadata).withEnsembleEntry(0L, ensemble).build();
+                        return LedgerMetadataBuilder.from(currentMetadata).replaceEnsembleEntry(0L, ensemble).build();
                     },
                     reference1::compareAndSet).run();
 
@@ -141,7 +141,7 @@ public void testConflictOnWrite() throws Exception {
                     (currentMetadata) -> {
                         List<BookieSocketAddress> ensemble = Lists.newArrayList(currentMetadata.getEnsemble(0L));
                         ensemble.set(1, b3);
-                        return LedgerMetadataBuilder.from(currentMetadata).withEnsembleEntry(0L, ensemble).build();
+                        return LedgerMetadataBuilder.from(currentMetadata).replaceEnsembleEntry(0L, ensemble).build();
                     },
                     reference2::compareAndSet).run();
 
@@ -181,7 +181,7 @@ public void testConflictOnWriteBothWritingSame() throws Exception {
             BookieSocketAddress b2 = new BookieSocketAddress("0.0.0.2:3181");
 
             LedgerMetadata initMeta = LedgerMetadataBuilder.create().withEnsembleSize(2)
-                .withEnsembleEntry(0L, Lists.newArrayList(b0, b1)).build();
+                .newEnsembleEntry(0L, Lists.newArrayList(b0, b1)).build();
             GenericCallbackFuture<LedgerMetadata> promise = new GenericCallbackFuture<>();
             lm.createLedgerMetadata(ledgerId, initMeta, promise);
             LedgerMetadata writtenMetadata = promise.get();
@@ -196,7 +196,7 @@ public void testConflictOnWriteBothWritingSame() throws Exception {
                     (currentMetadata) -> {
                         List<BookieSocketAddress> ensemble = Lists.newArrayList(currentMetadata.getEnsemble(0L));
                         ensemble.set(0, b2);
-                        return LedgerMetadataBuilder.from(currentMetadata).withEnsembleEntry(0L, ensemble).build();
+                        return LedgerMetadataBuilder.from(currentMetadata).replaceEnsembleEntry(0L, ensemble).build();
                     },
                     reference::compareAndSet).run();
             CompletableFuture<LedgerMetadata> loop2 = new MetadataUpdateLoop(
@@ -207,7 +207,7 @@ public void testConflictOnWriteBothWritingSame() throws Exception {
                     (currentMetadata) -> {
                         List<BookieSocketAddress> ensemble = Lists.newArrayList(currentMetadata.getEnsemble(0L));
                         ensemble.set(0, b2);
-                        return LedgerMetadataBuilder.from(currentMetadata).withEnsembleEntry(0L, ensemble).build();
+                        return LedgerMetadataBuilder.from(currentMetadata).replaceEnsembleEntry(0L, ensemble).build();
                     },
                     reference::compareAndSet).run();
 
@@ -237,7 +237,7 @@ public void testConflictOnLocalUpdate() throws Exception {
             BookieSocketAddress b3 = new BookieSocketAddress("0.0.0.3:3181");
 
             LedgerMetadata initMeta = LedgerMetadataBuilder.create().withEnsembleSize(2)
-                .withEnsembleEntry(0L, Lists.newArrayList(b0, b1)).build();
+                .newEnsembleEntry(0L, Lists.newArrayList(b0, b1)).build();
             GenericCallbackFuture<LedgerMetadata> promise = new GenericCallbackFuture<>();
             lm.createLedgerMetadata(ledgerId, initMeta, promise);
             LedgerMetadata writtenMetadata = promise.get();
@@ -252,7 +252,7 @@ public void testConflictOnLocalUpdate() throws Exception {
                     (currentMetadata) -> {
                         List<BookieSocketAddress> ensemble = Lists.newArrayList(currentMetadata.getEnsemble(0L));
                         ensemble.set(0, b2);
-                        return LedgerMetadataBuilder.from(currentMetadata).withEnsembleEntry(0L, ensemble).build();
+                        return LedgerMetadataBuilder.from(currentMetadata).replaceEnsembleEntry(0L, ensemble).build();
                     },
                     reference::compareAndSet).run();
 
@@ -265,7 +265,7 @@ public void testConflictOnLocalUpdate() throws Exception {
                     (currentMetadata) -> {
                         List<BookieSocketAddress> ensemble = Lists.newArrayList(currentMetadata.getEnsemble(0L));
                         ensemble.set(1, b3);
-                        return LedgerMetadataBuilder.from(currentMetadata).withEnsembleEntry(0L, ensemble).build();
+                        return LedgerMetadataBuilder.from(currentMetadata).replaceEnsembleEntry(0L, ensemble).build();
                     },
                     reference::compareAndSet).run();
             Assert.assertEquals(loop2.get(), reference.get());
@@ -305,7 +305,7 @@ public void testHammer() throws Exception {
                 .collect(Collectors.toList());
 
             LedgerMetadata initMeta = LedgerMetadataBuilder.create().withEnsembleSize(ensembleSize)
-                .withEnsembleEntry(0L, initialEnsemble).build();
+                .newEnsembleEntry(0L, initialEnsemble).build();
             GenericCallbackFuture<LedgerMetadata> promise = new GenericCallbackFuture<>();
             lm.createLedgerMetadata(ledgerId, initMeta, promise);
             LedgerMetadata writtenMetadata = promise.get();
@@ -325,7 +325,7 @@ public void testHammer() throws Exception {
                     (currentMetadata) -> {
                         List<BookieSocketAddress> ensemble = Lists.newArrayList(currentMetadata.getEnsemble(0L));
                         ensemble.set(i, replacementBookies.get(i));
-                        return LedgerMetadataBuilder.from(currentMetadata).withEnsembleEntry(0L, ensemble).build();
+                        return LedgerMetadataBuilder.from(currentMetadata).replaceEnsembleEntry(0L, ensemble).build();
                     },
                     reference::compareAndSet).run())
                 .collect(Collectors.toList());
@@ -350,7 +350,7 @@ public void testNewestValueCannotBeUsedAfterReadBack() throws Exception {
             BookieSocketAddress b1 = new BookieSocketAddress("0.0.0.1:3181");
 
             LedgerMetadata initMeta = LedgerMetadataBuilder.create().withEnsembleSize(1)
-                .withEnsembleEntry(0L, Lists.newArrayList(b0)).build();
+                .newEnsembleEntry(0L, Lists.newArrayList(b0)).build();
             GenericCallbackFuture<LedgerMetadata> promise = new GenericCallbackFuture<>();
             lm.createLedgerMetadata(ledgerId, initMeta, promise);
             LedgerMetadata writtenMetadata = promise.get();
@@ -377,7 +377,7 @@ public void testNewestValueCannotBeUsedAfterReadBack() throws Exception {
                     (currentMetadata) -> {
                         List<BookieSocketAddress> ensemble = Lists.newArrayList(currentMetadata.getEnsemble(0L));
                         ensemble.set(0, b1);
-                        return LedgerMetadataBuilder.from(currentMetadata).withEnsembleEntry(0L, ensemble).build();
+                        return LedgerMetadataBuilder.from(currentMetadata).replaceEnsembleEntry(0L, ensemble).build();
                     },
                     reference::compareAndSet).run();
             lm.releaseWrites();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services