You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by re...@apache.org on 2019/05/04 22:28:06 UTC

[bookkeeper] branch master updated: GetListOfEntriesOfLedger implementation

This is an automated email from the ASF dual-hosted git repository.

reddycharan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new aa84c7f  GetListOfEntriesOfLedger implementation
aa84c7f is described below

commit aa84c7fdd24a925f57f087670b205a35fb8ef237
Author: Charan Reddy Guttapalem <re...@gmail.com>
AuthorDate: Sat May 4 15:28:01 2019 -0700

    GetListOfEntriesOfLedger implementation
    
    
    Descriptions of the changes in this PR:
    
    As described in this BP - https://github.com/apache/bookkeeper/blob/master/site/bps/BP-34-cluster-metadata-checker.md, this request returns list of entries bookie contains for the given ledger in an encoded format. The returned list provides weakly consistent state, of the entries of the ledger.
    
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>, Sijie Guo <si...@apache.org>
    
    This closes #1931 from reddycharan/getlistofentries and squashes the following commits:
    
    467bb8b73 [Charan Reddy Guttapalem] GetListOfEntriesOfLedger implementation
    757f99156 [Charan Reddy Guttapalem] GetListOfEntriesOfLedger implementation
    4e84bcba0 [Charan Reddy Guttapalem] GetListOfEntriesOfLedger implementation
    f5655bb29 [Charan Reddy Guttapalem] GetListOfEntriesOfLedger implementation
---
 .../src/main/proto/BookkeeperProtocol.proto        |  13 +
 .../bookkeeper/bookie/BookKeeperServerStats.java   |   3 +
 .../java/org/apache/bookkeeper/bookie/Bookie.java  |  22 ++
 .../apache/bookkeeper/bookie/EntryMemTable.java    |  49 ++-
 .../bookie/InterleavedLedgerStorage.java           |  16 +-
 .../InterleavedStorageRegenerateIndexOp.java       |   5 +
 .../org/apache/bookkeeper/bookie/LedgerCache.java  |   4 +
 .../apache/bookkeeper/bookie/LedgerCacheImpl.java  |  49 +++
 .../apache/bookkeeper/bookie/LedgerDescriptor.java |   4 +
 .../bookkeeper/bookie/LedgerDescriptorImpl.java    |   6 +
 .../apache/bookkeeper/bookie/LedgerEntryPage.java  |  27 ++
 .../apache/bookkeeper/bookie/LedgerStorage.java    |  17 +
 .../bookkeeper/bookie/SortedLedgerStorage.java     |  11 +
 .../bookkeeper/bookie/stats/BookieStats.java       |   9 +
 .../bookie/storage/ldb/DbLedgerStorage.java        |   8 +
 .../ldb/SingleDirectoryDbLedgerStorage.java        |   7 +
 .../apache/bookkeeper/client/BookKeeperAdmin.java  |  18 +-
 .../bookkeeper/client/BookKeeperClientStats.java   |   2 +
 .../org/apache/bookkeeper/proto/BookieClient.java  |  15 +
 .../apache/bookkeeper/proto/BookieClientImpl.java  |  31 ++
 .../bookkeeper/proto/BookieRequestProcessor.java   |  13 +
 .../proto/BookkeeperInternalCallbacks.java         |  48 +++
 .../proto/GetListOfEntriesOfLedgerProcessorV3.java | 110 ++++++
 .../bookkeeper/proto/PerChannelBookieClient.java   |  83 +++++
 .../org/apache/bookkeeper/proto/RequestStats.java  |  17 +
 .../util/AvailabilityOfEntriesOfLedger.java        | 377 +++++++++++++++++++++
 .../apache/bookkeeper/util/IteratorUtility.java    | 171 ++++++++++
 .../bookie/InterleavedLedgerStorageTest.java       |  56 ++-
 .../bookkeeper/bookie/LedgerStorageTest.java       |  45 +++
 .../bookkeeper/bookie/SortedLedgerStorageTest.java | 194 +++++++++++
 .../bookkeeper/bookie/TestEntryMemTable.java       | 154 +++++++++
 .../apache/bookkeeper/bookie/TestSyncThread.java   |   6 +
 .../bookkeeper/client/BookKeeperAdminTest.java     |  98 ++++++
 .../org/apache/bookkeeper/meta/GcLedgersTest.java  |   6 +
 .../bookkeeper/meta/LedgerManagerTestCase.java     |   6 +
 .../apache/bookkeeper/proto/MockBookieClient.java  |  13 +
 .../util/AvailabilityOfEntriesOfLedgerTest.java    | 192 +++++++++++
 .../bookkeeper/util/IteratorUtilityTest.java       | 141 ++++++++
 38 files changed, 2041 insertions(+), 5 deletions(-)

