You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/02/28 21:42:23 UTC

[GitHub] sijie closed pull request #1218: Support LacPiggyback, LongPoll and ExplicitLac in db ledger storage

sijie closed pull request #1218: Support LacPiggyback, LongPoll and ExplicitLac in db ledger storage
URL: https://github.com/apache/bookkeeper/pull/1218
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
index 296acb93c..eb6d9e122 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
@@ -167,6 +167,7 @@ public ByteBuf getExplicitLac() {
     }
 
     public void setExplicitLac(ByteBuf lac) {
+        long explicitLacValue;
         synchronized (this) {
             if (explicitLac == null) {
                 explicitLac = ByteBuffer.allocate(lac.capacity());
@@ -176,13 +177,13 @@ public void setExplicitLac(ByteBuf lac) {
 
             // skip the ledger id
             explicitLac.getLong();
-            long explicitLacValue = explicitLac.getLong();
-            setLastAddConfirmed(explicitLacValue);
+            explicitLacValue = explicitLac.getLong();
             explicitLac.rewind();
             if (LOG.isDebugEnabled()) {
                 LOG.debug("fileInfo:SetLac: {}", explicitLac);
             }
         }
+        setLastAddConfirmed(explicitLacValue);
     }
 
     public synchronized void readHeader() throws IOException {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index a6d8361f8..fdfef0ab7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -242,10 +242,13 @@ public long getLastAddConfirmed(long ledgerId) throws IOException {
             if (null == bb) {
                 return BookieProtocol.INVALID_ENTRY_ID;
             } else {
-                bb.readLong(); // ledger id
-                bb.readLong(); // entry id
-                lac = bb.readLong();
-                lac = ledgerCache.updateLastAddConfirmed(ledgerId, lac);
+                try {
+                    bb.skipBytes(2 * Long.BYTES); // skip ledger & entry id
+                    lac = bb.readLong();
+                    lac = ledgerCache.updateLastAddConfirmed(ledgerId, lac);
+                } finally {
+                    bb.release();
+                }
             }
         }
         return lac;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
index b2e9e3d70..3a9c805e3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
@@ -22,13 +22,21 @@
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification.WATCHER_RECYCLER;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
 import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.SortedMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
@@ -55,6 +63,7 @@
 import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorageDataFormats.LedgerData;
 import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.Batch;
 import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType;
+import org.apache.bookkeeper.common.util.Watchable;
 import org.apache.bookkeeper.common.util.Watcher;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
@@ -74,10 +83,121 @@
  */
 public class DbLedgerStorage implements CompactableLedgerStorage {
 
+    /**
+     * This class borrows the logic from FileInfo.
+     *
+     * <p>This class is used for holding all the transient states for a given ledger.
+     */
+    private static class TransientLedgerInfo
+            extends Watchable<LastAddConfirmedUpdateNotification>
+            implements AutoCloseable {
+
+        // lac
+        private Long lac = null;
+        // request from explicit lac requests
+        private ByteBuffer explicitLac = null;
+        // is the ledger info closed?
+        private boolean isClosed;
+
+        private final long ledgerId;
+        // reference to LedgerMetadataIndex
+        private final LedgerMetadataIndex ledgerIndex;
+
+        /**
+         * Construct an Watchable with zero watchers.
+         */
+        public TransientLedgerInfo(long ledgerId, LedgerMetadataIndex ledgerIndex) {
+            super(WATCHER_RECYCLER);
+            this.ledgerId = ledgerId;
+            this.ledgerIndex = ledgerIndex;
+        }
+
+        synchronized Long getLastAddConfirmed() {
+            return lac;
+        }
+
+        Long setLastAddConfirmed(long lac) {
+            long lacToReturn;
+            boolean changed = false;
+            synchronized (this) {
+                if (null == this.lac || this.lac < lac) {
+                    this.lac = lac;
+                    changed = true;
+                }
+                lacToReturn = this.lac;
+            }
+            if (changed) {
+                notifyWatchers(lacToReturn);
+            }
+            return lacToReturn;
+        }
+
+        synchronized boolean waitForLastAddConfirmedUpdate(long previousLAC,
+                                                           Watcher<LastAddConfirmedUpdateNotification> watcher)
+                throws IOException {
+            if ((null != lac && lac > previousLAC)
+                    || isClosed || ledgerIndex.get(ledgerId).getFenced()) {
+                return false;
+            }
+
+            addWatcher(watcher);
+            return true;
+        }
+
+        public ByteBuf getExplicitLac() {
+            ByteBuf retLac = null;
+            synchronized (this) {
+                if (explicitLac != null) {
+                    retLac = Unpooled.buffer(explicitLac.capacity());
+                    explicitLac.rewind(); //copy from the beginning
+                    retLac.writeBytes(explicitLac);
+                    explicitLac.rewind();
+                    return retLac;
+                }
+            }
+            return retLac;
+        }
+
+        public void setExplicitLac(ByteBuf lac) {
+            long explicitLacValue;
+            synchronized (this) {
+                if (explicitLac == null) {
+                    explicitLac = ByteBuffer.allocate(lac.capacity());
+                }
+                lac.readBytes(explicitLac);
+                explicitLac.rewind();
+
+                // skip the ledger id
+                explicitLac.getLong();
+                explicitLacValue = explicitLac.getLong();
+                explicitLac.rewind();
+            }
+            setLastAddConfirmed(explicitLacValue);
+        }
+
+        void notifyWatchers(long lastAddConfirmed) {
+            notifyWatchers(LastAddConfirmedUpdateNotification.FUNC, lastAddConfirmed);
+        }
+
+        @Override
+        public void close() {
+           synchronized (this) {
+               if (isClosed) {
+                   return;
+               }
+               isClosed = true;
+           }
+           // notify watchers
+            notifyWatchers(Long.MAX_VALUE);
+        }
+
+    }
+
     private EntryLogger entryLogger;
 
     private LedgerMetadataIndex ledgerIndex;
     private EntryLocationIndex entryLocationIndex;
+    private LoadingCache<Long, TransientLedgerInfo> transientLedgerInfoCache;
 
     private GarbageCollectorThread gcThread;
 
@@ -167,12 +287,41 @@ public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, Le
         ledgerIndex = new LedgerMetadataIndex(conf, KeyValueStorageRocksDB.factory, baseDir, stats);
         entryLocationIndex = new EntryLocationIndex(conf, KeyValueStorageRocksDB.factory, baseDir, stats);
 
+        // build the ledger info cache
+        int concurrencyLevel = Math.max(1, Math.max(conf.getNumAddWorkerThreads(), conf.getNumReadWorkerThreads()));
+        RemovalListener<Long, TransientLedgerInfo> ledgerInfoRemovalListener = this::handleLedgerEviction;
+        CacheBuilder<Long, TransientLedgerInfo> builder = CacheBuilder.newBuilder()
+            .initialCapacity(conf.getFileInfoCacheInitialCapacity())
+            .maximumSize(conf.getOpenFileLimit())
+            .concurrencyLevel(concurrencyLevel)
+            .removalListener(ledgerInfoRemovalListener);
+        if (conf.getFileInfoMaxIdleTime() > 0) {
+            builder.expireAfterAccess(conf.getFileInfoMaxIdleTime(), TimeUnit.SECONDS);
+        }
+        transientLedgerInfoCache = builder.build(new CacheLoader<Long, TransientLedgerInfo>() {
+            @Override
+            public TransientLedgerInfo load(Long key) throws Exception {
+                return new TransientLedgerInfo(key, ledgerIndex);
+            }
+        });
+
         entryLogger = new EntryLogger(conf, ledgerDirsManager);
         gcThread = new GarbageCollectorThread(conf, ledgerManager, this, statsLogger);
 
         registerStats();
     }
 
+    /**
+     * When a ledger is evicted from transient ledger info cache, we can just simply discard the object.
+     */
+    private void handleLedgerEviction(RemovalNotification<Long, TransientLedgerInfo> notification) {
+        TransientLedgerInfo ledgerInfo = notification.getValue();
+        if (null == ledgerInfo || null == notification.getKey()) {
+            return;
+        }
+        ledgerInfo.close();
+    }
+
     public void registerStats() {
         stats.registerGauge("write-cache-size", new Gauge<Long>() {
             @Override
@@ -285,7 +434,15 @@ public boolean setFenced(long ledgerId) throws IOException {
         if (log.isDebugEnabled()) {
             log.debug("Set fenced. ledger: {}", ledgerId);
         }
-        return ledgerIndex.setFenced(ledgerId);
+        boolean changed = ledgerIndex.setFenced(ledgerId);
+        if (changed) {
+            // notify all the watchers if a ledger is fenced
+            TransientLedgerInfo ledgerInfo = transientLedgerInfoCache.getIfPresent(ledgerId);
+            if (null != ledgerInfo) {
+                ledgerInfo.notifyWatchers(Long.MAX_VALUE);
+            }
+        }
+        return changed;
     }
 
     @Override
@@ -310,9 +467,10 @@ public long addEntry(ByteBuf entry) throws IOException {
 
         long ledgerId = entry.getLong(entry.readerIndex());
         long entryId = entry.getLong(entry.readerIndex() + 8);
+        long lac = entry.getLong(entry.readerIndex() + 16);
 
         if (log.isDebugEnabled()) {
-            log.debug("Add entry. {}@{}", ledgerId, entryId);
+            log.debug("Add entry. {}@{}, lac = {}", ledgerId, entryId, lac);
         }
 
         // Waits if the write cache is being switched for a flush
@@ -328,6 +486,10 @@ public long addEntry(ByteBuf entry) throws IOException {
             triggerFlushAndAddEntry(ledgerId, entryId, entry);
         }
 
+        // after successfully insert the entry, update LAC and notify the watchers
+        transientLedgerInfoCache.getUnchecked(ledgerId)
+            .setLastAddConfirmed(lac);
+
         recordSuccessfulEvent(addEntryStats, startTime);
         return entryId;
     }
@@ -712,23 +874,42 @@ public EntryLogger getEntryLogger() {
 
     @Override
     public long getLastAddConfirmed(long ledgerId) throws IOException {
-        throw new UnsupportedOperationException();
+        TransientLedgerInfo ledgerInfo = transientLedgerInfoCache.getIfPresent(ledgerId);
+        Long lac = null != ledgerInfo ? ledgerInfo.getLastAddConfirmed() : null;
+        if (null == lac) {
+            ByteBuf bb = getEntry(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED);
+            try {
+                bb.skipBytes(2 * Long.BYTES); // skip ledger id and entry id
+                lac = bb.readLong();
+                lac = transientLedgerInfoCache.getUnchecked(ledgerId).setLastAddConfirmed(lac);
+            } finally {
+                bb.release();
+            }
+        }
+        return lac;
     }
 
     @Override
     public boolean waitForLastAddConfirmedUpdate(long ledgerId, long previousLAC,
             Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException {
-        throw new UnsupportedOperationException();
+        return transientLedgerInfoCache.getUnchecked(ledgerId)
+            .waitForLastAddConfirmedUpdate(previousLAC, watcher);
     }
 
     @Override
     public void setExplicitlac(long ledgerId, ByteBuf lac) throws IOException {
-        throw new UnsupportedOperationException();
+        transientLedgerInfoCache.getUnchecked(ledgerId)
+            .setExplicitLac(lac);
     }
 
     @Override
     public ByteBuf getExplicitLac(long ledgerId) {
-        throw new UnsupportedOperationException();
+        TransientLedgerInfo ledgerInfo = transientLedgerInfoCache.getIfPresent(ledgerId);
+        if (null == ledgerInfo) {
+            return null;
+        } else {
+            return ledgerInfo.getExplicitLac();
+        }
     }
 
     @Override
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
index 4b61a657f..f4d67f277 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
@@ -122,6 +122,7 @@ public void simple() throws Exception {
         ByteBuf entry = Unpooled.buffer(1024);
         entry.writeLong(4); // ledger id
         entry.writeLong(1); // entry id
+        entry.writeLong(0); // lac
         entry.writeBytes("entry-1".getBytes());
 
         assertEquals(false, ((DbLedgerStorage) storage).isFlushRequired());
@@ -152,6 +153,7 @@ public void simple() throws Exception {
         ByteBuf entry2 = Unpooled.buffer(1024);
         entry2.writeLong(4); // ledger id
         entry2.writeLong(2); // entry id
+        entry2.writeLong(1); // lac
         entry2.writeBytes("entry-2".getBytes());
 
         storage.addEntry(entry2);
@@ -160,21 +162,28 @@ public void simple() throws Exception {
         res = storage.getEntry(4, BookieProtocol.LAST_ADD_CONFIRMED);
         assertEquals(entry2, res);
 
+        // Read last add confirmed in ledger
+        assertEquals(1L, storage.getLastAddConfirmed(4));
+
         ByteBuf entry3 = Unpooled.buffer(1024);
         entry3.writeLong(4); // ledger id
         entry3.writeLong(3); // entry id
+        entry3.writeLong(2); // lac
         entry3.writeBytes("entry-3".getBytes());
         storage.addEntry(entry3);
 
         ByteBuf entry4 = Unpooled.buffer(1024);
         entry4.writeLong(4); // ledger id
         entry4.writeLong(4); // entry id
+        entry4.writeLong(3); // lac
         entry4.writeBytes("entry-4".getBytes());
         storage.addEntry(entry4);
 
         res = storage.getEntry(4, 4);
         assertEquals(entry4, res);
 
+        assertEquals(3, storage.getLastAddConfirmed(4));
+
         // Delete
         assertEquals(true, storage.ledgerExists(4));
         storage.deleteLedger(4);
@@ -182,10 +191,12 @@ public void simple() throws Exception {
 
         // Should not throw exception event if the ledger was deleted
         storage.getEntry(4, 4);
+        assertEquals(3, storage.getLastAddConfirmed(4));
 
         storage.addEntry(Unpooled.wrappedBuffer(entry2));
         res = storage.getEntry(4, BookieProtocol.LAST_ADD_CONFIRMED);
         assertEquals(entry4, res);
+        assertEquals(3, storage.getLastAddConfirmed(4));
 
         // Get last entry from storage
         storage.flush();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
index 992201694..344f5ec82 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
@@ -311,130 +311,6 @@ public void testAutoCloseableBookKeeper() throws Exception {
         assertTrue("BookKeeper should be closed!", bkc2.closed);
     }
 
-    @Test
-    public void testReadHandleWithNoExplicitLAC() throws Exception {
-        ClientConfiguration confWithNoExplicitLAC = new ClientConfiguration();
-        confWithNoExplicitLAC.setZkServers(zkUtil.getZooKeeperConnectString());
-        confWithNoExplicitLAC.setExplictLacInterval(0);
-
-        BookKeeper bkcWithNoExplicitLAC = new BookKeeper(confWithNoExplicitLAC);
-
-        LedgerHandle wlh = bkcWithNoExplicitLAC.createLedger(digestType, "testPasswd".getBytes());
-        long ledgerId = wlh.getId();
-        int numOfEntries = 5;
-        for (int i = 0; i < numOfEntries; i++) {
-            wlh.addEntry(("foobar" + i).getBytes());
-        }
-
-        LedgerHandle rlh = bkcWithNoExplicitLAC.openLedgerNoRecovery(ledgerId, digestType, "testPasswd".getBytes());
-        assertTrue(
-                "Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(),
-                (rlh.getLastAddConfirmed() == (numOfEntries - 2)));
-
-        Enumeration<LedgerEntry> entries = rlh.readEntries(0, numOfEntries - 2);
-        int entryId = 0;
-        while (entries.hasMoreElements()) {
-            LedgerEntry entry = entries.nextElement();
-            String entryString = new String(entry.getEntry());
-            assertTrue("Expected entry String: " + ("foobar" + entryId) + " actual entry String: " + entryString,
-                    entryString.equals("foobar" + entryId));
-            entryId++;
-        }
-
-        for (int i = numOfEntries; i < 2 * numOfEntries; i++) {
-            wlh.addEntry(("foobar" + i).getBytes());
-        }
-
-        Thread.sleep(3000);
-        assertTrue(
-                "Expected LAC of wlh: " + (2 * numOfEntries - 1) + " actual LAC of rlh: " + wlh.getLastAddConfirmed(),
-                (wlh.getLastAddConfirmed() == (2 * numOfEntries - 1)));
-        assertTrue(
-                "Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(),
-                (rlh.getLastAddConfirmed() == (numOfEntries - 2)));
-
-        // since explicitlacflush policy is not enabled for writeledgerhandle, when we try
-        // to read explicitlac for rlh, it will be reading up to the piggyback value.
-        long explicitlac = rlh.readExplicitLastConfirmed();
-        assertTrue(
-                "Expected Explicit LAC of rlh: " + (numOfEntries - 2) + " actual ExplicitLAC of rlh: " + explicitlac,
-                (explicitlac == (2 * numOfEntries - 2)));
-
-        try {
-            rlh.readEntries(2 * numOfEntries - 1, 2 * numOfEntries - 1);
-            fail("rlh readEntries beyond " + (2 * numOfEntries - 2) + " should fail with ReadException");
-        } catch (BKException.BKReadException readException) {
-        }
-
-        rlh.close();
-        wlh.close();
-        bkcWithNoExplicitLAC.close();
-    }
-
-    @Test
-    public void testReadHandleWithExplicitLAC() throws Exception {
-        ClientConfiguration confWithExplicitLAC = new ClientConfiguration();
-        confWithExplicitLAC.setZkServers(zkUtil.getZooKeeperConnectString());
-        int explicitLacIntervalMillis = 1000;
-        confWithExplicitLAC.setExplictLacInterval(explicitLacIntervalMillis);
-
-        BookKeeper bkcWithExplicitLAC = new BookKeeper(confWithExplicitLAC);
-
-        LedgerHandle wlh = bkcWithExplicitLAC.createLedger(digestType, "testPasswd".getBytes());
-        long ledgerId = wlh.getId();
-        int numOfEntries = 5;
-        for (int i = 0; i < numOfEntries; i++) {
-            wlh.addEntry(("foobar" + i).getBytes());
-        }
-
-        LedgerHandle rlh = bkcWithExplicitLAC.openLedgerNoRecovery(ledgerId, digestType, "testPasswd".getBytes());
-
-        assertTrue(
-                "Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(),
-                (rlh.getLastAddConfirmed() == (numOfEntries - 2)));
-
-        for (int i = numOfEntries; i < 2 * numOfEntries; i++) {
-            wlh.addEntry(("foobar" + i).getBytes());
-        }
-
-        // we need to wait for atleast 2 explicitlacintervals,
-        // since in writehandle for the first call
-        // lh.getExplicitLastAddConfirmed() will be <
-        // lh.getPiggyBackedLastAddConfirmed(),
-        // so it wont make explicit writelac in the first run
-        Thread.sleep((2 * explicitLacIntervalMillis / 1000 + 1) * 1000);
-        assertTrue(
-                "Expected LAC of wlh: " + (2 * numOfEntries - 1) + " actual LAC of wlh: " + wlh.getLastAddConfirmed(),
-                (wlh.getLastAddConfirmed() == (2 * numOfEntries - 1)));
-        // readhandle's lastaddconfirmed wont be updated until readExplicitLastConfirmed call is made
-        assertTrue(
-                "Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(),
-                (rlh.getLastAddConfirmed() == (numOfEntries - 2)));
-
-        long explicitlac = rlh.readExplicitLastConfirmed();
-        assertTrue("Expected Explicit LAC of rlh: " + (2 * numOfEntries - 1)
-                + " actual ExplicitLAC of rlh: " + explicitlac,
-                (explicitlac == (2 * numOfEntries - 1)));
-        // readExplicitLastConfirmed updates the lac of rlh.
-        assertTrue(
-                "Expected LAC of rlh: " + (2 * numOfEntries - 1) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(),
-                (rlh.getLastAddConfirmed() == (2 * numOfEntries - 1)));
-
-        Enumeration<LedgerEntry> entries = rlh.readEntries(numOfEntries, 2 * numOfEntries - 1);
-        int entryId = numOfEntries;
-        while (entries.hasMoreElements()) {
-            LedgerEntry entry = entries.nextElement();
-            String entryString = new String(entry.getEntry());
-            assertTrue("Expected entry String: " + ("foobar" + entryId) + " actual entry String: " + entryString,
-                    entryString.equals("foobar" + entryId));
-            entryId++;
-        }
-
-        rlh.close();
-        wlh.close();
-        bkcWithExplicitLAC.close();
-    }
-
     @Test
     public void testReadAfterLastAddConfirmed() throws Exception {
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ExplicitLacTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ExplicitLacTest.java
new file mode 100644
index 000000000..745be578f
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ExplicitLacTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Enumeration;
+import org.apache.bookkeeper.bookie.InterleavedLedgerStorage;
+import org.apache.bookkeeper.bookie.LedgerStorage;
+import org.apache.bookkeeper.bookie.SortedLedgerStorage;
+import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.bookkeeper.util.TestUtils;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Test cases for `Explicit Lac` feature.
+ */
+@RunWith(Parameterized.class)
+public class ExplicitLacTest extends BookKeeperClusterTestCase {
+
+    private final DigestType digestType;
+
+    public ExplicitLacTest(Class<? extends LedgerStorage> storageClass) {
+        super(1);
+        this.digestType = DigestType.CRC32;
+        baseConf.setLedgerStorageClass(storageClass.getName());
+    }
+
+    @Parameters
+    public static Collection<Object[]> configs() {
+        return Arrays.asList(new Object[][] {
+            { InterleavedLedgerStorage.class },
+            { SortedLedgerStorage.class },
+            { DbLedgerStorage.class },
+        });
+    }
+
+    @Test
+    public void testReadHandleWithNoExplicitLAC() throws Exception {
+        ClientConfiguration confWithNoExplicitLAC = new ClientConfiguration();
+        confWithNoExplicitLAC.setZkServers(zkUtil.getZooKeeperConnectString());
+        confWithNoExplicitLAC.setExplictLacInterval(0);
+
+        BookKeeper bkcWithNoExplicitLAC = new BookKeeper(confWithNoExplicitLAC);
+
+        LedgerHandle wlh = bkcWithNoExplicitLAC.createLedger(
+            1, 1, 1,
+            digestType, "testPasswd".getBytes());
+        long ledgerId = wlh.getId();
+        int numOfEntries = 5;
+        for (int i = 0; i < numOfEntries; i++) {
+            wlh.addEntry(("foobar" + i).getBytes());
+        }
+
+        LedgerHandle rlh = bkcWithNoExplicitLAC.openLedgerNoRecovery(ledgerId, digestType, "testPasswd".getBytes());
+        assertTrue(
+                "Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(),
+                (rlh.getLastAddConfirmed() == (numOfEntries - 2)));
+
+        Enumeration<LedgerEntry> entries = rlh.readEntries(0, numOfEntries - 2);
+        int entryId = 0;
+        while (entries.hasMoreElements()) {
+            LedgerEntry entry = entries.nextElement();
+            String entryString = new String(entry.getEntry());
+            assertTrue("Expected entry String: " + ("foobar" + entryId) + " actual entry String: " + entryString,
+                    entryString.equals("foobar" + entryId));
+            entryId++;
+        }
+
+        for (int i = numOfEntries; i < 2 * numOfEntries; i++) {
+            wlh.addEntry(("foobar" + i).getBytes());
+        }
+
+        TestUtils.waitUntilLacUpdated(rlh, numOfEntries - 2);
+
+        assertTrue(
+                "Expected LAC of wlh: " + (2 * numOfEntries - 1) + " actual LAC of rlh: " + wlh.getLastAddConfirmed(),
+                (wlh.getLastAddConfirmed() == (2 * numOfEntries - 1)));
+        assertTrue(
+                "Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(),
+                (rlh.getLastAddConfirmed() == (numOfEntries - 2)));
+
+        // since explicitlacflush policy is not enabled for writeledgerhandle, when we try
+        // to read explicitlac for rlh, it will be reading up to the piggyback value.
+        long explicitlac = rlh.readExplicitLastConfirmed();
+        assertTrue(
+                "Expected Explicit LAC of rlh: " + (numOfEntries - 2) + " actual ExplicitLAC of rlh: " + explicitlac,
+                (explicitlac == (2 * numOfEntries - 2)));
+
+        try {
+            rlh.readEntries(2 * numOfEntries - 1, 2 * numOfEntries - 1);
+            fail("rlh readEntries beyond " + (2 * numOfEntries - 2) + " should fail with ReadException");
+        } catch (BKException.BKReadException readException) {
+        }
+
+        rlh.close();
+        wlh.close();
+        bkcWithNoExplicitLAC.close();
+    }
+
+    @Test
+    public void testReadHandleWithExplicitLAC() throws Exception {
+        ClientConfiguration confWithExplicitLAC = new ClientConfiguration();
+        confWithExplicitLAC.setZkServers(zkUtil.getZooKeeperConnectString());
+        int explicitLacIntervalMillis = 1000;
+        confWithExplicitLAC.setExplictLacInterval(explicitLacIntervalMillis);
+
+        BookKeeper bkcWithExplicitLAC = new BookKeeper(confWithExplicitLAC);
+
+        LedgerHandle wlh = bkcWithExplicitLAC.createLedger(
+            1, 1, 1,
+            digestType, "testPasswd".getBytes());
+        long ledgerId = wlh.getId();
+        int numOfEntries = 5;
+        for (int i = 0; i < numOfEntries; i++) {
+            wlh.addEntry(("foobar" + i).getBytes());
+        }
+
+        LedgerHandle rlh = bkcWithExplicitLAC.openLedgerNoRecovery(ledgerId, digestType, "testPasswd".getBytes());
+
+        assertTrue(
+                "Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(),
+                (rlh.getLastAddConfirmed() == (numOfEntries - 2)));
+
+        for (int i = numOfEntries; i < 2 * numOfEntries; i++) {
+            wlh.addEntry(("foobar" + i).getBytes());
+        }
+
+        // we need to wait for atleast 2 explicitlacintervals,
+        // since in writehandle for the first call
+        // lh.getExplicitLastAddConfirmed() will be <
+        // lh.getPiggyBackedLastAddConfirmed(),
+        // so it wont make explicit writelac in the first run
+        TestUtils.waitUntilLacUpdated(rlh, 2 * numOfEntries - 2);
+
+        assertTrue(
+                "Expected LAC of wlh: " + (2 * numOfEntries - 1) + " actual LAC of wlh: " + wlh.getLastAddConfirmed(),
+                (wlh.getLastAddConfirmed() == (2 * numOfEntries - 1)));
+        // readhandle's lastaddconfirmed wont be updated until readExplicitLastConfirmed call is made
+        assertTrue(
+                "Expected LAC of rlh: " + (2 * numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(),
+                (rlh.getLastAddConfirmed() == (2 * numOfEntries - 2)));
+
+        long explicitlac = TestUtils.waitUntilExplicitLacUpdated(rlh, 2 * numOfEntries - 1);
+        assertTrue("Expected Explicit LAC of rlh: " + (2 * numOfEntries - 1)
+                + " actual ExplicitLAC of rlh: " + explicitlac,
+                (explicitlac == (2 * numOfEntries - 1)));
+        // readExplicitLastConfirmed updates the lac of rlh.
+        assertTrue(
+                "Expected LAC of rlh: " + (2 * numOfEntries - 1) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(),
+                (rlh.getLastAddConfirmed() == (2 * numOfEntries - 1)));
+
+        Enumeration<LedgerEntry> entries = rlh.readEntries(numOfEntries, 2 * numOfEntries - 1);
+        int entryId = numOfEntries;
+        while (entries.hasMoreElements()) {
+            LedgerEntry entry = entries.nextElement();
+            String entryString = new String(entry.getEntry());
+            assertTrue("Expected entry String: " + ("foobar" + entryId) + " actual entry String: " + entryString,
+                    entryString.equals("foobar" + entryId));
+            entryId++;
+        }
+
+        rlh.close();
+        wlh.close();
+        bkcWithExplicitLAC.close();
+    }
+
+
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPiggybackLAC.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPiggybackLAC.java
index 616b9fb27..d8be69c7f 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPiggybackLAC.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPiggybackLAC.java
@@ -22,32 +22,52 @@
 
 import static org.junit.Assert.assertEquals;
 
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Enumeration;
 
+import org.apache.bookkeeper.bookie.InterleavedLedgerStorage;
+import org.apache.bookkeeper.bookie.LedgerStorage;
+import org.apache.bookkeeper.bookie.SortedLedgerStorage;
+import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Test a piggyback LAC.
  */
+@RunWith(Parameterized.class)
 public class TestPiggybackLAC extends BookKeeperClusterTestCase {
 
     private static final Logger LOG = LoggerFactory.getLogger(TestPiggybackLAC.class);
 
     final DigestType digestType;
 
-    public TestPiggybackLAC() {
-        super(3);
+    public TestPiggybackLAC(Class<? extends LedgerStorage> storageClass) {
+        super(1);
         this.digestType = DigestType.CRC32;
+        baseConf.setLedgerStorageClass(storageClass.getName());
+    }
+
+    @Parameters
+    public static Collection<Object[]> configs() {
+        return Arrays.asList(new Object[][] {
+            { InterleavedLedgerStorage.class },
+            { SortedLedgerStorage.class },
+            { DbLedgerStorage.class },
+        });
     }
 
     @Test
     public void testPiggybackLAC() throws Exception {
         int numEntries = 10;
-        LedgerHandle lh = bkc.createLedger(3, 3, 3, digestType, "".getBytes());
+        LedgerHandle lh = bkc.createLedger(1, 1, 1, digestType, "".getBytes());
         // tried to add entries
         for (int i = 0; i < numEntries; i++) {
             lh.addEntry(("data" + i).getBytes());
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastConfirmedAndEntry.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastConfirmedAndEntry.java
index d78b228ab..7c2cf5b78 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastConfirmedAndEntry.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastConfirmedAndEntry.java
@@ -27,6 +27,8 @@
 import io.netty.buffer.ByteBuf;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Enumeration;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -35,27 +37,45 @@
 
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.bookie.InterleavedLedgerStorage;
+import org.apache.bookkeeper.bookie.LedgerStorage;
+import org.apache.bookkeeper.bookie.SortedLedgerStorage;
+import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.zookeeper.KeeperException;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Test reading the last confirmed and entry.
  */
+@RunWith(Parameterized.class)
 public class TestReadLastConfirmedAndEntry extends BookKeeperClusterTestCase {
 
     private static final Logger logger = LoggerFactory.getLogger(TestReadLastConfirmedAndEntry.class);
 
     final BookKeeper.DigestType digestType;
 
-    public TestReadLastConfirmedAndEntry() {
+    public TestReadLastConfirmedAndEntry(Class<? extends LedgerStorage> storageClass) {
         super(3);
         this.digestType = BookKeeper.DigestType.CRC32;
         this.baseConf.setAllowEphemeralPorts(false);
+        this.baseConf.setLedgerStorageClass(storageClass.getName());
+    }
+
+    @Parameters
+    public static Collection<Object[]> configs() {
+        return Arrays.asList(new Object[][] {
+            { InterleavedLedgerStorage.class },
+            { SortedLedgerStorage.class },
+            { DbLedgerStorage.class },
+        });
     }
 
     static class FakeBookie extends Bookie {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastConfirmedLongPoll.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastConfirmedLongPoll.java
index 414c063d2..f754f6a3b 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastConfirmedLongPoll.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastConfirmedLongPoll.java
@@ -20,24 +20,44 @@
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.bookkeeper.bookie.InterleavedLedgerStorage;
+import org.apache.bookkeeper.bookie.LedgerStorage;
+import org.apache.bookkeeper.bookie.SortedLedgerStorage;
+import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
 /**
  * Test read last confirmed long by polling.
  */
+@RunWith(Parameterized.class)
 public class TestReadLastConfirmedLongPoll extends BookKeeperClusterTestCase {
     final DigestType digestType;
 
-    public TestReadLastConfirmedLongPoll() {
+    public TestReadLastConfirmedLongPoll(Class<? extends LedgerStorage> storageClass) {
         super(6);
         this.digestType = DigestType.CRC32;
+        baseConf.setLedgerStorageClass(storageClass.getName());
+    }
+
+    @Parameters
+    public static Collection<Object[]> configs() {
+        return Arrays.asList(new Object[][] {
+            { InterleavedLedgerStorage.class },
+            { SortedLedgerStorage.class },
+            { DbLedgerStorage.class },
+        });
     }
 
     @Test
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
index 04e253de5..0147c06e5 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -793,4 +793,5 @@ public TestStatsProvider getStatsProvider(BookieSocketAddress addr) {
     public TestStatsProvider getStatsProvider(int index) throws Exception {
         return getStatsProvider(bs.get(index).getLocalAddress());
     }
+
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestUtils.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestUtils.java
index 222de5b53..c3f1b8938 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestUtils.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestUtils.java
@@ -25,12 +25,20 @@
 import java.util.HashSet;
 import java.util.Set;
 
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.api.ReadHandle;
 
 /**
  * Test utilities.
  */
-public class TestUtils {
+@Slf4j
+public final class TestUtils {
+
+    private TestUtils() {}
+
     public static boolean hasLogFiles(File ledgerDirectory, boolean partial, Integer... logsId) {
         boolean result = partial ? false : true;
         Set<Integer> logs = new HashSet<Integer>();
@@ -52,4 +60,21 @@ public static boolean hasLogFiles(File ledgerDirectory, boolean partial, Integer
         }
         return result;
     }
+
+    public static void waitUntilLacUpdated(ReadHandle rh, long newLac) throws Exception {
+        long lac = rh.getLastAddConfirmed();
+        while (lac < newLac) {
+            TimeUnit.MILLISECONDS.sleep(20);
+            lac = rh.readLastAddConfirmed().get();
+        }
+    }
+
+    public static long waitUntilExplicitLacUpdated(LedgerHandle rh, long newLac) throws Exception {
+        long lac;
+        while ((lac = rh.readExplicitLastConfirmed()) < newLac) {
+            TimeUnit.MILLISECONDS.sleep(20);
+        }
+        return lac;
+    }
+
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services