You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/02/28 21:42:21 UTC

[bookkeeper] branch master updated: Support LacPiggyback, LongPoll and ExplicitLac in db ledger storage

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b8c551  Support LacPiggyback, LongPoll and ExplicitLac in db ledger storage
8b8c551 is described below

commit 8b8c5515e041347e0fdd791534fa0a694c96c6ae
Author: Sijie Guo <si...@apache.org>
AuthorDate: Wed Feb 28 13:42:14 2018 -0800

    Support LacPiggyback, LongPoll and ExplicitLac in db ledger storage
    
    Descriptions of the changes in this PR:
    
    *Motivation*
    
    DbLedgerStorage was developed in yahoo and just merged recently to master. This implementation only works for v2 protocol.
    It doesn't work with v3 protocol. When using v3 protocol clients (e.g. dlog library) to talk to bookies use DbLedgerStorage,
    the clients hang on waiting responses. because a few methods in DbLedgerStorage throw UnsupportedException
    
    *Solution*
    
    This change focuses on adding the implementations for those methods, in order to support LacPiggyback, LongPoll and ExplicitLac
    for v3 protocol. The implementation follows what we had in FileInfoCache but did it in a much simpler way, since all the persistence
    information has been managed by db ledger storage itself. All the information required by `LacPiggyback`, `LongPoll` and `ExplicitLac`
    are transient.
    
    This change doesn't introduce any new test cases. It reuses existing test cases to test those features using DbLedgerStorage. These
    tests are:
    
    - ExplicitLacTest: for testing explicit lac
    - TestPiggybackLAC: for testing piggback lac
    - TestReadLastConfirmedLongPoll: for testing long polling lac
    - TestReadLastConfirmedAndEntry: for testing entry piggyback
    
    Author: Sijie Guo <si...@apache.org>
    
    Reviewers: Ivan Kelly <iv...@apache.org>, Matteo Merli <mm...@apache.org>
    
    This closes #1218 from sijie/support_long_polls
---
 .../org/apache/bookkeeper/bookie/FileInfo.java     |   5 +-
 .../bookie/InterleavedLedgerStorage.java           |  11 +-
 .../bookie/storage/ldb/DbLedgerStorage.java        | 193 +++++++++++++++++++-
 .../bookie/storage/ldb/DbLedgerStorageTest.java    |  11 ++
 .../apache/bookkeeper/client/BookKeeperTest.java   | 124 -------------
 .../apache/bookkeeper/client/ExplicitLacTest.java  | 194 +++++++++++++++++++++
 .../apache/bookkeeper/client/TestPiggybackLAC.java |  26 ++-
 .../client/TestReadLastConfirmedAndEntry.java      |  22 ++-
 .../client/TestReadLastConfirmedLongPoll.java      |  22 ++-
 .../bookkeeper/test/BookKeeperClusterTestCase.java |   1 +
 .../java/org/apache/bookkeeper/util/TestUtils.java |  27 ++-
 11 files changed, 494 insertions(+), 142 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