diff --git a/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto b/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto
index f34d56e..4f178aa 100644
--- a/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto
+++ b/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto
@@ -64,6 +64,7 @@ enum OperationType {
     GET_BOOKIE_INFO = 8;
     START_TLS = 9;
     FORCE_LEDGER = 10;
+    GET_LIST_OF_ENTRIES_OF_LEDGER = 11;
 }
 
 /**
@@ -92,6 +93,7 @@ message Request {
     optional GetBookieInfoRequest getBookieInfoRequest = 105;
     optional StartTLSRequest startTLSRequest = 106;
     optional ForceLedgerRequest forceLedgerRequest = 107;
+    optional GetListOfEntriesOfLedgerRequest getListOfEntriesOfLedgerRequest = 108;
     // to pass MDC context
     repeated ContextPair requestContext = 200;
 }
@@ -152,6 +154,10 @@ message GetBookieInfoRequest {
     optional int64 requested = 1;
 }
 
+message GetListOfEntriesOfLedgerRequest {
+	required int64 ledgerId = 1;
+}
+
 message Response {
 
     required BKPacketHeader header = 1;
@@ -167,6 +173,7 @@ message Response {
     optional GetBookieInfoResponse getBookieInfoResponse = 105;
     optional StartTLSResponse startTLSResponse = 106;
     optional ForceLedgerResponse forceLedgerResponse = 107;
+    optional GetListOfEntriesOfLedgerResponse getListOfEntriesOfLedgerResponse = 108;
 }
 
 message ReadResponse {
@@ -213,5 +220,11 @@ message GetBookieInfoResponse {
     optional int64 freeDiskSpace = 3;
 }
 
+message GetListOfEntriesOfLedgerResponse {
+    required StatusCode status = 1;
+    required int64 ledgerId = 2;
+    optional bytes availabilityOfEntriesOfLedger = 3; // condensed encoded format representing availability of entries of ledger
+}
+
 message StartTLSResponse {
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
index b58514b..bdca048 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
@@ -64,6 +64,8 @@ public interface BookKeeperServerStats {
     String READ_LAC = "READ_LAC";
     String GET_BOOKIE_INFO_REQUEST = "GET_BOOKIE_INFO_REQUEST";
     String GET_BOOKIE_INFO = "GET_BOOKIE_INFO";
+    String GET_LIST_OF_ENTRIES_OF_LEDGER = "GET_LIST_OF_ENTRIES_OF_LEDGER";
+    String GET_LIST_OF_ENTRIES_OF_LEDGER_REQUEST = "GET_LIST_OF_ENTRIES_OF_LEDGER_REQUEST";
 
     // Ensemble Stats
     String WATCHER_SCOPE = "bookie_watcher";
@@ -80,6 +82,7 @@ public interface BookKeeperServerStats {
     String BOOKIE_READ_LAST_CONFIRMED = "BOOKIE_READ_LAST_CONFIRMED";
     String BOOKIE_ADD_ENTRY_BYTES = "BOOKIE_ADD_ENTRY_BYTES";
     String BOOKIE_READ_ENTRY_BYTES = "BOOKIE_READ_ENTRY_BYTES";
+    String BOOKIE_GET_LIST_OF_ENTRIES_OF_LEDGER = "BOOKIE_GET_LIST_OF_ENTRIES_OF_LEDGER";
 
     String ADD_ENTRY_IN_PROGRESS = "ADD_ENTRY_IN_PROGRESS";
     String ADD_ENTRY_BLOCKED = "ADD_ENTRY_BLOCKED";
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index e253fdc..32adb5d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -52,6 +52,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.PrimitiveIterator.OfLong;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -1594,4 +1595,25 @@ public class Bookie extends BookieCriticalThread {
             return new LedgerDirsManager(conf, idxDirs, diskChecker, statsLogger);
         }
     }
+
+    public OfLong getListOfEntriesOfLedger(long ledgerId) throws IOException, NoLedgerException {
+        long requestNanos = MathUtils.nowInNano();
+        boolean success = false;
+        try {
+            LedgerDescriptor handle = handles.getReadOnlyHandle(ledgerId);
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("GetEntriesOfLedger {}", ledgerId);
+            }
+            OfLong entriesOfLedger = handle.getListOfEntriesOfLedger(ledgerId);
+            success = true;
+            return entriesOfLedger;
+        } finally {
+            long elapsedNanos = MathUtils.elapsedNanos(requestNanos);
+            if (success) {
+                bookieStats.getReadEntryStats().registerSuccessfulEvent(elapsedNanos, TimeUnit.NANOSECONDS);
+            } else {
+                bookieStats.getReadEntryStats().registerFailedEvent(elapsedNanos, TimeUnit.NANOSECONDS);
+            }
+        }
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
index a27e54e..941906c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
@@ -21,17 +21,21 @@ package org.apache.bookkeeper.bookie;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.PrimitiveIterator;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.bookie.stats.EntryMemTableStats;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.IteratorUtility;
 import org.apache.bookkeeper.util.MathUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,7 +48,6 @@ import org.slf4j.LoggerFactory;
  */
 public class EntryMemTable implements AutoCloseable{
     private static Logger logger = LoggerFactory.getLogger(Journal.class);
-
     /**
      * Entry skip list.
      */
@@ -456,4 +459,48 @@ public class EntryMemTable implements AutoCloseable{
     public void close() throws Exception {
         // no-op
     }
+
+    /*
+     * returns the primitive long iterator of entries of a ledger available in
+     * this EntryMemTable. It would be in the ascending order and this Iterator
+     * is weakly consistent.
+     */
+    PrimitiveIterator.OfLong getListOfEntriesOfLedger(long ledgerId) {
+        EntryKey thisLedgerFloorEntry = new EntryKey(ledgerId, 0);
+        EntryKey thisLedgerCeilingEntry = new EntryKey(ledgerId, Long.MAX_VALUE);
+        Iterator<EntryKey> thisLedgerEntriesInKVMap;
+        Iterator<EntryKey> thisLedgerEntriesInSnapshot;
+        this.lock.readLock().lock();
+        try {
+            /*
+             * Gets a view of the portion of this map that corresponds to
+             * entries of this ledger.
+             *
+             * Here 'kvmap' is of type 'ConcurrentSkipListMap', so its 'subMap'
+             * call would return a view of the portion of this map whose keys
+             * range from fromKey to toKey and it would be of type
+             * 'ConcurrentNavigableMap'. ConcurrentNavigableMap's 'keySet' would
+             * return NavigableSet view of the keys contained in this map. This
+             * view's iterator would be weakly consistent -
+             * https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/
+             * package-summary.html#Weakly.
+             *
+             * 'weakly consistent' would guarantee 'to traverse elements as they
+             * existed upon construction exactly once, and may (but are not
+             * guaranteed to) reflect any modifications subsequent to
+             * construction.'
+             *
+             */
+            thisLedgerEntriesInKVMap = this.kvmap.subMap(thisLedgerFloorEntry, thisLedgerCeilingEntry).keySet()
+                    .iterator();
+            thisLedgerEntriesInSnapshot = this.snapshot.subMap(thisLedgerFloorEntry, thisLedgerCeilingEntry).keySet()
+                    .iterator();
+        } finally {
+            this.lock.readLock().unlock();
+        }
+        return IteratorUtility.mergeIteratorsForPrimitiveLongIterator(thisLedgerEntriesInKVMap,
+                thisLedgerEntriesInSnapshot, EntryKey.COMPARATOR, (entryKey) -> {
+                    return entryKey.entryId;
+                });
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index becb16b..5d4ec0e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -34,6 +34,7 @@ import static org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_SCRUB_P
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.RateLimiter;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 
@@ -45,12 +46,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Optional;
+import java.util.PrimitiveIterator.OfLong;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import lombok.Cleanup;
 import lombok.Getter;
+
 import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.bookie.EntryLogger.EntryLogListener;
@@ -65,8 +68,8 @@ import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stats.annotations.StatsDoc;
 import org.apache.bookkeeper.util.MathUtils;
 import org.apache.bookkeeper.util.SnapshotMap;
-import org.apache.commons.lang.mutable.MutableBoolean;
-import org.apache.commons.lang.mutable.MutableLong;
+import org.apache.commons.lang3.mutable.MutableBoolean;
+import org.apache.commons.lang3.mutable.MutableLong;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -83,6 +86,7 @@ import org.slf4j.LoggerFactory;
 )
 public class InterleavedLedgerStorage implements CompactableLedgerStorage, EntryLogListener {
     private static final Logger LOG = LoggerFactory.getLogger(InterleavedLedgerStorage.class);
+    public static final long INVALID_ENTRYID = -1;
 
     EntryLogger entryLogger;
     @Getter
@@ -103,6 +107,8 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
     // this indicates that a write has happened since the last flush
     private final AtomicBoolean somethingWritten = new AtomicBoolean(false);
 
+    private int pageSize;
+
     // Expose Stats
     @StatsDoc(
         name = STORAGE_GET_OFFSET,
@@ -191,6 +197,7 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
         ledgerCache = new LedgerCacheImpl(conf, activeLedgers,
                 null == indexDirsManager ? ledgerDirsManager : indexDirsManager, statsLogger);
         gcThread = new GarbageCollectorThread(conf, ledgerManager, this, statsLogger.scope("gc"));
+        pageSize = conf.getPageSize();
         ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
         // Expose Stats
         getOffsetStats = statsLogger.getOpStatsLogger(STORAGE_GET_OFFSET);
@@ -649,4 +656,9 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
     public List<GarbageCollectionStatus> getGarbageCollectionStatus() {
         return Collections.singletonList(gcThread.getGarbageCollectionStatus());
     }
+
+    @Override
+    public OfLong getListOfEntriesOfLedger(long ledgerId) throws IOException {
+        return ledgerCache.getEntriesIterator(ledgerId);
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java
index 880d3c1..90f5afc 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java
@@ -26,6 +26,7 @@ import java.io.IOException;
 import java.security.NoSuchAlgorithmException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.PrimitiveIterator.OfLong;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
@@ -240,5 +241,9 @@ public class InterleavedStorageRegenerateIndexOp {
         public LedgerIndexMetadata readLedgerIndexMetadata(long ledgerId) throws IOException {
             throw new UnsupportedOperationException();
         }
+        @Override
+        public OfLong getEntriesIterator(long ledgerId) throws IOException {
+            throw new UnsupportedOperationException();
+        }
     }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
index cae8bb4..606afb4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
@@ -26,6 +26,8 @@ import static org.apache.bookkeeper.tools.cli.commands.bookie.FormatUtil.bytes2H
 import io.netty.buffer.ByteBuf;
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.PrimitiveIterator.OfLong;
+
 import org.apache.bookkeeper.common.util.Watcher;
 
 /**
@@ -87,6 +89,8 @@ public interface LedgerCache extends Closeable {
 
     PageEntriesIterable listEntries(long ledgerId) throws IOException;
 
+    OfLong getEntriesIterator(long ledgerId) throws IOException;
+
     /**
      * Represents summary of ledger metadata.
      */
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
index e6de2f9..7341ace 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
@@ -23,6 +23,11 @@ package org.apache.bookkeeper.bookie;
 
 import io.netty.buffer.ByteBuf;
 import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.PrimitiveIterator.OfLong;
+
 import org.apache.bookkeeper.common.util.Watcher;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -186,4 +191,48 @@ public class LedgerCacheImpl implements LedgerCache {
     public LedgerIndexMetadata readLedgerIndexMetadata(long ledgerId) throws IOException {
         return indexPersistenceManager.readLedgerIndexMetadata(ledgerId);
     }
+
+    @Override
+    public OfLong getEntriesIterator(long ledgerId) throws IOException {
+        Iterator<LedgerCache.PageEntries> pageEntriesIteratorNonFinal = null;
+        try {
+            pageEntriesIteratorNonFinal = listEntries(ledgerId).iterator();
+        } catch (Bookie.NoLedgerException noLedgerException) {
+            pageEntriesIteratorNonFinal = Collections.emptyIterator();
+        }
+        final Iterator<LedgerCache.PageEntries> pageEntriesIterator = pageEntriesIteratorNonFinal;
+        return new OfLong() {
+            private OfLong entriesInCurrentLEPIterator = null;
+            {
+                if (pageEntriesIterator.hasNext()) {
+                    entriesInCurrentLEPIterator = pageEntriesIterator.next().getLEP().getEntriesIterator();
+                }
+            }
+
+            @Override
+            public boolean hasNext() {
+                try {
+                    while ((entriesInCurrentLEPIterator != null) && (!entriesInCurrentLEPIterator.hasNext())) {
+                        if (pageEntriesIterator.hasNext()) {
+                            entriesInCurrentLEPIterator = pageEntriesIterator.next().getLEP().getEntriesIterator();
+                        } else {
+                            entriesInCurrentLEPIterator = null;
+                        }
+                    }
+                    return (entriesInCurrentLEPIterator != null);
+                } catch (Exception exc) {
+                    throw new RuntimeException(
+                            "Received exception in InterleavedLedgerStorage getEntriesOfLedger hasNext call", exc);
+                }
+            }
+
+            @Override
+            public long nextLong() {
+                if (!hasNext()) {
+                    throw new NoSuchElementException();
+                }
+                return entriesInCurrentLEPIterator.nextLong();
+            }
+        };
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
index 74bc8b5..abb7b34 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
@@ -27,6 +27,8 @@ import com.google.common.util.concurrent.SettableFuture;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import java.io.IOException;
+import java.util.PrimitiveIterator.OfLong;
+
 import org.apache.bookkeeper.common.util.Watcher;
 
 /**
@@ -86,4 +88,6 @@ public abstract class LedgerDescriptor {
     abstract void setExplicitLac(ByteBuf entry) throws IOException;
 
     abstract  ByteBuf getExplicitLac();
+
+    abstract OfLong getListOfEntriesOfLedger(long ledgerId) throws IOException;
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
index 563494e..da884b5 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
@@ -25,6 +25,7 @@ import com.google.common.util.concurrent.SettableFuture;
 import io.netty.buffer.ByteBuf;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.PrimitiveIterator.OfLong;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.bookkeeper.client.api.BKException;
 import org.apache.bookkeeper.common.util.Watcher;
@@ -173,4 +174,9 @@ public class LedgerDescriptorImpl extends LedgerDescriptor {
     void cancelWaitForLastAddConfirmedUpdate(Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException {
         ledgerStorage.cancelWaitForLastAddConfirmedUpdate(ledgerId, watcher);
     }
+
+    @Override
+    OfLong getListOfEntriesOfLedger(long ledgerId) throws IOException {
+        return ledgerStorage.getListOfEntriesOfLedger(ledgerId);
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java
index c272e7c..98ea948 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java
@@ -23,6 +23,8 @@ package org.apache.bookkeeper.bookie;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.NoSuchElementException;
+import java.util.PrimitiveIterator.OfLong;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.util.ZeroBuffer;
@@ -298,6 +300,31 @@ public class LedgerEntryPage implements AutoCloseable {
         }
     }
 
+    public OfLong getEntriesIterator() {
+        return new OfLong() {
+            long firstEntry = getFirstEntry();
+            int curDiffEntry = 0;
+
+            @Override
+            public boolean hasNext() {
+                while ((curDiffEntry < entriesPerPage) && (getOffset(curDiffEntry * 8) == 0)) {
+                    curDiffEntry++;
+                }
+                return (curDiffEntry != entriesPerPage);
+            }
+
+            @Override
+            public long nextLong() {
+                if (!hasNext()) {
+                    throw new NoSuchElementException();
+                }
+                long nextEntry = firstEntry + curDiffEntry;
+                curDiffEntry++;
+                return nextEntry;
+            }
+        };
+    }
+
     @Override
     public void close() throws Exception {
         releasePage();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
index 2c47b28..d371520 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
@@ -30,6 +30,8 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
+import java.util.PrimitiveIterator;
+
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.common.util.Watcher;
 import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -258,4 +260,19 @@ public interface LedgerStorage {
     default List<GarbageCollectionStatus> getGarbageCollectionStatus() {
         return Collections.emptyList();
     }
+
+    /**
+     * Returns the primitive long iterator for entries of the ledger, stored in
+     * this LedgerStorage. The returned iterator provide weakly consistent state
+     * of the ledger. It is guaranteed that entries of the ledger added to this
+     * LedgerStorage by the time this method is called will be available but
+     * modifications made after method invocation may not be available.
+     *
+     * @param ledgerId
+     *            - id of the ledger
+     * @return the list of entries of the ledger available in this
+     *         ledgerstorage.
+     * @throws Exception
+     */
+    PrimitiveIterator.OfLong getListOfEntriesOfLedger(long ledgerId) throws IOException;
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
index 23e0716..c1ad591 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
@@ -23,21 +23,25 @@ package org.apache.bookkeeper.bookie;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.RateLimiter;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 
 import java.io.IOException;
 import java.util.List;
 import java.util.Optional;
+import java.util.PrimitiveIterator;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.common.util.Watcher;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.IteratorUtility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -361,4 +365,11 @@ public class SortedLedgerStorage
     public List<GarbageCollectionStatus> getGarbageCollectionStatus() {
         return interleavedLedgerStorage.getGarbageCollectionStatus();
     }
+
+    @Override
+    public PrimitiveIterator.OfLong getListOfEntriesOfLedger(long ledgerId) throws IOException {
+        PrimitiveIterator.OfLong entriesInMemtableItr = memTable.getListOfEntriesOfLedger(ledgerId);
+        PrimitiveIterator.OfLong entriesFromILSItr = interleavedLedgerStorage.getListOfEntriesOfLedger(ledgerId);
+        return IteratorUtility.mergePrimitiveLongIterator(entriesInMemtableItr, entriesFromILSItr);
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/BookieStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/BookieStats.java
index 5e033e9..5df2186 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/BookieStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/BookieStats.java
@@ -23,11 +23,13 @@ import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_ADD_ENTRY;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_ADD_ENTRY_BYTES;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_FORCE_LEDGER;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_GET_LIST_OF_ENTRIES_OF_LEDGER;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_READ_ENTRY;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_READ_ENTRY_BYTES;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_RECOVERY_ADD_ENTRY;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.CATEGORY_SERVER;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.GET_LIST_OF_ENTRIES_OF_LEDGER;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_BYTES;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WRITE_BYTES;
@@ -72,6 +74,12 @@ public class BookieStats {
         parent = READ_ENTRY
     )
     private final OpStatsLogger readEntryStats;
+    @StatsDoc(
+            name = BOOKIE_GET_LIST_OF_ENTRIES_OF_LEDGER,
+            help = "operation stats of GetListOfEntriesOfLedger on a bookie",
+            parent = GET_LIST_OF_ENTRIES_OF_LEDGER
+    )
+    private final OpStatsLogger getListOfEntriesOfLedgerStats;
     // Bookie Operation Bytes Stats
     @StatsDoc(name = BOOKIE_ADD_ENTRY_BYTES, help = "bytes stats of AddEntry on a bookie")
     private final OpStatsLogger addBytesStats;
@@ -86,6 +94,7 @@ public class BookieStats {
         addEntryStats = statsLogger.getOpStatsLogger(BOOKIE_ADD_ENTRY);
         recoveryAddEntryStats = statsLogger.getOpStatsLogger(BOOKIE_RECOVERY_ADD_ENTRY);
         readEntryStats = statsLogger.getOpStatsLogger(BOOKIE_READ_ENTRY);
+        getListOfEntriesOfLedgerStats = statsLogger.getOpStatsLogger(BOOKIE_GET_LIST_OF_ENTRIES_OF_LEDGER);
         addBytesStats = statsLogger.getOpStatsLogger(BOOKIE_ADD_ENTRY_BYTES);
         readBytesStats = statsLogger.getOpStatsLogger(BOOKIE_READ_ENTRY_BYTES);
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
index a68cd1c..73d254a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
@@ -37,6 +37,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.PrimitiveIterator.OfLong;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 
@@ -359,4 +360,11 @@ public class DbLedgerStorage implements LedgerStorage {
             return conf.getLong(keyName);
         }
     }
+
+    @Override
+    public OfLong getListOfEntriesOfLedger(long ledgerId) throws IOException {
+        // check Issue #2078
+        throw new UnsupportedOperationException(
+                "getListOfEntriesOfLedger method is currently unsupported for DbLedgerStorage");
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
index 4c41235..58ce2be 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
@@ -33,6 +33,7 @@ import io.netty.util.concurrent.DefaultThreadFactory;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
+import java.util.PrimitiveIterator.OfLong;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -888,4 +889,10 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage
     }
 
     private static final Logger log = LoggerFactory.getLogger(DbLedgerStorage.class);
+
+    @Override
+    public OfLong getListOfEntriesOfLedger(long ledgerId) throws IOException {
+        throw new UnsupportedOperationException(
+                "getListOfEntriesOfLedger method is currently unsupported for SingleDirectoryDbLedgerStorage");
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
index 6d2319c..4a869ee 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
@@ -27,8 +27,8 @@ import static org.apache.bookkeeper.meta.MetadataDrivers.runFunctionWithRegistra
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-
 import com.google.common.util.concurrent.UncheckedExecutionException;
+
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -80,6 +80,7 @@ import org.apache.bookkeeper.replication.ReplicationException.CompatibilityExcep
 import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.AvailabilityOfEntriesOfLedger;
 import org.apache.bookkeeper.util.IOUtils;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.KeeperException;
@@ -1663,4 +1664,19 @@ public class BookKeeperAdmin implements AutoCloseable {
         return bkc.getPlacementPolicy().isEnsembleAdheringToPlacementPolicy(ensembleBookiesList, writeQuorumSize,
                 ackQuorumSize);
     }
+
+    /**
+     * Makes async request for getting list of entries of ledger from a bookie
+     * and returns Future for the result.
+     *
+     * @param address
+     *            BookieSocketAddress of the bookie
+     * @param ledgerId
+     *            ledgerId
+     * @return returns Future
+     */
+    public CompletableFuture<AvailabilityOfEntriesOfLedger> asyncGetListOfEntriesOfLedger(BookieSocketAddress address,
+            long ledgerId) {
+        return bkc.getBookieClient().getListOfEntriesOfLedger(address, ledgerId);
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
index cdfde67..36c304c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
@@ -64,6 +64,7 @@ public interface BookKeeperClientStats {
     String GET_BOOKIE_INFO_OP = "GET_BOOKIE_INFO";
     String SPECULATIVE_READ_COUNT = "SPECULATIVE_READ_COUNT";
     String READ_REQUESTS_REORDERED = "READ_REQUESTS_REORDERED";
+    String GET_LIST_OF_ENTRIES_OF_LEDGER_OP = "GET_LIST_OF_ENTRIES_OF_LEDGER";
 
     // per channel stats
     String CHANNEL_SCOPE = "per_channel_bookie_client";
@@ -81,6 +82,7 @@ public interface BookKeeperClientStats {
     String TIMEOUT_GET_BOOKIE_INFO = "TIMEOUT_GET_BOOKIE_INFO";
     String CHANNEL_START_TLS_OP = "START_TLS";
     String CHANNEL_TIMEOUT_START_TLS_OP = "TIMEOUT_START_TLS";
+    String TIMEOUT_GET_LIST_OF_ENTRIES_OF_LEDGER = "TIMEOUT_GET_LIST_OF_ENTRIES_OF_LEDGER";
 
     String NETTY_EXCEPTION_CNT = "NETTY_EXCEPTION_CNT";
     String CLIENT_CHANNEL_WRITE_WAIT = "CLIENT_CHANNEL_WRITE_WAIT";
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index 85a4ef9..2092a67 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -22,6 +22,7 @@ package org.apache.bookkeeper.proto;
 
 import java.util.EnumSet;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 
 import org.apache.bookkeeper.client.api.WriteFlag;
 import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -31,6 +32,7 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
+import org.apache.bookkeeper.util.AvailabilityOfEntriesOfLedger;
 import org.apache.bookkeeper.util.ByteBufList;
 
 /**
@@ -216,6 +218,19 @@ public interface BookieClient {
                        GetBookieInfoCallback cb, Object ctx);
 
     /**
+     * Makes async request for getting list of entries of ledger from a bookie
+     * and returns Future for the result.
+     *
+     * @param address
+     *            BookieSocketAddress of the bookie
+     * @param ledgerId
+     *            ledgerId
+     * @return returns Future
+     */
+    CompletableFuture<AvailabilityOfEntriesOfLedger> getListOfEntriesOfLedger(BookieSocketAddress address,
+            long ledgerId);
+
+    /**
      * @return whether bookie client object has been closed
      */
     boolean isClosed();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
index 8342821..c772a97 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
@@ -38,6 +38,7 @@ import io.netty.util.concurrent.DefaultThreadFactory;
 import java.io.IOException;
 import java.util.EnumSet;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
@@ -56,6 +57,7 @@ import org.apache.bookkeeper.common.util.SafeRunnable;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ForceLedgerCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.FutureGetListOfEntriesOfLedger;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
@@ -66,6 +68,7 @@ import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.tls.SecurityException;
 import org.apache.bookkeeper.tls.SecurityHandlerFactory;
+import org.apache.bookkeeper.util.AvailabilityOfEntriesOfLedger;
 import org.apache.bookkeeper.util.ByteBufList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -317,6 +320,33 @@ public class BookieClientImpl implements BookieClient, PerChannelBookieClientFac
                       ledgerId);
     }
 
+    @Override
+    public CompletableFuture<AvailabilityOfEntriesOfLedger> getListOfEntriesOfLedger(BookieSocketAddress address,
+            long ledgerId) {
+        FutureGetListOfEntriesOfLedger futureResult = new FutureGetListOfEntriesOfLedger(ledgerId);
+        final PerChannelBookieClientPool client = lookupClient(address);
+        if (client == null) {
+            futureResult.getListOfEntriesOfLedgerComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
+                    ledgerId, null);
+            return futureResult;
+        }
+        client.obtain((rc, pcbc) -> {
+            if (rc != BKException.Code.OK) {
+                try {
+                    executor.executeOrdered(ledgerId, safeRun(() -> {
+                        futureResult.getListOfEntriesOfLedgerComplete(rc, ledgerId, null);
+                    }));
+                } catch (RejectedExecutionException re) {
+                    futureResult.getListOfEntriesOfLedgerComplete(getRc(BKException.Code.InterruptedException),
+                            ledgerId, null);
+                }
+            } else {
+                pcbc.getListOfEntriesOfLedger(ledgerId, futureResult);
+            }
+        }, ledgerId);
+        return futureResult;
+    }
+
     private void completeRead(final int rc,
                               final long ledgerId,
                               final long entryId,
@@ -415,6 +445,7 @@ public class BookieClientImpl implements BookieClient, PerChannelBookieClientFac
         }
     }
 
+    @Override
     public void readLac(final BookieSocketAddress addr, final long ledgerId, final ReadLacCallback cb,
             final Object ctx) {
         final PerChannelBookieClientPool client = lookupClient(addr);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index 02b0f56..f46d19c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -339,6 +339,9 @@ public class BookieRequestProcessor implements RequestProcessor {
                     case START_TLS:
                         processStartTLSRequestV3(r, c);
                         break;
+                    case GET_LIST_OF_ENTRIES_OF_LEDGER:
+                        processGetListOfEntriesOfLedgerProcessorV3(r, c);
+                        break;
                     default:
                         LOG.info("Unknown operation type {}", header.getOperation());
                         BookkeeperProtocol.Response.Builder response =
@@ -587,6 +590,16 @@ public class BookieRequestProcessor implements RequestProcessor {
         }
     }
 
+    private void processGetListOfEntriesOfLedgerProcessorV3(final BookkeeperProtocol.Request r, final Channel c) {
+        GetListOfEntriesOfLedgerProcessorV3 getListOfEntriesOfLedger = new GetListOfEntriesOfLedgerProcessorV3(r, c,
+                this);
+        if (null == readThreadPool) {
+            getListOfEntriesOfLedger.run();
+        } else {
+            readThreadPool.submit(getListOfEntriesOfLedger);
+        }
+    }
+
     private void processAddRequest(final BookieProtocol.ParsedAddRequest r, final Channel c) {
         WriteEntryProcessor write = WriteEntryProcessor.create(r, c, this);
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
index 2ade9e9..fdfd379 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
@@ -37,6 +37,7 @@ import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.util.AvailabilityOfEntriesOfLedger;
 import org.apache.bookkeeper.util.MathUtils;
 import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.zookeeper.AsyncCallback;
@@ -108,6 +109,53 @@ public class BookkeeperInternalCallbacks {
     }
 
     /**
+     * A callback interface for GetListOfEntriesOfLedger command.
+     */
+    public interface GetListOfEntriesOfLedgerCallback {
+        void getListOfEntriesOfLedgerComplete(int rc, long ledgerId,
+                AvailabilityOfEntriesOfLedger availabilityOfEntriesOfLedger);
+    }
+
+    /**
+     * Handle the Response Code and transform it to a BKException.
+     *
+     * @param <T>
+     * @param rc
+     * @param result
+     * @param future
+     */
+    public static <T> void finish(int rc, T result, CompletableFuture<? super T> future) {
+        if (rc != BKException.Code.OK) {
+            future.completeExceptionally(BKException.create(rc).fillInStackTrace());
+        } else {
+            future.complete(result);
+        }
+    }
+
+    /**
+     * Future for GetListOfEntriesOfLedger.
+     */
+    public static class FutureGetListOfEntriesOfLedger extends CompletableFuture<AvailabilityOfEntriesOfLedger>
+            implements GetListOfEntriesOfLedgerCallback {
+        private final long ledgerIdOfTheRequest;
+
+        FutureGetListOfEntriesOfLedger(long ledgerId) {
+            this.ledgerIdOfTheRequest = ledgerId;
+        }
+
+        @Override
+        public void getListOfEntriesOfLedgerComplete(int rc, long ledgerIdOfTheResponse,
+                AvailabilityOfEntriesOfLedger availabilityOfEntriesOfLedger) {
+            if ((rc == BKException.Code.OK) && (ledgerIdOfTheRequest != ledgerIdOfTheResponse)) {
+                LOG.error("For getListOfEntriesOfLedger expected ledgerId in the response: {} actual ledgerId: {}",
+                        ledgerIdOfTheRequest, ledgerIdOfTheResponse);
+                rc = BKException.Code.ReadException;
+            }
+            finish(rc, availabilityOfEntriesOfLedger, this);
+        }
+    }
+
+    /**
      * A generic callback interface.
      */
     public interface GenericCallback<T> {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetListOfEntriesOfLedgerProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetListOfEntriesOfLedgerProcessorV3.java
new file mode 100644
index 0000000..f78cdf2
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetListOfEntriesOfLedgerProcessorV3.java
@@ -0,0 +1,110 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.proto;
+
+import com.google.protobuf.ByteString;
+import io.netty.channel.Channel;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.GetListOfEntriesOfLedgerRequest;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.GetListOfEntriesOfLedgerResponse;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
+import org.apache.bookkeeper.util.AvailabilityOfEntriesOfLedger;
+import org.apache.bookkeeper.util.MathUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A processor class for v3 entries of a ledger packets.
+ */
+public class GetListOfEntriesOfLedgerProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(GetListOfEntriesOfLedgerProcessorV3.class);
+    protected final GetListOfEntriesOfLedgerRequest getListOfEntriesOfLedgerRequest;
+    protected final long ledgerId;
+
+    public GetListOfEntriesOfLedgerProcessorV3(Request request, Channel channel,
+            BookieRequestProcessor requestProcessor) {
+        super(request, channel, requestProcessor);
+        this.getListOfEntriesOfLedgerRequest = request.getGetListOfEntriesOfLedgerRequest();
+        this.ledgerId = getListOfEntriesOfLedgerRequest.getLedgerId();
+    }
+
+    private GetListOfEntriesOfLedgerResponse getListOfEntriesOfLedgerResponse() {
+        long startTimeNanos = MathUtils.nowInNano();
+
+        GetListOfEntriesOfLedgerResponse.Builder getListOfEntriesOfLedgerResponse = GetListOfEntriesOfLedgerResponse
+                .newBuilder();
+        getListOfEntriesOfLedgerResponse.setLedgerId(ledgerId);
+
+        if (!isVersionCompatible()) {
+            getListOfEntriesOfLedgerResponse.setStatus(StatusCode.EBADVERSION);
+            requestProcessor.getRequestStats().getGetListOfEntriesOfLedgerStats()
+                    .registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
+            return getListOfEntriesOfLedgerResponse.build();
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Received new getListOfEntriesOfLedger request: {}", request);
+        }
+        StatusCode status = StatusCode.EOK;
+        AvailabilityOfEntriesOfLedger availabilityOfEntriesOfLedger = null;
+        try {
+            availabilityOfEntriesOfLedger = new AvailabilityOfEntriesOfLedger(
+                    requestProcessor.bookie.getListOfEntriesOfLedger(ledgerId));
+            getListOfEntriesOfLedgerResponse.setAvailabilityOfEntriesOfLedger(
+                    ByteString.copyFrom(availabilityOfEntriesOfLedger.serializeStateOfEntriesOfLedger()));
+
+        } catch (Bookie.NoLedgerException e) {
+            status = StatusCode.ENOLEDGER;
+            LOG.error("No ledger found while performing getListOfEntriesOfLedger from ledger: {}", ledgerId, e);
+        } catch (IOException e) {
+            status = StatusCode.EIO;
+            LOG.error("IOException while performing getListOfEntriesOfLedger from ledger: {}", ledgerId);
+        }
+
+        if (status == StatusCode.EOK) {
+            requestProcessor.getRequestStats().getListOfEntriesOfLedgerStats
+                    .registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
+        } else {
+            requestProcessor.getRequestStats().getListOfEntriesOfLedgerStats
+                    .registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
+        }
+        // Finally set the status and return
+        getListOfEntriesOfLedgerResponse.setStatus(status);
+        return getListOfEntriesOfLedgerResponse.build();
+    }
+
+    @Override
+    public void safeRun() {
+        GetListOfEntriesOfLedgerResponse listOfEntriesOfLedgerResponse = getListOfEntriesOfLedgerResponse();
+        Response.Builder response = Response.newBuilder().setHeader(getHeader())
+                .setStatus(listOfEntriesOfLedgerResponse.getStatus())
+                .setGetListOfEntriesOfLedgerResponse(listOfEntriesOfLedgerResponse);
+        Response resp = response.build();
+        sendResponse(listOfEntriesOfLedgerResponse.getStatus(), resp,
+                requestProcessor.getRequestStats().getListOfEntriesOfLedgerRequestStats);
+    }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 1fdb403..2f1659f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -96,6 +96,7 @@ import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ForceLedgerCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetListOfEntriesOfLedgerCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallbackCtx;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback;
@@ -109,6 +110,8 @@ import org.apache.bookkeeper.proto.BookkeeperProtocol.ForceLedgerRequest;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.ForceLedgerResponse;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.GetBookieInfoRequest;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.GetBookieInfoResponse;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.GetListOfEntriesOfLedgerRequest;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.GetListOfEntriesOfLedgerResponse;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.OperationType;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.ProtocolVersion;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest;
@@ -128,6 +131,7 @@ import org.apache.bookkeeper.stats.annotations.StatsDoc;
 import org.apache.bookkeeper.tls.SecurityException;
 import org.apache.bookkeeper.tls.SecurityHandlerFactory;
 import org.apache.bookkeeper.tls.SecurityHandlerFactory.NodeType;
+import org.apache.bookkeeper.util.AvailabilityOfEntriesOfLedger;
 import org.apache.bookkeeper.util.ByteBufList;
 import org.apache.bookkeeper.util.MathUtils;
 import org.apache.bookkeeper.util.SafeRunnable;
@@ -257,6 +261,8 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
         help = "channel stats of connect requests"
     )
     private final OpStatsLogger connectTimer;
+    private final OpStatsLogger getListOfEntriesOfLedgerCompletionOpLogger;
+    private final OpStatsLogger getListOfEntriesOfLedgerCompletionTimeoutOpLogger;
     @StatsDoc(
         name = BookKeeperClientStats.NETTY_EXCEPTION_CNT,
         help = "the number of exceptions received from this channel"
@@ -395,6 +401,8 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
         forceLedgerOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_FORCE_OP);
         readLacOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_READ_LAC_OP);
         getBookieInfoOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.GET_BOOKIE_INFO_OP);
+        getListOfEntriesOfLedgerCompletionOpLogger = statsLogger
+                .getOpStatsLogger(BookKeeperClientStats.GET_LIST_OF_ENTRIES_OF_LEDGER_OP);
         readTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_READ);
         addTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_ADD);
         writeLacTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_WRITE_LAC);
@@ -403,6 +411,8 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
         getBookieInfoTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.TIMEOUT_GET_BOOKIE_INFO);
         startTLSOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_START_TLS_OP);
         startTLSTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_START_TLS_OP);
+        getListOfEntriesOfLedgerCompletionTimeoutOpLogger = statsLogger
+                .getOpStatsLogger(BookKeeperClientStats.TIMEOUT_GET_LIST_OF_ENTRIES_OF_LEDGER);
         exceptionCounter = statsLogger.getCounter(BookKeeperClientStats.NETTY_EXCEPTION_CNT);
         connectTimer = statsLogger.getOpStatsLogger(BookKeeperClientStats.CLIENT_CONNECT_TIMER);
         addEntryOutstanding = statsLogger.getCounter(BookKeeperClientStats.ADD_OP_OUTSTANDING);
@@ -829,6 +839,24 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
         writeAndFlush(channel, completionKey, request);
     }
 
+    public void getListOfEntriesOfLedger(final long ledgerId, GetListOfEntriesOfLedgerCallback cb) {
+        final long txnId = getTxnId();
+        final CompletionKey completionKey = new V3CompletionKey(txnId, OperationType.GET_LIST_OF_ENTRIES_OF_LEDGER);
+        completionObjects.put(completionKey, new GetListOfEntriesOfLedgerCompletion(completionKey, cb, ledgerId));
+
+        // Build the request.
+        BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder().setVersion(ProtocolVersion.VERSION_THREE)
+                .setOperation(OperationType.GET_LIST_OF_ENTRIES_OF_LEDGER).setTxnId(txnId);
+
+        GetListOfEntriesOfLedgerRequest.Builder getListOfEntriesOfLedgerRequestBuilder =
+                GetListOfEntriesOfLedgerRequest.newBuilder().setLedgerId(ledgerId);
+
+        final Request getListOfEntriesOfLedgerRequest = Request.newBuilder().setHeader(headerBuilder)
+                .setGetListOfEntriesOfLedgerRequest(getListOfEntriesOfLedgerRequestBuilder).build();
+
+        writeAndFlush(channel, completionKey, getListOfEntriesOfLedgerRequest);
+    }
+
     /**
      * Long Poll Reads.
      */
@@ -1983,6 +2011,61 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
         }
     }
 
