You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by iv...@apache.org on 2018/08/22 12:14:47 UTC

[bookkeeper] branch master updated: Get currentEnsemble from ledger rather than directly from metadata

This is an automated email from the ASF dual-hosted git repository.

ivank pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 77b7fb8  Get currentEnsemble from ledger rather than directly from metadata
77b7fb8 is described below

commit 77b7fb870ef0fc2743dbe444c768c6c63bbe6875
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Wed Aug 22 14:14:29 2018 +0200

    Get currentEnsemble from ledger rather than directly from metadata
    
    This will allow us to override the currentEnsemble being used during
    recovery without modifying the metadata itself.
    
    Master issue: #281
    
    Author: Ivan Kelly <iv...@apache.org>
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>, Sijie Guo <si...@apache.org>
    
    This closes #1586 from ivankelly/current-ensemble
---
 .../bookkeeper/client/ExplicitLacFlushPolicy.java  |  2 +-
 .../apache/bookkeeper/client/ForceLedgerOp.java    |  5 +-
 .../org/apache/bookkeeper/client/LedgerHandle.java | 56 +++++++++++++++-------
 .../apache/bookkeeper/client/LedgerHandleAdv.java  |  2 +-
 .../apache/bookkeeper/client/LedgerMetadata.java   |  6 ++-
 .../apache/bookkeeper/client/LedgerRecoveryOp.java |  2 +-
 .../org/apache/bookkeeper/client/PendingAddOp.java | 21 +++++---
 .../apache/bookkeeper/client/PendingReadLacOp.java | 16 ++++---
 .../bookkeeper/client/PendingWriteLacOp.java       |  8 +++-
 .../client/ReadLastConfirmedAndEntryOp.java        |  8 +++-
 .../bookkeeper/client/ReadLastConfirmedOp.java     | 18 +++----
 .../bookkeeper/client/ReadOnlyLedgerHandle.java    |  2 +-
 .../bookkeeper/client/TryReadLastConfirmedOp.java  | 13 +++--
 .../bookkeeper/client/BookieWriteLedgerTest.java   |  4 +-
 .../apache/bookkeeper/client/LedgerCloseTest.java  |  4 +-
 .../bookkeeper/client/LedgerRecoveryTest.java      | 16 +++----
 .../client/ParallelLedgerRecoveryTest.java         | 45 +++++++++--------
 .../apache/bookkeeper/client/PendingAddOpTest.java |  2 +-
 .../client/ReadLastConfirmedAndEntryOpTest.java    |  3 +-
 .../apache/bookkeeper/client/SlowBookieTest.java   |  2 +-
 .../client/TestAddEntryQuorumTimeout.java          |  6 +--
 .../bookkeeper/client/TestDelayEnsembleChange.java | 30 ++++++------
 .../client/TestDisableEnsembleChange.java          |  2 +-
 .../bookkeeper/client/TestPendingReadLacOp.java    | 15 +++---
 .../client/TestReadLastConfirmedLongPoll.java      |  2 +-
 .../client/TestTryReadLastConfirmed.java           |  4 +-
 .../bookkeeper/client/TestWatchEnsembleChange.java |  2 +-
 .../org/apache/bookkeeper/client/LedgerReader.java |  2 +-
 28 files changed, 178 insertions(+), 120 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java
