You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by iv...@apache.org on 2018/08/01 09:29:48 UTC

[bookkeeper] branch master updated: Make each ensemble in ensemble list immutable

This is an automated email from the ASF dual-hosted git repository.

ivank pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b14458  Make each ensemble in ensemble list immutable
2b14458 is described below

commit 2b14458d966f80d991cec35ed7460639db52997b
Author: Ivan Kelly <iv...@ivankelly.net>
AuthorDate: Wed Aug 1 11:29:41 2018 +0200

    Make each ensemble in ensemble list immutable
    
    Previously, the ensemble list was a Map<Long, ArrayList<BookieSocketAddress>>.
    ArrayList is by definition mutable, so ensemble passed to metadata users are
    always mutable.
    
    This patch changes in ensembles in the list to be immutable. We were also leaking
    the implementation of ledger metadata to the placement policy, so this has been
    modified to use List<BookieSocketAddress> also.
    
    Master issue: #281
    
    Author: Ivan Kelly <iv...@ivankelly.net>
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>, Sijie Guo <si...@apache.org>
    
    This closes #1575 from ivankelly/ledger-fragment-immutable-metadata and squashes the following commits:
    
    14977a242 [Ivan Kelly] fix dlog
    bdaef31e8 [Ivan Kelly] checkstyle
    6e3431118 [Ivan Kelly] Make each ensemble in ensemble list immutable