+    class GetListOfEntriesOfLedgerCompletion extends CompletionValue {
+        final GetListOfEntriesOfLedgerCallback cb;
+
+        public GetListOfEntriesOfLedgerCompletion(final CompletionKey key,
+                final GetListOfEntriesOfLedgerCallback origCallback, final long ledgerId) {
+            super("GetListOfEntriesOfLedger", null, ledgerId, 0L, getListOfEntriesOfLedgerCompletionOpLogger,
+                    getListOfEntriesOfLedgerCompletionTimeoutOpLogger);
+            this.cb = new GetListOfEntriesOfLedgerCallback() {
+                @Override
+                public void getListOfEntriesOfLedgerComplete(int rc, long ledgerId,
+                        AvailabilityOfEntriesOfLedger availabilityOfEntriesOfLedger) {
+                    logOpResult(rc);
+                    origCallback.getListOfEntriesOfLedgerComplete(rc, ledgerId, availabilityOfEntriesOfLedger);
+                    key.release();
+                }
+            };
+        }
+
+        @Override
+        public void errorOut() {
+            errorOut(BKException.Code.BookieHandleNotAvailableException);
+        }
+
+        @Override
+        public void errorOut(final int rc) {
+            errorOutAndRunCallback(() -> cb.getListOfEntriesOfLedgerComplete(rc, ledgerId, null));
+        }
+
+        @Override
+        public void handleV3Response(BookkeeperProtocol.Response response) {
+            GetListOfEntriesOfLedgerResponse getListOfEntriesOfLedgerResponse = response
+                    .getGetListOfEntriesOfLedgerResponse();
+            ByteBuf availabilityOfEntriesOfLedgerBuffer = Unpooled.EMPTY_BUFFER;
+            StatusCode status = response.getStatus() == StatusCode.EOK ? getListOfEntriesOfLedgerResponse.getStatus()
+                    : response.getStatus();
+
+            if (getListOfEntriesOfLedgerResponse.hasAvailabilityOfEntriesOfLedger()) {
+                availabilityOfEntriesOfLedgerBuffer = Unpooled.wrappedBuffer(
+                        getListOfEntriesOfLedgerResponse.getAvailabilityOfEntriesOfLedger().asReadOnlyByteBuffer());
+            }
+
+            if (LOG.isDebugEnabled()) {
+                logResponse(status, "ledgerId", ledgerId);
+            }
+
+            int rc = convertStatus(status, BKException.Code.ReadException);
+            AvailabilityOfEntriesOfLedger availabilityOfEntriesOfLedger = null;
+            if (rc == BKException.Code.OK) {
+                availabilityOfEntriesOfLedger = new AvailabilityOfEntriesOfLedger(
+                        availabilityOfEntriesOfLedgerBuffer.slice());
+            }
+            cb.getListOfEntriesOfLedgerComplete(rc, ledgerId, availabilityOfEntriesOfLedger);
+        }
+    }
+
     private final Recycler<AddCompletion> addCompletionRecycler = new Recycler<AddCompletion>() {
             @Override
             protected AddCompletion newObject(Recycler.Handle<AddCompletion> handle) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestStats.java
index 1799e66..fd91957 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestStats.java
@@ -30,6 +30,8 @@ import static org.apache.bookkeeper.bookie.BookKeeperServerStats.FORCE_LEDGER;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.FORCE_LEDGER_REQUEST;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.GET_BOOKIE_INFO;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.GET_BOOKIE_INFO_REQUEST;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.GET_LIST_OF_ENTRIES_OF_LEDGER;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.GET_LIST_OF_ENTRIES_OF_LEDGER_REQUEST;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_BLOCKED;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_BLOCKED_WAIT;
@@ -210,6 +212,17 @@ public class RequestStats {
         help = "operation stats of ReadEntry blocked on a bookie"
     )
     final OpStatsLogger readEntryBlockedStats;
+    @StatsDoc(
+            name = GET_LIST_OF_ENTRIES_OF_LEDGER_REQUEST,
+            help = "request stats of GetListOfEntriesOfLedger on a bookie"
+    )
+    final OpStatsLogger getListOfEntriesOfLedgerRequestStats;
+    @StatsDoc(
+            name = "GET_LIST_OF_ENTRIES_OF_LEDGER",
+            help = "operation stats of GetListOfEntriesOfLedger",
+            parent = GET_LIST_OF_ENTRIES_OF_LEDGER_REQUEST
+    )
+    final OpStatsLogger getListOfEntriesOfLedgerStats;
 
     public RequestStats(StatsLogger statsLogger) {
         this.addEntryStats = statsLogger.getOpStatsLogger(ADD_ENTRY);
@@ -238,6 +251,10 @@ public class RequestStats {
         this.addEntryBlockedStats = statsLogger.getOpStatsLogger(ADD_ENTRY_BLOCKED_WAIT);
         this.readEntryBlockedStats = statsLogger.getOpStatsLogger(READ_ENTRY_BLOCKED_WAIT);
 
+        this.getListOfEntriesOfLedgerStats = statsLogger.getOpStatsLogger(GET_LIST_OF_ENTRIES_OF_LEDGER);
+        this.getListOfEntriesOfLedgerRequestStats =
+                statsLogger.getOpStatsLogger(GET_LIST_OF_ENTRIES_OF_LEDGER_REQUEST);
+
         statsLogger.registerGauge(ADD_ENTRY_IN_PROGRESS, new Gauge<Number>() {
             @Override
             public Number getDefaultValue() {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/AvailabilityOfEntriesOfLedger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/AvailabilityOfEntriesOfLedger.java
new file mode 100644
index 0000000..6042bdb
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/AvailabilityOfEntriesOfLedger.java
@@ -0,0 +1,377 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.util;
+
+import io.netty.buffer.ByteBuf;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Map.Entry;
+import java.util.PrimitiveIterator;
+import java.util.TreeMap;
+
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.commons.lang3.mutable.MutableLong;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+/**
+ * Ordered collection of SequenceGroups will represent entries of the ledger
+ * residing in a bookie.
+ *
+ * <p>In the byte array representation of AvailabilityOfEntriesOfLedger, for the
+ * sake of future extensibility it would be helpful to have reserved space for
+ * header at the beginning. So the first 64 bytes will be used for header, with
+ * the first four bytes specifying the int version number, next four bytes
+ * specifying the number of sequencegroups for now and the rest of the bytes in
+ * the reserved space will be 0's. The encoded format will be represented after
+ * the first 64 bytes. The ordered collection of SequenceGroups will be appended
+ * sequentially to this byte array, with each SequenceGroup taking 24 bytes.
+ */
+public class AvailabilityOfEntriesOfLedger {
+    public static final long INVALID_ENTRYID = -1;
+
+    /*
+     *
+     * Nomenclature:
+     *
+     * - Continuous entries are grouped as a ’Sequence’. - Number of continuous
+     * entries in a ‘Sequence’ is called ‘sequenceSize’. - Gap between
+     * Consecutive sequences is called ‘sequencePeriod’. - Consecutive sequences
+     * with same sequenceSize and same sequencePeriod in between consecutive
+     * sequences are grouped as a SequenceGroup. - ‘firstSequenceStart’ is the
+     * first entry in the first sequence of the SequenceGroup. -
+     * ‘lastSequenceStart’ is the first entry in the last sequence of the
+     * SequenceGroup.
+     *
+     * To represent a SequenceGroup, two long values and two int values are
+     * needed, so each SequenceGroup can be represented with (2 * 8 + 2 * 4 = 24
+     * bytes).
+     */
+    private static class SequenceGroup {
+        private static final int SEQUENCEGROUP_BYTES = 2 * Long.BYTES + 2 * Integer.BYTES;
+        private final long firstSequenceStart;
+        private final int sequenceSize;
+        private long lastSequenceStart = INVALID_ENTRYID;
+        private int sequencePeriod;
+        private boolean isSequenceGroupClosed = false;
+        private long numOfEntriesInSequenceGroup = 0;
+
+        private SequenceGroup(long firstSequenceStart, int sequenceSize) {
+            this.firstSequenceStart = firstSequenceStart;
+            this.lastSequenceStart = firstSequenceStart;
+            this.sequenceSize = sequenceSize;
+            this.sequencePeriod = 0;
+        }
+
+        private SequenceGroup(byte[] serializedSequenceGroup) {
+            ByteBuffer buffer = ByteBuffer.wrap(serializedSequenceGroup);
+            firstSequenceStart = buffer.getLong();
+            lastSequenceStart = buffer.getLong();
+            sequenceSize = buffer.getInt();
+            sequencePeriod = buffer.getInt();
+            setSequenceGroupClosed();
+        }
+
+        private boolean isSequenceGroupClosed() {
+            return isSequenceGroupClosed;
+        }
+
+        private void setSequenceGroupClosed() {
+            this.isSequenceGroupClosed = true;
+            numOfEntriesInSequenceGroup = (lastSequenceStart - firstSequenceStart) == 0 ? sequenceSize
+                    : (((lastSequenceStart - firstSequenceStart) / sequencePeriod) + 1) * sequenceSize;
+        }
+
+        private long getNumOfEntriesInSequenceGroup() {
+            if (!isSequenceGroupClosed()) {
+                throw new IllegalStateException(
+                        "SequenceGroup is not yet closed, it is illegal to call getNumOfEntriesInSequenceGroup");
+            }
+            return numOfEntriesInSequenceGroup;
+        }
+
+        private long getLastSequenceStart() {
+            return lastSequenceStart;
+        }
+
+        private void setLastSequenceStart(long lastSequenceStart) {
+            this.lastSequenceStart = lastSequenceStart;
+        }
+
+        private int getSequencePeriod() {
+            return sequencePeriod;
+        }
+
+        private void setSequencePeriod(int sequencePeriod) {
+            this.sequencePeriod = sequencePeriod;
+        }
+
+        private long getFirstSequenceStart() {
+            return firstSequenceStart;
+        }
+
+        private void serializeSequenceGroup(byte[] byteArrayForSerialization) {
+            if (!isSequenceGroupClosed()) {
+                throw new IllegalStateException(
+                        "SequenceGroup is not yet closed, it is illegal to call serializeSequenceGroup");
+            }
+            ByteBuffer buffer = ByteBuffer.wrap(byteArrayForSerialization);
+            buffer.putLong(firstSequenceStart);
+            buffer.putLong(lastSequenceStart);
+            buffer.putInt(sequenceSize);
+            buffer.putInt(sequencePeriod);
+        }
+
+        private boolean isEntryAvailable(long entryId) {
+            if (!isSequenceGroupClosed()) {
+                throw new IllegalStateException(
+                        "SequenceGroup is not yet closed, it is illegal to call isEntryAvailable");
+            }
+
+            if ((entryId >= firstSequenceStart) && (entryId <= (lastSequenceStart + sequenceSize))) {
+                if (sequencePeriod == 0) {
+                    return ((entryId - firstSequenceStart) < sequenceSize);
+                } else {
+                    return (((entryId - firstSequenceStart) % sequencePeriod) < sequenceSize);
+                }
+            } else {
+                return false;
+            }
+        }
+    }
+
+    public static final int HEADER_SIZE = 64;
+    public static final int V0 = 0;
+    // current version of AvailabilityOfEntriesOfLedger header is V0
+    public static final int CURRENT_HEADER_VERSION = V0;
+    private final TreeMap<Long, SequenceGroup> sortedSequenceGroups = new TreeMap<Long, SequenceGroup>();
+    private MutableObject<SequenceGroup> curSequenceGroup = new MutableObject<SequenceGroup>(null);
+    private MutableLong curSequenceStartEntryId = new MutableLong(INVALID_ENTRYID);
+    private MutableInt curSequenceSize = new MutableInt(0);
+    private boolean availabilityOfEntriesOfLedgerClosed = false;
+    private long totalNumOfAvailableEntries = 0;
+
+    public AvailabilityOfEntriesOfLedger(PrimitiveIterator.OfLong entriesOfLedgerItr) {
+        while (entriesOfLedgerItr.hasNext()) {
+            this.addEntryToAvailabileEntriesOfLedger(entriesOfLedgerItr.nextLong());
+        }
+        this.closeStateOfEntriesOfALedger();
+    }
+
+    public AvailabilityOfEntriesOfLedger(byte[] serializeStateOfEntriesOfLedger) {
+        byte[] header = new byte[HEADER_SIZE];
+        byte[] serializedSequenceGroupByteArray = new byte[SequenceGroup.SEQUENCEGROUP_BYTES];
+        System.arraycopy(serializeStateOfEntriesOfLedger, 0, header, 0, HEADER_SIZE);
+
+        ByteBuffer headerByteBuf = ByteBuffer.wrap(header);
+        int headerVersion = headerByteBuf.getInt();
+        if (headerVersion > CURRENT_HEADER_VERSION) {
+            throw new IllegalArgumentException("Unsupported Header Version: " + headerVersion);
+        }
+        int numOfSequenceGroups = headerByteBuf.getInt();
+        SequenceGroup newSequenceGroup;
+        for (int i = 0; i < numOfSequenceGroups; i++) {
+            Arrays.fill(serializedSequenceGroupByteArray, (byte) 0);
+            System.arraycopy(serializeStateOfEntriesOfLedger, HEADER_SIZE + (i * SequenceGroup.SEQUENCEGROUP_BYTES),
+                    serializedSequenceGroupByteArray, 0, SequenceGroup.SEQUENCEGROUP_BYTES);
+            newSequenceGroup = new SequenceGroup(serializedSequenceGroupByteArray);
+            sortedSequenceGroups.put(newSequenceGroup.getFirstSequenceStart(), newSequenceGroup);
+        }
+        setAvailabilityOfEntriesOfLedgerClosed();
+    }
+
+    public AvailabilityOfEntriesOfLedger(ByteBuf byteBuf) {
+        byte[] header = new byte[HEADER_SIZE];
+        byte[] serializedSequenceGroupByteArray = new byte[SequenceGroup.SEQUENCEGROUP_BYTES];
+        int readerIndex = byteBuf.readerIndex();
+        byteBuf.getBytes(readerIndex, header, 0, HEADER_SIZE);
+
+        ByteBuffer headerByteBuf = ByteBuffer.wrap(header);
+        int headerVersion = headerByteBuf.getInt();
+        if (headerVersion > CURRENT_HEADER_VERSION) {
+            throw new IllegalArgumentException("Unsupported Header Version: " + headerVersion);
+        }
+        int numOfSequenceGroups = headerByteBuf.getInt();
+        SequenceGroup newSequenceGroup;
+        for (int i = 0; i < numOfSequenceGroups; i++) {
+            Arrays.fill(serializedSequenceGroupByteArray, (byte) 0);
+            byteBuf.getBytes(readerIndex + HEADER_SIZE + (i * SequenceGroup.SEQUENCEGROUP_BYTES),
+                    serializedSequenceGroupByteArray, 0, SequenceGroup.SEQUENCEGROUP_BYTES);
+            newSequenceGroup = new SequenceGroup(serializedSequenceGroupByteArray);
+            sortedSequenceGroups.put(newSequenceGroup.getFirstSequenceStart(), newSequenceGroup);
+        }
+        setAvailabilityOfEntriesOfLedgerClosed();
+    }
+
+    private void initializeCurSequence(long curSequenceStartEntryIdValue) {
+        curSequenceStartEntryId.setValue(curSequenceStartEntryIdValue);
+        curSequenceSize.setValue(1);
+    }
+
+    private void resetCurSequence() {
+        curSequenceStartEntryId.setValue(INVALID_ENTRYID);
+        curSequenceSize.setValue(0);
+    }
+
+    private boolean isCurSequenceInitialized() {
+        return curSequenceStartEntryId.longValue() != INVALID_ENTRYID;
+    }
+
+    private boolean isEntryExistingInCurSequence(long entryId) {
+        return (curSequenceStartEntryId.longValue() <= entryId)
+                && (entryId < (curSequenceStartEntryId.longValue() + curSequenceSize.intValue()));
+    }
+
+    private boolean isEntryAppendableToCurSequence(long entryId) {
+        return ((curSequenceStartEntryId.longValue() + curSequenceSize.intValue()) == entryId);
+    }
+
+    private void incrementCurSequenceSize() {
+        curSequenceSize.increment();
+    }
+
+    private void createNewSequenceGroupWithCurSequence() {
+        SequenceGroup curSequenceGroupValue = curSequenceGroup.getValue();
+        curSequenceGroupValue.setSequenceGroupClosed();
+        sortedSequenceGroups.put(curSequenceGroupValue.getFirstSequenceStart(), curSequenceGroupValue);
+        curSequenceGroup.setValue(new SequenceGroup(curSequenceStartEntryId.longValue(), curSequenceSize.intValue()));
+    }
+
+    private boolean isCurSequenceGroupInitialized() {
+        return curSequenceGroup.getValue() != null;
+    }
+
+    private void initializeCurSequenceGroupWithCurSequence() {
+        curSequenceGroup.setValue(new SequenceGroup(curSequenceStartEntryId.longValue(), curSequenceSize.intValue()));
+    }
+
+    private boolean doesCurSequenceBelongToCurSequenceGroup() {
+        long curSequenceStartEntryIdValue = curSequenceStartEntryId.longValue();
+        int curSequenceSizeValue = curSequenceSize.intValue();
+        boolean belongsToThisSequenceGroup = false;
+        SequenceGroup curSequenceGroupValue = curSequenceGroup.getValue();
+        if ((curSequenceGroupValue.sequenceSize == curSequenceSizeValue)
+                && ((curSequenceGroupValue.getLastSequenceStart() == INVALID_ENTRYID) || ((curSequenceStartEntryIdValue
+                        - curSequenceGroupValue.getLastSequenceStart()) == curSequenceGroupValue
+                                .getSequencePeriod()))) {
+            belongsToThisSequenceGroup = true;
+        }
+        return belongsToThisSequenceGroup;
+    }
+
+    private void appendCurSequenceToCurSequenceGroup() {
+        SequenceGroup curSequenceGroupValue = curSequenceGroup.getValue();
+        curSequenceGroupValue.setLastSequenceStart(curSequenceStartEntryId.longValue());
+        if (curSequenceGroupValue.getSequencePeriod() == 0) {
+            curSequenceGroupValue.setSequencePeriod(
+                    ((int) (curSequenceGroupValue.getLastSequenceStart() - curSequenceGroupValue.firstSequenceStart)));
+        }
+    }
+
+    private void addCurSequenceToSequenceGroup() {
+        if (!isCurSequenceGroupInitialized()) {
+            initializeCurSequenceGroupWithCurSequence();
+        } else if (doesCurSequenceBelongToCurSequenceGroup()) {
+            appendCurSequenceToCurSequenceGroup();
+        } else {
+            createNewSequenceGroupWithCurSequence();
+        }
+    }
+
+    private void addEntryToAvailabileEntriesOfLedger(long entryId) {
+        if (!isCurSequenceInitialized()) {
+            initializeCurSequence(entryId);
+        } else if (isEntryExistingInCurSequence(entryId)) {
+            /* this entry is already added so do nothing */
+        } else if (isEntryAppendableToCurSequence(entryId)) {
+            incrementCurSequenceSize();
+        } else {
+            addCurSequenceToSequenceGroup();
+            initializeCurSequence(entryId);
+        }
+    }
+
+    private void closeStateOfEntriesOfALedger() {
+        if (isCurSequenceInitialized()) {
+            addCurSequenceToSequenceGroup();
+            resetCurSequence();
+        }
+        SequenceGroup curSequenceGroupValue = curSequenceGroup.getValue();
+        if (curSequenceGroupValue != null) {
+            curSequenceGroupValue.setSequenceGroupClosed();
+            sortedSequenceGroups.put(curSequenceGroupValue.getFirstSequenceStart(), curSequenceGroupValue);
+        }
+        setAvailabilityOfEntriesOfLedgerClosed();
+    }
+
+    private boolean isAvailabilityOfEntriesOfLedgerClosed() {
+        return availabilityOfEntriesOfLedgerClosed;
+    }
+
+    private void setAvailabilityOfEntriesOfLedgerClosed() {
+        this.availabilityOfEntriesOfLedgerClosed = true;
+        for (Entry<Long, SequenceGroup> seqGroupEntry : sortedSequenceGroups.entrySet()) {
+            totalNumOfAvailableEntries += seqGroupEntry.getValue().getNumOfEntriesInSequenceGroup();
+        }
+    }
+
+    public byte[] serializeStateOfEntriesOfLedger() {
+        if (!isAvailabilityOfEntriesOfLedgerClosed()) {
+            throw new IllegalStateException("AvailabilityOfEntriesOfLedger is not yet closed,"
+                    + "it is illegal to call serializeStateOfEntriesOfLedger");
+        }
+        byte[] header = new byte[HEADER_SIZE];
+        ByteBuffer headerByteBuf = ByteBuffer.wrap(header);
+        byte[] serializedSequenceGroupByteArray = new byte[SequenceGroup.SEQUENCEGROUP_BYTES];
+        byte[] serializedStateByteArray = new byte[HEADER_SIZE
+                + (sortedSequenceGroups.size() * SequenceGroup.SEQUENCEGROUP_BYTES)];
+        final int numOfSequenceGroups = sortedSequenceGroups.size();
+        headerByteBuf.putInt(CURRENT_HEADER_VERSION);
+        headerByteBuf.putInt(numOfSequenceGroups);
+        System.arraycopy(header, 0, serializedStateByteArray, 0, HEADER_SIZE);
+        int seqNum = 0;
+        for (Entry<Long, SequenceGroup> seqGroupEntry : sortedSequenceGroups.entrySet()) {
+            SequenceGroup seqGroup = seqGroupEntry.getValue();
+            Arrays.fill(serializedSequenceGroupByteArray, (byte) 0);
+            seqGroup.serializeSequenceGroup(serializedSequenceGroupByteArray);
+            System.arraycopy(serializedSequenceGroupByteArray, 0, serializedStateByteArray,
+                    HEADER_SIZE + ((seqNum++) * SequenceGroup.SEQUENCEGROUP_BYTES), SequenceGroup.SEQUENCEGROUP_BYTES);
+        }
+        return serializedStateByteArray;
+    }
+
+    public boolean isEntryAvailable(long entryId) {
+        if (!isAvailabilityOfEntriesOfLedgerClosed()) {
+            throw new IllegalStateException(
+                    "AvailabilityOfEntriesOfLedger is not yet closed, it is illegal to call isEntryAvailable");
+        }
+        Entry<Long, SequenceGroup> seqGroup = sortedSequenceGroups.floorEntry(entryId);
+        if (seqGroup == null) {
+            return false;
+        }
+        return seqGroup.getValue().isEntryAvailable(entryId);
+    }
+
+    public long getTotalNumOfAvailableEntries() {
+        if (!isAvailabilityOfEntriesOfLedgerClosed()) {
+            throw new IllegalStateException("AvailabilityOfEntriesOfLedger is not yet closed,"
+                    + " it is illegal to call getTotalNumOfAvailableEntries");
+        }
+        return totalNumOfAvailableEntries;
+    }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/IteratorUtility.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/IteratorUtility.java
new file mode 100644
index 0000000..701d31a
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/IteratorUtility.java
@@ -0,0 +1,171 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.util;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.PrimitiveIterator;
+import java.util.PrimitiveIterator.OfLong;
+import java.util.function.ToLongFunction;
+
+/**
+ * Utility class to merge iterators.
+ */
+public class IteratorUtility {
+
+    private static final long INVALID_ELEMENT = -1;
+
+    /**
+     * Merges two long primitive sorted iterators and returns merged iterator.
+     * It expects
+     *  - input iterators to be sorted
+     *  - input iterators to be non-repetitive for merged iterator to be non-repetitive
+     * It removes duplicates from the input iterators.
+     *
+     * @param iter1
+     *            first primitive oflong input iterator
+     * @param iter2
+     *            second primitive oflong input iterator
+     * @return merged primitive oflong iterator.
+     */
+    public static OfLong mergePrimitiveLongIterator(OfLong iter1, OfLong iter2) {
+        return new PrimitiveIterator.OfLong() {
+            private long curIter1Element = INVALID_ELEMENT;
+            private long curIter2Element = INVALID_ELEMENT;
+            private boolean hasToPreFetch = true;
+
+            @Override
+            public boolean hasNext() {
+                if (hasToPreFetch) {
+                    if (curIter1Element == INVALID_ELEMENT) {
+                        curIter1Element = iter1.hasNext() ? iter1.nextLong() : INVALID_ELEMENT;
+                    }
+                    if (curIter2Element == INVALID_ELEMENT) {
+                        curIter2Element = iter2.hasNext() ? iter2.nextLong() : INVALID_ELEMENT;
+                    }
+                }
+                hasToPreFetch = false;
+                return (curIter1Element != INVALID_ELEMENT || curIter2Element != INVALID_ELEMENT);
+            }
+
+            @Override
+            public long nextLong() {
+                if (!hasNext()) {
+                    throw new NoSuchElementException();
+                }
+
+                long returnEntryId = INVALID_ELEMENT;
+                if (curIter1Element != INVALID_ELEMENT && curIter2Element != INVALID_ELEMENT) {
+                    if (curIter1Element == curIter2Element) {
+                        returnEntryId = curIter1Element;
+                        curIter1Element = INVALID_ELEMENT;
+                        curIter2Element = INVALID_ELEMENT;
+                    } else if (curIter1Element < curIter2Element) {
+                        returnEntryId = curIter1Element;
+                        curIter1Element = INVALID_ELEMENT;
+                    } else {
+                        returnEntryId = curIter2Element;
+                        curIter2Element = INVALID_ELEMENT;
+                    }
+                } else if (curIter1Element != INVALID_ELEMENT) {
+                    returnEntryId = curIter1Element;
+                    curIter1Element = INVALID_ELEMENT;
+                } else {
+                    returnEntryId = curIter2Element;
+                    curIter2Element = INVALID_ELEMENT;
+                }
+                hasToPreFetch = true;
+                return returnEntryId;
+            }
+        };
+    }
+
+    /**
+     * Merges two sorted iterators and returns merged iterator sorted using
+     * comparator. It uses 'function' to convert T type to long, to return long
+     * iterator.
+     * It expects
+     *  - input iterators to be sorted
+     *  - input iterators to be non-repetitive for merged iterator to be non-repetitive
+     * It removes duplicates from the input iterators.
+     *
+     * @param iter1
+     *          first iterator of type T
+     * @param iter2
+     *          second iterator of type T
+     * @param comparator
+     * @param function
+     * @return
+     */
+    public static <T> OfLong mergeIteratorsForPrimitiveLongIterator(Iterator<T> iter1, Iterator<T> iter2,
+            Comparator<T> comparator, ToLongFunction<T> function) {
+        return new PrimitiveIterator.OfLong() {
+            private T curIter1Entry = null;
+            private T curIter2Entry = null;
+            private boolean hasToPreFetch = true;
+
+            @Override
+            public boolean hasNext() {
+                if (hasToPreFetch) {
+                    if (curIter1Entry == null) {
+                        curIter1Entry = iter1.hasNext() ? iter1.next() : null;
+                    }
+                    if (curIter2Entry == null) {
+                        curIter2Entry = iter2.hasNext() ? iter2.next() : null;
+                    }
+                }
+                hasToPreFetch = false;
+                return (curIter1Entry != null || curIter2Entry != null);
+            }
+
+            @Override
+            public long nextLong() {
+                if (!hasNext()) {
+                    throw new NoSuchElementException();
+                }
+
+                T returnEntry = null;
+                if (curIter1Entry != null && curIter2Entry != null) {
+                    int compareValue = comparator.compare(curIter1Entry, curIter2Entry);
+                    if (compareValue == 0) {
+                        returnEntry = curIter1Entry;
+                        curIter1Entry = null;
+                        curIter2Entry = null;
+                    } else if (compareValue < 0) {
+                        returnEntry = curIter1Entry;
+                        curIter1Entry = null;
+                    } else {
+                        returnEntry = curIter2Entry;
+                        curIter2Entry = null;
+                    }
+                } else if (curIter1Entry != null) {
+                    returnEntry = curIter1Entry;
+                    curIter1Entry = null;
+                } else {
+                    returnEntry = curIter2Entry;
+                    curIter2Entry = null;
+                }
+                hasToPreFetch = true;
+                return function.applyAsLong(returnEntry);
+            }
+        };
+    }
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorageTest.java
index c12ed91..6416173 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorageTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorageTest.java
@@ -23,6 +23,8 @@ package org.apache.bookkeeper.bookie;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_SCRUB_PAGE_RETRIES;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
@@ -34,11 +36,15 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
+import java.util.PrimitiveIterator.OfLong;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.util.stream.IntStream;
+
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
@@ -135,7 +141,9 @@ public class InterleavedLedgerStorageTest {
     TestableEntryLogger entryLogger;
     InterleavedLedgerStorage interleavedStorage = new InterleavedLedgerStorage();
     final long numWrites = 2000;
+    final long moreNumOfWrites = 3000;
     final long entriesPerWrite = 2;
+    final long numOfLedgers = 5;
 
     @Before
     public void setUp() throws Exception {
@@ -158,7 +166,7 @@ public class InterleavedLedgerStorageTest {
 
         // Insert some ledger & entries in the interleaved storage
         for (long entryId = 0; entryId < numWrites; entryId++) {
-            for (long ledgerId = 0; ledgerId < 5; ledgerId++) {
+            for (long ledgerId = 0; ledgerId < numOfLedgers; ledgerId++) {
                 if (entryId == 0) {
                     interleavedStorage.setMasterKey(ledgerId, ("ledger-" + ledgerId).getBytes());
                     interleavedStorage.setFenced(ledgerId);
@@ -192,6 +200,52 @@ public class InterleavedLedgerStorageTest {
     }
 
     @Test
+    public void testGetListOfEntriesOfLedger() throws IOException {
+        for (long ledgerId = 0; ledgerId < numOfLedgers; ledgerId++) {
+            OfLong entriesOfLedger = interleavedStorage.getListOfEntriesOfLedger(ledgerId);
+            ArrayList<Long> arrayList = new ArrayList<Long>();
+            Consumer<Long> addMethod = arrayList::add;
+            entriesOfLedger.forEachRemaining(addMethod);
+            assertEquals("Number of entries", numWrites, arrayList.size());
+            assertTrue("Entries of Ledger", IntStream.range(0, arrayList.size()).allMatch(i -> {
+                return arrayList.get(i).longValue() == (i * entriesPerWrite);
+            }));
+        }
+
+        long nonExistingLedger = 456789L;
+        OfLong entriesOfLedger = interleavedStorage.getListOfEntriesOfLedger(nonExistingLedger);
+        assertFalse("There shouldn't be any entry", entriesOfLedger.hasNext());
+    }
+
+    @Test
+    public void testGetListOfEntriesOfLedgerAfterFlush() throws IOException {
+        interleavedStorage.flush();
+
+        // Insert some more ledger & entries in the interleaved storage
+        for (long entryId = numWrites; entryId < moreNumOfWrites; entryId++) {
+            for (long ledgerId = 0; ledgerId < numOfLedgers; ledgerId++) {
+                ByteBuf entry = Unpooled.buffer(128);
+                entry.writeLong(ledgerId);
+                entry.writeLong(entryId * entriesPerWrite);
+                entry.writeBytes(("entry-" + entryId).getBytes());
+
+                interleavedStorage.addEntry(entry);
+            }
+        }
+
+        for (long ledgerId = 0; ledgerId < numOfLedgers; ledgerId++) {
+            OfLong entriesOfLedger = interleavedStorage.getListOfEntriesOfLedger(ledgerId);
+            ArrayList<Long> arrayList = new ArrayList<Long>();
+            Consumer<Long> addMethod = arrayList::add;
+            entriesOfLedger.forEachRemaining(addMethod);
+            assertEquals("Number of entries", moreNumOfWrites, arrayList.size());
+            assertTrue("Entries of Ledger", IntStream.range(0, arrayList.size()).allMatch(i -> {
+                return arrayList.get(i).longValue() == (i * entriesPerWrite);
+            }));
+        }
+    }
+
+    @Test
     public void testConsistencyCheckConcurrentGC() throws Exception {
         final long signalDone = -1;
         final List<Exception> asyncErrors = new ArrayList<>();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageTest.java
index d028f70..76244b7 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageTest.java
@@ -28,7 +28,11 @@ import io.netty.buffer.UnpooledByteBufAllocator;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.PrimitiveIterator.OfLong;
 import java.util.concurrent.CountDownLatch;
+import java.util.function.Consumer;
+import java.util.stream.IntStream;
 
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
@@ -37,6 +41,7 @@ import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.proto.checksum.DigestManager;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.util.TestUtils;
+import org.junit.Assert;
 import org.junit.Test;
 
 /**
@@ -277,4 +282,44 @@ public class LedgerStorageTest extends BookKeeperClusterTestCase {
         fi.readHeader();
         return fi;
     }
+
+    @Test
+    public void testGetListOfEntriesOfLedger() throws Exception {
+        ClientConfiguration conf = new ClientConfiguration();
+        conf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+        int numOfBookies = bs.size();
+        int numOfEntries = 5;
+        BookKeeper.DigestType digestType = BookKeeper.DigestType.CRC32;
+        BookKeeper bkc = new BookKeeper(conf);
+        LedgerHandle lh = bkc.createLedger(numOfBookies, numOfBookies, digestType, "testPasswd".getBytes());
+        long lId = lh.getId();
+        for (int i = 0; i < numOfEntries; i++) {
+            lh.addEntry("000".getBytes());
+        }
+
+        ServerConfiguration newBookieConf = new ServerConfiguration(bsConfs.get(0));
+        /*
+         * by reusing bookieServerConfig and setting metadataServiceUri to null
+         * we can create/start new Bookie instance using the same data
+         * (journal/ledger/index) of the existing BookeieServer for our testing
+         * purpose.
+         */
+        newBookieConf.setMetadataServiceUri(null);
+        Bookie newbookie = new Bookie(newBookieConf);
+        /*
+         * since 'newbookie' uses the same data as original Bookie, it should be
+         * able to read journal of the original bookie.
+         */
+        newbookie.readJournal();
+
+        OfLong listOfEntriesItr = newbookie.getListOfEntriesOfLedger(lId);
+        ArrayList<Long> arrayList = new ArrayList<Long>();
+        Consumer<Long> addMethod = arrayList::add;
+        listOfEntriesItr.forEachRemaining(addMethod);
+
+        assertEquals("Num Of Entries", numOfEntries, arrayList.size());
+        Assert.assertTrue("Iterator should be sorted",
+                IntStream.range(0, arrayList.size() - 1).allMatch(k -> arrayList.get(k) <= arrayList.get(k + 1)));
+        bkc.close();
+    }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageTest.java
new file mode 100644
index 0000000..01383e2
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageTest.java
@@ -0,0 +1,194 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie;
+
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.PrimitiveIterator.OfLong;
+import java.util.function.Consumer;
+import java.util.stream.IntStream;
+
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
+import org.apache.bookkeeper.test.TestStatsProvider;
+import org.apache.bookkeeper.util.DiskChecker;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * Testing SortedLedgerStorage.
+ */
+@RunWith(Parameterized.class)
+public class SortedLedgerStorageTest {
+
+    TestStatsProvider statsProvider = new TestStatsProvider();
+    ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+    LedgerDirsManager ledgerDirsManager;
+    SortedLedgerStorage sortedLedgerStorage = new SortedLedgerStorage();
+
+    final long numWrites = 2000;
+    final long moreNumOfWrites = 3000;
+    final long entriesPerWrite = 2;
+    final long numOfLedgers = 5;
+
+    @Parameterized.Parameters
+    public static Iterable<Boolean> elplSetting() {
+        return Arrays.asList(true, false);
+    }
+
+    public SortedLedgerStorageTest(boolean elplSetting) {
+        conf.setEntryLogSizeLimit(2048);
+        conf.setEntryLogPerLedgerEnabled(elplSetting);
+    }
+
+    CheckpointSource checkpointSource = new CheckpointSource() {
+        @Override
+        public Checkpoint newCheckpoint() {
+            return Checkpoint.MAX;
+        }
+
+        @Override
+        public void checkpointComplete(Checkpoint checkpoint, boolean compact) throws IOException {
+        }
+    };
+
+    Checkpointer checkpointer = new Checkpointer() {
+        @Override
+        public void startCheckpoint(Checkpoint checkpoint) {
+            // No-op
+        }
+
+        @Override
+        public void start() {
+            // no-op
+        }
+    };
+
+    @Before
+    public void setUp() throws Exception {
+        File tmpDir = File.createTempFile("bkTest", ".dir");
+        tmpDir.delete();
+        tmpDir.mkdir();
+        File curDir = Bookie.getCurrentDirectory(tmpDir);
+        Bookie.checkDirectoryStructure(curDir);
+
+        conf.setLedgerDirNames(new String[] { tmpDir.toString() });
+        ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+        sortedLedgerStorage.initialize(conf, null, ledgerDirsManager, ledgerDirsManager, null, checkpointSource,
+                checkpointer, statsProvider.getStatsLogger(BOOKIE_SCOPE), UnpooledByteBufAllocator.DEFAULT);
+    }
+
+    @Test
+    public void testGetListOfEntriesOfLedger() throws Exception {
+        long nonExistingLedgerId = 123456L;
+        OfLong entriesItr = sortedLedgerStorage.getListOfEntriesOfLedger(nonExistingLedgerId);
+        assertFalse("There shouldn't be any entries for this ledger", entriesItr.hasNext());
+        // Insert some ledger & entries in the interleaved storage
+        for (long entryId = 0; entryId < numWrites; entryId++) {
+            for (long ledgerId = 0; ledgerId < numOfLedgers; ledgerId++) {
+                if (entryId == 0) {
+                    sortedLedgerStorage.setMasterKey(ledgerId, ("ledger-" + ledgerId).getBytes());
+                    sortedLedgerStorage.setFenced(ledgerId);
+                }
+                ByteBuf entry = Unpooled.buffer(128);
+                entry.writeLong(ledgerId);
+                entry.writeLong(entryId * entriesPerWrite);
+                entry.writeBytes(("entry-" + entryId).getBytes());
+
+                sortedLedgerStorage.addEntry(entry);
+            }
+        }
+
+        for (long ledgerId = 0; ledgerId < numOfLedgers; ledgerId++) {
+            OfLong entriesOfLedger = sortedLedgerStorage.getListOfEntriesOfLedger(ledgerId);
+            ArrayList<Long> arrayList = new ArrayList<Long>();
+            Consumer<Long> addMethod = arrayList::add;
+            entriesOfLedger.forEachRemaining(addMethod);
+            assertEquals("Number of entries", numWrites, arrayList.size());
+            assertTrue("Entries of Ledger", IntStream.range(0, arrayList.size()).allMatch(i -> {
+                return arrayList.get(i).longValue() == (i * entriesPerWrite);
+            }));
+        }
+
+        nonExistingLedgerId = 456789L;
+        entriesItr = sortedLedgerStorage.getListOfEntriesOfLedger(nonExistingLedgerId);
+        assertFalse("There shouldn't be any entry", entriesItr.hasNext());
+    }
+
+    @Test
+    public void testGetListOfEntriesOfLedgerAfterFlush() throws IOException {
+        // Insert some ledger & entries in the interleaved storage
+        for (long entryId = 0; entryId < numWrites; entryId++) {
+            for (long ledgerId = 0; ledgerId < numOfLedgers; ledgerId++) {
+                if (entryId == 0) {
+                    sortedLedgerStorage.setMasterKey(ledgerId, ("ledger-" + ledgerId).getBytes());
+                    sortedLedgerStorage.setFenced(ledgerId);
+                }
+                ByteBuf entry = Unpooled.buffer(128);
+                entry.writeLong(ledgerId);
+                entry.writeLong(entryId * entriesPerWrite);
+                entry.writeBytes(("entry-" + entryId).getBytes());
+
+                sortedLedgerStorage.addEntry(entry);
+            }
+        }
+
+        sortedLedgerStorage.flush();
+
+        // Insert some more ledger & entries in the interleaved storage
+        for (long entryId = numWrites; entryId < moreNumOfWrites; entryId++) {
+            for (long ledgerId = 0; ledgerId < numOfLedgers; ledgerId++) {
+                ByteBuf entry = Unpooled.buffer(128);
+                entry.writeLong(ledgerId);
+                entry.writeLong(entryId * entriesPerWrite);
+                entry.writeBytes(("entry-" + entryId).getBytes());
+
+                sortedLedgerStorage.addEntry(entry);
+            }
+        }
+
+        for (long ledgerId = 0; ledgerId < numOfLedgers; ledgerId++) {
+            OfLong entriesOfLedger = sortedLedgerStorage.getListOfEntriesOfLedger(ledgerId);
+            ArrayList<Long> arrayList = new ArrayList<Long>();
+            Consumer<Long> addMethod = arrayList::add;
+            entriesOfLedger.forEachRemaining(addMethod);
+            assertEquals("Number of entries", moreNumOfWrites, arrayList.size());
+            assertTrue("Entries of Ledger", IntStream.range(0, arrayList.size()).allMatch(i -> {
+                return arrayList.get(i).longValue() == (i * entriesPerWrite);
+            }));
+        }
+    }
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestEntryMemTable.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestEntryMemTable.java
index 68e3eeb..9e6c559 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestEntryMemTable.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestEntryMemTable.java
@@ -18,6 +18,8 @@
  */
 package org.apache.bookkeeper.bookie;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -32,9 +34,14 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.PrimitiveIterator.OfLong;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
+import lombok.extern.slf4j.Slf4j;
 
 import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -50,6 +57,7 @@ import org.junit.runners.Parameterized.Parameters;
 /**
  * Test the EntryMemTable class.
  */
+@Slf4j
 @RunWith(Parameterized.class)
 public class TestEntryMemTable implements CacheCallback, SkipListFlusher, CheckpointSource {
 
@@ -297,5 +305,151 @@ public class TestEntryMemTable implements CacheCallback, SkipListFlusher, Checkp
         }
 
     }
+
+    @Test
+    public void testGetListOfEntriesOfLedger() throws IOException {
+        Set<EntryKeyValue> flushedKVs = Collections.newSetFromMap(new ConcurrentHashMap<EntryKeyValue, Boolean>());
+        KVFLusher flusher = new KVFLusher(flushedKVs);
+        int numofEntries = 100;
+        int numOfLedgers = 5;
+        byte[] data = new byte[10];
+        for (long entryId = 1; entryId <= numofEntries; entryId++) {
+            for (long ledgerId = 1; ledgerId <= numOfLedgers; ledgerId++) {
+                random.nextBytes(data);
+                assertTrue(ledgerId + ":" + entryId + " is duplicate in mem-table!",
+                        memTable.addEntry(ledgerId, entryId, ByteBuffer.wrap(data), this) != 0);
+            }
+        }
+        for (long ledgerId = 1; ledgerId <= numOfLedgers; ledgerId++) {
+            OfLong entriesItr = memTable.getListOfEntriesOfLedger((random.nextInt((int) ledgerId) + 1));
+            ArrayList<Long> listOfEntries = new ArrayList<Long>();
+            Consumer<Long> addMethod = listOfEntries::add;
+            entriesItr.forEachRemaining(addMethod);
+            assertEquals("Number of Entries", numofEntries, listOfEntries.size());
+            for (int i = 0; i < numofEntries; i++) {
+                assertEquals("listOfEntries should be sorted", Long.valueOf(i + 1), listOfEntries.get(i));
+            }
+        }
+        assertTrue("Snapshot is expected to be empty since snapshot is not done", memTable.snapshot.isEmpty());
+        assertTrue("Take snapshot and returned checkpoint should not be empty", memTable.snapshot() != null);
+        assertFalse("After taking snapshot, snapshot should not be empty ", memTable.snapshot.isEmpty());
+        for (long ledgerId = 1; ledgerId <= numOfLedgers; ledgerId++) {
+            OfLong entriesItr = memTable.getListOfEntriesOfLedger((random.nextInt((int) ledgerId) + 1));
+            ArrayList<Long> listOfEntries = new ArrayList<Long>();
+            Consumer<Long> addMethod = listOfEntries::add;
+            entriesItr.forEachRemaining(addMethod);
+            assertEquals("Number of Entries should be the same even after taking snapshot", numofEntries,
+                    listOfEntries.size());
+            for (int i = 0; i < numofEntries; i++) {
+                assertEquals("listOfEntries should be sorted", Long.valueOf(i + 1), listOfEntries.get(i));
+            }
+        }
+
+        memTable.flush(flusher);
+        for (long ledgerId = 1; ledgerId <= numOfLedgers; ledgerId++) {
+            OfLong entriesItr = memTable.getListOfEntriesOfLedger((random.nextInt((int) ledgerId) + 1));
+            assertFalse("After flushing there shouldn't be entries in memtable", entriesItr.hasNext());
+        }
+    }
+
+    @Test
+    public void testGetListOfEntriesOfLedgerFromBothKVMapAndSnapshot() throws IOException {
+        int numofEntries = 100;
+        int newNumOfEntries = 200;
+        int numOfLedgers = 5;
+        byte[] data = new byte[10];
+        for (long entryId = 1; entryId <= numofEntries; entryId++) {
+            for (long ledgerId = 1; ledgerId <= numOfLedgers; ledgerId++) {
+                random.nextBytes(data);
+                assertTrue(ledgerId + ":" + entryId + " is duplicate in mem-table!",
+                        memTable.addEntry(ledgerId, entryId, ByteBuffer.wrap(data), this) != 0);
+            }
+        }
+
+        assertTrue("Snapshot is expected to be empty since snapshot is not done", memTable.snapshot.isEmpty());
+        assertTrue("Take snapshot and returned checkpoint should not be empty", memTable.snapshot() != null);
+        assertFalse("After taking snapshot, snapshot should not be empty ", memTable.snapshot.isEmpty());
+
+        for (long entryId = numofEntries + 1; entryId <= newNumOfEntries; entryId++) {
+            for (long ledgerId = 1; ledgerId <= numOfLedgers; ledgerId++) {
+                random.nextBytes(data);
+                assertTrue(ledgerId + ":" + entryId + " is duplicate in mem-table!",
+                        memTable.addEntry(ledgerId, entryId, ByteBuffer.wrap(data), this) != 0);
+            }
+        }
+
+        for (long ledgerId = 1; ledgerId <= numOfLedgers; ledgerId++) {
+            OfLong entriesItr = memTable.getListOfEntriesOfLedger((random.nextInt((int) ledgerId) + 1));
+            ArrayList<Long> listOfEntries = new ArrayList<Long>();
+            Consumer<Long> addMethod = listOfEntries::add;
+            entriesItr.forEachRemaining(addMethod);
+            assertEquals("Number of Entries should be the same", newNumOfEntries, listOfEntries.size());
+            for (int i = 0; i < newNumOfEntries; i++) {
+                assertEquals("listOfEntries should be sorted", Long.valueOf(i + 1), listOfEntries.get(i));
+            }
+        }
+    }
+
+    @Test
+    public void testGetListOfEntriesOfLedgerWhileAddingConcurrently() throws IOException, InterruptedException {
+        final int numofEntries = 100;
+        final int newNumOfEntries = 200;
+        final int concurrentAddOfEntries = 300;
+        long ledgerId = 5;
+        byte[] data = new byte[10];
+        for (long entryId = 1; entryId <= numofEntries; entryId++) {
+            random.nextBytes(data);
+            assertTrue(ledgerId + ":" + entryId + " is duplicate in mem-table!",
+                    memTable.addEntry(ledgerId, entryId, ByteBuffer.wrap(data), this) != 0);
+        }
+
+        assertTrue("Snapshot is expected to be empty since snapshot is not done", memTable.snapshot.isEmpty());
+        assertTrue("Take snapshot and returned checkpoint should not be empty", memTable.snapshot() != null);
+        assertFalse("After taking snapshot, snapshot should not be empty ", memTable.snapshot.isEmpty());
+
+        for (long entryId = numofEntries + 1; entryId <= newNumOfEntries; entryId++) {
+            random.nextBytes(data);
+            assertTrue(ledgerId + ":" + entryId + " is duplicate in mem-table!",
+                    memTable.addEntry(ledgerId, entryId, ByteBuffer.wrap(data), this) != 0);
+        }
+
+        AtomicBoolean successfullyAdded = new AtomicBoolean(true);
+
+        Thread threadToAdd = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    for (long entryId = newNumOfEntries + 1; entryId <= concurrentAddOfEntries; entryId++) {
+                        random.nextBytes(data);
+                        boolean thisEntryAddedSuccessfully = (memTable.addEntry(ledgerId, entryId,
+                                ByteBuffer.wrap(data), TestEntryMemTable.this) != 0);
+                        successfullyAdded.set(successfullyAdded.get() && thisEntryAddedSuccessfully);
+                        Thread.sleep(10);
+                    }
+                } catch (IOException e) {
+                    log.error("Got Unexpected exception while adding entries");
+                    successfullyAdded.set(false);
+                } catch (InterruptedException e) {
+                    log.error("Got InterruptedException while waiting");
+                    successfullyAdded.set(false);
+                }
+            }
+        });
+        threadToAdd.start();
+
+        Thread.sleep(200);
+        OfLong entriesItr = memTable.getListOfEntriesOfLedger(ledgerId);
+        ArrayList<Long> listOfEntries = new ArrayList<Long>();
+        while (entriesItr.hasNext()) {
+            listOfEntries.add(entriesItr.next());
+            Thread.sleep(5);
+        }
+        threadToAdd.join(5000);
+        assertTrue("Entries should be added successfully in the spawned thread", successfullyAdded.get());
+
+        for (int i = 0; i < newNumOfEntries; i++) {
+            assertEquals("listOfEntries should be sorted", Long.valueOf(i + 1), listOfEntries.get(i));
+        }
+    }
 }
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
index f57d388..22535f3 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
@@ -29,6 +29,7 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 
 import java.io.IOException;
+import java.util.PrimitiveIterator.OfLong;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -369,6 +370,11 @@ public class TestSyncThread {
         @Override
         public void registerLedgerDeletionListener(LedgerDeletionListener listener) {
         }
+
+        @Override
+        public OfLong getListOfEntriesOfLedger(long ledgerId) {
+            return null;
+        }
     }
 
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperAdminTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperAdminTest.java
index 6e8cf32..2f419b5 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperAdminTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperAdminTest.java
@@ -24,6 +24,7 @@ import static com.google.common.base.Charsets.UTF_8;
 import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
 import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -33,6 +34,9 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -43,6 +47,7 @@ import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
 import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.bookkeeper.util.AvailabilityOfEntriesOfLedger;
 import org.apache.bookkeeper.util.BookKeeperConstants;
 import org.apache.commons.io.FileUtils;
 import org.apache.zookeeper.CreateMode;
@@ -399,4 +404,97 @@ public class BookKeeperAdminTest extends BookKeeperClusterTestCase {
         }
         bk.close();
     }
+
+    @Test
+    public void testGetListOfEntriesOfClosedLedger() throws Exception {
+        testGetListOfEntriesOfLedger(true);
+    }
+
+    @Test
+    public void testGetListOfEntriesOfNotClosedLedger() throws Exception {
+        testGetListOfEntriesOfLedger(false);
+    }
+
+    @Test
+    public void testGetListOfEntriesOfNonExistingLedger() throws Exception {
+        long nonExistingLedgerId = 56789L;
+        try (BookKeeperAdmin bkAdmin = new BookKeeperAdmin(zkUtil.getZooKeeperConnectString())) {
+            for (int i = 0; i < bs.size(); i++) {
+                CompletableFuture<AvailabilityOfEntriesOfLedger> futureResult = bkAdmin
+                        .asyncGetListOfEntriesOfLedger(bs.get(i).getLocalAddress(), nonExistingLedgerId);
+                try {
+                    futureResult.get();
+                    fail("asyncGetListOfEntriesOfLedger is supposed to be failed with NoSuchLedgerExistsException");
+                } catch (ExecutionException ee) {
+                    assertTrue(ee.getCause() instanceof BKException);
+                    BKException e = (BKException) ee.getCause();
+                    assertEquals(e.getCode(), BKException.Code.NoSuchLedgerExistsException);
+                }
+            }
+        }
+    }
+
+    public void testGetListOfEntriesOfLedger(boolean isLedgerClosed) throws Exception {
+        ClientConfiguration conf = new ClientConfiguration();
+        conf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+        int numOfEntries = 6;
+        BookKeeper bkc = new BookKeeper(conf);
+        LedgerHandle lh = bkc.createLedger(numOfBookies, numOfBookies, digestType, "testPasswd".getBytes());
+        long lId = lh.getId();
+        for (int i = 0; i < numOfEntries; i++) {
+            lh.addEntry("000".getBytes());
+        }
+        if (isLedgerClosed) {
+            lh.close();
+        }
+        try (BookKeeperAdmin bkAdmin = new BookKeeperAdmin(zkUtil.getZooKeeperConnectString())) {
+            for (int i = 0; i < bs.size(); i++) {
+                CompletableFuture<AvailabilityOfEntriesOfLedger> futureResult = bkAdmin
+                        .asyncGetListOfEntriesOfLedger(bs.get(i).getLocalAddress(), lId);
+                AvailabilityOfEntriesOfLedger availabilityOfEntriesOfLedger = futureResult.get();
+                assertEquals("Number of entries", numOfEntries,
+                        availabilityOfEntriesOfLedger.getTotalNumOfAvailableEntries());
+                for (int j = 0; j < numOfEntries; j++) {
+                    assertTrue("Entry should be available: " + j, availabilityOfEntriesOfLedger.isEntryAvailable(j));
+                }
+                assertFalse("Entry should not be available: " + numOfEntries,
+                        availabilityOfEntriesOfLedger.isEntryAvailable(numOfEntries));
+            }
+        }
+        bkc.close();
+    }
+
+    @Test
+    public void testGetListOfEntriesOfLedgerWithJustOneBookieInWriteQuorum() throws Exception {
+        ClientConfiguration conf = new ClientConfiguration();
+        conf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+        int numOfEntries = 6;
+        BookKeeper bkc = new BookKeeper(conf);
+        /*
+         * in this testsuite there are going to be 2 (numOfBookies) and if
+         * writeQuorum is 1 then it will stripe entries to those two bookies.
+         */
+        LedgerHandle lh = bkc.createLedger(2, 1, digestType, "testPasswd".getBytes());
+        long lId = lh.getId();
+        for (int i = 0; i < numOfEntries; i++) {
+            lh.addEntry("000".getBytes());
+        }
+
+        try (BookKeeperAdmin bkAdmin = new BookKeeperAdmin(zkUtil.getZooKeeperConnectString())) {
+            for (int i = 0; i < bs.size(); i++) {
+                CompletableFuture<AvailabilityOfEntriesOfLedger> futureResult = bkAdmin
+                        .asyncGetListOfEntriesOfLedger(bs.get(i).getLocalAddress(), lId);
+                AvailabilityOfEntriesOfLedger availabilityOfEntriesOfLedger = futureResult.get();
+                /*
+                 * since num of bookies in the ensemble is 2 and
+                 * writeQuorum/ackQuorum is 1, it will stripe to these two
+                 * bookies and hence in each bookie there will be only
+                 * numOfEntries/2 entries.
+                 */
+                assertEquals("Number of entries", numOfEntries / 2,
+                        availabilityOfEntriesOfLedger.getTotalNumOfAvailableEntries());
+            }
+        }
+        bkc.close();
+    }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
index 10f7048..30121b9 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
@@ -40,6 +40,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
+import java.util.PrimitiveIterator.OfLong;
 import java.util.Queue;
 import java.util.Random;
 import java.util.Set;
@@ -685,5 +686,10 @@ public class GcLedgersTest extends LedgerManagerTestCase {
                                                         Watcher<LastAddConfirmedUpdateNotification> watcher)
                 throws IOException {
         }
+
+        @Override
+        public OfLong getListOfEntriesOfLedger(long ledgerId) throws IOException {
+            return null;
+        }
     }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
index a673d8c..4acc3e7 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
@@ -31,6 +31,7 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Optional;
+import java.util.PrimitiveIterator.OfLong;
 
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.bookie.CheckpointSource;
@@ -288,5 +289,10 @@ public abstract class LedgerManagerTestCase extends BookKeeperClusterTestCase {
         public ByteBuf getExplicitLac(long ledgerId) {
             return null;
         }
+
+        @Override
+        public OfLong getListOfEntriesOfLedger(long ledgerId) {
+            return null;
+        }
     }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java
index 2c349a0..1cbddd3 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java
@@ -42,6 +42,7 @@ import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ForceLedgerCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.FutureGetListOfEntriesOfLedger;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback;
@@ -49,6 +50,7 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
 import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.DigestType;
 import org.apache.bookkeeper.proto.checksum.DigestManager;
+import org.apache.bookkeeper.util.AvailabilityOfEntriesOfLedger;
 import org.apache.bookkeeper.util.ByteBufList;
 
 import org.slf4j.Logger;
@@ -257,6 +259,17 @@ public class MockBookieClient implements BookieClient {
     }
 
     @Override
+    public CompletableFuture<AvailabilityOfEntriesOfLedger> getListOfEntriesOfLedger(BookieSocketAddress address,
+            long ledgerId) {
+        FutureGetListOfEntriesOfLedger futureResult = new FutureGetListOfEntriesOfLedger(ledgerId);
+        executor.executeOrdered(address, safeRun(() -> {
+            futureResult
+                    .completeExceptionally(BKException.create(BKException.Code.IllegalOpException).fillInStackTrace());
+        }));
+        return futureResult;
+    }
+
+    @Override
     public boolean isClosed() {
         return false;
     }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/AvailabilityOfEntriesOfLedgerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/AvailabilityOfEntriesOfLedgerTest.java
new file mode 100644
index 0000000..4368f89
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/AvailabilityOfEntriesOfLedgerTest.java
@@ -0,0 +1,192 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.PrimitiveIterator;
+import java.util.Set;
+
+import org.junit.Test;
+
+/**
+ * Testsuite for AvailabilityOfEntriesOfLedger.
+ */
+public class AvailabilityOfEntriesOfLedgerTest {
+    @Test
+    public void testWithItrConstructor() {
+        long[][] arrays = {
+                { 0, 1, 2 },
+                { 1, 2},
+                { 1, 2, 3, 5, 6, 7, 8 },
+                { 0, 1, 5 },
+                { 3 },
+                { 1, 2, 4, 5, 7, 8 },
+                {},
+                {0},
+                { 1, 2, 3, 5, 6, 11, 12, 13, 14, 15, 16, 17, 100, 1000, 1001, 10000, 20000, 20001 }
+        };
+        for (int i = 0; i < arrays.length; i++) {
+            long[] tempArray = arrays[i];
+            PrimitiveIterator.OfLong primitiveIterator = Arrays.stream(tempArray).iterator();
+            AvailabilityOfEntriesOfLedger availabilityOfEntriesOfLedger = new AvailabilityOfEntriesOfLedger(
+                    primitiveIterator);
+            assertEquals("Expected total number of entries", tempArray.length,
+                    availabilityOfEntriesOfLedger.getTotalNumOfAvailableEntries());
+            for (int j = 0; j < tempArray.length; j++) {
+                assertTrue(tempArray[j] + " is supposed to be available",
+                        availabilityOfEntriesOfLedger.isEntryAvailable(tempArray[j]));
+            }
+        }
+    }
+
+    @Test
+    public void testWithItrConstructorWithDuplicates() {
+        long[][] arrays = {
+                { 1, 2, 2, 3 },
+                { 1, 2, 3, 5, 5, 6, 7, 8, 8 },
+                { 1, 1, 5, 5 },
+                { 3, 3 },
+                { 1, 1, 2, 4, 5, 8, 9, 9, 9, 9, 9 },
+                {},
+                { 1, 2, 3, 5, 6, 11, 12, 13, 14, 15, 16, 17, 17, 100, 1000, 1000, 1001, 10000, 10000, 20000, 20001 }
+        };
+        for (int i = 0; i < arrays.length; i++) {
+            long[] tempArray = arrays[i];
+            Set<Long> tempSet = new HashSet<Long>();
+            for (int k = 0; k < tempArray.length; k++) {
+                tempSet.add(tempArray[k]);
+            }
+            PrimitiveIterator.OfLong primitiveIterator = Arrays.stream(tempArray).iterator();
+            AvailabilityOfEntriesOfLedger availabilityOfEntriesOfLedger = new AvailabilityOfEntriesOfLedger(
+                    primitiveIterator);
+            assertEquals("Expected total number of entries", tempSet.size(),
+                    availabilityOfEntriesOfLedger.getTotalNumOfAvailableEntries());
+            for (int j = 0; j < tempArray.length; j++) {
+                assertTrue(tempArray[j] + " is supposed to be available",
+                        availabilityOfEntriesOfLedger.isEntryAvailable(tempArray[j]));
+            }
+        }
+    }
+
+    @Test
+    public void testSerializeDeserialize() {
+        long[][] arrays = {
+                { 0, 1, 2 },
+                { 1, 2 },
+                { 1, 2, 3, 5, 6, 7, 8 },
+                { 0, 1, 5 },
+                { 3 },
+                { 1, 2, 4, 5, 7, 8 },
+                { },
+                { 0 },
+                { 1, 2, 3, 5, 6, 11, 12, 13, 14, 15, 16, 17, 100, 1000, 1001, 10000, 20000, 20001 }
+        };
+        for (int i = 0; i < arrays.length; i++) {
+            long[] tempArray = arrays[i];
+            PrimitiveIterator.OfLong primitiveIterator = Arrays.stream(tempArray).iterator();
+            AvailabilityOfEntriesOfLedger availabilityOfEntriesOfLedger = new AvailabilityOfEntriesOfLedger(
+                    primitiveIterator);
+            byte[] serializedState = availabilityOfEntriesOfLedger.serializeStateOfEntriesOfLedger();
+            AvailabilityOfEntriesOfLedger availabilityOfEntriesOfLedgerUsingSer = new AvailabilityOfEntriesOfLedger(
+                    serializedState);
+            assertEquals("Expected total number of entries", tempArray.length,
+                    availabilityOfEntriesOfLedgerUsingSer.getTotalNumOfAvailableEntries());
+            for (int j = 0; j < tempArray.length; j++) {
+                assertTrue(tempArray[j] + " is supposed to be available",
+                        availabilityOfEntriesOfLedgerUsingSer.isEntryAvailable(tempArray[j]));
+            }
+        }
+    }
+
+    @Test
+    public void testSerializeDeserializeWithItrConstructorWithDuplicates() {
+        long[][] arrays = {
+                { 1, 2, 2, 3 },
+                { 1, 2, 3, 5, 5, 6, 7, 8, 8 },
+                { 1, 1, 5, 5 },
+                { 3, 3 },
+                { 1, 1, 2, 4, 5, 8, 9, 9, 9, 9, 9 },
+                {},
+                { 1, 2, 3, 5, 6, 11, 12, 13, 14, 15, 16, 17, 17, 100, 1000, 1000, 1001, 10000, 10000, 20000, 20001 }
+        };
+        for (int i = 0; i < arrays.length; i++) {
+            long[] tempArray = arrays[i];
+            Set<Long> tempSet = new HashSet<Long>();
+            for (int k = 0; k < tempArray.length; k++) {
+                tempSet.add(tempArray[k]);
+            }
+            PrimitiveIterator.OfLong primitiveIterator = Arrays.stream(tempArray).iterator();
+            AvailabilityOfEntriesOfLedger availabilityOfEntriesOfLedger = new AvailabilityOfEntriesOfLedger(
+                    primitiveIterator);
+            byte[] serializedState = availabilityOfEntriesOfLedger.serializeStateOfEntriesOfLedger();
+            AvailabilityOfEntriesOfLedger availabilityOfEntriesOfLedgerUsingSer = new AvailabilityOfEntriesOfLedger(
+                    serializedState);
+            assertEquals("Expected total number of entries", tempSet.size(),
+                    availabilityOfEntriesOfLedgerUsingSer.getTotalNumOfAvailableEntries());
+            for (int j = 0; j < tempArray.length; j++) {
+                assertTrue(tempArray[j] + " is supposed to be available",
+                        availabilityOfEntriesOfLedgerUsingSer.isEntryAvailable(tempArray[j]));
+            }
+        }
+    }
+
+    @Test
+    public void testNonExistingEntries() {
+        long[][] arrays = {
+                { 0, 1, 2 },
+                { 1, 2, 3, 5, 6, 7, 8 },
+                { 1, 5 },
+                { 3 },
+                { 1, 2, 4, 5, 7, 8 },
+                {},
+                { 1, 2, 3, 5, 6, 11, 12, 13, 14, 15, 16, 17, 100, 1000, 1001, 10000, 20000, 20001 }
+        };
+        /**
+         * corresponding non-existing entries for 'arrays'
+         */
+        long[][] nonExistingEntries = {
+                { 3 },
+                { 0, 4, 9, 100, 101 },
+                { 0, 2, 3, 6, 9 },
+                { 0, 1, 2, 4, 5, 6 },
+                { 0, 3, 9, 10, 11, 100, 1000 },
+                { 0, 1, 2, 3, 4, 5 },
+                { 4, 18, 1002, 19999, 20003 }
+        };
+        for (int i = 0; i < arrays.length; i++) {
+            long[] tempArray = arrays[i];
+            long[] nonExistingElementsTempArray = nonExistingEntries[i];
+            PrimitiveIterator.OfLong primitiveIterator = Arrays.stream(tempArray).iterator();
+            AvailabilityOfEntriesOfLedger availabilityOfEntriesOfLedger = new AvailabilityOfEntriesOfLedger(
+                    primitiveIterator);
+
+            for (int j = 0; j < nonExistingElementsTempArray.length; j++) {
+                assertFalse(nonExistingElementsTempArray[j] + " is not supposed to be available",
+                        availabilityOfEntriesOfLedger.isEntryAvailable(nonExistingElementsTempArray[j]));
+            }
+        }
+    }
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/IteratorUtilityTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/IteratorUtilityTest.java
new file mode 100644
index 0000000..55c1db3
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/IteratorUtilityTest.java
@@ -0,0 +1,141 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.util;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.PrimitiveIterator;
+import java.util.function.Consumer;
+import java.util.stream.IntStream;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Testsuite for IteratorUtility methods.
+ */
+public class IteratorUtilityTest {
+
+    @Test
+    public void testWithPrimitiveItrMerge() {
+        long[][] arrays = {
+                { 0, 1, 2 },
+                { 0, 1 },
+                { 1, 2 },
+                { 1, 2, 3, 5, 6, 7, 8 },
+                { 1, 2, 3, 5, 6, 7, 8 },
+                { 0, 1, 5 },
+                { 3 },
+                { 1, 2, 4, 5, 7, 8 },
+                {},
+                {},
+                { 0 },
+                { 1, 2, 3, 5, 6, 11, 12, 13, 14, 15, 16, 17, 100, 1000, 1001, 10000, 20000, 20001 },
+                { 201, 202, 203, 205, 206, 2011, 2012, 2013, 2014, 2015, 2016, 2017, 20100, 201000,
+                    201001, 2010000, 2020000, 2020001 }
+        };
+        for (int i = 0; i < arrays.length; i++) {
+            for (int j = i + 1; j < arrays.length; j++) {
+                long[] tempArray1 = arrays[i];
+                long[] tempArray2 = arrays[j];
+                HashSet<Long> unionSet = new HashSet<Long>();
+                for (int k = 0; k < tempArray1.length; k++) {
+                    unionSet.add(tempArray1[k]);
+                }
+                for (int k = 0; k < tempArray2.length; k++) {
+                    unionSet.add(tempArray2[k]);
+                }
+
+                PrimitiveIterator.OfLong primitiveIterator1 = Arrays.stream(tempArray1).iterator();
+                PrimitiveIterator.OfLong primitiveIterator2 = Arrays.stream(tempArray2).iterator();
+
+                PrimitiveIterator.OfLong mergedItr = IteratorUtility.mergePrimitiveLongIterator(primitiveIterator1,
+                        primitiveIterator2);
+                ArrayList<Long> mergedArrayList = new ArrayList<Long>();
+                Consumer<Long> addMethod = mergedArrayList::add;
+                mergedItr.forEachRemaining(addMethod);
+                int mergedListSize = mergedArrayList.size();
+                Assert.assertEquals("Size of the mergedArrayList", unionSet.size(), mergedArrayList.size());
+                Assert.assertTrue("mergedArrayList should contain all elements in unionSet",
+                        mergedArrayList.containsAll(unionSet));
+                Assert.assertTrue("Merged Iterator should be sorted", IntStream.range(0, mergedListSize - 1)
+                        .allMatch(k -> mergedArrayList.get(k) <= mergedArrayList.get(k + 1)));
+                Assert.assertTrue("All elements of tempArray1 should be in mergedArrayList",
+                        IntStream.range(0, tempArray1.length).allMatch(k -> mergedArrayList.contains(tempArray1[k])));
+                Assert.assertTrue("All elements of tempArray2 should be in mergedArrayList",
+                        IntStream.range(0, tempArray2.length).allMatch(k -> mergedArrayList.contains(tempArray2[k])));
+            }
+        }
+    }
+
+    @Test
+    public void testWithItrMerge() {
+        long[][] arrays = {
+                { 0, 1, 2 },
+                { 0, 1 },
+                { 1, 2 },
+                { 1, 2, 3, 5, 6, 7, 8 },
+                { 1, 2, 3, 5, 6, 7, 8 },
+                { 0, 1, 5 },
+                { 3 },
+                { 1, 2, 4, 5, 7, 8 },
+                {},
+                {},
+                { 0 },
+                { 1, 2, 3, 5, 6, 11, 12, 13, 14, 15, 16, 17, 100, 1000, 1001, 10000, 20000, 20001 },
+                { 201, 202, 203, 205, 206, 2011, 2012, 2013, 2014, 2015, 2016, 2017, 20100, 201000,
+                    201001, 2010000, 2020000, 2020001 }
+        };
+        for (int i = 0; i < arrays.length; i++) {
+            for (int j = i + 1; j < arrays.length; j++) {
+                long[] tempArray1 = arrays[i];
+                ArrayList<Long> tempArrayList1 = new ArrayList<Long>();
+                IntStream.range(0, tempArray1.length).forEach((k) -> tempArrayList1.add(tempArray1[k]));
+                long[] tempArray2 = arrays[j];
+                ArrayList<Long> tempArrayList2 = new ArrayList<Long>();
+                IntStream.range(0, tempArray2.length).forEach((k) -> tempArrayList2.add(tempArray2[k]));
+                HashSet<Long> unionSet = new HashSet<Long>();
+                unionSet.addAll(tempArrayList1);
+                unionSet.addAll(tempArrayList2);
+
+                Iterator<Long> itr1 = tempArrayList1.iterator();
+                Iterator<Long> itr2 = tempArrayList2.iterator();
+
+                Iterator<Long> mergedItr = IteratorUtility.mergeIteratorsForPrimitiveLongIterator(itr1, itr2,
+                        Long::compare, (l) -> l);
+                ArrayList<Long> mergedArrayList = new ArrayList<Long>();
+                Consumer<Long> addMethod = mergedArrayList::add;
+                mergedItr.forEachRemaining(addMethod);
+                int mergedListSize = mergedArrayList.size();
+                Assert.assertEquals("Size of the mergedArrayList", unionSet.size(), mergedArrayList.size());
+                Assert.assertTrue("mergedArrayList should contain all elements in unionSet",
+                        mergedArrayList.containsAll(unionSet));
+                Assert.assertTrue("Merged Iterator should be sorted", IntStream.range(0, mergedListSize - 1)
+                        .allMatch(k -> mergedArrayList.get(k) <= mergedArrayList.get(k + 1)));
+                Assert.assertTrue("All elements of tempArray1 should be in mergedArrayList",
+                        IntStream.range(0, tempArray1.length).allMatch(k -> mergedArrayList.contains(tempArray1[k])));
+                Assert.assertTrue("All elements of tempArray2 should be in mergedArrayList",
+                        IntStream.range(0, tempArray2.length).allMatch(k -> mergedArrayList.contains(tempArray2[k])));
+            }
+        }
+    }
+}