index e2b3801..c9e6def 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java
@@ -134,7 +134,7 @@ interface ExplicitLacFlushPolicy {
          */
         void asyncExplicitLacFlush(final long explicitLac) {
             final LastAddConfirmedCallback cb = LastAddConfirmedCallback.INSTANCE;
-            final PendingWriteLacOp op = new PendingWriteLacOp(lh, clientCtx, cb, null);
+            final PendingWriteLacOp op = new PendingWriteLacOp(lh, clientCtx, lh.getCurrentEnsemble(), cb, null);
             op.setLac(explicitLac);
             try {
                 if (LOG.isDebugEnabled()) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ForceLedgerOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ForceLedgerOp.java
index f87bfe3..2d785ab 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ForceLedgerOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ForceLedgerOp.java
@@ -40,7 +40,7 @@ class ForceLedgerOp extends SafeRunnable implements ForceLedgerCallback {
     boolean completed = false;
     boolean errored = false;
     int lastSeenError = BKException.Code.WriteException;
-    List<BookieSocketAddress> currentEnsemble;
+    final List<BookieSocketAddress> currentEnsemble;
 
     long currentNonDurableLastAddConfirmed = LedgerHandle.INVALID_ENTRY_ID;
 
@@ -48,10 +48,12 @@ class ForceLedgerOp extends SafeRunnable implements ForceLedgerCallback {
     final BookieClient bookieClient;
 
     ForceLedgerOp(LedgerHandle lh, BookieClient bookieClient,
+                  List<BookieSocketAddress> ensemble,
                   CompletableFuture<Void> cb) {
         this.lh = lh;
         this.bookieClient = bookieClient;
         this.cb = cb;
+        this.currentEnsemble = ensemble;
     }
 
     void sendForceLedgerRequest(int bookieIndex) {
@@ -73,7 +75,6 @@ class ForceLedgerOp extends SafeRunnable implements ForceLedgerCallback {
             LOG.debug("force {} clientNonDurableLac {}", lh.ledgerId, currentNonDurableLastAddConfirmed);
         }
         // we need to send the request to every bookie in the ensamble
-        this.currentEnsemble = lh.getLedgerMetadata().currentEnsemble;
         this.ackSet = lh.distributionSchedule.getEnsembleAckSet();
 
         DistributionSchedule.WriteSet writeSet = lh.getDistributionSchedule()
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 4f65ab3..3e9f1c2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -1052,7 +1052,7 @@ public class LedgerHandle implements WriteHandle {
     }
 
     public void asyncAddEntry(ByteBuf data, final AddCallback cb, final Object ctx) {
-        PendingAddOp op = PendingAddOp.create(this, clientCtx, data, writeFlags, cb, ctx);
+        PendingAddOp op = PendingAddOp.create(this, clientCtx, getCurrentEnsemble(), data, writeFlags, cb, ctx);
         doAsyncAddEntry(op);
     }
 
@@ -1135,7 +1135,7 @@ public class LedgerHandle implements WriteHandle {
     @Override
     public CompletableFuture<Void> force() {
         CompletableFuture<Void> result = new CompletableFuture<>();
-        ForceLedgerOp op = new ForceLedgerOp(this, clientCtx.getBookieClient(), result);
+        ForceLedgerOp op = new ForceLedgerOp(this, clientCtx.getBookieClient(), getCurrentEnsemble(), result);
         boolean wasClosed = false;
         synchronized (this) {
             // synchronized on this to ensure that
@@ -1202,7 +1202,7 @@ public class LedgerHandle implements WriteHandle {
      */
     void asyncRecoveryAddEntry(final byte[] data, final int offset, final int length,
                                final AddCallback cb, final Object ctx) {
-        PendingAddOp op = PendingAddOp.create(this, clientCtx,
+        PendingAddOp op = PendingAddOp.create(this, clientCtx, getCurrentEnsemble(),
                                               Unpooled.wrappedBuffer(data, offset, length),
                                               writeFlags, cb, ctx)
                 .enableRecoveryAdd();
@@ -1219,8 +1219,9 @@ public class LedgerHandle implements WriteHandle {
         final int requiredWritable = sz - allowedNonWritableCount;
 
         int nonWritableCount = 0;
+        List<BookieSocketAddress> currentEnsemble = getCurrentEnsemble();
         for (int i = 0; i < sz; i++) {
-            if (!clientCtx.getBookieClient().isWritable(getLedgerMetadata().currentEnsemble.get(i), key)) {
+            if (!clientCtx.getBookieClient().isWritable(currentEnsemble.get(i), key)) {
                 nonWritableCount++;
                 if (nonWritableCount >= allowedNonWritableCount) {
                     return false;
@@ -1403,7 +1404,7 @@ public class LedgerHandle implements WriteHandle {
                 }
             };
 
-        new ReadLastConfirmedOp(this, clientCtx.getBookieClient(), innercb).initiate();
+        new ReadLastConfirmedOp(this, clientCtx.getBookieClient(), getCurrentEnsemble(), innercb).initiate();
     }
 
     /**
@@ -1449,7 +1450,8 @@ public class LedgerHandle implements WriteHandle {
                 }
             }
         };
-        new TryReadLastConfirmedOp(this, clientCtx.getBookieClient(), innercb, getLastAddConfirmed()).initiate();
+        new TryReadLastConfirmedOp(this, clientCtx.getBookieClient(), getCurrentEnsemble(),
+                                   innercb, getLastAddConfirmed()).initiate();
     }
 
     /**
@@ -1558,7 +1560,7 @@ public class LedgerHandle implements WriteHandle {
                 }
             }
         };
-        new ReadLastConfirmedAndEntryOp(this, clientCtx, innercb, entryId - 1, timeOutInMillis)
+        new ReadLastConfirmedAndEntryOp(this, clientCtx, getCurrentEnsemble(), innercb, entryId - 1, timeOutInMillis)
             .parallelRead(parallel)
             .initiate();
     }
@@ -1701,7 +1703,7 @@ public class LedgerHandle implements WriteHandle {
                 }
             }
         };
-        new PendingReadLacOp(this, clientCtx.getBookieClient(), innercb).initiate();
+        new PendingReadLacOp(this, clientCtx.getBookieClient(), getCurrentEnsemble(), innercb).initiate();
     }
 
     /*
@@ -1822,7 +1824,7 @@ public class LedgerHandle implements WriteHandle {
         final HashSet<Integer> replacedBookies = new HashSet<Integer>();
         final LedgerMetadata metadata = getLedgerMetadata();
         synchronized (metadata) {
-            newEnsemble.addAll(metadata.currentEnsemble);
+            newEnsemble.addAll(getCurrentEnsemble());
             for (Map.Entry<Integer, BookieSocketAddress> entry : failedBookies.entrySet()) {
                 int idx = entry.getKey();
                 BookieSocketAddress addr = entry.getValue();
@@ -1861,7 +1863,7 @@ public class LedgerHandle implements WriteHandle {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("[EnsembleChange-L{}-{}] : changing ensemble from: {} to: {} starting at entry: {},"
                     + " failed bookies: {}, replaced bookies: {}",
-                        ledgerId, ensembleChangeIdx, metadata.currentEnsemble, newEnsemble,
+                        ledgerId, ensembleChangeIdx, getCurrentEnsemble(), newEnsemble,
                         (getLastAddConfirmed() + 1), failedBookies, replacedBookies);
             }
             metadata.addEnsemble(newEnsembleStartEntry, newEnsemble);
@@ -1889,7 +1891,7 @@ public class LedgerHandle implements WriteHandle {
                 LOG.debug("Ensemble change is disabled. Retry sending to failed bookies {} for ledger {}.",
                     failedBookies, ledgerId);
             }
-            unsetSuccessAndSendWriteRequest(failedBookies.keySet());
+            unsetSuccessAndSendWriteRequest(getCurrentEnsemble(), failedBookies.keySet());
             return;
         }
 
@@ -1941,7 +1943,7 @@ public class LedgerHandle implements WriteHandle {
 
     // Contains newly reformed ensemble, bookieIndex, failedBookieAddress
     static final class EnsembleInfo {
-        private final ArrayList<BookieSocketAddress> newEnsemble;
+        final ArrayList<BookieSocketAddress> newEnsemble;
         private final Map<Integer, BookieSocketAddress> failedBookies;
         final Set<Integer> replacedBookies;
 
@@ -2016,7 +2018,7 @@ public class LedgerHandle implements WriteHandle {
             LOG.info("New Ensemble: {} for ledger: {}", ensembleInfo.newEnsemble, ledgerId);
 
             // the failed bookie has been replaced
-            unsetSuccessAndSendWriteRequest(ensembleInfo.replacedBookies);
+            unsetSuccessAndSendWriteRequest(ensembleInfo.newEnsemble, ensembleInfo.replacedBookies);
         }
 
         @Override
@@ -2133,7 +2135,7 @@ public class LedgerHandle implements WriteHandle {
                 // We've successfully changed an ensemble
                 // the failed bookie has been replaced
                 int newBlockAddCompletions = blockAddCompletions.decrementAndGet();
-                unsetSuccessAndSendWriteRequest(ensembleInfo.replacedBookies);
+                unsetSuccessAndSendWriteRequest(ensembleInfo.newEnsemble, ensembleInfo.replacedBookies);
                 if (LOG.isDebugEnabled()) {
                     LOG.info("[EnsembleChange-L{}-{}] : resolved conflicts, block add complectiosn {} => {}.",
                             ledgerId, ensembleChangeIdx, curBlockAddCompletions, newBlockAddCompletions);
@@ -2155,7 +2157,8 @@ public class LedgerHandle implements WriteHandle {
             boolean replaced = true;
             for (Integer replacedBookieIdx : ensembleInfo.replacedBookies) {
                 BookieSocketAddress failedBookieAddr = ensembleInfo.failedBookies.get(replacedBookieIdx);
-                BookieSocketAddress replacedBookieAddr = newMeta.currentEnsemble.get(replacedBookieIdx);
+                BookieSocketAddress replacedBookieAddr = newMeta.getEnsembles()
+                    .lastEntry().getValue().get(replacedBookieIdx);
                 replaced &= !Objects.equal(replacedBookieAddr, failedBookieAddr);
             }
             return replaced;
@@ -2201,10 +2204,10 @@ public class LedgerHandle implements WriteHandle {
         }
     }
 
-    void unsetSuccessAndSendWriteRequest(final Set<Integer> bookies) {
+    void unsetSuccessAndSendWriteRequest(List<BookieSocketAddress> ensemble, final Set<Integer> bookies) {
         for (PendingAddOp pendingAddOp : pendingAddOps) {
             for (Integer bookieIndex: bookies) {
-                pendingAddOp.unsetSuccessAndSendWriteRequest(bookieIndex);
+                pendingAddOp.unsetSuccessAndSendWriteRequest(ensemble, bookieIndex);
             }
         }
     }
@@ -2337,4 +2340,23 @@ public class LedgerHandle implements WriteHandle {
         }
     }
 
+    /**
+     * Get the current ensemble from the ensemble list. The current ensemble
+     * is the last ensemble in the list. The ledger handle uses this ensemble when
+     * triggering operations which work on the end of the ledger, such as adding new
+     * entries or reading the last add confirmed.
+     *
+     * <p>This method is also used by ReadOnlyLedgerHandle during recovery, and when
+     * tailing a ledger.
+     *
+     * <p>Generally, this method should only be called by LedgerHandle and not by the
+     * operations themselves, to avoid adding more dependencies between the classes.
+     * There are too many already.
+     */
+    List<BookieSocketAddress> getCurrentEnsemble() {
+        // Getting current ensemble from the metadata is only a temporary
+        // thing until metadata is immutable. At that point, current ensemble
+        // becomes a property of the LedgerHandle itself.
+        return metadata.getCurrentEnsemble();
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
index e5fa0ac..c1d3849 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
@@ -195,7 +195,7 @@ public class LedgerHandleAdv extends LedgerHandle implements WriteAdvHandle {
     @Override
     public void asyncAddEntry(final long entryId, ByteBuf data,
                               final AddCallbackWithLatency cb, final Object ctx) {
-        PendingAddOp op = PendingAddOp.create(this, clientCtx, data, writeFlags, cb, ctx);
+        PendingAddOp op = PendingAddOp.create(this, clientCtx, getCurrentEnsemble(), data, writeFlags, cb, ctx);
         op.setEntryId(entryId);
 
         if ((entryId <= this.lastAddConfirmed) || pendingAddOps.contains(op)) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
index d549f7c..4f58aa8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
@@ -84,7 +84,7 @@ public class LedgerMetadata implements org.apache.bookkeeper.client.api.LedgerMe
 
     private LedgerMetadataFormat.State state;
     private TreeMap<Long, ImmutableList<BookieSocketAddress>> ensembles =  new TreeMap<>();
-    List<BookieSocketAddress> currentEnsemble;
+    private List<BookieSocketAddress> currentEnsemble;
     volatile Version version = Version.NEW;
 
     private boolean hasPassword = false;
@@ -336,6 +336,10 @@ public class LedgerMetadata implements org.apache.bookkeeper.client.api.LedgerMe
         currentEnsemble = ensemble;
     }
 
+    List<BookieSocketAddress> getCurrentEnsemble() {
+        return currentEnsemble;
+    }
+
     public void updateEnsemble(long startEntryId, List<BookieSocketAddress> ensemble) {
         checkArgument(ensembles.containsKey(startEntryId));
         ensembles.put(startEntryId, ImmutableList.copyOf(ensemble));
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
index 18b47e0..6ab25d6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
@@ -97,7 +97,7 @@ class LedgerRecoveryOp implements ReadEntryListener, AddCallback {
     }
 
     public void initiate() {
-        ReadLastConfirmedOp rlcop = new ReadLastConfirmedOp(lh, clientCtx.getBookieClient(),
+        ReadLastConfirmedOp rlcop = new ReadLastConfirmedOp(lh, clientCtx.getBookieClient(), lh.getCurrentEnsemble(),
                 new ReadLastConfirmedOp.LastConfirmedDataCallback() {
                     public void readLastConfirmedDataComplete(int rc, RecoveryData data) {
                         if (rc == BKException.Code.OK) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index d10bbe7..89bf0b8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -30,6 +30,7 @@ import io.netty.util.Recycler.Handle;
 import io.netty.util.ReferenceCountUtil;
 import java.util.EnumSet;
 
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -77,8 +78,10 @@ class PendingAddOp extends SafeRunnable implements WriteCallback {
     boolean hasRun;
     EnumSet<WriteFlag> writeFlags;
     boolean allowFailFast = false;
+    List<BookieSocketAddress> ensemble;
 
     static PendingAddOp create(LedgerHandle lh, ClientContext clientCtx,
+                               List<BookieSocketAddress> ensemble,
                                ByteBuf payload, EnumSet<WriteFlag> writeFlags,
                                AddCallbackWithLatency cb, Object ctx) {
         PendingAddOp op = RECYCLER.get();
@@ -93,6 +96,7 @@ class PendingAddOp extends SafeRunnable implements WriteCallback {
         op.entryLength = payload.readableBytes();
 
         op.completed = false;
+        op.ensemble = ensemble;
         op.ackSet = lh.getDistributionSchedule().getAckSet();
         op.pendingWriteRequests = 0;
         op.callbackTriggered = false;
@@ -131,10 +135,10 @@ class PendingAddOp extends SafeRunnable implements WriteCallback {
         return this.entryId;
     }
 
-    void sendWriteRequest(int bookieIndex) {
+    void sendWriteRequest(List<BookieSocketAddress> ensemble, int bookieIndex) {
         int flags = isRecoveryAdd ? FLAG_RECOVERY_ADD | FLAG_HIGH_PRIORITY : FLAG_NONE;
 
-        clientCtx.getBookieClient().addEntry(lh.getLedgerMetadata().currentEnsemble.get(bookieIndex),
+        clientCtx.getBookieClient().addEntry(ensemble.get(bookieIndex),
                                              lh.ledgerId, lh.ledgerKey, entryId, toSend, this, bookieIndex,
                                              flags, allowFailFast, lh.writeFlags);
         ++pendingWriteRequests;
@@ -168,7 +172,10 @@ class PendingAddOp extends SafeRunnable implements WriteCallback {
         }
     }
 
-    void unsetSuccessAndSendWriteRequest(int bookieIndex) {
+    void unsetSuccessAndSendWriteRequest(List<BookieSocketAddress> ensemble, int bookieIndex) {
+        // update the ensemble
+        this.ensemble = ensemble;
+
         if (toSend == null) {
             // this addOp hasn't yet had its mac computed. When the mac is
             // computed, its write requests will be sent, so no need to send it
@@ -216,7 +223,7 @@ class PendingAddOp extends SafeRunnable implements WriteCallback {
             completed = false;
         }
 
-        sendWriteRequest(bookieIndex);
+        sendWriteRequest(ensemble, bookieIndex);
     }
 
     /**
@@ -250,9 +257,10 @@ class PendingAddOp extends SafeRunnable implements WriteCallback {
         }
         // Iterate over set and trigger the sendWriteRequests
         DistributionSchedule.WriteSet writeSet = lh.distributionSchedule.getWriteSet(entryId);
+
         try {
             for (int i = 0; i < writeSet.size(); i++) {
-                sendWriteRequest(writeSet.get(i));
+                sendWriteRequest(ensemble, writeSet.get(i));
             }
         } finally {
             writeSet.recycle();
@@ -264,7 +272,7 @@ class PendingAddOp extends SafeRunnable implements WriteCallback {
         int bookieIndex = (Integer) ctx;
         --pendingWriteRequests;
 
-        if (!lh.getLedgerMetadata().currentEnsemble.get(bookieIndex).equals(addr)) {
+        if (!ensemble.get(bookieIndex).equals(addr)) {
             // ensemble has already changed, failure of this addr is immaterial
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Write did not succeed: " + ledgerId + ", " + entryId + ". But we have already fixed it.");
@@ -460,6 +468,7 @@ class PendingAddOp extends SafeRunnable implements WriteCallback {
         }
         cb = null;
         ctx = null;
+        ensemble = null;
         ackSet.recycle();
         ackSet = null;
         lh = null;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java
index 493643c..0dad804 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java
@@ -19,7 +19,10 @@ package org.apache.bookkeeper.client;
 
 import io.netty.buffer.ByteBuf;
 
+import java.util.List;
+
 import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
+import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieClient;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback;
 import org.apache.bookkeeper.proto.checksum.DigestManager.RecoveryData;
@@ -51,6 +54,7 @@ class PendingReadLacOp implements ReadLacCallback {
     int lastSeenError = BKException.Code.ReadException;
     final DistributionSchedule.QuorumCoverageSet coverageSet;
     long maxLac = LedgerHandle.INVALID_ENTRY_ID;
+    final List<BookieSocketAddress> currentEnsemble;
 
     /*
      * Wrapper to get Lac from the request
@@ -59,18 +63,18 @@ class PendingReadLacOp implements ReadLacCallback {
         void getLacComplete(int rc, long lac);
     }
 
-    PendingReadLacOp(LedgerHandle lh, BookieClient bookieClient, LacCallback cb) {
+    PendingReadLacOp(LedgerHandle lh, BookieClient bookieClient, List<BookieSocketAddress> ensemble, LacCallback cb) {
         this.lh = lh;
         this.bookieClient = bookieClient;
         this.cb = cb;
-        this.numResponsesPending = lh.getLedgerMetadata().getEnsembleSize();
+        this.numResponsesPending = ensemble.size();
         this.coverageSet = lh.distributionSchedule.getCoverageSet();
+        this.currentEnsemble = ensemble;
     }
 
     public void initiate() {
-        LedgerMetadata metadata = lh.getLedgerMetadata();
-        for (int i = 0; i < metadata.currentEnsemble.size(); i++) {
-            bookieClient.readLac(metadata.currentEnsemble.get(i), lh.ledgerId, this, i);
+        for (int i = 0; i < currentEnsemble.size(); i++) {
+            bookieClient.readLac(currentEnsemble.get(i), lh.ledgerId, this, i);
         }
     }
 
@@ -120,7 +124,7 @@ class PendingReadLacOp implements ReadLacCallback {
                 // Too bad, this bookie did not give us a valid answer, we
                 // still might be able to recover. So, continue
                 LOG.error("Mac mismatch while reading  ledger: " + ledgerId + " LAC from bookie: "
-                        + lh.getLedgerMetadata().currentEnsemble.get(bookieIndex));
+                        + currentEnsemble.get(bookieIndex));
                 rc = BKException.Code.DigestMatchException;
             }
         }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
index 730ad07..2881d2f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
@@ -18,6 +18,7 @@
 package org.apache.bookkeeper.client;
 
 import java.util.BitSet;
+import java.util.List;
 
 import org.apache.bookkeeper.client.AsyncCallback.AddLacCallback;
 import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -50,7 +51,9 @@ class PendingWriteLacOp implements WriteLacCallback {
     LedgerHandle lh;
     ClientContext clientCtx;
 
-    PendingWriteLacOp(LedgerHandle lh, ClientContext clientCtx,
+    final List<BookieSocketAddress> currentEnsemble;
+
+    PendingWriteLacOp(LedgerHandle lh, ClientContext clientCtx, List<BookieSocketAddress> ensemble,
                       AddLacCallback cb, Object ctx) {
         this.lh = lh;
         this.clientCtx = clientCtx;
@@ -58,6 +61,7 @@ class PendingWriteLacOp implements WriteLacCallback {
         this.ctx = ctx;
         this.lac = LedgerHandle.INVALID_ENTRY_ID;
         ackSet = lh.distributionSchedule.getAckSet();
+        currentEnsemble = ensemble;
     }
 
     void setLac(long lac) {
@@ -70,7 +74,7 @@ class PendingWriteLacOp implements WriteLacCallback {
     }
 
     void sendWriteLacRequest(int bookieIndex) {
-        clientCtx.getBookieClient().writeLac(lh.getLedgerMetadata().currentEnsemble.get(bookieIndex),
+        clientCtx.getBookieClient().writeLac(currentEnsemble.get(bookieIndex),
                                              lh.ledgerId, lh.ledgerKey, lac, toSend, this, bookieIndex);
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
index d71cc0a..e61e666 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
@@ -63,6 +63,7 @@ class ReadLastConfirmedAndEntryOp implements BookkeeperInternalCallbacks.ReadEnt
     private final long prevEntryId;
     private long lastAddConfirmed;
     private long timeOutInMillis;
+    private final List<BookieSocketAddress> currentEnsemble;
 
     abstract class ReadLACAndEntryRequest implements AutoCloseable {
 
@@ -425,6 +426,7 @@ class ReadLastConfirmedAndEntryOp implements BookkeeperInternalCallbacks.ReadEnt
 
     ReadLastConfirmedAndEntryOp(LedgerHandle lh,
                                 ClientContext clientCtx,
+                                List<BookieSocketAddress> ensemble,
                                 LastConfirmedAndEntryCallback cb,
                                 long prevEntryId,
                                 long timeOutInMillis) {
@@ -435,6 +437,8 @@ class ReadLastConfirmedAndEntryOp implements BookkeeperInternalCallbacks.ReadEnt
         this.lastAddConfirmed = lh.getLastAddConfirmed();
         this.timeOutInMillis = timeOutInMillis;
         this.numResponsesPending = 0;
+
+        this.currentEnsemble = ensemble;
         // since long poll is effectively reading lac with waits, lac can be potentially
         // be advanced in different write quorums, so we need to make sure to cover enough
         // bookies before claiming lac is not advanced.
@@ -480,9 +484,9 @@ class ReadLastConfirmedAndEntryOp implements BookkeeperInternalCallbacks.ReadEnt
 
     public void initiate() {
         if (parallelRead) {
-            request = new ParallelReadRequest(lh.getLedgerMetadata().currentEnsemble, lh.getId(), prevEntryId + 1);
+            request = new ParallelReadRequest(currentEnsemble, lh.getId(), prevEntryId + 1);
         } else {
-            request = new SequenceReadRequest(lh.getLedgerMetadata().currentEnsemble, lh.getId(), prevEntryId + 1);
+            request = new SequenceReadRequest(currentEnsemble, lh.getId(), prevEntryId + 1);
         }
         request.read();
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
index b72639d..dfbc167 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
@@ -18,8 +18,10 @@
 package org.apache.bookkeeper.client;
 
 import io.netty.buffer.ByteBuf;
+import java.util.List;
 
 import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
+import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieClient;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
@@ -42,6 +44,7 @@ class ReadLastConfirmedOp implements ReadEntryCallback {
 
     LastConfirmedDataCallback cb;
     final DistributionSchedule.QuorumCoverageSet coverageSet;
+    final List<BookieSocketAddress> currentEnsemble;
 
     /**
      * Wrapper to get all recovered data from the request.
@@ -51,19 +54,19 @@ class ReadLastConfirmedOp implements ReadEntryCallback {
     }
 
     public ReadLastConfirmedOp(LedgerHandle lh, BookieClient bookieClient,
-                               LastConfirmedDataCallback cb) {
+                               List<BookieSocketAddress> ensemble, LastConfirmedDataCallback cb) {
         this.cb = cb;
         this.bookieClient = bookieClient;
         this.maxRecoveredData = new RecoveryData(LedgerHandle.INVALID_ENTRY_ID, 0);
         this.lh = lh;
         this.numResponsesPending = lh.getLedgerMetadata().getEnsembleSize();
         this.coverageSet = lh.distributionSchedule.getCoverageSet();
+        this.currentEnsemble = ensemble;
     }
 
     public void initiate() {
-        LedgerMetadata metadata = lh.getLedgerMetadata();
-        for (int i = 0; i < metadata.currentEnsemble.size(); i++) {
-            bookieClient.readEntry(metadata.currentEnsemble.get(i),
+        for (int i = 0; i < currentEnsemble.size(); i++) {
+            bookieClient.readEntry(currentEnsemble.get(i),
                                    lh.ledgerId,
                                    BookieProtocol.LAST_ADD_CONFIRMED,
                                    this, i, BookieProtocol.FLAG_NONE);
@@ -71,9 +74,8 @@ class ReadLastConfirmedOp implements ReadEntryCallback {
     }
 
     public void initiateWithFencing() {
-        LedgerMetadata metadata = lh.getLedgerMetadata();
-        for (int i = 0; i < metadata.currentEnsemble.size(); i++) {
-            bookieClient.readEntry(metadata.currentEnsemble.get(i),
+        for (int i = 0; i < currentEnsemble.size(); i++) {
+            bookieClient.readEntry(currentEnsemble.get(i),
                                    lh.ledgerId,
                                    BookieProtocol.LAST_ADD_CONFIRMED,
                                    this, i, BookieProtocol.FLAG_DO_FENCING,
@@ -102,7 +104,7 @@ class ReadLastConfirmedOp implements ReadEntryCallback {
                 // still might be able to recover though so continue
                 LOG.error("Mac mismatch for ledger: " + ledgerId + ", entry: " + entryId
                           + " while reading last entry from bookie: "
-                          + lh.getLedgerMetadata().currentEnsemble.get(bookieIndex));
+                          + currentEnsemble.get(bookieIndex));
             }
         }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
index fba7ebd..e9f7900 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
@@ -140,7 +140,7 @@ class ReadOnlyLedgerHandle extends LedgerHandle implements LedgerMetadataListene
                 }
                 blockAddCompletions.decrementAndGet();
                 // the failed bookie has been replaced
-                unsetSuccessAndSendWriteRequest(ensembleInfo.replacedBookies);
+                unsetSuccessAndSendWriteRequest(ensembleInfo.newEnsemble, ensembleInfo.replacedBookies);
             } catch (BKException.BKNotEnoughBookiesException e) {
                 LOG.error("Could not get additional bookie to "
                           + "remake ensemble, closing ledger: " + ledgerId);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TryReadLastConfirmedOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TryReadLastConfirmedOp.java
index 181b343..1311d31 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TryReadLastConfirmedOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TryReadLastConfirmedOp.java
@@ -18,8 +18,10 @@
 package org.apache.bookkeeper.client;
 
 import io.netty.buffer.ByteBuf;
+import java.util.List;
 
 import org.apache.bookkeeper.client.ReadLastConfirmedOp.LastConfirmedDataCallback;
+import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieClient;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
@@ -43,20 +45,21 @@ class TryReadLastConfirmedOp implements ReadEntryCallback {
     volatile boolean hasValidResponse = false;
     volatile boolean completed = false;
     RecoveryData maxRecoveredData;
+    final List<BookieSocketAddress> currentEnsemble;
 
     TryReadLastConfirmedOp(LedgerHandle lh, BookieClient bookieClient,
-                           LastConfirmedDataCallback cb, long lac) {
+                           List<BookieSocketAddress> ensemble, LastConfirmedDataCallback cb, long lac) {
         this.lh = lh;
         this.bookieClient = bookieClient;
         this.cb = cb;
         this.maxRecoveredData = new RecoveryData(lac, 0);
         this.numResponsesPending = lh.getLedgerMetadata().getEnsembleSize();
+        this.currentEnsemble = ensemble;
     }
 
     public void initiate() {
-        LedgerMetadata metadata = lh.getLedgerMetadata();
-        for (int i = 0; i < metadata.currentEnsemble.size(); i++) {
-            bookieClient.readEntry(metadata.currentEnsemble.get(i),
+        for (int i = 0; i < currentEnsemble.size(); i++) {
+            bookieClient.readEntry(currentEnsemble.get(i),
                                    lh.ledgerId,
                                    BookieProtocol.LAST_ADD_CONFIRMED,
                                    this, i, BookieProtocol.FLAG_NONE);
@@ -88,7 +91,7 @@ class TryReadLastConfirmedOp implements ReadEntryCallback {
             } catch (BKException.BKDigestMatchException e) {
                 LOG.error("Mac mismatch for ledger: " + ledgerId + ", entry: " + entryId
                           + " while reading last entry from bookie: "
-                          + lh.getLedgerMetadata().currentEnsemble.get(bookieIndex));
+                          + currentEnsemble.get(bookieIndex));
             }
         } else if (BKException.Code.UnauthorizedAccessException == rc && !completed) {
             cb.readLastConfirmedDataComplete(rc, maxRecoveredData);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
index 85376ea..82a87c3 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
@@ -292,7 +292,7 @@ public class BookieWriteLedgerTest extends
         CountDownLatch sleepLatch1 = new CountDownLatch(1);
 
         // get bookie at index-0
-        BookieSocketAddress bookie1 = lh.getLedgerMetadata().currentEnsemble.get(0);
+        BookieSocketAddress bookie1 = lh.getCurrentEnsemble().get(0);
         sleepBookie(bookie1, sleepLatch1);
 
         int i = numEntriesToWrite;
@@ -331,7 +331,7 @@ public class BookieWriteLedgerTest extends
 
         sleepLatch1.countDown();
         // get the bookie at index-0 again, this must be different.
-        BookieSocketAddress bookie2 = lh.getLedgerMetadata().currentEnsemble.get(0);
+        BookieSocketAddress bookie2 = lh.getCurrentEnsemble().get(0);
 
         assertFalse(
                 "Delayed write error must have forced ensemble change",
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java
index 3259932..e761f1c 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java
@@ -145,12 +145,12 @@ public class LedgerCloseTest extends BookKeeperClusterTestCase {
         final CountDownLatch recoverDoneLatch = new CountDownLatch(1);
         final CountDownLatch failedLatch = new CountDownLatch(1);
         // kill first bookie to replace with a unauthorize bookie
-        BookieSocketAddress bookie = lh.getLedgerMetadata().currentEnsemble.get(0);
+        BookieSocketAddress bookie = lh.getCurrentEnsemble().get(0);
         ServerConfiguration conf = killBookie(bookie);
         // replace a unauthorize bookie
         startUnauthorizedBookie(conf, addDoneLatch);
         // kill second bookie to replace with a dead bookie
-        bookie = lh.getLedgerMetadata().currentEnsemble.get(1);
+        bookie = lh.getCurrentEnsemble().get(1);
         conf = killBookie(bookie);
         // replace a slow dead bookie
         startDeadBookie(conf, deadIOLatch);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
index 25da4ba..e126001 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
@@ -188,7 +188,7 @@ public class LedgerRecoveryTest extends BookKeeperClusterTestCase {
         // kill first bookie server to start a fake one to simulate a slow bookie
         // and failed to add entry on crash
         // until write succeed
-        BookieSocketAddress host = beforelh.getLedgerMetadata().currentEnsemble.get(slowBookieIdx);
+        BookieSocketAddress host = beforelh.getCurrentEnsemble().get(slowBookieIdx);
         ServerConfiguration conf = killBookie(host);
 
         Bookie fakeBookie = new Bookie(conf) {
@@ -262,7 +262,7 @@ public class LedgerRecoveryTest extends BookKeeperClusterTestCase {
         bs.add(startBookie(conf, deadBookie1));
 
         // kill first bookie server
-        BookieSocketAddress bookie1 = lhbefore.getLedgerMetadata().currentEnsemble.get(0);
+        BookieSocketAddress bookie1 = lhbefore.getCurrentEnsemble().get(0);
         ServerConfiguration conf1 = killBookie(bookie1);
 
         // Try to recover and fence the ledger after killing one bookie in the
@@ -277,7 +277,7 @@ public class LedgerRecoveryTest extends BookKeeperClusterTestCase {
         // restart the first server, kill the second
         bsConfs.add(conf1);
         bs.add(startBookie(conf1));
-        BookieSocketAddress bookie2 = lhbefore.getLedgerMetadata().currentEnsemble.get(1);
+        BookieSocketAddress bookie2 = lhbefore.getCurrentEnsemble().get(1);
         ServerConfiguration conf2 = killBookie(bookie2);
 
         // using async, because this could trigger an assertion
@@ -343,7 +343,7 @@ public class LedgerRecoveryTest extends BookKeeperClusterTestCase {
         bs.add(startBookie(conf, deadBookie1));
 
         // kill first bookie server
-        BookieSocketAddress bookie1 = lhbefore.getLedgerMetadata().currentEnsemble.get(0);
+        BookieSocketAddress bookie1 = lhbefore.getCurrentEnsemble().get(0);
         killBookie(bookie1);
 
         // Try to recover and fence the ledger after killing one bookie in the
@@ -395,9 +395,9 @@ public class LedgerRecoveryTest extends BookKeeperClusterTestCase {
             fail("Failed to add " + numEntries + " to ledger handle " + lh.getId());
         }
         // kill first 2 bookies to replace bookies
-        BookieSocketAddress bookie1 = lh.getLedgerMetadata().currentEnsemble.get(0);
+        BookieSocketAddress bookie1 = lh.getCurrentEnsemble().get(0);
         ServerConfiguration conf1 = killBookie(bookie1);
-        BookieSocketAddress bookie2 = lh.getLedgerMetadata().currentEnsemble.get(1);
+        BookieSocketAddress bookie2 = lh.getCurrentEnsemble().get(1);
         ServerConfiguration conf2 = killBookie(bookie2);
 
         // replace these two bookies
@@ -451,8 +451,8 @@ public class LedgerRecoveryTest extends BookKeeperClusterTestCase {
 
         CountDownLatch latch1 = new CountDownLatch(1);
         CountDownLatch latch2 = new CountDownLatch(1);
-        sleepBookie(lh.getLedgerMetadata().currentEnsemble.get(0), latch1);
-        sleepBookie(lh.getLedgerMetadata().currentEnsemble.get(1), latch2);
+        sleepBookie(lh.getCurrentEnsemble().get(0), latch1);
+        sleepBookie(lh.getCurrentEnsemble().get(1), latch2);
 
         int numEntries = (numBookies * 3) + 1;
         final AtomicInteger numPendingAdds = new AtomicInteger(numEntries);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
index 3213c46..8b5f5a6 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
@@ -300,8 +300,8 @@ public class ParallelLedgerRecoveryTest extends BookKeeperClusterTestCase {
         final LedgerHandle lh = newBk.createLedger(numBookies, 2, 2, digestType, "".getBytes());
         CountDownLatch latch1 = new CountDownLatch(1);
         CountDownLatch latch2 = new CountDownLatch(1);
-        sleepBookie(lh.getLedgerMetadata().currentEnsemble.get(0), latch1);
-        sleepBookie(lh.getLedgerMetadata().currentEnsemble.get(1), latch2);
+        sleepBookie(lh.getCurrentEnsemble().get(0), latch1);
+        sleepBookie(lh.getCurrentEnsemble().get(1), latch2);
 
         int numEntries = (numBookies * 3) + 1;
         final AtomicInteger numPendingAdds = new AtomicInteger(numEntries);
@@ -428,16 +428,17 @@ public class ParallelLedgerRecoveryTest extends BookKeeperClusterTestCase {
         final CountDownLatch addLatch = new CountDownLatch(1);
         final AtomicBoolean addSuccess = new AtomicBoolean(false);
         LOG.info("Add entry {} with lac = {}", entryId, lac);
-        bkc.getBookieClient().addEntry(lh.getLedgerMetadata().currentEnsemble.get(0),
-                                         lh.getId(), lh.ledgerKey, entryId, toSend,
-                                         new WriteCallback() {
-                                             @Override
-                                             public void writeComplete(int rc, long ledgerId, long entryId,
-                                                                       BookieSocketAddress addr, Object ctx) {
-                                                 addSuccess.set(BKException.Code.OK == rc);
-                                                 addLatch.countDown();
-                                             }
-                                         }, 0, BookieProtocol.FLAG_NONE, false, WriteFlag.NONE);
+
+        bkc.getBookieClient().addEntry(lh.getCurrentEnsemble().get(0),
+                                       lh.getId(), lh.ledgerKey, entryId, toSend,
+                                       new WriteCallback() {
+                                           @Override
+                                           public void writeComplete(int rc, long ledgerId, long entryId,
+                                                                     BookieSocketAddress addr, Object ctx) {
+                                               addSuccess.set(BKException.Code.OK == rc);
+                                               addLatch.countDown();
+                                           }
+                                       }, 0, BookieProtocol.FLAG_NONE, false, WriteFlag.NONE);
         addLatch.await();
         assertTrue("add entry 14 should succeed", addSuccess.get());
 
@@ -587,7 +588,7 @@ public class ParallelLedgerRecoveryTest extends BookKeeperClusterTestCase {
         LOG.info("Create ledger {}", lh0.getId());
 
         // 0) place the bookie with a fake bookie
-        BookieSocketAddress address = lh0.getLedgerMetadata().currentEnsemble.get(0);
+        BookieSocketAddress address = lh0.getCurrentEnsemble().get(0);
         ServerConfiguration conf = killBookie(address);
         conf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName());
         DelayResponseBookie fakeBookie = new DelayResponseBookie(conf);
@@ -667,14 +668,16 @@ public class ParallelLedgerRecoveryTest extends BookKeeperClusterTestCase {
         final AtomicLong lacHolder = new AtomicLong(-1234L);
         final AtomicInteger rcHolder = new AtomicInteger(-1234);
         final CountDownLatch doneLatch = new CountDownLatch(1);
-        new ReadLastConfirmedOp(readLh, bkc.getBookieClient(), new ReadLastConfirmedOp.LastConfirmedDataCallback() {
-            @Override
-            public void readLastConfirmedDataComplete(int rc, DigestManager.RecoveryData data) {
-                rcHolder.set(rc);
-                lacHolder.set(data.getLastAddConfirmed());
-                doneLatch.countDown();
-            }
-        }).initiate();
+
+        new ReadLastConfirmedOp(readLh, bkc.getBookieClient(), readLh.getCurrentEnsemble(),
+                new ReadLastConfirmedOp.LastConfirmedDataCallback() {
+                    @Override
+                    public void readLastConfirmedDataComplete(int rc, DigestManager.RecoveryData data) {
+                        rcHolder.set(rc);
+                        lacHolder.set(data.getLastAddConfirmed());
+                        doneLatch.countDown();
+                    }
+                }).initiate();
         doneLatch.await();
         assertEquals(BKException.Code.OK, rcHolder.get());
         assertEquals(1L, lacHolder.get());
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/PendingAddOpTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/PendingAddOpTest.java
index a6bc088..51d296c 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/PendingAddOpTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/PendingAddOpTest.java
@@ -69,7 +69,7 @@ public class PendingAddOpTest {
     public void testExecuteAfterCancelled() {
         AtomicInteger rcHolder = new AtomicInteger(-0xdead);
         PendingAddOp op = PendingAddOp.create(
-                lh, mockClientContext,
+                lh, mockClientContext, lh.getCurrentEnsemble(),
                 payload, WriteFlag.NONE,
                 (rc, handle, entryId, qwcLatency, ctx) -> {
                     rcHolder.set(rc);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java
index 7d524a5..3d9c394 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java
@@ -126,6 +126,7 @@ public class ReadLastConfirmedAndEntryOpTest {
         this.mockLh = mock(LedgerHandle.class);
 
         when(mockLh.getId()).thenReturn(LEDGERID);
+        when(mockLh.getCurrentEnsemble()).thenReturn(ensemble);
         when(mockLh.getLedgerMetadata()).thenReturn(ledgerMetadata);
         when(mockLh.getDistributionSchedule()).thenReturn(distributionSchedule);
         digestManager = new DummyDigestManager(LEDGERID, false);
@@ -199,7 +200,7 @@ public class ReadLastConfirmedAndEntryOpTest {
         };
 
         ReadLastConfirmedAndEntryOp op = new ReadLastConfirmedAndEntryOp(
-                mockLh, mockClientCtx, resultCallback, 1L, 10000);
+                mockLh, mockClientCtx, mockLh.getCurrentEnsemble(), resultCallback, 1L, 10000);
         op.initiate();
 
         // wait until all speculative requests are sent
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java
index 71a9b11..a70ec32 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java
@@ -73,7 +73,7 @@ public class SlowBookieTest extends BookKeeperClusterTestCase {
         final CountDownLatch b0latch = new CountDownLatch(1);
         final CountDownLatch b1latch = new CountDownLatch(1);
         final CountDownLatch addEntrylatch = new CountDownLatch(1);
-        List<BookieSocketAddress> curEns = lh.getLedgerMetadata().currentEnsemble;
+        List<BookieSocketAddress> curEns = lh.getCurrentEnsemble();
         try {
             sleepBookie(curEns.get(0), b0latch);
             for (int i = 0; i < 10; i++) {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestAddEntryQuorumTimeout.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestAddEntryQuorumTimeout.java
index 44134cc..efbc43d 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestAddEntryQuorumTimeout.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestAddEntryQuorumTimeout.java
@@ -81,7 +81,7 @@ public class TestAddEntryQuorumTimeout extends BookKeeperClusterTestCase impleme
     public void testBasicTimeout() throws Exception {
         BookKeeperTestClient bkc = new BookKeeperTestClient(baseClientConf);
         LedgerHandle lh = bkc.createLedger(3, 3, 3, digestType, testPasswd);
-        List<BookieSocketAddress> curEns = lh.getLedgerMetadata().currentEnsemble;
+        List<BookieSocketAddress> curEns = lh.getCurrentEnsemble();
         byte[] data = "foobar".getBytes();
         lh.addEntry(data);
         sleepBookie(curEns.get(0), 5).await();
@@ -105,7 +105,7 @@ public class TestAddEntryQuorumTimeout extends BookKeeperClusterTestCase impleme
     public void testTimeoutWithPendingOps() throws Exception {
         BookKeeperTestClient bkc = new BookKeeperTestClient(baseClientConf);
         LedgerHandle lh = bkc.createLedger(3, 3, 3, digestType, testPasswd);
-        List<BookieSocketAddress> curEns = lh.getLedgerMetadata().currentEnsemble;
+        List<BookieSocketAddress> curEns = lh.getCurrentEnsemble();
         byte[] data = "foobar".getBytes();
 
         SyncObj syncObj1 = new SyncObj();
@@ -130,7 +130,7 @@ public class TestAddEntryQuorumTimeout extends BookKeeperClusterTestCase impleme
     public void testLedgerClosedAfterTimeout() throws Exception {
         BookKeeperTestClient bkc = new BookKeeperTestClient(baseClientConf);
         LedgerHandle lh = bkc.createLedger(3, 3, 3, digestType, testPasswd);
-        List<BookieSocketAddress> curEns = lh.getLedgerMetadata().currentEnsemble;
+        List<BookieSocketAddress> curEns = lh.getCurrentEnsemble();
         byte[] data = "foobar".getBytes();
         CountDownLatch b0latch = sleepBookie(curEns.get(0), 5);
         try {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDelayEnsembleChange.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDelayEnsembleChange.java
index 62be7cb..e8da86e 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDelayEnsembleChange.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDelayEnsembleChange.java
@@ -146,8 +146,8 @@ public class TestDelayEnsembleChange extends BookKeeperClusterTestCase {
         }
 
         // kill two bookies, but we still have 3 bookies for the ack quorum.
-        ServerConfiguration conf0 = killBookie(lh.getLedgerMetadata().currentEnsemble.get(0));
-        ServerConfiguration conf1 = killBookie(lh.getLedgerMetadata().currentEnsemble.get(1));
+        ServerConfiguration conf0 = killBookie(lh.getCurrentEnsemble().get(0));
+        ServerConfiguration conf1 = killBookie(lh.getCurrentEnsemble().get(1));
 
         for (int i = numEntries; i < 2 * numEntries; i++) {
             lh.addEntry(data);
@@ -213,7 +213,7 @@ public class TestDelayEnsembleChange extends BookKeeperClusterTestCase {
         logger.info("Kill bookie 0 and write {} entries.", numEntries);
 
         // kill two bookies, but we still have 3 bookies for the ack quorum.
-        ServerConfiguration conf0 = killBookie(lh.getLedgerMetadata().currentEnsemble.get(0));
+        ServerConfiguration conf0 = killBookie(lh.getCurrentEnsemble().get(0));
 
         for (int i = numEntries; i < 2 * numEntries; i++) {
             lh.addEntry(data);
@@ -230,7 +230,7 @@ public class TestDelayEnsembleChange extends BookKeeperClusterTestCase {
 
         logger.info("Kill bookie 1 and write another {} entries.", numEntries);
 
-        ServerConfiguration conf1 = killBookie(lh.getLedgerMetadata().currentEnsemble.get(1));
+        ServerConfiguration conf1 = killBookie(lh.getCurrentEnsemble().get(1));
 
         for (int i = 2 * numEntries; i < 3 * numEntries; i++) {
             lh.addEntry(data);
@@ -242,7 +242,7 @@ public class TestDelayEnsembleChange extends BookKeeperClusterTestCase {
 
         logger.info("Kill bookie 2 and write another {} entries.", numEntries);
 
-        ServerConfiguration conf2 = killBookie(lh.getLedgerMetadata().currentEnsemble.get(2));
+        ServerConfiguration conf2 = killBookie(lh.getCurrentEnsemble().get(2));
 
         for (int i = 3 * numEntries; i < 4 * numEntries; i++) {
             lh.addEntry(data);
@@ -304,9 +304,9 @@ public class TestDelayEnsembleChange extends BookKeeperClusterTestCase {
         logger.info("Killed 3 bookies and add {} more entries : {}", numEntries, lh.getLedgerMetadata());
 
         // kill three bookies, but we only have 2 new bookies for ensemble change.
-        ServerConfiguration conf0 = killBookie(lh.getLedgerMetadata().currentEnsemble.get(0));
-        ServerConfiguration conf1 = killBookie(lh.getLedgerMetadata().currentEnsemble.get(1));
-        ServerConfiguration conf2 = killBookie(lh.getLedgerMetadata().currentEnsemble.get(2));
+        ServerConfiguration conf0 = killBookie(lh.getCurrentEnsemble().get(0));
+        ServerConfiguration conf1 = killBookie(lh.getCurrentEnsemble().get(1));
+        ServerConfiguration conf2 = killBookie(lh.getCurrentEnsemble().get(2));
 
         for (int i = numEntries; i < 2 * numEntries; i++) {
             lh.addEntry(data);
@@ -360,7 +360,7 @@ public class TestDelayEnsembleChange extends BookKeeperClusterTestCase {
         // kill 5 bookies to introduce more bookie failure
         List<ServerConfiguration> confs = new ArrayList<ServerConfiguration>(5);
         for (int i = 0; i < 5; i++) {
-            confs.add(killBookie(lh.getLedgerMetadata().currentEnsemble.get(i)));
+            confs.add(killBookie(lh.getCurrentEnsemble().get(i)));
         }
 
         for (int i = numEntries; i < 2 * numEntries; i++) {
@@ -406,7 +406,7 @@ public class TestDelayEnsembleChange extends BookKeeperClusterTestCase {
         }
 
         // kill two bookies, but we still have 3 bookies for the ack quorum.
-        setBookieToReadOnly(lh.getLedgerMetadata().currentEnsemble.get(0));
+        setBookieToReadOnly(lh.getCurrentEnsemble().get(0));
 
         for (int i = numEntries; i < 2 * numEntries; i++) {
             lh.addEntry(data);
@@ -429,8 +429,8 @@ public class TestDelayEnsembleChange extends BookKeeperClusterTestCase {
             lh.addEntry(data);
         }
 
-        BookieSocketAddress failedBookie = lh.getLedgerMetadata().currentEnsemble.get(0);
-        BookieSocketAddress readOnlyBookie = lh.getLedgerMetadata().currentEnsemble.get(1);
+        BookieSocketAddress failedBookie = lh.getCurrentEnsemble().get(0);
+        BookieSocketAddress readOnlyBookie = lh.getCurrentEnsemble().get(1);
         ServerConfiguration conf0 = killBookie(failedBookie);
 
         for (int i = 0; i < numEntries; i++) {
@@ -450,9 +450,9 @@ public class TestDelayEnsembleChange extends BookKeeperClusterTestCase {
         // ensure there is no ensemble changed
         assertEquals("The ensemble should change when a bookie is readonly even if we delay ensemble change.",
             2, lh.getLedgerMetadata().getEnsembles().size());
-        assertEquals(3, lh.getLedgerMetadata().currentEnsemble.size());
-        assertFalse(lh.getLedgerMetadata().currentEnsemble.contains(failedBookie));
-        assertFalse(lh.getLedgerMetadata().currentEnsemble.contains(readOnlyBookie));
+        assertEquals(3, lh.getCurrentEnsemble().size());
+        assertFalse(lh.getCurrentEnsemble().contains(failedBookie));
+        assertFalse(lh.getCurrentEnsemble().contains(readOnlyBookie));
     }
 
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDisableEnsembleChange.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDisableEnsembleChange.java
index 26f076a..f114cbf 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDisableEnsembleChange.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDisableEnsembleChange.java
@@ -239,7 +239,7 @@ public class TestDisableEnsembleChange extends BookKeeperClusterTestCase {
             lh.addEntry(entry);
         }
 
-        List<BookieSocketAddress> curEns = lh.getLedgerMetadata().currentEnsemble;
+        List<BookieSocketAddress> curEns = lh.getCurrentEnsemble();
 
         final CountDownLatch wakeupLatch = new CountDownLatch(1);
         final CountDownLatch suspendLatch = new CountDownLatch(1);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPendingReadLacOp.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPendingReadLacOp.java
index e0f3dc6..c9ca508 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPendingReadLacOp.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPendingReadLacOp.java
@@ -51,11 +51,11 @@ public class TestPendingReadLacOp extends BookKeeperClusterTestCase {
         lh.append(data);
 
         final CompletableFuture<Long> result = new CompletableFuture<>();
-        PendingReadLacOp pro = new PendingReadLacOp(lh, bkc.getBookieClient(),
+        PendingReadLacOp pro = new PendingReadLacOp(lh, bkc.getBookieClient(), lh.getCurrentEnsemble(),
                                                     (rc, lac) -> result.complete(lac)) {
             @Override
             public void initiate() {
-                for (int i = 0; i < lh.getLedgerMetadata().currentEnsemble.size(); i++) {
+                for (int i = 0; i < lh.getCurrentEnsemble().size(); i++) {
                     final int index = i;
                     ByteBufList buffer = lh.getDigestManager().computeDigestAndPackageForSending(
                             2,
@@ -71,8 +71,8 @@ public class TestPendingReadLacOp extends BookKeeperClusterTestCase {
                                 index);
 
                     }, 0, TimeUnit.SECONDS);
-                    bookieClient.readLac(lh.getLedgerMetadata().currentEnsemble.get(i),
-                            lh.ledgerId, this, i);
+                    bookieClient.readLac(lh.getCurrentEnsemble().get(i),
+                                         lh.ledgerId, this, i);
                 }
             }
         };
@@ -88,10 +88,11 @@ public class TestPendingReadLacOp extends BookKeeperClusterTestCase {
         lh.append(data);
 
         final CompletableFuture<Long> result = new CompletableFuture<>();
-        PendingReadLacOp pro = new PendingReadLacOp(lh, bkc.getBookieClient(), (rc, lac) -> result.complete(lac)) {
+        PendingReadLacOp pro = new PendingReadLacOp(lh, bkc.getBookieClient(), lh.getCurrentEnsemble(),
+            (rc, lac) -> result.complete(lac)) {
             @Override
             public void initiate() {
-                for (int i = 0; i < lh.getLedgerMetadata().currentEnsemble.size(); i++) {
+                for (int i = 0; i < lh.getCurrentEnsemble().size(); i++) {
                     final int index = i;
                     ByteBufList buffer = lh.getDigestManager().computeDigestAndPackageForSendingLac(1);
                     bkc.scheduler.schedule(() -> {
@@ -102,7 +103,7 @@ public class TestPendingReadLacOp extends BookKeeperClusterTestCase {
                                 null,
                                 index);
                     }, 0, TimeUnit.SECONDS);
-                    bookieClient.readLac(lh.getLedgerMetadata().currentEnsemble.get(i),
+                    bookieClient.readLac(lh.getCurrentEnsemble().get(i),
                             lh.ledgerId, this, i);
                 }
             }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastConfirmedLongPoll.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastConfirmedLongPoll.java
index f754f6a..d05f864 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastConfirmedLongPoll.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastConfirmedLongPoll.java
@@ -153,7 +153,7 @@ public class TestReadLastConfirmedLongPoll extends BookKeeperClusterTestCase {
             ServerConfiguration[] confs = new ServerConfiguration[numEntries - 1];
             for (int j = 0; j < numEntries - 1; j++) {
                 int idx = (i + 1 + j) % numEntries;
-                confs[j] = killBookie(lh.getLedgerMetadata().currentEnsemble.get(idx));
+                confs[j] = killBookie(lh.getCurrentEnsemble().get(idx));
             }
 
             final AtomicBoolean entryAsExpected = new AtomicBoolean(false);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestTryReadLastConfirmed.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestTryReadLastConfirmed.java
index 0117366..a4a63fa 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestTryReadLastConfirmed.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestTryReadLastConfirmed.java
@@ -119,7 +119,7 @@ public class TestTryReadLastConfirmed extends BookKeeperClusterTestCase {
             ServerConfiguration[] confs = new ServerConfiguration[ensembleSize - 1];
             for (int j = 0; j < ensembleSize - 1; j++) {
                 int idx = (i + 1 + j) % ensembleSize;
-                confs[j] = killBookie(lh.getLedgerMetadata().currentEnsemble.get(idx));
+                confs[j] = killBookie(lh.getCurrentEnsemble().get(idx));
             }
 
             final AtomicBoolean success = new AtomicBoolean(false);
@@ -164,7 +164,7 @@ public class TestTryReadLastConfirmed extends BookKeeperClusterTestCase {
             lh.addEntry(("data" + i).getBytes());
         }
         for (int i = 0; i < ensembleSize; i++) {
-            killBookie(lh.getLedgerMetadata().currentEnsemble.get(i));
+            killBookie(lh.getCurrentEnsemble().get(i));
         }
         final AtomicBoolean success = new AtomicBoolean(false);
         final AtomicInteger numCallbacks = new AtomicInteger(0);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
index e6daa66..456839a 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
@@ -93,7 +93,7 @@ public class TestWatchEnsembleChange extends BookKeeperClusterTestCase {
         long lastLAC = readLh.getLastAddConfirmed();
         assertEquals(numEntries - 2, lastLAC);
         List<BookieSocketAddress> ensemble =
-                lh.getLedgerMetadata().currentEnsemble;
+            lh.getCurrentEnsemble();
         for (BookieSocketAddress addr : ensemble) {
             killBookie(addr);
         }
diff --git a/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java b/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java
index ffe2d88..3388110 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java
@@ -197,7 +197,7 @@ public class LedgerReader {
             op.submit();
         };
         // Read Last AddConfirmed
-        new ReadLastConfirmedOp(lh, clientCtx.getBookieClient(), readLACCallback).initiate();
+        new ReadLastConfirmedOp(lh, clientCtx.getBookieClient(), lh.getCurrentEnsemble(), readLACCallback).initiate();
     }
 
     public void readLacs(final LedgerHandle lh, long eid,