---
 .../org/apache/bookkeeper/bookie/BookieShell.java  |  4 +-
 .../bookie/LocalBookieEnsemblePlacementPolicy.java |  8 ++--
 .../bookie/ScanAndCompareGarbageCollector.java     |  6 +--
 .../apache/bookkeeper/client/BookKeeperAdmin.java  | 25 +++++------
 .../apache/bookkeeper/client/BookieWatcher.java    |  5 +--
 .../client/DefaultEnsemblePlacementPolicy.java     |  4 +-
 .../bookkeeper/client/EnsemblePlacementPolicy.java | 14 +++---
 .../apache/bookkeeper/client/ForceLedgerOp.java    |  4 +-
 .../ITopologyAwareEnsemblePlacementPolicy.java     |  6 +--
 .../apache/bookkeeper/client/LedgerChecker.java    |  6 +--
 .../apache/bookkeeper/client/LedgerCreateOp.java   |  3 +-
 .../apache/bookkeeper/client/LedgerFragment.java   |  3 +-
 .../client/LedgerFragmentReplicator.java           |  9 ++--
 .../org/apache/bookkeeper/client/LedgerHandle.java |  4 +-
 .../apache/bookkeeper/client/LedgerMetadata.java   | 51 ++++++++++++----------
 .../apache/bookkeeper/client/PendingReadOp.java    | 10 ++---
 .../client/RackawareEnsemblePlacementPolicy.java   | 20 ++++-----
 .../RackawareEnsemblePlacementPolicyImpl.java      | 32 +++++++-------
 .../client/ReadLastConfirmedAndEntryOp.java        | 10 ++---
 .../client/RegionAwareEnsemblePlacementPolicy.java |  8 ++--
 .../TopologyAwareEnsemblePlacementPolicy.java      | 13 +++---
 .../apache/bookkeeper/client/UpdateLedgerOp.java   | 10 +++--
 .../replication/BookieLedgerIndexer.java           |  2 +-
 .../bookkeeper/replication/ReplicationWorker.java  |  6 +--
 .../bookie/TestGcOverreplicatedLedger.java         |  9 ++--
 .../bookkeeper/client/BookieRecoveryTest.java      | 14 +++---
 .../bookkeeper/client/BookieWriteLedgerTest.java   | 18 ++++----
 .../client/GenericEnsemblePlacementPolicyTest.java |  2 +-
 .../bookkeeper/client/TestDelayEnsembleChange.java |  8 ++--
 .../bookkeeper/client/TestLedgerChecker.java       | 20 ++++-----
 .../client/TestLedgerFragmentReplication.java      | 12 +++--
 .../apache/bookkeeper/client/TestParallelRead.java | 11 ++---
 .../TestRackawareEnsemblePlacementPolicy.java      | 36 +++++++--------
 ...ackawareEnsemblePlacementPolicyUsingScript.java | 16 +++----
 .../TestRackawarePolicyNotificationUpdates.java    |  2 +-
 .../bookkeeper/client/TestReadEntryListener.java   |  6 +--
 .../TestRegionAwareEnsemblePlacementPolicy.java    | 46 +++++++++----------
 .../apache/bookkeeper/client/TestSequenceRead.java |  9 ++--
 .../bookkeeper/client/TestSpeculativeRead.java     |  4 +-
 .../bookkeeper/client/TestWatchEnsembleChange.java |  4 +-
 .../bookkeeper/client/UpdateLedgerCmdTest.java     |  2 +-
 .../bookkeeper/client/UpdateLedgerOpTest.java      | 10 ++---
 .../AuditorPeriodicBookieCheckTest.java            |  4 +-
 .../replication/AuditorPeriodicCheckTest.java      |  6 +--
 .../replication/BookieAutoRecoveryTest.java        | 10 ++---
 .../TestAutoRecoveryAlongWithBookieServers.java    | 10 ++---
 .../replication/TestReplicationWorker.java         | 10 ++---
 .../bookkeeper/server/http/TestHttpService.java    |  5 ++-
 .../java/org/apache/bookkeeper/tls/TestTLS.java    |  6 +--
 .../org/apache/bookkeeper/client/LedgerReader.java |  6 +--
 .../distributedlog/tools/DistributedLogTool.java   |  2 +-
 51 files changed, 278 insertions(+), 273 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index c4ead36..b119062 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -583,8 +583,8 @@ public class BookieShell implements Tool {
 
         private Map<Long, Integer> inspectLedger(LedgerMetadata metadata, Set<BookieSocketAddress> bookiesToInspect) {
             Map<Long, Integer> numBookiesToReplacePerEnsemble = new TreeMap<Long, Integer>();
-            for (Map.Entry<Long, ArrayList<BookieSocketAddress>> ensemble : metadata.getEnsembles().entrySet()) {
-                ArrayList<BookieSocketAddress> bookieList = ensemble.getValue();
+            for (Map.Entry<Long, ? extends List<BookieSocketAddress>> ensemble : metadata.getEnsembles().entrySet()) {
+                List<BookieSocketAddress> bookieList = ensemble.getValue();
                 System.out.print(ensemble.getKey() + ":\t");
                 int numBookiesToReplace = 0;
                 for (BookieSocketAddress bookie : bookieList) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
index 0414553..46978ea 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
@@ -20,8 +20,8 @@ package org.apache.bookkeeper.bookie;
 import com.google.common.collect.Lists;
 import io.netty.util.HashedWheelTimer;
 import java.net.UnknownHostException;
-import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -94,7 +94,7 @@ public class LocalBookieEnsemblePlacementPolicy implements EnsemblePlacementPoli
 
     @Override
     public DistributionSchedule.WriteSet reorderReadSequence(
-            ArrayList<BookieSocketAddress> ensemble,
+            List<BookieSocketAddress> ensemble,
             BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
         return null;
@@ -102,14 +102,14 @@ public class LocalBookieEnsemblePlacementPolicy implements EnsemblePlacementPoli
 
     @Override
     public DistributionSchedule.WriteSet reorderReadLACSequence(
-            ArrayList<BookieSocketAddress> ensemble,
+            List<BookieSocketAddress> ensemble,
             BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
         return null;
     }
 
     @Override
-    public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
+    public List<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
         java.util.Map<String, byte[]> customMetadata, Set<BookieSocketAddress> excludeBookies)
             throws BKNotEnoughBookiesException {
         if (ensembleSize > 1) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java
index f5ea91c..137c07c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java
@@ -26,7 +26,6 @@ import static org.apache.bookkeeper.bookie.BookKeeperServerStats.DELETED_LEDGER_
 
 import com.google.common.collect.Sets;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.NavigableSet;
 import java.util.Set;
@@ -248,8 +247,9 @@ public class ScanAndCompareGarbageCollector implements GarbageCollector{
                                 release();
                                 return;
                             }
-                            SortedMap<Long, ArrayList<BookieSocketAddress>> ensembles = ledgerMetadata.getEnsembles();
-                            for (ArrayList<BookieSocketAddress> ensemble : ensembles.values()) {
+                            SortedMap<Long, ? extends List<BookieSocketAddress>> ensembles =
+                                ledgerMetadata.getEnsembles();
+                            for (List<BookieSocketAddress> ensemble : ensembles.values()) {
                                 // check if this bookie is supposed to have this ledger
                                 if (ensemble.contains(selfBookieAddress)) {
                                     release();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
index a55ace8..cadbe49 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
@@ -817,7 +817,7 @@ public class BookKeeperAdmin implements AutoCloseable {
                  */
                 Map<Long, Long> ledgerFragmentsRange = new HashMap<Long, Long>();
                 Long curEntryId = null;
-                for (Map.Entry<Long, ArrayList<BookieSocketAddress>> entry : lh.getLedgerMetadata().getEnsembles()
+                for (Map.Entry<Long, ? extends List<BookieSocketAddress>> entry : lh.getLedgerMetadata().getEnsembles()
                          .entrySet()) {
                     if (curEntryId != null) {
                         ledgerFragmentsRange.put(curEntryId, entry.getKey() - 1);
@@ -863,7 +863,7 @@ public class BookKeeperAdmin implements AutoCloseable {
                  */
                 for (final Long startEntryId : ledgerFragmentsToRecover) {
                     Long endEntryId = ledgerFragmentsRange.get(startEntryId);
-                    ArrayList<BookieSocketAddress> ensemble = lh.getLedgerMetadata().getEnsembles().get(startEntryId);
+                    List<BookieSocketAddress> ensemble = lh.getLedgerMetadata().getEnsembles().get(startEntryId);
                     // Get bookies to replace
                     Map<Integer, BookieSocketAddress> targetBookieAddresses;
                     try {
@@ -910,7 +910,7 @@ public class BookKeeperAdmin implements AutoCloseable {
             }, null);
     }
 
-    static String formatEnsemble(ArrayList<BookieSocketAddress> ensemble, Set<BookieSocketAddress> bookiesSrc,
+    static String formatEnsemble(List<BookieSocketAddress> ensemble, Set<BookieSocketAddress> bookiesSrc,
             char marker) {
         StringBuilder sb = new StringBuilder();
         sb.append("[");
@@ -1060,7 +1060,7 @@ public class BookKeeperAdmin implements AutoCloseable {
     }
 
     private static Map<BookieSocketAddress, BookieSocketAddress> getReplacementBookiesMap(
-            ArrayList<BookieSocketAddress> ensemble,
+            List<BookieSocketAddress> ensemble,
             Map<Integer, BookieSocketAddress> targetBookieAddresses) {
         Map<BookieSocketAddress, BookieSocketAddress> bookiesMap =
                 new HashMap<BookieSocketAddress, BookieSocketAddress>();
@@ -1091,11 +1091,11 @@ public class BookKeeperAdmin implements AutoCloseable {
             return false;
         }
         Long lastKey = lm.getEnsembles().lastKey();
-        ArrayList<BookieSocketAddress> lastEnsemble = lm.getEnsembles().get(lastKey);
+        List<BookieSocketAddress> lastEnsemble = lm.getEnsembles().get(lastKey);
         return containBookies(lastEnsemble, bookies);
     }
 
-    private static boolean containBookies(ArrayList<BookieSocketAddress> ensemble,
+    private static boolean containBookies(List<BookieSocketAddress> ensemble,
                                           Set<BookieSocketAddress> bookies) {
         for (BookieSocketAddress bookie : ensemble) {
             if (bookies.contains(bookie)) {
@@ -1544,9 +1544,9 @@ public class BookKeeperAdmin implements AutoCloseable {
 
     public static boolean areEntriesOfLedgerStoredInTheBookie(long ledgerId, BookieSocketAddress bookieAddress,
             LedgerMetadata ledgerMetadata) {
-        Collection<ArrayList<BookieSocketAddress>> ensemblesOfSegments = ledgerMetadata.getEnsembles().values();
-        Iterator<ArrayList<BookieSocketAddress>> ensemblesOfSegmentsIterator = ensemblesOfSegments.iterator();
-        ArrayList<BookieSocketAddress> ensemble;
+        Collection<? extends List<BookieSocketAddress>> ensemblesOfSegments = ledgerMetadata.getEnsembles().values();
+        Iterator<? extends List<BookieSocketAddress>> ensemblesOfSegmentsIterator = ensemblesOfSegments.iterator();
+        List<BookieSocketAddress> ensemble;
         int segmentNo = 0;
         while (ensemblesOfSegmentsIterator.hasNext()) {
             ensemble = ensemblesOfSegmentsIterator.next();
@@ -1565,9 +1565,8 @@ public class BookKeeperAdmin implements AutoCloseable {
         int ensembleSize = ledgerMetadata.getEnsembleSize();
         int writeQuorumSize = ledgerMetadata.getWriteQuorumSize();
 
-        List<Entry<Long, ArrayList<BookieSocketAddress>>> segments =
-            new LinkedList<Entry<Long, ArrayList<BookieSocketAddress>>>(
-                ledgerMetadata.getEnsembles().entrySet());
+        List<Entry<Long, ? extends List<BookieSocketAddress>>> segments =
+            new LinkedList<>(ledgerMetadata.getEnsembles().entrySet());
 
         boolean lastSegment = (segmentNo == (segments.size() - 1));
 
@@ -1617,7 +1616,7 @@ public class BookKeeperAdmin implements AutoCloseable {
         DistributionSchedule distributionSchedule = new RoundRobinDistributionSchedule(
                 ledgerMetadata.getWriteQuorumSize(), ledgerMetadata.getAckQuorumSize(),
                 ledgerMetadata.getEnsembleSize());
-        ArrayList<BookieSocketAddress> currentSegmentEnsemble = segments.get(segmentNo).getValue();
+        List<BookieSocketAddress> currentSegmentEnsemble = segments.get(segmentNo).getValue();
         int thisBookieIndexInCurrentEnsemble = currentSegmentEnsemble.indexOf(bookieAddress);
         long firstEntryId = segments.get(segmentNo).getKey();
         long lastEntryId = lastSegment ? ledgerMetadata.getLastEntryId() : segments.get(segmentNo + 1).getKey() - 1;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
index 43d0c26..d97121d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
@@ -24,7 +24,6 @@ import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.RemovalListener;
 import com.google.common.cache.RemovalNotification;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -200,11 +199,11 @@ class BookieWatcher {
      * @return list of bookies for new ensemble.
      * @throws BKNotEnoughBookiesException
      */
-    public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize,
+    public List<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize,
         int ackQuorumSize, Map<String, byte[]> customMetadata)
             throws BKNotEnoughBookiesException {
         long startTime = MathUtils.nowInNano();
-        ArrayList<BookieSocketAddress> socketAddresses;
+        List<BookieSocketAddress> socketAddresses;
         try {
             socketAddresses = placementPolicy.newEnsemble(ensembleSize,
                     writeQuorumSize, ackQuorumSize, customMetadata, new HashSet<BookieSocketAddress>(
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
index 7602499..28efe66 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
@@ -159,7 +159,7 @@ public class DefaultEnsemblePlacementPolicy implements EnsemblePlacementPolicy {
 
     @Override
     public DistributionSchedule.WriteSet reorderReadSequence(
-            ArrayList<BookieSocketAddress> ensemble,
+            List<BookieSocketAddress> ensemble,
             BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
         return writeSet;
@@ -167,7 +167,7 @@ public class DefaultEnsemblePlacementPolicy implements EnsemblePlacementPolicy {
 
     @Override
     public DistributionSchedule.WriteSet reorderReadLACSequence(
-            ArrayList<BookieSocketAddress> ensemble,
+            List<BookieSocketAddress> ensemble,
             BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
         writeSet.addMissingIndices(ensemble.size());
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
index 500ca16..e185964 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
@@ -18,7 +18,7 @@
 package org.apache.bookkeeper.client;
 
 import io.netty.util.HashedWheelTimer;
-import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -152,8 +152,8 @@ import org.apache.bookkeeper.stats.StatsLogger;
  *
  * <h3>How to choose bookies to do speculative reads?</h3>
  *
- * <p>{@link #reorderReadSequence(ArrayList, BookiesHealthInfo, WriteSet)} and
- * {@link #reorderReadLACSequence(ArrayList, BookiesHealthInfo, WriteSet)} are
+ * <p>{@link #reorderReadSequence(List, BookiesHealthInfo, WriteSet)} and
+ * {@link #reorderReadLACSequence(List, BookiesHealthInfo, WriteSet)} are
  * two methods exposed by the placement policy, to help client determine a better read sequence according to the
  * network topology and the bookie failure history.
  *
@@ -258,9 +258,9 @@ public interface EnsemblePlacementPolicy {
      *                       provides in {@link BookKeeper#createLedger(int, int, int, BookKeeper.DigestType, byte[])}
      * @param excludeBookies Bookies that should not be considered as targets.
      * @throws BKNotEnoughBookiesException if not enough bookies available.
-     * @return the ArrayList&lt;org.apache.bookkeeper.net.BookieSocketAddress&gt;
+     * @return the List&lt;org.apache.bookkeeper.net.BookieSocketAddress&gt;
      */
-    ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize,
+    List<BookieSocketAddress> newEnsemble(int ensembleSize,
                                                int writeQuorumSize,
                                                int ackQuorumSize,
                                                Map<String, byte[]> customMetadata,
@@ -318,7 +318,7 @@ public interface EnsemblePlacementPolicy {
      * @since 4.5
      */
     DistributionSchedule.WriteSet reorderReadSequence(
-            ArrayList<BookieSocketAddress> ensemble,
+            List<BookieSocketAddress> ensemble,
             BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet);
 
@@ -338,7 +338,7 @@ public interface EnsemblePlacementPolicy {
      * @since 4.5
      */
     DistributionSchedule.WriteSet reorderReadLACSequence(
-            ArrayList<BookieSocketAddress> ensemble,
+            List<BookieSocketAddress> ensemble,
             BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet);
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ForceLedgerOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ForceLedgerOp.java
index ae0e051..a48a58b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ForceLedgerOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ForceLedgerOp.java
@@ -18,7 +18,7 @@
 package org.apache.bookkeeper.client;
 
 import static com.google.common.base.Preconditions.checkState;
-import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -39,7 +39,7 @@ class ForceLedgerOp extends SafeRunnable implements ForceLedgerCallback {
     boolean completed = false;
     boolean errored = false;
     int lastSeenError = BKException.Code.WriteException;
-    ArrayList<BookieSocketAddress> currentEnsemble;
+    List<BookieSocketAddress> currentEnsemble;
 
     long currentNonDurableLastAddConfirmed = LedgerHandle.INVALID_ENTRY_ID;
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ITopologyAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ITopologyAwareEnsemblePlacementPolicy.java
index 87e6f18..7c9e07c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ITopologyAwareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ITopologyAwareEnsemblePlacementPolicy.java
@@ -17,7 +17,7 @@
  */
 package org.apache.bookkeeper.client;
 
-import java.util.ArrayList;
+import java.util.List;
 import java.util.Set;
 
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
@@ -66,7 +66,7 @@ public interface ITopologyAwareEnsemblePlacementPolicy<T extends Node> extends E
         /**
          * @return list of addresses representing the ensemble
          */
-        ArrayList<BookieSocketAddress> toList();
+        List<BookieSocketAddress> toList();
 
         /**
          * Validates if an ensemble is valid.
@@ -93,7 +93,7 @@ public interface ITopologyAwareEnsemblePlacementPolicy<T extends Node> extends E
      * @return list of bookies forming the ensemble
      * @throws BKException.BKNotEnoughBookiesException
      */
-    ArrayList<BookieSocketAddress> newEnsemble(
+    List<BookieSocketAddress> newEnsemble(
             int ensembleSize,
             int writeQuorumSize,
             int ackQuorumSize,
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
index 6f232cf..cbd9976 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
@@ -20,9 +20,9 @@
 package org.apache.bookkeeper.client;
 
 import io.netty.buffer.ByteBuf;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
@@ -321,8 +321,8 @@ public class LedgerChecker {
         final Set<LedgerFragment> fragments = new HashSet<LedgerFragment>();
 
         Long curEntryId = null;
-        ArrayList<BookieSocketAddress> curEnsemble = null;
-        for (Map.Entry<Long, ArrayList<BookieSocketAddress>> e : lh
+        List<BookieSocketAddress> curEnsemble = null;
+        for (Map.Entry<Long, ? extends List<BookieSocketAddress>> e : lh
                 .getLedgerMetadata().getEnsembles().entrySet()) {
             if (curEntryId != null) {
                 Set<Integer> bookieIndexes = new HashSet<Integer>();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
index 213f941..4f8beed 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
@@ -23,7 +23,6 @@ package org.apache.bookkeeper.client;
 
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.security.GeneralSecurityException;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
@@ -131,7 +130,7 @@ class LedgerCreateOp implements GenericCallback<LedgerMetadata> {
          * Adding bookies to ledger handle
          */
 
-        ArrayList<BookieSocketAddress> ensemble;
+        List<BookieSocketAddress> ensemble;
         try {
             ensemble = bk.getBookieWatcher()
                     .newEnsemble(metadata.getEnsembleSize(),
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java
index 1a97af8..49b8de2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java
@@ -19,7 +19,6 @@
  */
 package org.apache.bookkeeper.client;
 
-import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -51,7 +50,7 @@ public class LedgerFragment {
         this.bookieIndexes = bookieIndexes;
         this.ensemble = lh.getLedgerMetadata().getEnsemble(firstEntryId);
         this.schedule = lh.getDistributionSchedule();
-        SortedMap<Long, ArrayList<BookieSocketAddress>> ensembles = lh
+        SortedMap<Long, ? extends List<BookieSocketAddress>> ensembles = lh
                 .getLedgerMetadata().getEnsembles();
         this.isLedgerClosed = lh.getLedgerMetadata().isClosed()
                 || !ensemble.equals(ensembles.get(ensembles.lastKey()));
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
index 538cfb7..64394de 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
@@ -374,18 +374,19 @@ public class LedgerFragmentReplicator {
          * Update the ledger metadata's ensemble info to point to the new
          * bookie.
          */
-        ArrayList<BookieSocketAddress> ensemble = lh.getLedgerMetadata()
-                .getEnsembles().get(fragmentStartId);
+        List<BookieSocketAddress> ensemble = lh.getLedgerMetadata().getEnsembles().get(fragmentStartId);
+        List<BookieSocketAddress> newEnsemble = new ArrayList<>(ensemble);
         for (Map.Entry<BookieSocketAddress, BookieSocketAddress> entry : oldBookie2NewBookie.entrySet()) {
-            int deadBookieIndex = ensemble.indexOf(entry.getKey());
+            int deadBookieIndex = newEnsemble.indexOf(entry.getKey());
             // update ensemble info might happen after re-read ledger metadata, so the ensemble might already
             // change. if ensemble is already changed, skip replacing the bookie doesn't exist.
             if (deadBookieIndex >= 0) {
-                ensemble.set(deadBookieIndex, entry.getValue());
+                newEnsemble.set(deadBookieIndex, entry.getValue());
             } else {
                 LOG.info("Bookie {} doesn't exist in ensemble {} anymore.", entry.getKey(), ensemble);
             }
         }
+        lh.getLedgerMetadata().updateEnsemble(fragmentStartId, newEnsemble);
         lh.writeLedgerConfig(new UpdateEnsembleCb(ensembleUpdatedCb,
                 fragmentStartId, lh, oldBookie2NewBookie));
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 90b9884..611b182 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -372,9 +372,9 @@ public class LedgerHandle implements WriteHandle {
      * @return count of unique bookies
      */
     public synchronized long getNumBookies() {
-        Map<Long, ArrayList<BookieSocketAddress>> m = getLedgerMetadata().getEnsembles();
+        Map<Long, ? extends List<BookieSocketAddress>> m = getLedgerMetadata().getEnsembles();
         Set<BookieSocketAddress> s = Sets.newHashSet();
-        for (ArrayList<BookieSocketAddress> aList : m.values()) {
+        for (List<BookieSocketAddress> aList : m.values()) {
             s.addAll(aList);
         }
         return s.size();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
index d95074d..b63c41d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
@@ -18,9 +18,11 @@
 package org.apache.bookkeeper.client;
 
 import static com.google.common.base.Charsets.UTF_8;
+import static com.google.common.base.Preconditions.checkArgument;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.TextFormat;
@@ -81,9 +83,8 @@ public class LedgerMetadata implements org.apache.bookkeeper.client.api.LedgerMe
     private boolean storeSystemtimeAsLedgerCreationTime;
 
     private LedgerMetadataFormat.State state;
-    private TreeMap<Long, ArrayList<BookieSocketAddress>> ensembles =
-        new TreeMap<Long, ArrayList<BookieSocketAddress>>();
-    ArrayList<BookieSocketAddress> currentEnsemble;
+    private TreeMap<Long, ImmutableList<BookieSocketAddress>> ensembles =  new TreeMap<>();
+    List<BookieSocketAddress> currentEnsemble;
     volatile Version version = Version.NEW;
 
     private boolean hasPassword = false;
@@ -157,10 +158,8 @@ public class LedgerMetadata implements org.apache.bookkeeper.client.api.LedgerMe
         this.password = new byte[other.password.length];
         System.arraycopy(other.password, 0, this.password, 0, other.password.length);
         // copy the ensembles
-        for (Entry<Long, ArrayList<BookieSocketAddress>> entry : other.ensembles.entrySet()) {
-            long startEntryId = entry.getKey();
-            ArrayList<BookieSocketAddress> newEnsemble = new ArrayList<BookieSocketAddress>(entry.getValue());
-            this.addEnsemble(startEntryId, newEnsemble);
+        for (Entry<Long, ? extends List<BookieSocketAddress>> entry : other.ensembles.entrySet()) {
+            this.addEnsemble(entry.getKey(), entry.getValue());
         }
         this.customMetadata = other.customMetadata;
     }
@@ -177,7 +176,7 @@ public class LedgerMetadata implements org.apache.bookkeeper.client.api.LedgerMe
      * @return SortedMap of Ledger Fragments and the corresponding
      * bookie ensembles that store the entries.
      */
-    public TreeMap<Long, ArrayList<BookieSocketAddress>> getEnsembles() {
+    public TreeMap<Long, ? extends List<BookieSocketAddress>> getEnsembles() {
         return ensembles;
     }
 
@@ -186,8 +185,11 @@ public class LedgerMetadata implements org.apache.bookkeeper.client.api.LedgerMe
         return ensembles;
     }
 
-    void setEnsembles(TreeMap<Long, ArrayList<BookieSocketAddress>> ensembles) {
-        this.ensembles = ensembles;
+    void setEnsembles(TreeMap<Long, ? extends List<BookieSocketAddress>> newEnsembles) {
+        this.ensembles = newEnsembles.entrySet().stream()
+            .collect(TreeMap::new,
+                     (m, e) -> m.put(e.getKey(), ImmutableList.copyOf(e.getValue())),
+                     TreeMap::putAll);
     }
 
     @Override
@@ -286,14 +288,19 @@ public class LedgerMetadata implements org.apache.bookkeeper.client.api.LedgerMe
         state = LedgerMetadataFormat.State.CLOSED;
     }
 
-    public void addEnsemble(long startEntryId, ArrayList<BookieSocketAddress> ensemble) {
-        assert ensembles.isEmpty() || startEntryId >= ensembles.lastKey();
+    public void addEnsemble(long startEntryId, List<BookieSocketAddress> ensemble) {
+        checkArgument(ensembles.isEmpty() || startEntryId >= ensembles.lastKey());
 
-        ensembles.put(startEntryId, ensemble);
+        ensembles.put(startEntryId, ImmutableList.copyOf(ensemble));
         currentEnsemble = ensemble;
     }
 
-    ArrayList<BookieSocketAddress> getEnsemble(long entryId) {
+    public void updateEnsemble(long startEntryId, List<BookieSocketAddress> ensemble) {
+        checkArgument(ensembles.containsKey(startEntryId));
+        ensembles.put(startEntryId, ImmutableList.copyOf(ensemble));
+    }
+
+    List<BookieSocketAddress> getEnsemble(long entryId) {
         // the head map cannot be empty, since we insert an ensemble for
         // entry-id 0, right when we start
         return ensembles.get(ensembles.headMap(entryId + 1).lastKey());
@@ -312,7 +319,7 @@ public class LedgerMetadata implements org.apache.bookkeeper.client.api.LedgerMe
      * @return the entry id of the next ensemble change (-1 if no further ensemble changes)
      */
     long getNextEnsembleChange(long entryId) {
-        SortedMap<Long, ArrayList<BookieSocketAddress>> tailMap = ensembles.tailMap(entryId + 1);
+        SortedMap<Long, ? extends List<BookieSocketAddress>> tailMap = ensembles.tailMap(entryId + 1);
 
         if (tailMap.isEmpty()) {
             return -1;
@@ -360,7 +367,7 @@ public class LedgerMetadata implements org.apache.bookkeeper.client.api.LedgerMe
             }
         }
 
-        for (Map.Entry<Long, ArrayList<BookieSocketAddress>> entry : ensembles.entrySet()) {
+        for (Map.Entry<Long, ? extends List<BookieSocketAddress>> entry : ensembles.entrySet()) {
             LedgerMetadataFormat.Segment.Builder segmentBuilder = LedgerMetadataFormat.Segment.newBuilder();
             segmentBuilder.setFirstEntryId(entry.getKey());
             for (BookieSocketAddress addr : entry.getValue()) {
@@ -399,7 +406,7 @@ public class LedgerMetadata implements org.apache.bookkeeper.client.api.LedgerMe
         s.append(VERSION_KEY).append(tSplitter).append(metadataFormatVersion).append(lSplitter);
         s.append(writeQuorumSize).append(lSplitter).append(ensembleSize).append(lSplitter).append(length);
 
-        for (Map.Entry<Long, ArrayList<BookieSocketAddress>> entry : ensembles.entrySet()) {
+        for (Map.Entry<Long, ? extends List<BookieSocketAddress>> entry : ensembles.entrySet()) {
             s.append(lSplitter).append(entry.getKey());
             for (BookieSocketAddress addr : entry.getValue()) {
                 s.append(tSplitter);
@@ -707,7 +714,7 @@ public class LedgerMetadata implements org.apache.bookkeeper.client.api.LedgerMe
         return sb.toString();
     }
 
-    void mergeEnsembles(SortedMap<Long, ArrayList<BookieSocketAddress>> newEnsembles) {
+    void mergeEnsembles(SortedMap<Long, ? extends List<BookieSocketAddress>> newEnsembles) {
         // allow new metadata to be one ensemble less than current metadata
         // since ensemble change might kick in when recovery changed metadata
         int diff = ensembles.size() - newEnsembles.size();
@@ -715,21 +722,21 @@ public class LedgerMetadata implements org.apache.bookkeeper.client.api.LedgerMe
             return;
         }
         int i = 0;
-        for (Entry<Long, ArrayList<BookieSocketAddress>> entry : newEnsembles.entrySet()) {
+        for (Entry<Long, ? extends List<BookieSocketAddress>> entry : newEnsembles.entrySet()) {
             ++i;
             if (ensembles.size() != i) {
                 // we should use last ensemble from current metadata
                 // not the new metadata read from zookeeper
                 long key = entry.getKey();
-                ArrayList<BookieSocketAddress> ensemble = entry.getValue();
-                ensembles.put(key, ensemble);
+                List<BookieSocketAddress> ensemble = entry.getValue();
+                ensembles.put(key, ImmutableList.copyOf(ensemble));
             }
         }
     }
 
     Set<BookieSocketAddress> getBookiesInThisLedger() {
         Set<BookieSocketAddress> bookies = new HashSet<BookieSocketAddress>();
-        for (ArrayList<BookieSocketAddress> ensemble : ensembles.values()) {
+        for (List<BookieSocketAddress> ensemble : ensembles.values()) {
             bookies.addAll(ensemble);
         }
         return bookies;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index 76332df..0aac38f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -91,12 +91,12 @@ class PendingReadOp implements ReadEntryCallback, SafeRunnable {
         int firstError = BKException.Code.OK;
         int numBookiesMissingEntry = 0;
 
-        final ArrayList<BookieSocketAddress> ensemble;
+        final List<BookieSocketAddress> ensemble;
         final DistributionSchedule.WriteSet writeSet;
         final LedgerEntryImpl entryImpl;
         final long eId;
 
-        LedgerEntryRequest(ArrayList<BookieSocketAddress> ensemble, long lId, long eId) {
+        LedgerEntryRequest(List<BookieSocketAddress> ensemble, long lId, long eId) {
             this.entryImpl = LedgerEntryImpl.create(lId, eId);
             this.ensemble = ensemble;
             this.eId = eId;
@@ -278,7 +278,7 @@ class PendingReadOp implements ReadEntryCallback, SafeRunnable {
 
         int numPendings;
 
-        ParallelReadRequest(ArrayList<BookieSocketAddress> ensemble, long lId, long eId) {
+        ParallelReadRequest(List<BookieSocketAddress> ensemble, long lId, long eId) {
             super(ensemble, lId, eId);
             numPendings = writeSet.size();
         }
@@ -328,7 +328,7 @@ class PendingReadOp implements ReadEntryCallback, SafeRunnable {
         final BitSet sentReplicas;
         final BitSet erroredReplicas;
 
-        SequenceReadRequest(ArrayList<BookieSocketAddress> ensemble, long lId, long eId) {
+        SequenceReadRequest(List<BookieSocketAddress> ensemble, long lId, long eId) {
             super(ensemble, lId, eId);
 
             this.sentReplicas = new BitSet(lh.getLedgerMetadata().getWriteQuorumSize());
@@ -519,7 +519,7 @@ class PendingReadOp implements ReadEntryCallback, SafeRunnable {
     void initiate() {
         long nextEnsembleChange = startEntryId, i = startEntryId;
         this.requestTimeNanos = MathUtils.nowInNano();
-        ArrayList<BookieSocketAddress> ensemble = null;
+        List<BookieSocketAddress> ensemble = null;
         do {
             if (i == nextEnsembleChange) {
                 ensemble = getLedgerMetadata().getEnsemble(i);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
index d41ca9f..1fd7580 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
@@ -19,7 +19,7 @@ package org.apache.bookkeeper.client;
 
 import io.netty.util.HashedWheelTimer;
 
-import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -95,7 +95,7 @@ public class RackawareEnsemblePlacementPolicy extends RackawareEnsemblePlacement
     }
 
     @Override
-    public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
+    public List<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
             Map<String, byte[]> customMetadata, Set<BookieSocketAddress> excludeBookies)
             throws BKException.BKNotEnoughBookiesException {
         try {
@@ -129,7 +129,7 @@ public class RackawareEnsemblePlacementPolicy extends RackawareEnsemblePlacement
 
     @Override
     public DistributionSchedule.WriteSet reorderReadSequence(
-            ArrayList<BookieSocketAddress> ensemble,
+            List<BookieSocketAddress> ensemble,
             BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
         return super.reorderReadSequence(ensemble, bookiesHealthInfo,
@@ -138,7 +138,7 @@ public class RackawareEnsemblePlacementPolicy extends RackawareEnsemblePlacement
 
     @Override
     public DistributionSchedule.WriteSet reorderReadLACSequence(
-            ArrayList<BookieSocketAddress> ensemble,
+            List<BookieSocketAddress> ensemble,
             BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
         return super.reorderReadLACSequence(ensemble, bookiesHealthInfo,
@@ -146,12 +146,12 @@ public class RackawareEnsemblePlacementPolicy extends RackawareEnsemblePlacement
     }
 
     @Override
-    public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize,
-                                                    int writeQuorumSize,
-                                                    int ackQuorumSize,
-                                                    Set<BookieSocketAddress> excludeBookies,
-                                                    Ensemble<BookieNode> parentEnsemble,
-                                                    Predicate<BookieNode> parentPredicate)
+    public List<BookieSocketAddress> newEnsemble(int ensembleSize,
+                                                 int writeQuorumSize,
+                                                 int ackQuorumSize,
+                                                 Set<BookieSocketAddress> excludeBookies,
+                                                 Ensemble<BookieNode> parentEnsemble,
+                                                 Predicate<BookieNode> parentPredicate)
             throws BKException.BKNotEnoughBookiesException {
         try {
             return super.newEnsemble(
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
index ae640bb..13efa5c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
@@ -479,17 +479,17 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP
     }
 
     @Override
-    public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
+    public List<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
             Map<String, byte[]> customMetadata, Set<BookieSocketAddress> excludeBookies)
             throws BKNotEnoughBookiesException {
         return newEnsembleInternal(ensembleSize, writeQuorumSize, excludeBookies, null, null);
     }
 
-    protected ArrayList<BookieSocketAddress> newEnsembleInternal(int ensembleSize,
-                                                               int writeQuorumSize,
-                                                               Set<BookieSocketAddress> excludeBookies,
-                                                               Ensemble<BookieNode> parentEnsemble,
-                                                               Predicate<BookieNode> parentPredicate)
+    protected List<BookieSocketAddress> newEnsembleInternal(int ensembleSize,
+                                                            int writeQuorumSize,
+                                                            Set<BookieSocketAddress> excludeBookies,
+                                                            Ensemble<BookieNode> parentEnsemble,
+                                                            Predicate<BookieNode> parentPredicate)
             throws BKNotEnoughBookiesException {
         return newEnsembleInternal(
                 ensembleSize,
@@ -501,12 +501,12 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP
     }
 
     @Override
-    public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize,
-                                                    int writeQuorumSize,
-                                                    int ackQuorumSize,
-                                                    Set<BookieSocketAddress> excludeBookies,
-                                                    Ensemble<BookieNode> parentEnsemble,
-                                                    Predicate<BookieNode> parentPredicate)
+    public List<BookieSocketAddress> newEnsemble(int ensembleSize,
+                                                 int writeQuorumSize,
+                                                 int ackQuorumSize,
+                                                 Set<BookieSocketAddress> excludeBookies,
+                                                 Ensemble<BookieNode> parentEnsemble,
+                                                 Predicate<BookieNode> parentPredicate)
             throws BKNotEnoughBookiesException {
         return newEnsembleInternal(
                 ensembleSize,
@@ -517,7 +517,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP
                 parentPredicate);
     }
 
-    protected ArrayList<BookieSocketAddress> newEnsembleInternal(
+    protected List<BookieSocketAddress> newEnsembleInternal(
             int ensembleSize,
             int writeQuorumSize,
             int ackQuorumSize,
@@ -624,7 +624,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP
                         !enforceMinNumRacksPerWriteQuorum || firstBookieInTheEnsemble);
                 racks[i] = prevNode.getNetworkLocation();
             }
-            ArrayList<BookieSocketAddress> bookieList = ensemble.toList();
+            List<BookieSocketAddress> bookieList = ensemble.toList();
             if (ensembleSize != bookieList.size()) {
                 LOG.error("Not enough {} bookies are available to form an ensemble : {}.",
                           ensembleSize, bookieList);
@@ -996,7 +996,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP
 
     @Override
     public DistributionSchedule.WriteSet reorderReadSequence(
-            ArrayList<BookieSocketAddress> ensemble,
+            List<BookieSocketAddress> ensemble,
             BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
         Map<Integer, String> writeSetWithRegion = new HashMap<>();
@@ -1037,7 +1037,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP
      * @return ordering of bookies to send read to
      */
     DistributionSchedule.WriteSet reorderReadSequenceWithRegion(
-        ArrayList<BookieSocketAddress> ensemble,
+        List<BookieSocketAddress> ensemble,
         DistributionSchedule.WriteSet writeSet,
         Map<Integer, String> writeSetWithRegion,
         BookiesHealthInfo bookiesHealthInfo,
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
index 1436990..b342b6f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
@@ -22,8 +22,8 @@ package org.apache.bookkeeper.client;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import io.netty.buffer.ByteBuf;
-import java.util.ArrayList;
 import java.util.BitSet;
+import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -73,12 +73,12 @@ class ReadLastConfirmedAndEntryOp implements BookkeeperInternalCallbacks.ReadEnt
         int firstError = BKException.Code.OK;
         int numMissedEntryReads = 0;
 
-        final ArrayList<BookieSocketAddress> ensemble;
+        final List<BookieSocketAddress> ensemble;
         final DistributionSchedule.WriteSet writeSet;
         final DistributionSchedule.WriteSet orderedEnsemble;
         final LedgerEntryImpl entryImpl;
 
-        ReadLACAndEntryRequest(ArrayList<BookieSocketAddress> ensemble, long lId, long eId) {
+        ReadLACAndEntryRequest(List<BookieSocketAddress> ensemble, long lId, long eId) {
             this.entryImpl = LedgerEntryImpl.create(lId, eId);
             this.ensemble = ensemble;
             this.writeSet = lh.getDistributionSchedule().getEnsembleSet(eId);
@@ -243,7 +243,7 @@ class ReadLastConfirmedAndEntryOp implements BookkeeperInternalCallbacks.ReadEnt
 
         int numPendings;
 
-        ParallelReadRequest(ArrayList<BookieSocketAddress> ensemble, long lId, long eId) {
+        ParallelReadRequest(List<BookieSocketAddress> ensemble, long lId, long eId) {
             super(ensemble, lId, eId);
             numPendings = orderedEnsemble.size();
         }
@@ -293,7 +293,7 @@ class ReadLastConfirmedAndEntryOp implements BookkeeperInternalCallbacks.ReadEnt
         final BitSet erroredReplicas;
         final BitSet emptyResponseReplicas;
 
-        SequenceReadRequest(ArrayList<BookieSocketAddress> ensemble, long lId, long eId) {
+        SequenceReadRequest(List<BookieSocketAddress> ensemble, long lId, long eId) {
             super(ensemble, lId, eId);
 
             this.sentReplicas = new BitSet(orderedEnsemble.size());
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
index dcd4b17..c52e0fe 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
@@ -225,7 +225,7 @@ public class RegionAwareEnsemblePlacementPolicy extends RackawareEnsemblePlaceme
 
 
     @Override
-    public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
+    public List<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
             Map<String, byte[]> customMetadata, Set<BookieSocketAddress> excludeBookies)
             throws BKException.BKNotEnoughBookiesException {
 
@@ -394,7 +394,7 @@ public class RegionAwareEnsemblePlacementPolicy extends RackawareEnsemblePlaceme
                 }
             } while ((remainingEnsemble > 0) && (remainingEnsemble < remainingEnsembleBeforeIteration));
 
-            ArrayList<BookieSocketAddress> bookieList = ensemble.toList();
+            List<BookieSocketAddress> bookieList = ensemble.toList();
             if (ensembleSize != bookieList.size()) {
                 LOG.error("Not enough {} bookies are available to form an ensemble : {}.",
                           ensembleSize, bookieList);
@@ -522,7 +522,7 @@ public class RegionAwareEnsemblePlacementPolicy extends RackawareEnsemblePlaceme
 
     @Override
     public final DistributionSchedule.WriteSet reorderReadSequence(
-            ArrayList<BookieSocketAddress> ensemble,
+            List<BookieSocketAddress> ensemble,
             BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
         if (UNKNOWN_REGION.equals(myRegion)) {
@@ -540,7 +540,7 @@ public class RegionAwareEnsemblePlacementPolicy extends RackawareEnsemblePlaceme
 
     @Override
     public final DistributionSchedule.WriteSet reorderReadLACSequence(
-            ArrayList<BookieSocketAddress> ensemble,
+            List<BookieSocketAddress> ensemble,
             BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
         if (UNKNOWN_REGION.equals(myRegion)) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
index 85917b1..19cb505 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
@@ -24,6 +24,7 @@ import com.google.common.collect.Sets;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 
 import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -51,7 +52,7 @@ abstract class TopologyAwareEnsemblePlacementPolicy implements
 
         public static final EnsembleForReplacementWithNoConstraints INSTANCE =
             new EnsembleForReplacementWithNoConstraints();
-        static final ArrayList<BookieSocketAddress> EMPTY_LIST = new ArrayList<BookieSocketAddress>(0);
+        static final List<BookieSocketAddress> EMPTY_LIST = new ArrayList<BookieSocketAddress>(0);
 
         @Override
         public boolean addNode(BookieNode node) {
@@ -60,7 +61,7 @@ abstract class TopologyAwareEnsemblePlacementPolicy implements
         }
 
         @Override
-        public ArrayList<BookieSocketAddress> toList() {
+        public List<BookieSocketAddress> toList() {
             return EMPTY_LIST;
         }
 
@@ -301,7 +302,7 @@ abstract class TopologyAwareEnsemblePlacementPolicy implements
         final int ackQuorumSize;
         final int minRacksOrRegionsForDurability;
         final int minNumRacksPerWriteQuorum;
-        final ArrayList<BookieNode> chosenNodes;
+        final List<BookieNode> chosenNodes;
         final Set<String> racksOrRegions;
         private final CoverageSet[] quorums;
         final Predicate<BookieNode> parentPredicate;
@@ -453,7 +454,7 @@ abstract class TopologyAwareEnsemblePlacementPolicy implements
         }
 
         @Override
-        public ArrayList<BookieSocketAddress> toList() {
+        public List<BookieSocketAddress> toList() {
             ArrayList<BookieSocketAddress> addresses = new ArrayList<BookieSocketAddress>(ensembleSize);
             for (BookieNode bn : chosenNodes) {
                 addresses.add(bn.getAddr());
@@ -490,7 +491,7 @@ abstract class TopologyAwareEnsemblePlacementPolicy implements
 
     @Override
     public DistributionSchedule.WriteSet reorderReadSequence(
-            ArrayList<BookieSocketAddress> ensemble,
+            List<BookieSocketAddress> ensemble,
             BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
         return writeSet;
@@ -498,7 +499,7 @@ abstract class TopologyAwareEnsemblePlacementPolicy implements
 
     @Override
     public DistributionSchedule.WriteSet reorderReadLACSequence(
-            ArrayList<BookieSocketAddress> ensemble,
+            List<BookieSocketAddress> ensemble,
             BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
         DistributionSchedule.WriteSet retList = reorderReadSequence(
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/UpdateLedgerOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/UpdateLedgerOp.java
index d0281c7..b55e2d0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/UpdateLedgerOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/UpdateLedgerOp.java
@@ -30,6 +30,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
@@ -221,10 +223,12 @@ public class UpdateLedgerOp {
                 return;
             }
             boolean updateEnsemble = false;
-            for (ArrayList<BookieSocketAddress> ensembles : metadata.getEnsembles().values()) {
-                int index = ensembles.indexOf(curBookieAddr);
+            for (Map.Entry<Long, ? extends List<BookieSocketAddress>> e : metadata.getEnsembles().entrySet()) {
+                List<BookieSocketAddress> newEnsemble = new ArrayList<>(e.getValue());
+                int index = newEnsemble.indexOf(curBookieAddr);
                 if (-1 != index) {
-                    ensembles.set(index, toBookieAddr);
+                    newEnsemble.set(index, toBookieAddr);
+                    metadata.updateEnsemble(e.getKey(), newEnsemble);
                     updateEnsemble = true;
                 }
             }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/BookieLedgerIndexer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/BookieLedgerIndexer.java
index b7b4285..027081a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/BookieLedgerIndexer.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/BookieLedgerIndexer.java
@@ -73,7 +73,7 @@ public class BookieLedgerIndexer {
                     public void operationComplete(int rc,
                             LedgerMetadata ledgerMetadata) {
                         if (rc == BKException.Code.OK) {
-                            for (Map.Entry<Long, ArrayList<BookieSocketAddress>> ensemble : ledgerMetadata
+                            for (Map.Entry<Long, ? extends List<BookieSocketAddress>> ensemble : ledgerMetadata
                                     .getEnsembles().entrySet()) {
                                 for (BookieSocketAddress bookie : ensemble
                                         .getValue()) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
index 7d51a7a..d5c85f1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
@@ -31,9 +31,9 @@ import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedMap;
@@ -369,8 +369,8 @@ public class ReplicationWorker implements Runnable {
             return false;
         }
 
-        SortedMap<Long, ArrayList<BookieSocketAddress>> ensembles = admin.getLedgerMetadata(lh).getEnsembles();
-        ArrayList<BookieSocketAddress> finalEnsemble = ensembles.get(ensembles.lastKey());
+        SortedMap<Long, ? extends List<BookieSocketAddress>> ensembles = admin.getLedgerMetadata(lh).getEnsembles();
+        List<BookieSocketAddress> finalEnsemble = ensembles.get(ensembles.lastKey());
         Collection<BookieSocketAddress> available = admin.getAvailableBookies();
         for (BookieSocketAddress b : finalEnsemble) {
             if (!available.contains(b)) {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestGcOverreplicatedLedger.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestGcOverreplicatedLedger.java
index 07cdd6b..9feb9df 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestGcOverreplicatedLedger.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestGcOverreplicatedLedger.java
@@ -25,7 +25,6 @@ import com.google.common.collect.Lists;
 
 import java.io.IOException;
 import java.net.UnknownHostException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
@@ -149,8 +148,8 @@ public class TestGcOverreplicatedLedger extends LedgerManagerTestCase {
             Assert.fail("No ledger metadata found");
         }
         BookieSocketAddress address = null;
-        SortedMap<Long, ArrayList<BookieSocketAddress>> ensembleMap = newLedgerMetadata.get().getEnsembles();
-        for (ArrayList<BookieSocketAddress> ensemble : ensembleMap.values()) {
+        SortedMap<Long, ? extends List<BookieSocketAddress>> ensembleMap = newLedgerMetadata.get().getEnsembles();
+        for (List<BookieSocketAddress> ensemble : ensembleMap.values()) {
             address = ensemble.get(0);
         }
         ServerConfiguration bkConf = getBkConf(address);
@@ -236,8 +235,8 @@ public class TestGcOverreplicatedLedger extends LedgerManagerTestCase {
         for (BookieServer bk : bs) {
             allAddresses.add(bk.getLocalAddress());
         }
-        SortedMap<Long, ArrayList<BookieSocketAddress>> ensembles = ledgerMetadata.getEnsembles();
-        for (ArrayList<BookieSocketAddress> fragmentEnsembles : ensembles.values()) {
+        SortedMap<Long, ? extends List<BookieSocketAddress>> ensembles = ledgerMetadata.getEnsembles();
+        for (List<BookieSocketAddress> fragmentEnsembles : ensembles.values()) {
             allAddresses.removeAll(fragmentEnsembles);
         }
         Assert.assertEquals(allAddresses.size(), 1);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
index 42ff0f9..b1ad88a 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
@@ -506,7 +506,7 @@ public class BookieRecoveryTest extends BookKeeperClusterTestCase {
     private boolean verifyFullyReplicated(LedgerHandle lh, long untilEntry) throws Exception {
         LedgerMetadata md = getLedgerMetadata(lh);
 
-        Map<Long, ArrayList<BookieSocketAddress>> ensembles = md.getEnsembles();
+        Map<Long, ? extends List<BookieSocketAddress>> ensembles = md.getEnsembles();
 
         HashMap<Long, Long> ranges = new HashMap<Long, Long>();
         ArrayList<Long> keyList = Collections.list(
@@ -517,7 +517,7 @@ public class BookieRecoveryTest extends BookKeeperClusterTestCase {
         }
         ranges.put(keyList.get(keyList.size() - 1), untilEntry);
 
-        for (Map.Entry<Long, ArrayList<BookieSocketAddress>> e : ensembles.entrySet()) {
+        for (Map.Entry<Long, ? extends List<BookieSocketAddress>> e : ensembles.entrySet()) {
             int quorum = md.getAckQuorumSize();
             long startEntryId = e.getKey();
             long endEntryId = ranges.get(startEntryId);
@@ -586,7 +586,7 @@ public class BookieRecoveryTest extends BookKeeperClusterTestCase {
         long numDupes = 0;
         for (LedgerHandle lh : lhs) {
             LedgerMetadata md = getLedgerMetadata(lh);
-            for (Map.Entry<Long, ArrayList<BookieSocketAddress>> e : md.getEnsembles().entrySet()) {
+            for (Map.Entry<Long, ? extends List<BookieSocketAddress>> e : md.getEnsembles().entrySet()) {
                 HashSet<BookieSocketAddress> set = new HashSet<BookieSocketAddress>();
                 long fragment = e.getKey();
 
@@ -619,7 +619,7 @@ public class BookieRecoveryTest extends BookKeeperClusterTestCase {
         closeLedgers(lhs);
 
         // Shutdown last bookie server in last ensemble
-        ArrayList<BookieSocketAddress> lastEnsemble = lhs.get(0).getLedgerMetadata().getEnsembles()
+        List<BookieSocketAddress> lastEnsemble = lhs.get(0).getLedgerMetadata().getEnsembles()
           .entrySet().iterator().next().getValue();
         BookieSocketAddress bookieToKill = lastEnsemble.get(lastEnsemble.size() - 1);
         killBookie(bookieToKill);
@@ -648,7 +648,7 @@ public class BookieRecoveryTest extends BookKeeperClusterTestCase {
         writeEntriestoLedgers(numMsgs, 0, lhs);
 
         // Shutdown the first bookie server
-        ArrayList<BookieSocketAddress> lastEnsemble = lhs.get(0).getLedgerMetadata().getEnsembles()
+        List<BookieSocketAddress> lastEnsemble = lhs.get(0).getLedgerMetadata().getEnsembles()
           .entrySet().iterator().next().getValue();
         BookieSocketAddress bookieToKill = lastEnsemble.get(lastEnsemble.size() - 1);
         killBookie(bookieToKill);
@@ -684,7 +684,7 @@ public class BookieRecoveryTest extends BookKeeperClusterTestCase {
         writeEntriestoLedgers(numMsgs, 0, lhs);
 
         // Shutdown the first bookie server
-        ArrayList<BookieSocketAddress> lastEnsemble = lhs.get(0).getLedgerMetadata().getEnsembles()
+        List<BookieSocketAddress> lastEnsemble = lhs.get(0).getLedgerMetadata().getEnsembles()
           .entrySet().iterator().next().getValue();
         // removed bookie
         BookieSocketAddress bookieToKill = lastEnsemble.get(0);
@@ -727,7 +727,7 @@ public class BookieRecoveryTest extends BookKeeperClusterTestCase {
         List<LedgerHandle> newLhs = openLedgers(lhs);
         for (LedgerHandle newLh : newLhs) {
             // first ensemble should contains bookieToKill2 and not contain bookieToKill
-            Map.Entry<Long, ArrayList<BookieSocketAddress>> entry =
+            Map.Entry<Long, ? extends List<BookieSocketAddress>> entry =
               newLh.getLedgerMetadata().getEnsembles().entrySet().iterator().next();
             assertFalse(entry.getValue().contains(bookieToKill));
             assertTrue(entry.getValue().contains(bookieToKill2));
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
index 75bb4a4..93683c8 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
@@ -144,7 +144,7 @@ public class BookieWriteLedgerTest extends
         startNewBookie();
 
         // Shutdown three bookies in the last ensemble and continue writing
-        ArrayList<BookieSocketAddress> ensemble = lh.getLedgerMetadata()
+        List<BookieSocketAddress> ensemble = lh.getLedgerMetadata()
                 .getEnsembles().entrySet().iterator().next().getValue();
         killBookie(ensemble.get(0));
         killBookie(ensemble.get(1));
@@ -189,7 +189,7 @@ public class BookieWriteLedgerTest extends
 
         CountDownLatch sleepLatch1 = new CountDownLatch(1);
         CountDownLatch sleepLatch2 = new CountDownLatch(1);
-        ArrayList<BookieSocketAddress> ensemble = lh.getLedgerMetadata()
+        List<BookieSocketAddress> ensemble = lh.getLedgerMetadata()
                 .getEnsembles().entrySet().iterator().next().getValue();
 
         sleepBookie(ensemble.get(0), sleepLatch1);
@@ -383,7 +383,7 @@ public class BookieWriteLedgerTest extends
         startNewBookie();
 
         // Shutdown one bookie in the last ensemble and continue writing
-        ArrayList<BookieSocketAddress> ensemble = lh.getLedgerMetadata().getEnsembles().entrySet().iterator().next()
+        List<BookieSocketAddress> ensemble = lh.getLedgerMetadata().getEnsembles().entrySet().iterator().next()
                 .getValue();
         killBookie(ensemble.get(0));
 
@@ -522,7 +522,7 @@ public class BookieWriteLedgerTest extends
         startNewBookie();
 
         // Shutdown one bookie in the last ensemble and continue writing
-        ArrayList<BookieSocketAddress> ensemble = lh.getLedgerMetadata().getEnsembles().entrySet().iterator().next()
+        List<BookieSocketAddress> ensemble = lh.getLedgerMetadata().getEnsembles().entrySet().iterator().next()
                 .getValue();
         killBookie(ensemble.get(0));
 
@@ -840,7 +840,7 @@ public class BookieWriteLedgerTest extends
         startNewBookie();
 
         // Shutdown three bookies in the last ensemble and continue writing
-        ArrayList<BookieSocketAddress> ensemble = lh.getLedgerMetadata()
+        List<BookieSocketAddress> ensemble = lh.getLedgerMetadata()
                 .getEnsembles().entrySet().iterator().next().getValue();
         killBookie(ensemble.get(0));
         killBookie(ensemble.get(1));
@@ -913,7 +913,7 @@ public class BookieWriteLedgerTest extends
         }
         // Start One more bookie and shutdown one from last ensemble before reading
         startNewBookie();
-        ArrayList<BookieSocketAddress> ensemble = lh.getLedgerMetadata().getEnsembles().entrySet().iterator().next()
+        List<BookieSocketAddress> ensemble = lh.getLedgerMetadata().getEnsembles().entrySet().iterator().next()
                 .getValue();
         killBookie(ensemble.get(0));
 
@@ -981,7 +981,7 @@ public class BookieWriteLedgerTest extends
         }
 
         CountDownLatch sleepLatch1 = new CountDownLatch(1);
-        ArrayList<BookieSocketAddress> ensemble;
+        List<BookieSocketAddress> ensemble;
 
         ensemble = lh.getLedgerMetadata().getEnsembles().entrySet().iterator().next().getValue();
 
@@ -1067,7 +1067,7 @@ public class BookieWriteLedgerTest extends
                 if (j == numEntriesToWrite / 2) {
                     // Start One more bookie and shutdown one from last ensemble at half-way
                     startNewBookie();
-                    ArrayList<BookieSocketAddress> ensemble = lh.getLedgerMetadata().getEnsembles().entrySet()
+                    List<BookieSocketAddress> ensemble = lh.getLedgerMetadata().getEnsembles().entrySet()
                             .iterator().next().getValue();
                     killBookie(ensemble.get(0));
                 }
@@ -1136,7 +1136,7 @@ public class BookieWriteLedgerTest extends
         }
         // Start One more bookie and shutdown one from last ensemble before reading
         startNewBookie();
-        ArrayList<BookieSocketAddress> ensemble = lh.getLedgerMetadata().getEnsembles().entrySet().iterator().next()
+        List<BookieSocketAddress> ensemble = lh.getLedgerMetadata().getEnsembles().entrySet().iterator().next()
                 .getValue();
         killBookie(ensemble.get(0));
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/GenericEnsemblePlacementPolicyTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/GenericEnsemblePlacementPolicyTest.java
index 36beaa2..bf369cf 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/GenericEnsemblePlacementPolicyTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/GenericEnsemblePlacementPolicyTest.java
@@ -132,7 +132,7 @@ public class GenericEnsemblePlacementPolicyTest extends BookKeeperClusterTestCas
                 try (LedgerHandle lh = bk.createLedger(2, 2, 2, digestType, PASSWORD.getBytes(), customMetadata)) {
                     lh.addEntry(value);
                     long lId = lh.getId();
-                    ArrayList<BookieSocketAddress> ensembleAtFirstEntry = lh.getLedgerMetadata().getEnsemble(lId);
+                    List<BookieSocketAddress> ensembleAtFirstEntry = lh.getLedgerMetadata().getEnsemble(lId);
                     assertEquals(2, ensembleAtFirstEntry.size());
                     killBookie(ensembleAtFirstEntry.get(0));
                     lh.addEntry(value);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDelayEnsembleChange.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDelayEnsembleChange.java
index cf693ed..62be7cb 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDelayEnsembleChange.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDelayEnsembleChange.java
@@ -103,7 +103,7 @@ public class TestDelayEnsembleChange extends BookKeeperClusterTestCase {
         LedgerMetadata md = lh.getLedgerMetadata();
 
         for (long eid = startEntry; eid < untilEntry; eid++) {
-            ArrayList<BookieSocketAddress> addresses = md.getEnsemble(eid);
+            List<BookieSocketAddress> addresses = md.getEnsemble(eid);
             VerificationCallback callback = new VerificationCallback(addresses.size());
             for (BookieSocketAddress addr : addresses) {
                 bkc.getBookieClient().readEntry(addr, lh.getId(), eid,
@@ -121,7 +121,7 @@ public class TestDelayEnsembleChange extends BookKeeperClusterTestCase {
         LedgerMetadata md = lh.getLedgerMetadata();
 
         for (long eid = startEntry; eid < untilEntry; eid++) {
-            ArrayList<BookieSocketAddress> addresses = md.getEnsemble(eid);
+            List<BookieSocketAddress> addresses = md.getEnsemble(eid);
             VerificationCallback callback = new VerificationCallback(addresses.size());
             for (BookieSocketAddress addr : addresses) {
                 bkc.getBookieClient().readEntry(addr, lh.getId(), eid,
@@ -257,8 +257,8 @@ public class TestDelayEnsembleChange extends BookKeeperClusterTestCase {
                         CLIENT_SCOPE + "." + WATCHER_SCOPE + "." + REPLACE_BOOKIE_TIME)
                         .getSuccessCount() > 0);
 
-        ArrayList<BookieSocketAddress> firstFragment = lh.getLedgerMetadata().getEnsemble(0);
-        ArrayList<BookieSocketAddress> secondFragment = lh.getLedgerMetadata().getEnsemble(3 * numEntries);
+        List<BookieSocketAddress> firstFragment = lh.getLedgerMetadata().getEnsemble(0);
+        List<BookieSocketAddress> secondFragment = lh.getLedgerMetadata().getEnsemble(3 * numEntries);
         assertFalse(firstFragment.get(0).equals(secondFragment.get(0)));
         assertFalse(firstFragment.get(1).equals(secondFragment.get(1)));
         assertFalse(firstFragment.get(2).equals(secondFragment.get(2)));
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerChecker.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerChecker.java
index 20308e6..16a0958 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerChecker.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerChecker.java
@@ -24,7 +24,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
-import java.util.ArrayList;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -137,7 +137,7 @@ public class TestLedgerChecker extends BookKeeperClusterTestCase {
         // Entry should have added in first 2 Bookies.
 
         // Kill the 3rd BK from ensemble.
-        ArrayList<BookieSocketAddress> firstEnsemble = lh.getLedgerMetadata()
+        List<BookieSocketAddress> firstEnsemble = lh.getLedgerMetadata()
                 .getEnsembles().get(0L);
         BookieSocketAddress lastBookieFromEnsemble = firstEnsemble.get(2);
         LOG.info("Killing " + lastBookieFromEnsemble + " from ensemble="
@@ -180,7 +180,7 @@ public class TestLedgerChecker extends BookKeeperClusterTestCase {
         startNewBookie();
         lh.addEntry(TEST_LEDGER_ENTRY_DATA);
 
-        ArrayList<BookieSocketAddress> firstEnsemble = lh.getLedgerMetadata()
+        List<BookieSocketAddress> firstEnsemble = lh.getLedgerMetadata()
                 .getEnsembles().get(0L);
 
         BookieSocketAddress firstBookieFromEnsemble = firstEnsemble.get(0);
@@ -213,7 +213,7 @@ public class TestLedgerChecker extends BookKeeperClusterTestCase {
         LedgerHandle lh = bkc.createLedger(3, 2, BookKeeper.DigestType.CRC32,
                 TEST_LEDGER_PASSWORD);
 
-        ArrayList<BookieSocketAddress> firstEnsemble = lh.getLedgerMetadata()
+        List<BookieSocketAddress> firstEnsemble = lh.getLedgerMetadata()
                 .getEnsembles().get(0L);
         BookieSocketAddress firstBookieFromEnsemble = firstEnsemble.get(0);
         killBookie(firstBookieFromEnsemble);
@@ -258,7 +258,7 @@ public class TestLedgerChecker extends BookKeeperClusterTestCase {
         }
 
         // Kill all three bookies
-        ArrayList<BookieSocketAddress> firstEnsemble = lh.getLedgerMetadata()
+        List<BookieSocketAddress> firstEnsemble = lh.getLedgerMetadata()
                 .getEnsembles().get(0L);
         for (BookieSocketAddress bkAddr : firstEnsemble) {
             killBookie(firstEnsemble, bkAddr);
@@ -321,7 +321,7 @@ public class TestLedgerChecker extends BookKeeperClusterTestCase {
         LedgerHandle lh = bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32,
                 TEST_LEDGER_PASSWORD);
         lh.addEntry(TEST_LEDGER_ENTRY_DATA);
-        ArrayList<BookieSocketAddress> firstEnsemble = lh.getLedgerMetadata()
+        List<BookieSocketAddress> firstEnsemble = lh.getLedgerMetadata()
                 .getEnsembles().get(0L);
         BookieSocketAddress lastBookieFromEnsemble = firstEnsemble.get(0);
         LOG.info("Killing " + lastBookieFromEnsemble + " from ensemble="
@@ -355,7 +355,7 @@ public class TestLedgerChecker extends BookKeeperClusterTestCase {
         for (int i = 0; i < 10; i++) {
             lh.addEntry(TEST_LEDGER_ENTRY_DATA);
         }
-        ArrayList<BookieSocketAddress> firstEnsemble = lh.getLedgerMetadata()
+        List<BookieSocketAddress> firstEnsemble = lh.getLedgerMetadata()
                 .getEnsembles().get(0L);
         DistributionSchedule.WriteSet writeSet = lh.getDistributionSchedule().getWriteSet(lh.getLastAddPushed());
         BookieSocketAddress lastBookieFromEnsemble = firstEnsemble.get(writeSet.get(0));
@@ -400,7 +400,7 @@ public class TestLedgerChecker extends BookKeeperClusterTestCase {
     public void testClosedEmptyLedger() throws Exception {
         LedgerHandle lh = bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32,
                 TEST_LEDGER_PASSWORD);
-        ArrayList<BookieSocketAddress> firstEnsemble = lh.getLedgerMetadata()
+        List<BookieSocketAddress> firstEnsemble = lh.getLedgerMetadata()
                 .getEnsembles().get(0L);
         lh.close();
 
@@ -427,7 +427,7 @@ public class TestLedgerChecker extends BookKeeperClusterTestCase {
     public void testClosedSingleEntryLedger() throws Exception {
         LedgerHandle lh = bkc.createLedger(3, 2, BookKeeper.DigestType.CRC32,
                 TEST_LEDGER_PASSWORD);
-        ArrayList<BookieSocketAddress> firstEnsemble = lh.getLedgerMetadata()
+        List<BookieSocketAddress> firstEnsemble = lh.getLedgerMetadata()
             .getEnsembles().get(0L);
         lh.addEntry(TEST_LEDGER_ENTRY_DATA);
         lh.close();
@@ -496,7 +496,7 @@ public class TestLedgerChecker extends BookKeeperClusterTestCase {
         return result;
     }
 
-    private void killBookie(ArrayList<BookieSocketAddress> firstEnsemble, BookieSocketAddress ensemble)
+    private void killBookie(List<BookieSocketAddress> firstEnsemble, BookieSocketAddress ensemble)
             throws Exception {
         LOG.info("Killing " + ensemble + " from ensemble=" + firstEnsemble);
         killBookie(ensemble);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java
index 883dc9c..b26798b 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java
@@ -25,8 +25,8 @@ import static org.junit.Assert.fail;
 
 import com.google.common.collect.Sets;
 
-import java.util.ArrayList;
 import java.util.Enumeration;
+import java.util.List;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.SortedMap;
@@ -111,12 +111,10 @@ public class TestLedgerFragmentReplication extends BookKeeperClusterTestCase {
         }
 
         // Killing all bookies except newly replicated bookie
-        SortedMap<Long, ArrayList<BookieSocketAddress>> allBookiesBeforeReplication = lh
+        SortedMap<Long, ? extends List<BookieSocketAddress>> allBookiesBeforeReplication = lh
                 .getLedgerMetadata().getEnsembles();
-        Set<Entry<Long, ArrayList<BookieSocketAddress>>> entrySet = allBookiesBeforeReplication
-                .entrySet();
-        for (Entry<Long, ArrayList<BookieSocketAddress>> entry : entrySet) {
-            ArrayList<BookieSocketAddress> bookies = entry.getValue();
+        for (Entry<Long, ? extends List<BookieSocketAddress>> entry : allBookiesBeforeReplication.entrySet()) {
+            List<BookieSocketAddress> bookies = entry.getValue();
             for (BookieSocketAddress bookie : bookies) {
                 if (newBkAddr.equals(bookie)) {
                     continue;
@@ -237,7 +235,7 @@ public class TestLedgerFragmentReplication extends BookKeeperClusterTestCase {
         LedgerMetadata metadata = new LedgerMetadata(3, 3, 3, TEST_DIGEST_TYPE,
                 TEST_PSSWD) {
             @Override
-            ArrayList<BookieSocketAddress> getEnsemble(long entryId) {
+            List<BookieSocketAddress> getEnsemble(long entryId) {
                 return null;
             }
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java
index ce851a5..ba500fc 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java
@@ -27,8 +27,8 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import org.apache.bookkeeper.client.BKException.Code;
@@ -151,8 +151,7 @@ public class TestParallelRead extends BookKeeperClusterTestCase {
 
         LedgerHandle lh = bkc.openLedger(id, digestType, passwd);
 
-        ArrayList<BookieSocketAddress> ensemble =
-                lh.getLedgerMetadata().getEnsemble(10);
+        List<BookieSocketAddress> ensemble = lh.getLedgerMetadata().getEnsemble(10);
         CountDownLatch latch1 = new CountDownLatch(1);
         CountDownLatch latch2 = new CountDownLatch(1);
         // sleep two bookie
@@ -184,8 +183,7 @@ public class TestParallelRead extends BookKeeperClusterTestCase {
 
         LedgerHandle lh = bkc.openLedger(id, digestType, passwd);
 
-        ArrayList<BookieSocketAddress> ensemble =
-                lh.getLedgerMetadata().getEnsemble(5);
+        List<BookieSocketAddress> ensemble = lh.getLedgerMetadata().getEnsemble(5);
         // kill two bookies
         killBookie(ensemble.get(0));
         killBookie(ensemble.get(1));
@@ -222,8 +220,7 @@ public class TestParallelRead extends BookKeeperClusterTestCase {
 
         LedgerHandle lh = bkc.openLedger(id, digestType, passwd);
 
-        ArrayList<BookieSocketAddress> ensemble =
-                lh.getLedgerMetadata().getEnsemble(5);
+        List<BookieSocketAddress> ensemble = lh.getLedgerMetadata().getEnsemble(5);
         // kill two bookies
         killBookie(ensemble.get(0));
         killBookie(ensemble.get(1));
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
index e085006..dd95b7a 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
@@ -63,7 +63,7 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
     static final Logger LOG = LoggerFactory.getLogger(TestRackawareEnsemblePlacementPolicy.class);
 
     RackawareEnsemblePlacementPolicy repp;
-    final ArrayList<BookieSocketAddress> ensemble = new ArrayList<BookieSocketAddress>();
+    final List<BookieSocketAddress> ensemble = new ArrayList<BookieSocketAddress>();
     DistributionSchedule.WriteSet writeSet = DistributionSchedule.NULL_WRITE_SET;
     ClientConfiguration conf = new ClientConfiguration();
     BookieSocketAddress addr1, addr2, addr3, addr4;
@@ -453,7 +453,7 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
         bookiePendingMap.put(addr5, 9L); // not in write set
         bookiePendingMap.put(addr6, 2L); // best bookie -> this one first
         bookiePendingMap.put(addr7, 10L);
-        ArrayList<BookieSocketAddress> ensemble = new ArrayList<BookieSocketAddress>();
+        List<BookieSocketAddress> ensemble = new ArrayList<BookieSocketAddress>();
         ensemble.add(addr1);
         ensemble.add(addr2);
         ensemble.add(addr3);
@@ -669,9 +669,9 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
         addrs.add(addr4);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
-            ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2, null, new HashSet<>());
+            List<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2, null, new HashSet<>());
             assertEquals(0, getNumCoveredWriteQuorums(ensemble, 2, conf.getMinNumRacksPerWriteQuorum()));
-            ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, null, new HashSet<>());
+            List<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, null, new HashSet<>());
             assertEquals(0, getNumCoveredWriteQuorums(ensemble2, 2, conf.getMinNumRacksPerWriteQuorum()));
         } catch (BKNotEnoughBookiesException bnebe) {
             fail("Should not get not enough bookies exception even there is only one rack.");
@@ -703,7 +703,7 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
         addrs.add(addr4);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
 
-        ArrayList<BookieSocketAddress> ensemble;
+        List<BookieSocketAddress> ensemble;
         try {
             ensemble = repp.newEnsemble(3, 2, 2, null, new HashSet<>());
             fail("Should get not enough bookies exception since there is only one rack.");
@@ -765,7 +765,7 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
          * and there are enough bookies in 3 racks, this newEnsemble calls should
          * succeed.
          */
-        ArrayList<BookieSocketAddress> ensemble;
+        List<BookieSocketAddress> ensemble;
         int ensembleSize = numOfRacks * numOfBookiesPerRack;
         int writeQuorumSize = numOfRacks;
         int ackQuorumSize = numOfRacks;
@@ -821,7 +821,7 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
          * ensembleSizes (as long as there are enough number of bookies in each
          * rack).
          */
-        ArrayList<BookieSocketAddress> ensemble;
+        List<BookieSocketAddress> ensemble;
         for (int ensembleSize = effectiveMinNumRacksPerWriteQuorum; ensembleSize < 40; ensembleSize++) {
             ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, null, new HashSet<>());
             assertEquals("Number of writeQuorum sets covered", ensembleSize,
@@ -872,7 +872,7 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
          * and there are enough bookies in 3 racks, this newEnsemble call should
          * succeed.
          */
-        ArrayList<BookieSocketAddress> ensemble;
+        List<BookieSocketAddress> ensemble;
         int ensembleSize = numOfRacks * numOfBookiesPerRack;
         int writeQuorumSize = numOfRacks;
         int ackQuorumSize = numOfRacks;
@@ -1198,12 +1198,12 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
             int ensembleSize = 3;
             int writeQuorumSize = 2;
             int acqQuorumSize = 2;
-            ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize, acqQuorumSize,
+            List<BookieSocketAddress> ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize, acqQuorumSize,
                     null, new HashSet<>());
             int numCovered = getNumCoveredWriteQuorums(ensemble, 2, conf.getMinNumRacksPerWriteQuorum());
             assertTrue(numCovered >= 1 && numCovered < 3);
             ensembleSize = 4;
-            ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(ensembleSize, writeQuorumSize, acqQuorumSize,
+            List<BookieSocketAddress> ensemble2 = repp.newEnsemble(ensembleSize, writeQuorumSize, acqQuorumSize,
                     null, new HashSet<>());
             numCovered = getNumCoveredWriteQuorums(ensemble2, 2, conf.getMinNumRacksPerWriteQuorum());
             assertTrue(numCovered >= 1 && numCovered < 3);
@@ -1276,7 +1276,7 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
         repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
 
-        ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize, writeQuorumSize, null,
+        List<BookieSocketAddress> ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize, writeQuorumSize, null,
                 new HashSet<>());
         int numCovered = getNumCoveredWriteQuorums(ensemble, writeQuorumSize, minNumRacksPerWriteQuorum);
         assertEquals("minimum number of racks covered for writequorum ensemble: " + ensemble, ensembleSize, numCovered);
@@ -1317,12 +1317,12 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
             int ensembleSize = 3;
             int writeQuorumSize = 3;
             int ackQuorumSize = 2;
-            ArrayList<BookieSocketAddress> ensemble1 = repp.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize,
+            List<BookieSocketAddress> ensemble1 = repp.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize,
                     null, new HashSet<>());
             assertEquals(ensembleSize, getNumCoveredWriteQuorums(ensemble1, 2, conf.getMinNumRacksPerWriteQuorum()));
             ensembleSize = 4;
             writeQuorumSize = 4;
-            ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(ensembleSize, writeQuorumSize, 2, null,
+            List<BookieSocketAddress> ensemble2 = repp.newEnsemble(ensembleSize, writeQuorumSize, 2, null,
                     new HashSet<>());
             assertEquals(ensembleSize, getNumCoveredWriteQuorums(ensemble2, 2, conf.getMinNumRacksPerWriteQuorum()));
         } catch (BKNotEnoughBookiesException bnebe) {
@@ -1554,7 +1554,7 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
         int numTries = 10000;
 
         Set<BookieSocketAddress> excludeList = new HashSet<BookieSocketAddress>();
-        ArrayList<BookieSocketAddress> ensemble;
+        List<BookieSocketAddress> ensemble;
         int ensembleSize = 3;
         int writeQuorumSize = 2;
         int acqQuorumSize = 2;
@@ -1625,7 +1625,7 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
 
         repp.updateBookieInfo(bookieInfoMap);
 
-        ArrayList<BookieSocketAddress> ensemble = new ArrayList<BookieSocketAddress>();
+        List<BookieSocketAddress> ensemble = new ArrayList<BookieSocketAddress>();
         Set<BookieSocketAddress> excludeList = new HashSet<BookieSocketAddress>();
         try {
             excludeList.add(addr1);
@@ -1644,7 +1644,7 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
         }
     }
 
-    static int getNumCoveredWriteQuorums(ArrayList<BookieSocketAddress> ensemble, int writeQuorumSize,
+    static int getNumCoveredWriteQuorums(List<BookieSocketAddress> ensemble, int writeQuorumSize,
             int minNumRacksPerWriteQuorumConfValue) throws Exception {
         int ensembleSize = ensemble.size();
         int numCoveredWriteQuorums = 0;
@@ -1722,13 +1722,13 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
 
         // we will never use addr4 even it is in the stabilized network topology
         for (int i = 0; i < 5; i++) {
-            ArrayList<BookieSocketAddress> ensemble =
+            List<BookieSocketAddress> ensemble =
                     repp.newEnsemble(3, 3, 3, null, new HashSet<BookieSocketAddress>());
             assertFalse(ensemble.contains(addr4));
         }
 
         // we could still use addr4 for urgent allocation if it is just bookie flapping
-        ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(4, 4, 4, null, new HashSet<BookieSocketAddress>());
+        List<BookieSocketAddress> ensemble = repp.newEnsemble(4, 4, 4, null, new HashSet<BookieSocketAddress>());
         assertTrue(ensemble.contains(addr4));
     }
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicyUsingScript.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicyUsingScript.java
index 524d1d5..d9f2535 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicyUsingScript.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicyUsingScript.java
@@ -30,8 +30,8 @@ import static org.junit.Assert.fail;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 import io.netty.util.HashedWheelTimer;
-import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -257,9 +257,9 @@ public class TestRackawareEnsemblePlacementPolicyUsingScript {
         addrs.add(addr4);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
-            ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2, null, new HashSet<>());
+            List<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2, null, new HashSet<>());
             assertEquals(0, getNumCoveredWriteQuorums(ensemble, 2));
-            ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, null, new HashSet<>());
+            List<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, null, new HashSet<>());
             assertEquals(0, getNumCoveredWriteQuorums(ensemble2, 2));
         } catch (BKNotEnoughBookiesException bnebe) {
             fail("Should not get not enough bookies exception even there is only one rack.");
@@ -281,10 +281,10 @@ public class TestRackawareEnsemblePlacementPolicyUsingScript {
         addrs.add(addr4);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
-            ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2, null, new HashSet<>());
+            List<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2, null, new HashSet<>());
             int numCovered = getNumCoveredWriteQuorums(ensemble, 2);
             assertTrue(numCovered == 2);
-            ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, null, new HashSet<>());
+            List<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, null, new HashSet<>());
             numCovered = getNumCoveredWriteQuorums(ensemble2, 2);
             assertTrue(numCovered == 2);
         } catch (BKNotEnoughBookiesException bnebe) {
@@ -315,9 +315,9 @@ public class TestRackawareEnsemblePlacementPolicyUsingScript {
         addrs.add(addr8);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
-            ArrayList<BookieSocketAddress> ensemble1 = repp.newEnsemble(3, 2, 2, null, new HashSet<>());
+            List<BookieSocketAddress> ensemble1 = repp.newEnsemble(3, 2, 2, null, new HashSet<>());
             assertEquals(3, getNumCoveredWriteQuorums(ensemble1, 2));
-            ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, null, new HashSet<>());
+            List<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, null, new HashSet<>());
             assertEquals(4, getNumCoveredWriteQuorums(ensemble2, 2));
         } catch (BKNotEnoughBookiesException bnebe) {
             fail("Should not get not enough bookies exception.");
@@ -346,7 +346,7 @@ public class TestRackawareEnsemblePlacementPolicyUsingScript {
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
     }
 
-    private int getNumCoveredWriteQuorums(ArrayList<BookieSocketAddress> ensemble, int writeQuorumSize)
+    private int getNumCoveredWriteQuorums(List<BookieSocketAddress> ensemble, int writeQuorumSize)
             throws Exception {
         int ensembleSize = ensemble.size();
         int numCoveredWriteQuorums = 0;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java
index b1ecdba..7dc1d39 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java
@@ -104,7 +104,7 @@ public class TestRackawarePolicyNotificationUpdates extends TestCase {
         int ensembleSize = 3;
         int writeQuorumSize = 2;
         int acqQuorumSize = 2;
-        ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize, acqQuorumSize,
+        List<BookieSocketAddress> ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize, acqQuorumSize,
                 Collections.emptyMap(), Collections.emptySet());
         int numCovered = TestRackawareEnsemblePlacementPolicy.getNumCoveredWriteQuorums(ensemble, writeQuorumSize,
                 conf.getMinNumRacksPerWriteQuorum());
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadEntryListener.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadEntryListener.java
index 39942b3..66956b2 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadEntryListener.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadEntryListener.java
@@ -24,8 +24,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
-import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
@@ -229,7 +229,7 @@ public class TestReadEntryListener extends BookKeeperClusterTestCase {
 
         LedgerHandle lh = bkc.openLedger(id, digestType, passwd);
 
-        ArrayList<BookieSocketAddress> ensemble =
+        List<BookieSocketAddress> ensemble =
                 lh.getLedgerMetadata().getEnsemble(5);
         // kill two bookies
         killBookie(ensemble.get(0));
@@ -269,7 +269,7 @@ public class TestReadEntryListener extends BookKeeperClusterTestCase {
 
         LedgerHandle lh = bkc.openLedger(id, digestType, passwd);
 
-        ArrayList<BookieSocketAddress> ensemble =
+        List<BookieSocketAddress> ensemble =
             lh.getLedgerMetadata().getEnsemble(5);
         // kill bookies
         killBookie(ensemble.get(0));
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
index ed8cf29..e541230 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
@@ -475,7 +475,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
         addrs.add(addr4);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
-            ArrayList<BookieSocketAddress> list = repp.newEnsemble(5, 5, 3, null, new HashSet<BookieSocketAddress>());
+            List<BookieSocketAddress> list = repp.newEnsemble(5, 5, 3, null, new HashSet<BookieSocketAddress>());
             LOG.info("Ensemble : {}", list);
             fail("Should throw BKNotEnoughBookiesException when there is not enough bookies");
         } catch (BKNotEnoughBookiesException bnebe) {
@@ -536,10 +536,10 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
         addrs.add(addr4);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
-            ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2, null,
+            List<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2, null,
                     new HashSet<BookieSocketAddress>());
             assertEquals(0, getNumCoveredRegionsInWriteQuorum(ensemble, 2));
-            ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, null,
+            List<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, null,
                     new HashSet<BookieSocketAddress>());
             assertEquals(0, getNumCoveredRegionsInWriteQuorum(ensemble2, 2));
         } catch (BKNotEnoughBookiesException bnebe) {
@@ -569,7 +569,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
         addrs.add(addr4);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
-            ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2, null,
+            List<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2, null,
                     new HashSet<BookieSocketAddress>());
             int numCovered = getNumCoveredRegionsInWriteQuorum(ensemble, 2);
             assertTrue(numCovered >= 1);
@@ -578,7 +578,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
             fail("Should not get not enough bookies exception even there is only one rack.");
         }
         try {
-            ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, null,
+            List<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, null,
                     new HashSet<BookieSocketAddress>());
             int numCovered = getNumCoveredRegionsInWriteQuorum(ensemble2, 2);
             assertTrue(numCovered >= 1 && numCovered < 3);
@@ -618,10 +618,10 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
         addrs.add(addr8);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
-            ArrayList<BookieSocketAddress> ensemble1 = repp.newEnsemble(3, 2, 2, null,
+            List<BookieSocketAddress> ensemble1 = repp.newEnsemble(3, 2, 2, null,
                     new HashSet<BookieSocketAddress>());
             assertEquals(3, getNumCoveredRegionsInWriteQuorum(ensemble1, 2));
-            ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, null,
+            List<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, null,
                     new HashSet<BookieSocketAddress>());
             assertEquals(4, getNumCoveredRegionsInWriteQuorum(ensemble2, 2));
         } catch (BKNotEnoughBookiesException bnebe) {
@@ -669,7 +669,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
         addrs.add(addr10);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
-            ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(6, 6, 4, null,
+            List<BookieSocketAddress> ensemble = repp.newEnsemble(6, 6, 4, null,
                     new HashSet<BookieSocketAddress>());
             assert(ensemble.contains(addr4));
             assert(ensemble.contains(addr8));
@@ -738,7 +738,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
             ((SettableFeature) featureProvider.scope("region1").getFeature("disallowBookies")).set(true);
-            ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(6, 6, 4, null,
+            List<BookieSocketAddress> ensemble = repp.newEnsemble(6, 6, 4, null,
                     new HashSet<BookieSocketAddress>());
             assertEquals(2, getNumRegionsInEnsemble(ensemble));
             assert(ensemble.contains(addr1));
@@ -753,7 +753,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
         }
         try {
             ((SettableFeature) featureProvider.scope("region2").getFeature("disallowBookies")).set(true);
-            ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(6, 6, 4, null,
+            List<BookieSocketAddress> ensemble = repp.newEnsemble(6, 6, 4, null,
                     new HashSet<BookieSocketAddress>());
             fail("Should get not enough bookies exception even there is only one region with insufficient bookies.");
         } catch (BKNotEnoughBookiesException bnebe) {
@@ -761,7 +761,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
         }
         try {
             ((SettableFeature) featureProvider.scope("region2").getFeature("disallowBookies")).set(false);
-            ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(6, 6, 4, null,
+            List<BookieSocketAddress> ensemble = repp.newEnsemble(6, 6, 4, null,
                     new HashSet<BookieSocketAddress>());
             assert(ensemble.contains(addr1));
             assert(ensemble.contains(addr3));
@@ -835,7 +835,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
 
         try {
-            ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(10, 10, 10, null,
+            List<BookieSocketAddress> ensemble = repp.newEnsemble(10, 10, 10, null,
                     new HashSet<BookieSocketAddress>());
             assert(ensemble.size() == 10);
             assertEquals(5, getNumRegionsInEnsemble(ensemble));
@@ -847,7 +847,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
         try {
             Set<BookieSocketAddress> excludedAddrs = new HashSet<BookieSocketAddress>();
             excludedAddrs.add(addr10);
-            ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(10, 10, 10, null, excludedAddrs);
+            List<BookieSocketAddress> ensemble = repp.newEnsemble(10, 10, 10, null, excludedAddrs);
             assert(ensemble.contains(addr11) && ensemble.contains(addr12));
             assert(ensemble.size() == 10);
             assertEquals(5, getNumRegionsInEnsemble(ensemble));
@@ -937,7 +937,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
             ackQuorum = 5;
         }
 
-        ArrayList<BookieSocketAddress> ensemble;
+        List<BookieSocketAddress> ensemble;
         try {
             ensemble = repp.newEnsemble(6, 6, ackQuorum, null, new HashSet<BookieSocketAddress>());
             assert(ensemble.size() == 6);
@@ -1073,7 +1073,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
                     .set(true);
         }
 
-        ArrayList<BookieSocketAddress> ensemble;
+        List<BookieSocketAddress> ensemble;
         try {
             ensemble = repp.newEnsemble(6, 6, 4, null, new HashSet<BookieSocketAddress>());
             assert(ensemble.size() == 6);
@@ -1141,7 +1141,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
         excludedAddrs.add(addr10);
         excludedAddrs.add(addr9);
         try {
-            ArrayList<BookieSocketAddress> list = repp.newEnsemble(5, 5, 5, null, excludedAddrs);
+            List<BookieSocketAddress> list = repp.newEnsemble(5, 5, 5, null, excludedAddrs);
             LOG.info("Ensemble : {}", list);
             fail("Should throw BKNotEnoughBookiesException when there is not enough bookies");
         } catch (BKNotEnoughBookiesException bnebe) {
@@ -1202,7 +1202,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
     private void basicReorderReadSequenceWithLocalRegionTest(String myRegion, boolean isReadLAC) throws Exception {
         prepareNetworkTopologyForReorderTests(myRegion);
 
-        ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(9, 9, 5, null, new HashSet<BookieSocketAddress>());
+        List<BookieSocketAddress> ensemble = repp.newEnsemble(9, 9, 5, null, new HashSet<BookieSocketAddress>());
         assertEquals(9, getNumCoveredRegionsInWriteQuorum(ensemble, 9));
 
         DistributionSchedule ds = new RoundRobinDistributionSchedule(9, 9, 9);
@@ -1258,7 +1258,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
     private void basicReorderReadSequenceWithRemoteRegionTest(String myRegion, boolean isReadLAC) throws Exception {
         prepareNetworkTopologyForReorderTests(myRegion);
 
-        ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(9, 9, 5, null, new HashSet<BookieSocketAddress>());
+        List<BookieSocketAddress> ensemble = repp.newEnsemble(9, 9, 5, null, new HashSet<BookieSocketAddress>());
         assertEquals(9, getNumCoveredRegionsInWriteQuorum(ensemble, 9));
 
         DistributionSchedule ds = new RoundRobinDistributionSchedule(9, 9, 9);
@@ -1296,7 +1296,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
         reorderReadSequenceWithUnavailableOrReadOnlyBookiesTest(true);
     }
 
-    static Set<BookieSocketAddress> getBookiesForRegion(ArrayList<BookieSocketAddress> ensemble, String region) {
+    static Set<BookieSocketAddress> getBookiesForRegion(List<BookieSocketAddress> ensemble, String region) {
         Set<BookieSocketAddress> regionBookies = new HashSet<BookieSocketAddress>();
         for (BookieSocketAddress address : ensemble) {
             String r = StaticDNSResolver.getRegion(address.getHostName());
@@ -1307,7 +1307,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
         return regionBookies;
     }
 
-    static void appendBookieIndexByRegion(ArrayList<BookieSocketAddress> ensemble,
+    static void appendBookieIndexByRegion(List<BookieSocketAddress> ensemble,
                                           DistributionSchedule.WriteSet writeSet,
                                           String region,
                                           List<Integer> finalSet) {
@@ -1328,7 +1328,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
 
         prepareNetworkTopologyForReorderTests(myRegion);
 
-        ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(9, 9, 5, null, new HashSet<BookieSocketAddress>());
+        List<BookieSocketAddress> ensemble = repp.newEnsemble(9, 9, 5, null, new HashSet<BookieSocketAddress>());
         assertEquals(9, getNumCoveredRegionsInWriteQuorum(ensemble, 9));
 
         DistributionSchedule ds = new RoundRobinDistributionSchedule(9, 9, 9);
@@ -1372,7 +1372,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
         }
     }
 
-    private int getNumRegionsInEnsemble(ArrayList<BookieSocketAddress> ensemble) {
+    private int getNumRegionsInEnsemble(List<BookieSocketAddress> ensemble) {
         Set<String> regions = new HashSet<String>();
         for (BookieSocketAddress addr: ensemble) {
             regions.add(StaticDNSResolver.getRegion(addr.getHostName()));
@@ -1380,7 +1380,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
         return regions.size();
     }
 
-    private int getNumCoveredRegionsInWriteQuorum(ArrayList<BookieSocketAddress> ensemble, int writeQuorumSize)
+    private int getNumCoveredRegionsInWriteQuorum(List<BookieSocketAddress> ensemble, int writeQuorumSize)
             throws Exception {
         int ensembleSize = ensemble.size();
         int numCoveredWriteQuorums = 0;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSequenceRead.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSequenceRead.java
index 50c24bf..283cf75 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSequenceRead.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSequenceRead.java
@@ -23,6 +23,7 @@ package org.apache.bookkeeper.client;
 import static org.junit.Assert.assertEquals;
 
 import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.SortedMap;
 import java.util.TreeMap;
@@ -54,10 +55,10 @@ public class TestSequenceRead extends BookKeeperClusterTestCase {
     private LedgerHandle createLedgerWithDuplicatedBookies() throws Exception {
         final LedgerHandle lh = bkc.createLedger(3, 3, 3, digestType, passwd);
         // introduce duplicated bookies in an ensemble.
-        SortedMap<Long, ArrayList<BookieSocketAddress>> ensembles = lh.getLedgerMetadata().getEnsembles();
-        TreeMap<Long, ArrayList<BookieSocketAddress>> newEnsembles = new TreeMap<>();
-        for (Map.Entry<Long, ArrayList<BookieSocketAddress>> entry : ensembles.entrySet()) {
-            ArrayList<BookieSocketAddress> newList = new ArrayList<BookieSocketAddress>(entry.getValue().size());
+        SortedMap<Long, ? extends List<BookieSocketAddress>> ensembles = lh.getLedgerMetadata().getEnsembles();
+        TreeMap<Long, List<BookieSocketAddress>> newEnsembles = new TreeMap<>();
+        for (Map.Entry<Long, ? extends List<BookieSocketAddress>> entry : ensembles.entrySet()) {
+            List<BookieSocketAddress> newList = new ArrayList<BookieSocketAddress>(entry.getValue().size());
             BookieSocketAddress firstBookie = entry.getValue().get(0);
             for (BookieSocketAddress ignored : entry.getValue()) {
                 newList.add(firstBookie);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
index 4a49451..aa21059 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
@@ -27,10 +27,10 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
-import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Enumeration;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -308,7 +308,7 @@ public class TestSpeculativeRead extends BookKeeperClusterTestCase {
 
         LedgerHandle l = bkspec.openLedger(id, digestType, passwd);
 
-        ArrayList<BookieSocketAddress> ensemble = l.getLedgerMetadata().getEnsembles().get(0L);
+        List<BookieSocketAddress> ensemble = l.getLedgerMetadata().getEnsembles().get(0L);
         BitSet allHosts = new BitSet(ensemble.size());
         for (int i = 0; i < ensemble.size(); i++) {
             allHosts.set(i, true);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
index a729139..e6daa66 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
@@ -26,9 +26,9 @@ import static org.junit.Assert.assertTrue;
 
 import com.google.common.util.concurrent.UncheckedExecutionException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
@@ -92,7 +92,7 @@ public class TestWatchEnsembleChange extends BookKeeperClusterTestCase {
         LedgerHandle readLh = bkc.openLedgerNoRecovery(lh.getId(), digestType, "".getBytes());
         long lastLAC = readLh.getLastAddConfirmed();
         assertEquals(numEntries - 2, lastLAC);
-        ArrayList<BookieSocketAddress> ensemble =
+        List<BookieSocketAddress> ensemble =
                 lh.getLedgerMetadata().currentEnsemble;
         for (BookieSocketAddress addr : ensemble) {
             killBookie(addr);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/UpdateLedgerCmdTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/UpdateLedgerCmdTest.java
index d773bda..6eca0de 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/UpdateLedgerCmdTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/UpdateLedgerCmdTest.java
@@ -94,7 +94,7 @@ public class UpdateLedgerCmdTest extends BookKeeperClusterTestCase {
 
     private int getUpdatedLedgersCount(BookKeeper bk, List<LedgerHandle> ledgers, BookieSocketAddress toBookieAddr)
             throws InterruptedException, BKException {
-        ArrayList<BookieSocketAddress> ensemble;
+        List<BookieSocketAddress> ensemble;
         int updatedLedgersCount = 0;
         for (LedgerHandle lh : ledgers) {
             // ledger#close() would hit BadVersion exception as rename
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/UpdateLedgerOpTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/UpdateLedgerOpTest.java
index aa01e65..7f67242 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/UpdateLedgerOpTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/UpdateLedgerOpTest.java
@@ -98,7 +98,7 @@ public class UpdateLedgerOpTest extends BookKeeperClusterTestCase {
             ledgers.add(createLedgerWithEntries(bk, 0));
         }
 
-        ArrayList<BookieSocketAddress> ensemble = lh1.getLedgerMetadata().getEnsemble(0);
+        List<BookieSocketAddress> ensemble = lh1.getLedgerMetadata().getEnsemble(0);
 
         BookieSocketAddress curBookieAddr = ensemble.get(0);
         baseConf.setUseHostNameAsBookieID(true);
@@ -139,7 +139,7 @@ public class UpdateLedgerOpTest extends BookKeeperClusterTestCase {
             ledgers.add(createLedgerWithEntries(bk, 0));
         }
 
-        ArrayList<BookieSocketAddress> ensemble = lh1.getLedgerMetadata().getEnsemble(0);
+        List<BookieSocketAddress> ensemble = lh1.getLedgerMetadata().getEnsemble(0);
 
         BookieSocketAddress curBookieAddr = ensemble.get(0);
         baseConf.setUseHostNameAsBookieID(true);
@@ -194,7 +194,7 @@ public class UpdateLedgerOpTest extends BookKeeperClusterTestCase {
         LedgerHandle lh = createLedgerWithEntries(bk, 100);
 
         BookieServer bookieServer = bs.get(0);
-        ArrayList<BookieSocketAddress> ensemble = lh.getLedgerMetadata().getEnsemble(0);
+        List<BookieSocketAddress> ensemble = lh.getLedgerMetadata().getEnsemble(0);
         BookieSocketAddress curBookieAddr = null;
         for (BookieSocketAddress bookieSocketAddress : ensemble) {
             if (bookieServer.getLocalAddress().equals(bookieSocketAddress)) {
@@ -276,7 +276,7 @@ public class UpdateLedgerOpTest extends BookKeeperClusterTestCase {
             }
         };
         th.start();
-        ArrayList<BookieSocketAddress> ensemble = lh.getLedgerMetadata().getEnsemble(0);
+        List<BookieSocketAddress> ensemble = lh.getLedgerMetadata().getEnsemble(0);
         BookieSocketAddress curBookieAddr = ensemble.get(0);
         BookieSocketAddress toBookieAddr = new BookieSocketAddress("localhost:" + curBookieAddr.getPort());
         UpdateLedgerOp updateLedgerOp = new UpdateLedgerOp(bk, bkadmin);
@@ -297,7 +297,7 @@ public class UpdateLedgerOpTest extends BookKeeperClusterTestCase {
 
     private int getUpdatedLedgersCount(BookKeeper bk, List<LedgerHandle> ledgers, BookieSocketAddress toBookieAddr)
             throws InterruptedException, BKException {
-        ArrayList<BookieSocketAddress> ensemble;
+        List<BookieSocketAddress> ensemble;
         int updatedLedgersCount = 0;
         for (LedgerHandle lh : ledgers) {
             // ledger#close() would hit BadVersion exception as rename
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicBookieCheckTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicBookieCheckTest.java
index efe6356..50ad2c9 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicBookieCheckTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicBookieCheckTest.java
@@ -24,6 +24,7 @@ import static org.apache.bookkeeper.meta.MetadataDrivers.runFunctionWithLedgerMa
 import static org.junit.Assert.assertEquals;
 
 import com.google.common.util.concurrent.UncheckedExecutionException;
+import java.util.ArrayList;
 import java.util.List;
 import lombok.Cleanup;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
@@ -106,8 +107,9 @@ public class AuditorPeriodicBookieCheckTest extends BookKeeperClusterTestCase {
 
                 LedgerHandle lh = bkc.createLedger(3, 3, DigestType.CRC32, "passwd".getBytes());
                 LedgerMetadata md = LedgerHandleAdapter.getLedgerMetadata(lh);
-                List<BookieSocketAddress> ensemble = md.getEnsembles().get(0L);
+                List<BookieSocketAddress> ensemble = new ArrayList<>(md.getEnsembles().get(0L));
                 ensemble.set(0, new BookieSocketAddress("1.1.1.1", 1000));
+                md.updateEnsemble(0L, ensemble);
 
                 TestCallbacks.GenericCallbackFuture<LedgerMetadata> cb =
                     new TestCallbacks.GenericCallbackFuture<LedgerMetadata>();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
index e706d1e..cc4287d 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
@@ -366,7 +366,7 @@ public class AuditorPeriodicCheckTest extends BookKeeperClusterTestCase {
     private BookieSocketAddress replaceBookieWithWriteFailingBookie(LedgerHandle lh) throws Exception {
         int bookieIdx = -1;
         Long entryId = LedgerHandleAdapter.getLedgerMetadata(lh).getEnsembles().firstKey();
-        ArrayList<BookieSocketAddress> curEnsemble = LedgerHandleAdapter
+        List<BookieSocketAddress> curEnsemble = LedgerHandleAdapter
                 .getLedgerMetadata(lh).getEnsembles().get(entryId);
 
         // Identify a bookie in the current ledger ensemble to be replaced
@@ -463,9 +463,9 @@ public class AuditorPeriodicCheckTest extends BookKeeperClusterTestCase {
         // check that ensemble has changed and the bookie that rejected writes has
         // been replaced in the ensemble
         LedgerHandle newLh = bkc.openLedger(lh.getId(), DigestType.CRC32, "passwd".getBytes());
-        for (Map.Entry<Long, ArrayList<BookieSocketAddress>> e : LedgerHandleAdapter.getLedgerMetadata(newLh).
+        for (Map.Entry<Long, ? extends List<BookieSocketAddress>> e : LedgerHandleAdapter.getLedgerMetadata(newLh).
                 getEnsembles().entrySet()) {
-            ArrayList<BookieSocketAddress> ensemble = e.getValue();
+            List<BookieSocketAddress> ensemble = e.getValue();
             assertFalse("Ensemble hasn't been updated", ensemble.contains(replacedBookie));
         }
         newLh.close();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java
index abd8d56..4929b0e 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java
@@ -443,9 +443,9 @@ public class BookieAutoRecoveryTest extends BookKeeperClusterTestCase {
         List<LedgerHandle> listOfLedgerHandle = createLedgersAndAddEntries(1, 5);
         LedgerHandle lh = listOfLedgerHandle.get(0);
         int ledgerReplicaIndex = 0;
-        final SortedMap<Long, ArrayList<BookieSocketAddress>> ensembles = LedgerHandleAdapter
+        final SortedMap<Long, ? extends List<BookieSocketAddress>> ensembles = LedgerHandleAdapter
                 .getLedgerMetadata(lh).getEnsembles();
-        final ArrayList<BookieSocketAddress> bkAddresses = ensembles.get(0L);
+        final List<BookieSocketAddress> bkAddresses = ensembles.get(0L);
         BookieSocketAddress replicaToKillAddr = bkAddresses.get(0);
         for (BookieSocketAddress bookieSocketAddress : bkAddresses) {
             if (!isCreatedFromIp(bookieSocketAddress)){
@@ -522,9 +522,9 @@ public class BookieAutoRecoveryTest extends BookKeeperClusterTestCase {
         List<LedgerHandle> listOfLedgerHandle = createLedgersAndAddEntries(1, 5);
         LedgerHandle lh = listOfLedgerHandle.get(0);
         int ledgerReplicaIndex = 0;
-        final SortedMap<Long, ArrayList<BookieSocketAddress>> ensembles = LedgerHandleAdapter
+        final SortedMap<Long, ? extends List<BookieSocketAddress>> ensembles = LedgerHandleAdapter
                 .getLedgerMetadata(lh).getEnsembles();
-        final ArrayList<BookieSocketAddress> bkAddresses = ensembles.get(0L);
+        final List<BookieSocketAddress> bkAddresses = ensembles.get(0L);
         BookieSocketAddress replicaToKillAddr = bkAddresses.get(0);
         for (BookieSocketAddress bookieSocketAddress : bkAddresses) {
             if (isCreatedFromIp(bookieSocketAddress)) {
@@ -577,7 +577,7 @@ public class BookieAutoRecoveryTest extends BookKeeperClusterTestCase {
     }
 
     private int getReplicaIndexInLedger(LedgerHandle lh, BookieSocketAddress replicaToKill) {
-        SortedMap<Long, ArrayList<BookieSocketAddress>> ensembles = LedgerHandleAdapter
+        SortedMap<Long, ? extends List<BookieSocketAddress>> ensembles = LedgerHandleAdapter
                 .getLedgerMetadata(lh).getEnsembles();
         int ledgerReplicaIndex = -1;
         for (BookieSocketAddress addr : ensembles.get(0L)) {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestAutoRecoveryAlongWithBookieServers.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestAutoRecoveryAlongWithBookieServers.java
index 06a8b85..ca76784 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestAutoRecoveryAlongWithBookieServers.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestAutoRecoveryAlongWithBookieServers.java
@@ -23,10 +23,9 @@ package org.apache.bookkeeper.replication;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-import java.util.ArrayList;
 import java.util.Enumeration;
+import java.util.List;
 import java.util.Map.Entry;
-import java.util.Set;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
@@ -88,10 +87,9 @@ public class TestAutoRecoveryAlongWithBookieServers extends
         }
 
         // Killing all bookies except newly replicated bookie
-        Set<Entry<Long, ArrayList<BookieSocketAddress>>> entrySet = LedgerHandleAdapter
-                .getLedgerMetadata(lh).getEnsembles().entrySet();
-        for (Entry<Long, ArrayList<BookieSocketAddress>> entry : entrySet) {
-            ArrayList<BookieSocketAddress> bookies = entry.getValue();
+        for (Entry<Long, ? extends List<BookieSocketAddress>> entry :
+                 lh.getLedgerMetadata().getEnsembles().entrySet()) {
+            List<BookieSocketAddress> bookies = entry.getValue();
             for (BookieSocketAddress bookie : bookies) {
                 if (bookie.equals(newBkAddr)) {
                     continue;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
index 0b15f1d..f64456b 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
@@ -25,11 +25,10 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.net.URI;
-import java.util.ArrayList;
 import java.util.Enumeration;
+import java.util.List;
 import java.util.Map.Entry;
 import java.util.Optional;
-import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import lombok.Cleanup;
@@ -689,10 +688,9 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase {
     private void killAllBookies(LedgerHandle lh, BookieSocketAddress excludeBK)
             throws Exception {
         // Killing all bookies except newly replicated bookie
-        Set<Entry<Long, ArrayList<BookieSocketAddress>>> entrySet = LedgerHandleAdapter
-                .getLedgerMetadata(lh).getEnsembles().entrySet();
-        for (Entry<Long, ArrayList<BookieSocketAddress>> entry : entrySet) {
-            ArrayList<BookieSocketAddress> bookies = entry.getValue();
+        for (Entry<Long, ? extends List<BookieSocketAddress>> entry :
+                 lh.getLedgerMetadata().getEnsembles().entrySet()) {
+            List<BookieSocketAddress> bookies = entry.getValue();
             for (BookieSocketAddress bookie : bookies) {
                 if (bookie.equals(excludeBK)) {
                     continue;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/server/http/TestHttpService.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/server/http/TestHttpService.java
index 63eb641..d360b1f 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/server/http/TestHttpService.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/server/http/TestHttpService.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.assertTrue;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.UncheckedExecutionException;
 import java.io.File;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -646,6 +647,7 @@ public class TestHttpService extends BookKeeperClusterTestCase {
             try {
                 testListUnderReplicatedLedgerService(mFactory);
             } catch (Exception e) {
+                LOG.info("Exception in test", e);
                 throw new UncheckedExecutionException(e.getMessage(), e.getCause());
             }
             return null;
@@ -671,8 +673,9 @@ public class TestHttpService extends BookKeeperClusterTestCase {
 
         LedgerHandle lh = bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, "passwd".getBytes());
         LedgerMetadata md = LedgerHandleAdapter.getLedgerMetadata(lh);
-        List<BookieSocketAddress> ensemble = md.getEnsembles().get(0L);
+        List<BookieSocketAddress> ensemble = new ArrayList<>(md.getEnsembles().get(0L));
         ensemble.set(0, new BookieSocketAddress("1.1.1.1", 1000));
+        md.updateEnsemble(0L, ensemble);
 
         TestCallbacks.GenericCallbackFuture<LedgerMetadata> cb =
             new TestCallbacks.GenericCallbackFuture<LedgerMetadata>();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java
index d4a480f..8dafe8b 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java
@@ -28,11 +28,11 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.security.cert.Certificate;
 import java.security.cert.X509Certificate;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Enumeration;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 
@@ -436,8 +436,8 @@ public class TestTLS extends BookKeeperClusterTestCase {
         ClientConfiguration clientConf = new ClientConfiguration(baseClientConf);
         LedgerMetadata metadata = testClient(clientConf, 2);
         assertTrue(metadata.getEnsembles().size() > 0);
-        Collection<ArrayList<BookieSocketAddress>> ensembles = metadata.getEnsembles().values();
-        for (ArrayList<BookieSocketAddress> bookies : ensembles) {
+        Collection<? extends List<BookieSocketAddress>> ensembles = metadata.getEnsembles().values();
+        for (List<BookieSocketAddress> bookies : ensembles) {
             for (BookieSocketAddress bookieAddress : bookies) {
                 int port = bookieAddress.getPort();
                 assertTrue(tlsBookiePorts.contains(port));
diff --git a/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java b/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java
index 9d39d38..88a1344 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java
@@ -83,7 +83,7 @@ public class LedgerReader {
         bookieClient = bkc.getBookieClient();
     }
 
-    public static SortedMap<Long, ArrayList<BookieSocketAddress>> bookiesForLedger(final LedgerHandle lh) {
+    public static SortedMap<Long, ? extends List<BookieSocketAddress>> bookiesForLedger(final LedgerHandle lh) {
         return lh.getLedgerMetadata().getEnsembles();
     }
 
@@ -120,7 +120,7 @@ public class LedgerReader {
             }
         };
 
-        ArrayList<BookieSocketAddress> ensemble = lh.getLedgerMetadata().getEnsemble(eid);
+        List<BookieSocketAddress> ensemble = lh.getLedgerMetadata().getEnsemble(eid);
         for (int i = 0; i < writeSet.size(); i++) {
             int idx = writeSet.get(i);
             bookieClient.readEntry(ensemble.get(idx), lh.getId(), eid, readEntryCallback,
@@ -225,7 +225,7 @@ public class LedgerReader {
             }
         };
 
-        ArrayList<BookieSocketAddress> ensemble = lh.getLedgerMetadata().getEnsemble(eid);
+        List<BookieSocketAddress> ensemble = lh.getLedgerMetadata().getEnsemble(eid);
         for (int i = 0; i < writeSet.size(); i++) {
             int idx = writeSet.get(i);
             bookieClient.readEntry(ensemble.get(idx), lh.getId(), eid, readEntryCallback,
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java
index 02edeba..d19e768 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java
@@ -1037,7 +1037,7 @@ import org.slf4j.LoggerFactory;
             LedgerHandle lh = bkc.client().get().openLedgerNoRecovery(segment.getLogSegmentId(),
                     BookKeeper.DigestType.CRC32, getConf().getBKDigestPW().getBytes(UTF_8));
             long eidFirst = 0;
-            for (SortedMap.Entry<Long, ArrayList<BookieSocketAddress>>
+            for (SortedMap.Entry<Long, ? extends List<BookieSocketAddress>>
                     entry : LedgerReader.bookiesForLedger(lh).entrySet()) {
                 long eidLast = entry.getKey().longValue();
                 long count = eidLast - eidFirst + 1;