index 296acb9..eb6d9e1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
@@ -167,6 +167,7 @@ class FileInfo extends Watchable<LastAddConfirmedUpdateNotification> {
     }
 
     public void setExplicitLac(ByteBuf lac) {
+        long explicitLacValue;
         synchronized (this) {
             if (explicitLac == null) {
                 explicitLac = ByteBuffer.allocate(lac.capacity());
@@ -176,13 +177,13 @@ class FileInfo extends Watchable<LastAddConfirmedUpdateNotification> {
 
             // skip the ledger id
             explicitLac.getLong();
-            long explicitLacValue = explicitLac.getLong();
-            setLastAddConfirmed(explicitLacValue);
+            explicitLacValue = explicitLac.getLong();
             explicitLac.rewind();
             if (LOG.isDebugEnabled()) {
                 LOG.debug("fileInfo:SetLac: {}", explicitLac);
             }
         }
+        setLastAddConfirmed(explicitLacValue);
     }
 
     public synchronized void readHeader() throws IOException {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index a6d8361..fdfef0a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -242,10 +242,13 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
             if (null == bb) {
                 return BookieProtocol.INVALID_ENTRY_ID;
             } else {
-                bb.readLong(); // ledger id
-                bb.readLong(); // entry id
-                lac = bb.readLong();
-                lac = ledgerCache.updateLastAddConfirmed(ledgerId, lac);
+                try {
+                    bb.skipBytes(2 * Long.BYTES); // skip ledger & entry id
+                    lac = bb.readLong();
+                    lac = ledgerCache.updateLastAddConfirmed(ledgerId, lac);
+                } finally {
+                    bb.release();
+                }
             }
         }
         return lac;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
index b2e9e3d..3a9c805 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
@@ -22,13 +22,21 @@ package org.apache.bookkeeper.bookie.storage.ldb;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification.WATCHER_RECYCLER;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
 import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.SortedMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
@@ -55,6 +63,7 @@ import org.apache.bookkeeper.bookie.StateManager;
 import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorageDataFormats.LedgerData;
 import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.Batch;
 import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType;
+import org.apache.bookkeeper.common.util.Watchable;
 import org.apache.bookkeeper.common.util.Watcher;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
@@ -74,10 +83,121 @@ import org.slf4j.LoggerFactory;
  */
 public class DbLedgerStorage implements CompactableLedgerStorage {
 
+    /**
+     * This class borrows the logic from FileInfo.
+     *
+     * <p>This class is used for holding all the transient states for a given ledger.
+     */
+    private static class TransientLedgerInfo
+            extends Watchable<LastAddConfirmedUpdateNotification>
+            implements AutoCloseable {
+
+        // lac
+        private Long lac = null;
+        // request from explicit lac requests
+        private ByteBuffer explicitLac = null;
+        // is the ledger info closed?
+        private boolean isClosed;
+
+        private final long ledgerId;
+        // reference to LedgerMetadataIndex
+        private final LedgerMetadataIndex ledgerIndex;
+
+        /**
+         * Construct an Watchable with zero watchers.
+         */
+        public TransientLedgerInfo(long ledgerId, LedgerMetadataIndex ledgerIndex) {
+            super(WATCHER_RECYCLER);
+            this.ledgerId = ledgerId;
+            this.ledgerIndex = ledgerIndex;
+        }
+
+        synchronized Long getLastAddConfirmed() {
+            return lac;
+        }
+
+        Long setLastAddConfirmed(long lac) {
+            long lacToReturn;
+            boolean changed = false;
+            synchronized (this) {
+                if (null == this.lac || this.lac < lac) {
+                    this.lac = lac;
+                    changed = true;
+                }
+                lacToReturn = this.lac;
+            }
+            if (changed) {
+                notifyWatchers(lacToReturn);
+            }
+            return lacToReturn;
+        }
+
+        synchronized boolean waitForLastAddConfirmedUpdate(long previousLAC,
+                                                           Watcher<LastAddConfirmedUpdateNotification> watcher)
+                throws IOException {
+            if ((null != lac && lac > previousLAC)
+                    || isClosed || ledgerIndex.get(ledgerId).getFenced()) {
+                return false;
+            }
+
+            addWatcher(watcher);
+            return true;
+        }
+
+        public ByteBuf getExplicitLac() {
+            ByteBuf retLac = null;
+            synchronized (this) {
+                if (explicitLac != null) {
+                    retLac = Unpooled.buffer(explicitLac.capacity());
+                    explicitLac.rewind(); //copy from the beginning
+                    retLac.writeBytes(explicitLac);
+                    explicitLac.rewind();
+                    return retLac;
+                }
+            }
+            return retLac;
+        }
+
+        public void setExplicitLac(ByteBuf lac) {
+            long explicitLacValue;
+            synchronized (this) {
+                if (explicitLac == null) {
+                    explicitLac = ByteBuffer.allocate(lac.capacity());
+                }
+                lac.readBytes(explicitLac);
+                explicitLac.rewind();
+
+                // skip the ledger id
+                explicitLac.getLong();
+                explicitLacValue = explicitLac.getLong();
+                explicitLac.rewind();
+            }
+            setLastAddConfirmed(explicitLacValue);
+        }
+
+        void notifyWatchers(long lastAddConfirmed) {
+            notifyWatchers(LastAddConfirmedUpdateNotification.FUNC, lastAddConfirmed);
+        }
+
+        @Override
+        public void close() {
+           synchronized (this) {
+               if (isClosed) {
+                   return;
+               }
+               isClosed = true;
+           }
+           // notify watchers
+            notifyWatchers(Long.MAX_VALUE);
+        }
+
+    }
+
     private EntryLogger entryLogger;
 
     private LedgerMetadataIndex ledgerIndex;
     private EntryLocationIndex entryLocationIndex;
+    private LoadingCache<Long, TransientLedgerInfo> transientLedgerInfoCache;
 
     private GarbageCollectorThread gcThread;
 
@@ -167,12 +287,41 @@ public class DbLedgerStorage implements CompactableLedgerStorage {
         ledgerIndex = new LedgerMetadataIndex(conf, KeyValueStorageRocksDB.factory, baseDir, stats);
         entryLocationIndex = new EntryLocationIndex(conf, KeyValueStorageRocksDB.factory, baseDir, stats);
 
+        // build the ledger info cache
+        int concurrencyLevel = Math.max(1, Math.max(conf.getNumAddWorkerThreads(), conf.getNumReadWorkerThreads()));
+        RemovalListener<Long, TransientLedgerInfo> ledgerInfoRemovalListener = this::handleLedgerEviction;
+        CacheBuilder<Long, TransientLedgerInfo> builder = CacheBuilder.newBuilder()
+            .initialCapacity(conf.getFileInfoCacheInitialCapacity())
+            .maximumSize(conf.getOpenFileLimit())
+            .concurrencyLevel(concurrencyLevel)
+            .removalListener(ledgerInfoRemovalListener);
+        if (conf.getFileInfoMaxIdleTime() > 0) {
+            builder.expireAfterAccess(conf.getFileInfoMaxIdleTime(), TimeUnit.SECONDS);
+        }
+        transientLedgerInfoCache = builder.build(new CacheLoader<Long, TransientLedgerInfo>() {
+            @Override
+            public TransientLedgerInfo load(Long key) throws Exception {
+                return new TransientLedgerInfo(key, ledgerIndex);
+            }
+        });
+
         entryLogger = new EntryLogger(conf, ledgerDirsManager);
         gcThread = new GarbageCollectorThread(conf, ledgerManager, this, statsLogger);
 
         registerStats();
     }
 
+    /**
+     * When a ledger is evicted from transient ledger info cache, we can just simply discard the object.
+     */
+    private void handleLedgerEviction(RemovalNotification<Long, TransientLedgerInfo> notification) {
+        TransientLedgerInfo ledgerInfo = notification.getValue();
+        if (null == ledgerInfo || null == notification.getKey()) {
+            return;
+        }
+        ledgerInfo.close();
+    }
+
     public void registerStats() {
         stats.registerGauge("write-cache-size", new Gauge<Long>() {
             @Override
@@ -285,7 +434,15 @@ public class DbLedgerStorage implements CompactableLedgerStorage {
         if (log.isDebugEnabled()) {
             log.debug("Set fenced. ledger: {}", ledgerId);
         }
-        return ledgerIndex.setFenced(ledgerId);
+        boolean changed = ledgerIndex.setFenced(ledgerId);
+        if (changed) {
+            // notify all the watchers if a ledger is fenced
+            TransientLedgerInfo ledgerInfo = transientLedgerInfoCache.getIfPresent(ledgerId);
+            if (null != ledgerInfo) {
+                ledgerInfo.notifyWatchers(Long.MAX_VALUE);
+            }
+        }
+        return changed;
     }
 
     @Override
@@ -310,9 +467,10 @@ public class DbLedgerStorage implements CompactableLedgerStorage {
 
         long ledgerId = entry.getLong(entry.readerIndex());
         long entryId = entry.getLong(entry.readerIndex() + 8);
+        long lac = entry.getLong(entry.readerIndex() + 16);
 
         if (log.isDebugEnabled()) {
-            log.debug("Add entry. {}@{}", ledgerId, entryId);
+            log.debug("Add entry. {}@{}, lac = {}", ledgerId, entryId, lac);
         }
 
         // Waits if the write cache is being switched for a flush
@@ -328,6 +486,10 @@ public class DbLedgerStorage implements CompactableLedgerStorage {
             triggerFlushAndAddEntry(ledgerId, entryId, entry);
         }
 
+        // after successfully insert the entry, update LAC and notify the watchers
+        transientLedgerInfoCache.getUnchecked(ledgerId)
+            .setLastAddConfirmed(lac);
+
         recordSuccessfulEvent(addEntryStats, startTime);
         return entryId;
     }
@@ -712,23 +874,42 @@ public class DbLedgerStorage implements CompactableLedgerStorage {
 
     @Override
     public long getLastAddConfirmed(long ledgerId) throws IOException {
-        throw new UnsupportedOperationException();
+        TransientLedgerInfo ledgerInfo = transientLedgerInfoCache.getIfPresent(ledgerId);
+        Long lac = null != ledgerInfo ? ledgerInfo.getLastAddConfirmed() : null;
+        if (null == lac) {
+            ByteBuf bb = getEntry(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED);
+            try {
+                bb.skipBytes(2 * Long.BYTES); // skip ledger id and entry id
+                lac = bb.readLong();
+                lac = transientLedgerInfoCache.getUnchecked(ledgerId).setLastAddConfirmed(lac);
+            } finally {
+                bb.release();
+            }
+        }
+        return lac;
     }
 
     @Override
     public boolean waitForLastAddConfirmedUpdate(long ledgerId, long previousLAC,
             Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException {
-        throw new UnsupportedOperationException();
+        return transientLedgerInfoCache.getUnchecked(ledgerId)
+            .waitForLastAddConfirmedUpdate(previousLAC, watcher);
     }
 
     @Override
     public void setExplicitlac(long ledgerId, ByteBuf lac) throws IOException {
-        throw new UnsupportedOperationException();
+        transientLedgerInfoCache.getUnchecked(ledgerId)
+            .setExplicitLac(lac);
     }
 
     @Override
     public ByteBuf getExplicitLac(long ledgerId) {
-        throw new UnsupportedOperationException();
+        TransientLedgerInfo ledgerInfo = transientLedgerInfoCache.getIfPresent(ledgerId);
+        if (null == ledgerInfo) {
+            return null;
+        } else {
+            return ledgerInfo.getExplicitLac();
+        }
     }
 
     @Override
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
index 4b61a65..f4d67f2 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
@@ -122,6 +122,7 @@ public class DbLedgerStorageTest {
         ByteBuf entry = Unpooled.buffer(1024);
         entry.writeLong(4); // ledger id
         entry.writeLong(1); // entry id
+        entry.writeLong(0); // lac
         entry.writeBytes("entry-1".getBytes());
 
         assertEquals(false, ((DbLedgerStorage) storage).isFlushRequired());
@@ -152,6 +153,7 @@ public class DbLedgerStorageTest {
         ByteBuf entry2 = Unpooled.buffer(1024);
         entry2.writeLong(4); // ledger id
         entry2.writeLong(2); // entry id
+        entry2.writeLong(1); // lac
         entry2.writeBytes("entry-2".getBytes());
 
         storage.addEntry(entry2);
@@ -160,21 +162,28 @@ public class DbLedgerStorageTest {
         res = storage.getEntry(4, BookieProtocol.LAST_ADD_CONFIRMED);
         assertEquals(entry2, res);
 
+        // Read last add confirmed in ledger
+        assertEquals(1L, storage.getLastAddConfirmed(4));
+
         ByteBuf entry3 = Unpooled.buffer(1024);
         entry3.writeLong(4); // ledger id
         entry3.writeLong(3); // entry id
+        entry3.writeLong(2); // lac
         entry3.writeBytes("entry-3".getBytes());
         storage.addEntry(entry3);
 
         ByteBuf entry4 = Unpooled.buffer(1024);
         entry4.writeLong(4); // ledger id
         entry4.writeLong(4); // entry id
+        entry4.writeLong(3); // lac
         entry4.writeBytes("entry-4".getBytes());
         storage.addEntry(entry4);
 
         res = storage.getEntry(4, 4);
         assertEquals(entry4, res);
 
+        assertEquals(3, storage.getLastAddConfirmed(4));
+
         // Delete
         assertEquals(true, storage.ledgerExists(4));
         storage.deleteLedger(4);
@@ -182,10 +191,12 @@ public class DbLedgerStorageTest {
 
         // Should not throw exception event if the ledger was deleted
         storage.getEntry(4, 4);
+        assertEquals(3, storage.getLastAddConfirmed(4));
 
         storage.addEntry(Unpooled.wrappedBuffer(entry2));
         res = storage.getEntry(4, BookieProtocol.LAST_ADD_CONFIRMED);
         assertEquals(entry4, res);
+        assertEquals(3, storage.getLastAddConfirmed(4));
 
         // Get last entry from storage
         storage.flush();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
index 9922016..344f5ec 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
@@ -312,130 +312,6 @@ public class BookKeeperTest extends BookKeeperClusterTestCase {
     }
 
     @Test
-    public void testReadHandleWithNoExplicitLAC() throws Exception {
-        ClientConfiguration confWithNoExplicitLAC = new ClientConfiguration();
-        confWithNoExplicitLAC.setZkServers(zkUtil.getZooKeeperConnectString());
-        confWithNoExplicitLAC.setExplictLacInterval(0);
-
-        BookKeeper bkcWithNoExplicitLAC = new BookKeeper(confWithNoExplicitLAC);
-
-        LedgerHandle wlh = bkcWithNoExplicitLAC.createLedger(digestType, "testPasswd".getBytes());
-        long ledgerId = wlh.getId();
-        int numOfEntries = 5;
-        for (int i = 0; i < numOfEntries; i++) {
-            wlh.addEntry(("foobar" + i).getBytes());
-        }
-
-        LedgerHandle rlh = bkcWithNoExplicitLAC.openLedgerNoRecovery(ledgerId, digestType, "testPasswd".getBytes());
-        assertTrue(
-                "Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(),
-                (rlh.getLastAddConfirmed() == (numOfEntries - 2)));
-
-        Enumeration<LedgerEntry> entries = rlh.readEntries(0, numOfEntries - 2);
-        int entryId = 0;
-        while (entries.hasMoreElements()) {
-            LedgerEntry entry = entries.nextElement();
-            String entryString = new String(entry.getEntry());
-            assertTrue("Expected entry String: " + ("foobar" + entryId) + " actual entry String: " + entryString,
-                    entryString.equals("foobar" + entryId));
-            entryId++;
-        }
-
-        for (int i = numOfEntries; i < 2 * numOfEntries; i++) {
-            wlh.addEntry(("foobar" + i).getBytes());
-        }
-
-        Thread.sleep(3000);
-        assertTrue(
-                "Expected LAC of wlh: " + (2 * numOfEntries - 1) + " actual LAC of rlh: " + wlh.getLastAddConfirmed(),
-                (wlh.getLastAddConfirmed() == (2 * numOfEntries - 1)));
-        assertTrue(
-                "Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(),
-                (rlh.getLastAddConfirmed() == (numOfEntries - 2)));
-
-        // since explicitlacflush policy is not enabled for writeledgerhandle, when we try
-        // to read explicitlac for rlh, it will be reading up to the piggyback value.
-        long explicitlac = rlh.readExplicitLastConfirmed();
-        assertTrue(
-                "Expected Explicit LAC of rlh: " + (numOfEntries - 2) + " actual ExplicitLAC of rlh: " + explicitlac,
-                (explicitlac == (2 * numOfEntries - 2)));
-
-        try {
-            rlh.readEntries(2 * numOfEntries - 1, 2 * numOfEntries - 1);
-            fail("rlh readEntries beyond " + (2 * numOfEntries - 2) + " should fail with ReadException");
-        } catch (BKException.BKReadException readException) {
-        }
-
-        rlh.close();
-        wlh.close();
-        bkcWithNoExplicitLAC.close();
-    }
-
-    @Test
-    public void testReadHandleWithExplicitLAC() throws Exception {
-        ClientConfiguration confWithExplicitLAC = new ClientConfiguration();
-        confWithExplicitLAC.setZkServers(zkUtil.getZooKeeperConnectString());
-        int explicitLacIntervalMillis = 1000;
-        confWithExplicitLAC.setExplictLacInterval(explicitLacIntervalMillis);
-
-        BookKeeper bkcWithExplicitLAC = new BookKeeper(confWithExplicitLAC);
-
-        LedgerHandle wlh = bkcWithExplicitLAC.createLedger(digestType, "testPasswd".getBytes());
-        long ledgerId = wlh.getId();
-        int numOfEntries = 5;
-        for (int i = 0; i < numOfEntries; i++) {
-            wlh.addEntry(("foobar" + i).getBytes());
-        }
-
-        LedgerHandle rlh = bkcWithExplicitLAC.openLedgerNoRecovery(ledgerId, digestType, "testPasswd".getBytes());
-
-        assertTrue(
-                "Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(),
-                (rlh.getLastAddConfirmed() == (numOfEntries - 2)));
-
-        for (int i = numOfEntries; i < 2 * numOfEntries; i++) {
-            wlh.addEntry(("foobar" + i).getBytes());
-        }
-
-        // we need to wait for atleast 2 explicitlacintervals,
-        // since in writehandle for the first call
-        // lh.getExplicitLastAddConfirmed() will be <
-        // lh.getPiggyBackedLastAddConfirmed(),
-        // so it wont make explicit writelac in the first run
-        Thread.sleep((2 * explicitLacIntervalMillis / 1000 + 1) * 1000);
-        assertTrue(
-                "Expected LAC of wlh: " + (2 * numOfEntries - 1) + " actual LAC of wlh: " + wlh.getLastAddConfirmed(),
-                (wlh.getLastAddConfirmed() == (2 * numOfEntries - 1)));
-        // readhandle's lastaddconfirmed wont be updated until readExplicitLastConfirmed call is made
-        assertTrue(
-                "Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(),
-                (rlh.getLastAddConfirmed() == (numOfEntries - 2)));
-
-        long explicitlac = rlh.readExplicitLastConfirmed();
-        assertTrue("Expected Explicit LAC of rlh: " + (2 * numOfEntries - 1)
-                + " actual ExplicitLAC of rlh: " + explicitlac,
-                (explicitlac == (2 * numOfEntries - 1)));
-        // readExplicitLastConfirmed updates the lac of rlh.
-        assertTrue(
-                "Expected LAC of rlh: " + (2 * numOfEntries - 1) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(),
-                (rlh.getLastAddConfirmed() == (2 * numOfEntries - 1)));
-
-        Enumeration<LedgerEntry> entries = rlh.readEntries(numOfEntries, 2 * numOfEntries - 1);
-        int entryId = numOfEntries;
-        while (entries.hasMoreElements()) {
-            LedgerEntry entry = entries.nextElement();
-            String entryString = new String(entry.getEntry());
-            assertTrue("Expected entry String: " + ("foobar" + entryId) + " actual entry String: " + entryString,
-                    entryString.equals("foobar" + entryId));
-            entryId++;
-        }
-
-        rlh.close();
-        wlh.close();
-        bkcWithExplicitLAC.close();
-    }
-
-    @Test
     public void testReadAfterLastAddConfirmed() throws Exception {
 
         ClientConfiguration clientConfiguration = new ClientConfiguration();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ExplicitLacTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ExplicitLacTest.java
new file mode 100644
index 0000000..745be57
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ExplicitLacTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Enumeration;
+import org.apache.bookkeeper.bookie.InterleavedLedgerStorage;
+import org.apache.bookkeeper.bookie.LedgerStorage;
+import org.apache.bookkeeper.bookie.SortedLedgerStorage;
+import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.bookkeeper.util.TestUtils;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Test cases for `Explicit Lac` feature.
+ */
+@RunWith(Parameterized.class)
+public class ExplicitLacTest extends BookKeeperClusterTestCase {
+
+    private final DigestType digestType;
+
+    public ExplicitLacTest(Class<? extends LedgerStorage> storageClass) {
+        super(1);
+        this.digestType = DigestType.CRC32;
+        baseConf.setLedgerStorageClass(storageClass.getName());
+    }
+
+    @Parameters
+    public static Collection<Object[]> configs() {
+        return Arrays.asList(new Object[][] {
+            { InterleavedLedgerStorage.class },
+            { SortedLedgerStorage.class },
+            { DbLedgerStorage.class },
+        });
+    }
+
+    @Test
+    public void testReadHandleWithNoExplicitLAC() throws Exception {
+        ClientConfiguration confWithNoExplicitLAC = new ClientConfiguration();
+        confWithNoExplicitLAC.setZkServers(zkUtil.getZooKeeperConnectString());
+        confWithNoExplicitLAC.setExplictLacInterval(0);
+
+        BookKeeper bkcWithNoExplicitLAC = new BookKeeper(confWithNoExplicitLAC);
+
+        LedgerHandle wlh = bkcWithNoExplicitLAC.createLedger(
+            1, 1, 1,
+            digestType, "testPasswd".getBytes());
+        long ledgerId = wlh.getId();
+        int numOfEntries = 5;
+        for (int i = 0; i < numOfEntries; i++) {
+            wlh.addEntry(("foobar" + i).getBytes());
+        }
+
+        LedgerHandle rlh = bkcWithNoExplicitLAC.openLedgerNoRecovery(ledgerId, digestType, "testPasswd".getBytes());
+        assertTrue(
+                "Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(),
+                (rlh.getLastAddConfirmed() == (numOfEntries - 2)));
+
+        Enumeration<LedgerEntry> entries = rlh.readEntries(0, numOfEntries - 2);
+        int entryId = 0;
+        while (entries.hasMoreElements()) {
+            LedgerEntry entry = entries.nextElement();
+            String entryString = new String(entry.getEntry());
+            assertTrue("Expected entry String: " + ("foobar" + entryId) + " actual entry String: " + entryString,
+                    entryString.equals("foobar" + entryId));
+            entryId++;
+        }
+
+        for (int i = numOfEntries; i < 2 * numOfEntries; i++) {
+            wlh.addEntry(("foobar" + i).getBytes());
+        }
+
+        TestUtils.waitUntilLacUpdated(rlh, numOfEntries - 2);
+
+        assertTrue(
+                "Expected LAC of wlh: " + (2 * numOfEntries - 1) + " actual LAC of rlh: " + wlh.getLastAddConfirmed(),
+                (wlh.getLastAddConfirmed() == (2 * numOfEntries - 1)));
+        assertTrue(
+                "Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(),
+                (rlh.getLastAddConfirmed() == (numOfEntries - 2)));
+
+        // since explicitlacflush policy is not enabled for writeledgerhandle, when we try
+        // to read explicitlac for rlh, it will be reading up to the piggyback value.
+        long explicitlac = rlh.readExplicitLastConfirmed();
+        assertTrue(
+                "Expected Explicit LAC of rlh: " + (numOfEntries - 2) + " actual ExplicitLAC of rlh: " + explicitlac,
+                (explicitlac == (2 * numOfEntries - 2)));
+
+        try {
+            rlh.readEntries(2 * numOfEntries - 1, 2 * numOfEntries - 1);
+            fail("rlh readEntries beyond " + (2 * numOfEntries - 2) + " should fail with ReadException");
+        } catch (BKException.BKReadException readException) {
+        }
+
+        rlh.close();
+        wlh.close();
+        bkcWithNoExplicitLAC.close();
+    }
+
+    @Test
+    public void testReadHandleWithExplicitLAC() throws Exception {
+        ClientConfiguration confWithExplicitLAC = new ClientConfiguration();
+        confWithExplicitLAC.setZkServers(zkUtil.getZooKeeperConnectString());
+        int explicitLacIntervalMillis = 1000;
+        confWithExplicitLAC.setExplictLacInterval(explicitLacIntervalMillis);
+
+        BookKeeper bkcWithExplicitLAC = new BookKeeper(confWithExplicitLAC);
+
+        LedgerHandle wlh = bkcWithExplicitLAC.createLedger(
+            1, 1, 1,
+            digestType, "testPasswd".getBytes());
+        long ledgerId = wlh.getId();
+        int numOfEntries = 5;
+        for (int i = 0; i < numOfEntries; i++) {
+            wlh.addEntry(("foobar" + i).getBytes());
+        }
+
+        LedgerHandle rlh = bkcWithExplicitLAC.openLedgerNoRecovery(ledgerId, digestType, "testPasswd".getBytes());
+
+        assertTrue(
+                "Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(),
+                (rlh.getLastAddConfirmed() == (numOfEntries - 2)));
+
+        for (int i = numOfEntries; i < 2 * numOfEntries; i++) {
+            wlh.addEntry(("foobar" + i).getBytes());
+        }
+
+        // we need to wait for atleast 2 explicitlacintervals,
+        // since in writehandle for the first call
+        // lh.getExplicitLastAddConfirmed() will be <
+        // lh.getPiggyBackedLastAddConfirmed(),
+        // so it wont make explicit writelac in the first run
+        TestUtils.waitUntilLacUpdated(rlh, 2 * numOfEntries - 2);
+
+        assertTrue(
+                "Expected LAC of wlh: " + (2 * numOfEntries - 1) + " actual LAC of wlh: " + wlh.getLastAddConfirmed(),
+                (wlh.getLastAddConfirmed() == (2 * numOfEntries - 1)));
+        // readhandle's lastaddconfirmed wont be updated until readExplicitLastConfirmed call is made
+        assertTrue(
+                "Expected LAC of rlh: " + (2 * numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(),
+                (rlh.getLastAddConfirmed() == (2 * numOfEntries - 2)));
+
+        long explicitlac = TestUtils.waitUntilExplicitLacUpdated(rlh, 2 * numOfEntries - 1);
+        assertTrue("Expected Explicit LAC of rlh: " + (2 * numOfEntries - 1)
+                + " actual ExplicitLAC of rlh: " + explicitlac,
+                (explicitlac == (2 * numOfEntries - 1)));
+        // readExplicitLastConfirmed updates the lac of rlh.
+        assertTrue(
+                "Expected LAC of rlh: " + (2 * numOfEntries - 1) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(),
+                (rlh.getLastAddConfirmed() == (2 * numOfEntries - 1)));
+
+        Enumeration<LedgerEntry> entries = rlh.readEntries(numOfEntries, 2 * numOfEntries - 1);
+        int entryId = numOfEntries;
+        while (entries.hasMoreElements()) {
+            LedgerEntry entry = entries.nextElement();
+            String entryString = new String(entry.getEntry());
+            assertTrue("Expected entry String: " + ("foobar" + entryId) + " actual entry String: " + entryString,
+                    entryString.equals("foobar" + entryId));
+            entryId++;
+        }
+
+        rlh.close();
+        wlh.close();
+        bkcWithExplicitLAC.close();
+    }
+
+
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPiggybackLAC.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPiggybackLAC.java
index 616b9fb..d8be69c 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPiggybackLAC.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPiggybackLAC.java
@@ -22,32 +22,52 @@ package org.apache.bookkeeper.client;
 
 import static org.junit.Assert.assertEquals;
 
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Enumeration;
 
+import org.apache.bookkeeper.bookie.InterleavedLedgerStorage;
+import org.apache.bookkeeper.bookie.LedgerStorage;
+import org.apache.bookkeeper.bookie.SortedLedgerStorage;
+import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Test a piggyback LAC.
  */
+@RunWith(Parameterized.class)
 public class TestPiggybackLAC extends BookKeeperClusterTestCase {
 
     private static final Logger LOG = LoggerFactory.getLogger(TestPiggybackLAC.class);
 
     final DigestType digestType;
 
-    public TestPiggybackLAC() {
-        super(3);
+    public TestPiggybackLAC(Class<? extends LedgerStorage> storageClass) {
+        super(1);
         this.digestType = DigestType.CRC32;
+        baseConf.setLedgerStorageClass(storageClass.getName());
+    }
+
+    @Parameters
+    public static Collection<Object[]> configs() {
+        return Arrays.asList(new Object[][] {
+            { InterleavedLedgerStorage.class },
+            { SortedLedgerStorage.class },
+            { DbLedgerStorage.class },
+        });
     }
 
     @Test
     public void testPiggybackLAC() throws Exception {
         int numEntries = 10;
-        LedgerHandle lh = bkc.createLedger(3, 3, 3, digestType, "".getBytes());
+        LedgerHandle lh = bkc.createLedger(1, 1, 1, digestType, "".getBytes());
         // tried to add entries
         for (int i = 0; i < numEntries; i++) {
             lh.addEntry(("data" + i).getBytes());
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastConfirmedAndEntry.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastConfirmedAndEntry.java
index d78b228..7c2cf5b 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastConfirmedAndEntry.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastConfirmedAndEntry.java
@@ -27,6 +27,8 @@ import static org.junit.Assert.assertNull;
 import io.netty.buffer.ByteBuf;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Enumeration;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -35,27 +37,45 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.bookie.InterleavedLedgerStorage;
+import org.apache.bookkeeper.bookie.LedgerStorage;
+import org.apache.bookkeeper.bookie.SortedLedgerStorage;
+import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.zookeeper.KeeperException;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Test reading the last confirmed and entry.
  */
+@RunWith(Parameterized.class)
 public class TestReadLastConfirmedAndEntry extends BookKeeperClusterTestCase {
 
     private static final Logger logger = LoggerFactory.getLogger(TestReadLastConfirmedAndEntry.class);
 
     final BookKeeper.DigestType digestType;
 
-    public TestReadLastConfirmedAndEntry() {
+    public TestReadLastConfirmedAndEntry(Class<? extends LedgerStorage> storageClass) {
         super(3);
         this.digestType = BookKeeper.DigestType.CRC32;
         this.baseConf.setAllowEphemeralPorts(false);
+        this.baseConf.setLedgerStorageClass(storageClass.getName());
+    }
+
+    @Parameters
+    public static Collection<Object[]> configs() {
+        return Arrays.asList(new Object[][] {
+            { InterleavedLedgerStorage.class },
+            { SortedLedgerStorage.class },
+            { DbLedgerStorage.class },
+        });
     }
 
     static class FakeBookie extends Bookie {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastConfirmedLongPoll.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastConfirmedLongPoll.java
index 414c063..f754f6a 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastConfirmedLongPoll.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastConfirmedLongPoll.java
@@ -20,24 +20,44 @@ package org.apache.bookkeeper.client;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.bookkeeper.bookie.InterleavedLedgerStorage;
+import org.apache.bookkeeper.bookie.LedgerStorage;
+import org.apache.bookkeeper.bookie.SortedLedgerStorage;
+import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
 /**
  * Test read last confirmed long by polling.
  */
+@RunWith(Parameterized.class)
 public class TestReadLastConfirmedLongPoll extends BookKeeperClusterTestCase {
     final DigestType digestType;
 
-    public TestReadLastConfirmedLongPoll() {
+    public TestReadLastConfirmedLongPoll(Class<? extends LedgerStorage> storageClass) {
         super(6);
         this.digestType = DigestType.CRC32;
+        baseConf.setLedgerStorageClass(storageClass.getName());
+    }
+
+    @Parameters
+    public static Collection<Object[]> configs() {
+        return Arrays.asList(new Object[][] {
+            { InterleavedLedgerStorage.class },
+            { SortedLedgerStorage.class },
+            { DbLedgerStorage.class },
+        });
     }
 
     @Test
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
index 04e253d..0147c06 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -793,4 +793,5 @@ public abstract class BookKeeperClusterTestCase {
     public TestStatsProvider getStatsProvider(int index) throws Exception {
         return getStatsProvider(bs.get(index).getLocalAddress());
     }
+
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestUtils.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestUtils.java
index 222de5b..c3f1b89 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestUtils.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestUtils.java
@@ -25,12 +25,20 @@ import java.io.File;
 import java.util.HashSet;
 import java.util.Set;
 
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.api.ReadHandle;
 
 /**
  * Test utilities.
  */
-public class TestUtils {
+@Slf4j
+public final class TestUtils {
+
+    private TestUtils() {}
+
     public static boolean hasLogFiles(File ledgerDirectory, boolean partial, Integer... logsId) {
         boolean result = partial ? false : true;
         Set<Integer> logs = new HashSet<Integer>();
@@ -52,4 +60,21 @@ public class TestUtils {
         }
         return result;
     }
+
+    public static void waitUntilLacUpdated(ReadHandle rh, long newLac) throws Exception {
+        long lac = rh.getLastAddConfirmed();
+        while (lac < newLac) {
+            TimeUnit.MILLISECONDS.sleep(20);
+            lac = rh.readLastAddConfirmed().get();
+        }
+    }
+
+    public static long waitUntilExplicitLacUpdated(LedgerHandle rh, long newLac) throws Exception {
+        long lac;
+        while ((lac = rh.readExplicitLastConfirmed()) < newLac) {
+            TimeUnit.MILLISECONDS.sleep(20);
+        }
+        return lac;
+    }
+
 }

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.