You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/03/13 22:51:49 UTC

[bookkeeper] branch master updated: Issue #570: make changes to SyncThread/checkpoint logic.

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 88addff  Issue #570: make changes to SyncThread/checkpoint logic.
88addff is described below

commit 88addff33a9a9474b069ac197d46d2d672ae988d
Author: cguttapalem <cg...@salesforce.com>
AuthorDate: Tue Mar 13 15:51:42 2018 -0700

    Issue #570: make changes to SyncThread/checkpoint logic.
    
    Descriptions of the changes in this PR:
    
    make changes to SyncThread/checkpoint logic, so that incase of
    entrylogperledger, checkpoint happens for every flushInterval
    but not when active entrylog is created/rolled over.
    
    This is < sub-task3  > of Issue #570
    
    Master Issue: #570
    
    Author: cguttapalem <cg...@salesforce.com>
    
    Reviewers: Ivan Kelly <iv...@apache.org>, Sijie Guo <si...@apache.org>
    
    This closes #1236 from reddycharan/addcheckpointlogic, closes #570
---
 .../java/org/apache/bookkeeper/bookie/Bookie.java  |  42 +-
 .../org/apache/bookkeeper/bookie/BookieShell.java  |  10 +
 .../org/apache/bookkeeper/bookie/Checkpointer.java |  13 +-
 .../org/apache/bookkeeper/bookie/EntryLogger.java  |  22 +-
 .../bookie/InterleavedLedgerStorage.java           |   1 -
 .../java/org/apache/bookkeeper/bookie/Journal.java |   2 +-
 .../org/apache/bookkeeper/bookie/SyncThread.java   |  22 +-
 .../bookkeeper/conf/ServerConfiguration.java       |  22 +
 .../bookie/LedgerStorageCheckpointTest.java        | 699 +++++++++++++++++++++
 .../bookie/SortedLedgerStorageCheckpointTest.java  |  27 +-
 .../bookie/storage/ldb/ConversionRollbackTest.java |   5 +
 .../bookie/storage/ldb/ConversionTest.java         |   5 +
 .../storage/ldb/LocationsIndexRebuildTest.java     |   5 +
 conf/bk_server.conf                                |  17 +-
 site/_data/config/bk_server.yaml                   |   7 +-
 15 files changed, 867 insertions(+), 32 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index 7b28cdd..223c977 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -63,6 +63,7 @@ import org.apache.bookkeeper.bookie.BookieException.DiskPartitionDuplicationExce
 import org.apache.bookkeeper.bookie.BookieException.InvalidCookieException;
 import org.apache.bookkeeper.bookie.BookieException.MetadataStoreException;
 import org.apache.bookkeeper.bookie.BookieException.UnknownBookieIdException;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.bookie.Journal.JournalScanner;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
@@ -112,6 +113,7 @@ public class Bookie extends BookieCriticalThread {
     final List<Journal> journals;
 
     final HandleFactory handles;
+    final boolean entryLogPerLedgerEnabled;
 
     static final long METAENTRY_ID_LEDGER_KEY = -0x1000;
     static final long METAENTRY_ID_FENCE_KEY  = -0x2000;
@@ -688,13 +690,43 @@ public class Bookie extends BookieCriticalThread {
                          conf, ledgerDirsManager, statsLogger.scope(JOURNAL_SCOPE + "_" + i)));
         }
 
+        this.entryLogPerLedgerEnabled = conf.isEntryLogPerLedgerEnabled();
         CheckpointSource checkpointSource = new CheckpointSourceList(journals);
 
         // Instantiate the ledger storage implementation
         String ledgerStorageClass = conf.getLedgerStorageClass();
         LOG.info("Using ledger storage: {}", ledgerStorageClass);
         ledgerStorage = LedgerStorageFactory.createLedgerStorage(ledgerStorageClass);
-        syncThread = new SyncThread(conf, getLedgerDirsListener(), ledgerStorage, checkpointSource);
+
+        /*
+         * with this change https://github.com/apache/bookkeeper/pull/677,
+         * LedgerStorage drives the checkpoint logic. But with multiple entry
+         * logs, checkpoint logic based on a entry log is not possible, hence it
+         * needs to be timebased recurring thing and it is driven by SyncThread.
+         * SyncThread.start does that and it is started in Bookie.start method.
+         */
+        if (entryLogPerLedgerEnabled) {
+            syncThread = new SyncThread(conf, getLedgerDirsListener(), ledgerStorage, checkpointSource) {
+                @Override
+                public void startCheckpoint(Checkpoint checkpoint) {
+                    /*
+                     * in the case of entryLogPerLedgerEnabled, LedgerStorage
+                     * dont drive checkpoint logic, but instead it is done
+                     * periodically by SyncThread. So startCheckpoint which
+                     * will be called by LedgerStorage will be no-op.
+                     */
+                }
+
+                @Override
+                public void start() {
+                    executor.scheduleAtFixedRate(() -> {
+                        doCheckpoint(checkpointSource.newCheckpoint());
+                    }, conf.getFlushInterval(), conf.getFlushInterval(), TimeUnit.MILLISECONDS);
+                }
+            };
+        } else {
+            syncThread = new SyncThread(conf, getLedgerDirsListener(), ledgerStorage, checkpointSource);
+        }
 
         ledgerStorage.initialize(
             conf,
@@ -823,6 +855,14 @@ public class Bookie extends BookieCriticalThread {
         }
         LOG.info("Finished reading journal, starting bookie");
 
+
+        /*
+         * start sync thread first, so during replaying journals, we could do
+         * checkpoint which reduce the chance that we need to replay journals
+         * again if bookie restarted again before finished journal replays.
+         */
+        syncThread.start();
+
         // start bookie thread
         super.start();
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index 027a04d..d35684e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -2494,6 +2494,11 @@ public class BookieShell implements Tool {
                 public void startCheckpoint(Checkpoint checkpoint) {
                     // No-op
                 }
+
+                @Override
+                public void start() {
+                    // no-op
+                }
             };
 
             interleavedStorage.initialize(conf, null, ledgerDirsManager, ledgerIndexManager,
@@ -2584,6 +2589,11 @@ public class BookieShell implements Tool {
                 public void startCheckpoint(Checkpoint checkpoint) {
                     // No-op
                 }
+
+                @Override
+                public void start() {
+                    // no-op
+                }
             };
 
             dbStorage.initialize(conf, null, ledgerDirsManager, ledgerIndexManager, null,
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Checkpointer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Checkpointer.java
index 967d3e9..1e3f489 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Checkpointer.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Checkpointer.java
@@ -26,8 +26,16 @@ import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
  */
 public interface Checkpointer {
 
-    Checkpointer NULL = checkpoint -> {
-        // do nothing;
+    Checkpointer NULL = new Checkpointer(){
+        @Override
+        public void startCheckpoint(Checkpoint checkpoint) {
+            // no-op
+        }
+
+        @Override
+        public void start() {
+            // no-op
+        }
     };
 
     /**
@@ -37,4 +45,5 @@ public interface Checkpointer {
      */
     void startCheckpoint(Checkpoint checkpoint);
 
+    void start();
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
index 1792417..5754009 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
@@ -83,7 +83,7 @@ import org.slf4j.LoggerFactory;
 public class EntryLogger {
     private static final Logger LOG = LoggerFactory.getLogger(EntryLogger.class);
 
-    private static class BufferedLogChannel extends BufferedChannel {
+    static class BufferedLogChannel extends BufferedChannel {
         private final long logId;
         private final EntryLogMetadata entryLogMetadata;
         private final File logFile;
@@ -118,6 +118,7 @@ public class EntryLogger {
 
     volatile File currentDir;
     private final LedgerDirsManager ledgerDirsManager;
+    private final boolean entryLogPerLedgerEnabled;
     private final AtomicBoolean shouldCreateNewEntryLog = new AtomicBoolean(false);
 
     private volatile long leastUnflushedLogId;
@@ -131,8 +132,8 @@ public class EntryLogger {
      * The maximum size of a entry logger file.
      */
     final long logSizeLimit;
-    private List<BufferedLogChannel> logChannelsToFlush;
-    private volatile BufferedLogChannel logChannel;
+    List<BufferedLogChannel> logChannelsToFlush;
+    volatile BufferedLogChannel logChannel;
     private volatile BufferedLogChannel compactionLogChannel;
 
     private final EntryLoggerAllocator entryLoggerAllocator;
@@ -285,6 +286,7 @@ public class EntryLogger {
         this.leastUnflushedLogId = logId + 1;
         this.entryLoggerAllocator = new EntryLoggerAllocator(logId);
         this.conf = conf;
+        this.entryLogPerLedgerEnabled = conf.isEntryLogPerLedgerEnabled();
 
         initialize();
     }
@@ -797,6 +799,20 @@ public class EntryLogger {
      */
     void checkpoint() throws IOException {
         flushRotatedLogs();
+        /*
+         * In the case of entryLogPerLedgerEnabled we need to flush both
+         * rotatedlogs and currentlogs. This is needed because syncThread
+         * periodically does checkpoint and at this time all the logs should
+         * be flushed.
+         *
+         * TODO: When EntryLogManager is introduced in the subsequent sub-tasks of
+         * this Issue, I will move this logic to individual implamentations of
+         * EntryLogManager and it would be free of this booalen flag based logic.
+         *
+         */
+        if (entryLogPerLedgerEnabled) {
+            flushCurrentLog();
+        }
     }
 
     void flushRotatedLogs() throws IOException {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index 81de730..ce1a499 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -99,7 +99,6 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
             throws IOException {
         checkNotNull(checkpointSource, "invalid null checkpoint source");
         checkNotNull(checkpointer, "invalid null checkpointer");
-
         this.checkpointSource = checkpointSource;
         this.checkpointer = checkpointer;
         entryLogger = new EntryLogger(conf, ledgerDirsManager, this);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index e21f4d1..5a816a0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -82,7 +82,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
      * @param filter journal id filter
      * @return list of filtered ids
      */
-    private static List<Long> listJournalIds(File journalDir, JournalIdFilter filter) {
+    static List<Long> listJournalIds(File journalDir, JournalIdFilter filter) {
         File logFiles[] = journalDir.listFiles();
         if (logFiles == null || logFiles.length == 0) {
             return Collections.emptyList();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
index 3fbedbc..eeba1c4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
@@ -22,9 +22,7 @@
 package org.apache.bookkeeper.bookie;
 
 import com.google.common.annotations.VisibleForTesting;
-
 import io.netty.util.concurrent.DefaultThreadFactory;
-
 import java.io.IOException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -32,14 +30,12 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import lombok.extern.slf4j.Slf4j;
-
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.util.MathUtils;
 
-
 /**
  * SyncThread is a background thread which help checkpointing ledger storage
  * when a checkpoint is requested. After a ledger storage is checkpointed,
@@ -61,7 +57,6 @@ import org.apache.bookkeeper.util.MathUtils;
 class SyncThread implements Checkpointer {
 
     final ScheduledExecutorService executor;
-    final int flushInterval;
     final LedgerStorage ledgerStorage;
     final LedgerDirsListener dirsListener;
     final CheckpointSource checkpointSource;
@@ -78,14 +73,14 @@ class SyncThread implements Checkpointer {
         this.ledgerStorage = ledgerStorage;
         this.checkpointSource = checkpointSource;
         this.executor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("SyncThread"));
-        flushInterval = conf.getFlushInterval();
-        if (log.isDebugEnabled()) {
-            log.debug("Flush Interval : {}", flushInterval);
-        }
     }
 
     @Override
     public void startCheckpoint(Checkpoint checkpoint) {
+        doCheckpoint(checkpoint);
+    }
+
+    protected void doCheckpoint(Checkpoint checkpoint) {
         executor.submit(() -> {
             try {
                 synchronized (suspensionLock) {
@@ -108,14 +103,13 @@ class SyncThread implements Checkpointer {
         });
     }
 
-    public Future<Void> requestFlush() {
+    public Future requestFlush() {
         return executor.submit(() -> {
             try {
                 flush();
             } catch (Throwable t) {
                 log.error("Exception flushing ledgers ", t);
             }
-            return null;
         });
     }
 
@@ -171,6 +165,11 @@ class SyncThread implements Checkpointer {
         }
     }
 
+    @Override
+    public void start() {
+        // no-op
+    }
+
     /**
      * Suspend sync thread. (for testing)
      */
@@ -201,6 +200,7 @@ class SyncThread implements Checkpointer {
     void shutdown() throws InterruptedException {
         log.info("Shutting down SyncThread");
         requestFlush();
+
         executor.shutdown();
         long start = MathUtils.now();
         while (!executor.awaitTermination(5, TimeUnit.MINUTES)) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index da63685..328f473 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -179,6 +179,11 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
     // Stats
     protected static final String ENABLE_TASK_EXECUTION_STATS = "enableTaskExecutionStats";
 
+    /*
+     * config specifying if the entrylog per ledger is enabled or not.
+     */
+    protected static final String ENTRY_LOG_PER_LEDGER_ENABLED = "entryLogPerLedgerEnabled";
+
     /**
      * Construct a default configuration object.
      */
@@ -2633,4 +2638,21 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
     protected ServerConfiguration getThis() {
         return this;
     }
+
+    /*
+     * specifies if entryLog per ledger is enabled. If it is enabled, then there
+     * would be a active entrylog for each ledger
+     */
+    public boolean isEntryLogPerLedgerEnabled() {
+        return this.getBoolean(ENTRY_LOG_PER_LEDGER_ENABLED, false);
+    }
+
+    /*
+     * enables/disables entrylog per ledger feature.
+     *
+     */
+    public ServerConfiguration setEntryLogPerLedgerEnabled(boolean entryLogPerLedgerEnabled) {
+        this.setProperty(ENTRY_LOG_PER_LEDGER_ENABLED, Boolean.toString(entryLogPerLedgerEnabled));
+        return this;
+    }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java
new file mode 100644
index 0000000..1a795ad
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java
@@ -0,0 +1,699 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Enumeration;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+
+import org.apache.bookkeeper.bookie.Journal.LastLogMark;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.common.testing.executors.MockExecutorController;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.bookkeeper.test.ZooKeeperUtil;
+import org.apache.bookkeeper.util.IOUtils;
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * LedgerStorageCheckpointTest.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(SyncThread.class)
+@PowerMockIgnore("javax.*")
+public class LedgerStorageCheckpointTest {
+    private static final Logger LOG = LoggerFactory
+            .getLogger(LedgerStorageCheckpointTest.class);
+
+    @Rule
+    public final TestName runtime = new TestName();
+
+    // ZooKeeper related variables
+    protected final ZooKeeperUtil zkUtil = new ZooKeeperUtil();
+
+    // BookKeeper related variables
+    protected final List<File> tmpDirs = new LinkedList<File>();
+
+    // ScheduledExecutorService used by SyncThread
+    MockExecutorController executorController;
+
+    @Before
+    public void setUp() throws Exception {
+        LOG.info("Setting up test {}", getClass());
+        PowerMockito.mockStatic(Executors.class);
+
+        try {
+            // start zookeeper service
+            startZKCluster();
+        } catch (Exception e) {
+            LOG.error("Error setting up", e);
+            throw e;
+        }
+
+        ScheduledExecutorService scheduledExecutorService = PowerMockito.mock(ScheduledExecutorService.class);
+        executorController = new MockExecutorController()
+                .controlSubmit(scheduledExecutorService)
+                .controlScheduleAtFixedRate(scheduledExecutorService, 10);
+        PowerMockito.when(scheduledExecutorService.awaitTermination(anyLong(), any(TimeUnit.class))).thenReturn(true);
+        PowerMockito.when(Executors.newSingleThreadScheduledExecutor(any())).thenReturn(scheduledExecutorService);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        LOG.info("TearDown");
+        Exception tearDownException = null;
+        // stop zookeeper service
+        try {
+            stopZKCluster();
+        } catch (Exception e) {
+            LOG.error("Got Exception while trying to stop ZKCluster", e);
+            tearDownException = e;
+        }
+        // cleanup temp dirs
+        try {
+            cleanupTempDirs();
+        } catch (Exception e) {
+            LOG.error("Got Exception while trying to cleanupTempDirs", e);
+            tearDownException = e;
+        }
+        if (tearDownException != null) {
+            throw tearDownException;
+        }
+    }
+
+    /**
+     * Start zookeeper cluster.
+     *
+     * @throws Exception
+     */
+    protected void startZKCluster() throws Exception {
+        zkUtil.startServer();
+    }
+
+    /**
+     * Stop zookeeper cluster.
+     *
+     * @throws Exception
+     */
+    protected void stopZKCluster() throws Exception {
+        zkUtil.killServer();
+    }
+
+    protected void cleanupTempDirs() throws Exception {
+        for (File f : tmpDirs) {
+            FileUtils.deleteDirectory(f);
+        }
+    }
+
+    protected File createTempDir(String prefix, String suffix) throws IOException {
+        File dir = IOUtils.createTempDir(prefix, suffix);
+        tmpDirs.add(dir);
+        return dir;
+    }
+
+    private LogMark readLastMarkFile(File lastMarkFile) throws IOException {
+        byte buff[] = new byte[16];
+        ByteBuffer bb = ByteBuffer.wrap(buff);
+        LogMark rolledLogMark = new LogMark();
+        FileInputStream fis = new FileInputStream(lastMarkFile);
+        int bytesRead = fis.read(buff);
+        fis.close();
+        if (bytesRead != 16) {
+            throw new IOException("Couldn't read enough bytes from lastMark." + " Wanted " + 16 + ", got " + bytesRead);
+        }
+        bb.clear();
+        rolledLogMark.readLogMark(bb);
+        return rolledLogMark;
+    }
+
+    /*
+     * In this testcase, InterleavedLedgerStorage is used and validate if the
+     * checkpoint is called for every flushinterval period.
+     */
+    @Test
+    public void testPeriodicCheckpointForInterleavedLedgerStorage() throws Exception {
+        testPeriodicCheckpointForLedgerStorage(InterleavedLedgerStorage.class.getName());
+    }
+
+    /*
+     * In this testcase, SortedLedgerStorage is used and validate if the
+     * checkpoint is called for every flushinterval period.
+     */
+    @Test
+    public void testPeriodicCheckpointForSortedLedgerStorage() throws Exception {
+        testPeriodicCheckpointForLedgerStorage(SortedLedgerStorage.class.getName());
+    }
+
+    public void testPeriodicCheckpointForLedgerStorage(String ledgerStorageClassName) throws Exception {
+        File tmpDir = createTempDir("DiskCheck", "test");
+
+        final ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
+                .setZkServers(zkUtil.getZooKeeperConnectString())
+                .setZkTimeout(5000)
+                .setJournalDirName(tmpDir.getPath())
+                .setLedgerDirNames(new String[] { tmpDir.getPath() })
+                .setAutoRecoveryDaemonEnabled(false)
+                .setFlushInterval(2000)
+                .setBookiePort(PortManager.nextFreePort())
+                // entrylog per ledger is enabled
+                .setEntryLogPerLedgerEnabled(true)
+                .setLedgerStorageClass(ledgerStorageClassName);
+        Assert.assertEquals("Number of JournalDirs", 1, conf.getJournalDirs().length);
+        // we know there is only one ledgerDir
+        File ledgerDir = Bookie.getCurrentDirectories(conf.getLedgerDirs())[0];
+        BookieServer server = new BookieServer(conf);
+        server.start();
+        ClientConfiguration clientConf = new ClientConfiguration();
+        clientConf.setZkServers(zkUtil.getZooKeeperConnectString());
+        BookKeeper bkClient = new BookKeeper(clientConf);
+
+        int numOfLedgers = 2;
+        int numOfEntries = 5;
+        byte[] dataBytes = "data".getBytes();
+
+        for (int i = 0; i < numOfLedgers; i++) {
+            int ledgerIndex = i;
+            LedgerHandle handle = bkClient.createLedgerAdv((long) i, 1, 1, 1, DigestType.CRC32, "passwd".getBytes(),
+                    null);
+            for (int j = 0; j < numOfEntries; j++) {
+                handle.addEntry(j, dataBytes);
+            }
+            handle.close();
+        }
+
+        LastLogMark lastLogMarkAfterFirstSetOfAdds = server.getBookie().journals.get(0).getLastLogMark();
+        LogMark curMarkAfterFirstSetOfAdds = lastLogMarkAfterFirstSetOfAdds.getCurMark();
+
+        File lastMarkFile = new File(ledgerDir, "lastMark");
+        // lastMark file should be zero, because checkpoint hasn't happenend
+        LogMark logMarkFileBeforeCheckpoint = readLastMarkFile(lastMarkFile);
+        Assert.assertEquals("lastMarkFile before checkpoint should be zero", 0,
+                logMarkFileBeforeCheckpoint.compare(new LogMark()));
+
+        // wait for flushInterval for SyncThread to do next iteration of checkpoint
+        executorController.advance(Duration.ofMillis(conf.getFlushInterval()));
+        /*
+         * since we have waited for more than flushInterval SyncThread should
+         * have checkpointed. if entrylogperledger is not enabled, then we
+         * checkpoint only when currentLog in EntryLogger is rotated. but if
+         * entrylogperledger is enabled, then we checkpoint for every
+         * flushInterval period
+         */
+        Assert.assertTrue("lastMark file must be existing, because checkpoint should have happened",
+                lastMarkFile.exists());
+
+        LastLogMark lastLogMarkAfterCheckpoint = server.getBookie().journals.get(0).getLastLogMark();
+        LogMark curMarkAfterCheckpoint = lastLogMarkAfterCheckpoint.getCurMark();
+
+        LogMark rolledLogMark = readLastMarkFile(lastMarkFile);
+        Assert.assertNotEquals("rolledLogMark should not be zero, since checkpoint has happenend", 0,
+                rolledLogMark.compare(new LogMark()));
+        /*
+         * Curmark should be equal before and after checkpoint, because we didnt
+         * add new entries during this period
+         */
+        Assert.assertTrue("Curmark should be equal before and after checkpoint",
+                curMarkAfterCheckpoint.compare(curMarkAfterFirstSetOfAdds) == 0);
+        /*
+         * Curmark after checkpoint should be equal to rolled logmark, because
+         * we checkpointed
+         */
+        Assert.assertTrue("Curmark after first set of adds should be equal to rolled logmark",
+                curMarkAfterCheckpoint.compare(rolledLogMark) == 0);
+
+        // add more ledger/entries
+        for (int i = numOfLedgers; i < 2 * numOfLedgers; i++) {
+            int ledgerIndex = i;
+            LedgerHandle handle = bkClient.createLedgerAdv((long) i, 1, 1, 1, DigestType.CRC32, "passwd".getBytes(),
+                    null);
+            for (int j = 0; j < numOfEntries; j++) {
+                handle.addEntry(j, dataBytes);
+            }
+            handle.close();
+        }
+
+        // wait for flushInterval for SyncThread to do next iteration of checkpoint
+        executorController.advance(Duration.ofMillis(conf.getFlushInterval()));
+
+        LastLogMark lastLogMarkAfterSecondSetOfAdds = server.getBookie().journals.get(0).getLastLogMark();
+        LogMark curMarkAfterSecondSetOfAdds = lastLogMarkAfterSecondSetOfAdds.getCurMark();
+
+        rolledLogMark = readLastMarkFile(lastMarkFile);
+        /*
+         * Curmark after checkpoint should be equal to rolled logmark, because
+         * we checkpointed
+         */
+        Assert.assertTrue("Curmark after second set of adds should be equal to rolled logmark",
+                curMarkAfterSecondSetOfAdds.compare(rolledLogMark) == 0);
+
+        server.shutdown();
+        bkClient.close();
+    }
+
+    /*
+     * In this testcase, InterleavedLedgerStorage is used, entrylogperledger is
+     * enabled and validate that when entrylog is rotated it doesn't do
+     * checkpoint.
+     */
+    @Test
+    public void testCheckpointOfILSEntryLogIsRotatedWithELPLEnabled() throws Exception {
+        testCheckpointofILSWhenEntryLogIsRotated(true);
+    }
+
+    /*
+     * In this testcase, InterleavedLedgerStorage is used, entrylogperledger is
+     * not enabled and validate that when entrylog is rotated it does
+     * checkpoint.
+     */
+    @Test
+    public void testCheckpointOfILSEntryLogIsRotatedWithELPLDisabled() throws Exception {
+        testCheckpointofILSWhenEntryLogIsRotated(false);
+    }
+
+    public void testCheckpointofILSWhenEntryLogIsRotated(boolean entryLogPerLedgerEnabled) throws Exception {
+        File tmpDir = createTempDir("DiskCheck", "test");
+
+        final ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
+                .setZkServers(zkUtil.getZooKeeperConnectString())
+                .setZkTimeout(5000)
+                .setJournalDirName(tmpDir.getPath())
+                .setLedgerDirNames(new String[] { tmpDir.getPath() })
+                .setAutoRecoveryDaemonEnabled(false)
+                //set very high period for flushInterval
+                .setFlushInterval(30000)
+                .setBookiePort(PortManager.nextFreePort())
+                // entrylog per ledger is enabled
+                .setEntryLogPerLedgerEnabled(entryLogPerLedgerEnabled)
+                .setLedgerStorageClass(InterleavedLedgerStorage.class.getName());
+
+        Assert.assertEquals("Number of JournalDirs", 1, conf.getJournalDirs().length);
+        // we know there is only one ledgerDir
+        File ledgerDir = Bookie.getCurrentDirectories(conf.getLedgerDirs())[0];
+        BookieServer server = new BookieServer(conf);
+        server.start();
+        ClientConfiguration clientConf = new ClientConfiguration();
+        clientConf.setZkServers(zkUtil.getZooKeeperConnectString());
+        BookKeeper bkClient = new BookKeeper(clientConf);
+        InterleavedLedgerStorage ledgerStorage = (InterleavedLedgerStorage) server.getBookie().ledgerStorage;
+
+        int numOfEntries = 5;
+        byte[] dataBytes = "data".getBytes();
+
+        long ledgerId = 10;
+        LedgerHandle handle = bkClient.createLedgerAdv(ledgerId, 1, 1, 1, DigestType.CRC32, "passwd".getBytes(), null);
+        for (int j = 0; j < numOfEntries; j++) {
+            handle.addEntry(j, dataBytes);
+        }
+        handle.close();
+        // simulate rolling entrylog
+        ledgerStorage.entryLogger.rollLog();
+        // sleep for a bit for checkpoint to do its task
+        executorController.advance(Duration.ofMillis(500));
+
+        File lastMarkFile = new File(ledgerDir, "lastMark");
+        LogMark rolledLogMark = readLastMarkFile(lastMarkFile);
+        if (entryLogPerLedgerEnabled) {
+            Assert.assertEquals(
+                    "rolledLogMark should be zero, since checkpoint"
+                            + "shouldn't have happened when entryLog is rotated",
+                    0, rolledLogMark.compare(new LogMark()));
+        } else {
+            Assert.assertNotEquals("rolledLogMark shouldn't be zero, since checkpoint"
+                    + "should have happened when entryLog is rotated", 0, rolledLogMark.compare(new LogMark()));
+        }
+        bkClient.close();
+        server.shutdown();
+    }
+
+    /*
+     * In this testcase, SortedLedgerStorage is used, entrylogperledger is
+     * enabled and validate that when entrylog is rotated it doesn't do
+     * checkpoint.
+     */
+    @Test
+    public void testCheckpointOfSLSEntryLogIsRotatedWithELPLEnabled() throws Exception {
+        testCheckpointOfSLSWhenEntryLogIsRotated(true);
+    }
+
+    /*
+     * In this testcase, SortedLedgerStorage is used, entrylogperledger is
+     * not enabled and validate that when entrylog is rotated it does
+     * checkpoint.
+     */
+    @Test
+    public void testCheckpointOfSLSEntryLogIsRotatedWithELPLDisabled() throws Exception {
+        testCheckpointOfSLSWhenEntryLogIsRotated(false);
+    }
+
+    public void testCheckpointOfSLSWhenEntryLogIsRotated(boolean entryLogPerLedgerEnabled) throws Exception {
+        File tmpDir = createTempDir("DiskCheck", "test");
+
+        final ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
+                .setZkServers(zkUtil.getZooKeeperConnectString())
+                .setZkTimeout(5000)
+                .setJournalDirName(tmpDir.getPath())
+                .setLedgerDirNames(new String[] { tmpDir.getPath() })
+                .setAutoRecoveryDaemonEnabled(false)
+                //set very high period for flushInterval
+                .setFlushInterval(30000)
+                .setBookiePort(PortManager.nextFreePort())
+                // entrylog per ledger is enabled
+                .setEntryLogPerLedgerEnabled(entryLogPerLedgerEnabled)
+                .setLedgerStorageClass(SortedLedgerStorage.class.getName())
+                // set very low skipListSizeLimit and entryLogSizeLimit to simulate log file rotation
+                .setSkipListSizeLimit(1 * 1000 * 1000)
+                .setEntryLogSizeLimit(2 * 1000 * 1000);
+
+        Assert.assertEquals("Number of JournalDirs", 1, conf.getJournalDirs().length);
+        // we know there is only one ledgerDir
+        File ledgerDir = Bookie.getCurrentDirectories(conf.getLedgerDirs())[0];
+        BookieServer server = new BookieServer(conf);
+        server.start();
+        ClientConfiguration clientConf = new ClientConfiguration();
+        clientConf.setZkServers(zkUtil.getZooKeeperConnectString());
+        BookKeeper bkClient = new BookKeeper(clientConf);
+        InterleavedLedgerStorage ledgerStorage = (InterleavedLedgerStorage) server.getBookie().ledgerStorage;
+
+        Random rand = new Random();
+        byte[] dataBytes = new byte[10 * 1000];
+        rand.nextBytes(dataBytes);
+        int numOfEntries = ((int) conf.getEntryLogSizeLimit() + (100 * 1000)) / dataBytes.length;
+
+        LedgerHandle handle = bkClient.createLedgerAdv(10, 1, 1, 1, DigestType.CRC32, "passwd".getBytes(), null);
+        for (int j = 0; j < numOfEntries; j++) {
+            handle.addEntry(j, dataBytes);
+        }
+        handle.close();
+
+        // sleep for a bit for checkpoint to do its task
+        executorController.advance(Duration.ofMillis(500));
+
+        File lastMarkFile = new File(ledgerDir, "lastMark");
+        LogMark rolledLogMark = readLastMarkFile(lastMarkFile);
+        if (entryLogPerLedgerEnabled) {
+            Assert.assertEquals(
+                    "rolledLogMark should be zero, since checkpoint"
+                            + "shouldn't have happened when entryLog is rotated",
+                    0, rolledLogMark.compare(new LogMark()));
+        } else {
+            Assert.assertNotEquals("rolledLogMark shouldn't be zero, since checkpoint"
+                    + "should have happened when entryLog is rotated", 0, rolledLogMark.compare(new LogMark()));
+        }
+        bkClient.close();
+        server.shutdown();
+    }
+
+    /*
+     * in this method it checks if entryLogPerLedger is enabled, then
+     * InterLeavedLedgerStorage.checkpoint flushes current activelog and flushes
+     * all rotatedlogs and closes them.
+     *
+     */
+    @Test
+    public void testIfEntryLogPerLedgerEnabledCheckpointFlushesAllLogs() throws Exception {
+        File tmpDir = createTempDir("DiskCheck", "test");
+
+        final ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
+                .setZkServers(zkUtil.getZooKeeperConnectString())
+                .setZkTimeout(5000)
+                .setJournalDirName(tmpDir.getPath())
+                .setLedgerDirNames(new String[] { tmpDir.getPath() })
+                .setAutoRecoveryDaemonEnabled(false)
+                //set flushInterval
+                .setFlushInterval(3000)
+                .setBookiePort(PortManager.nextFreePort())
+                // entrylog per ledger is enabled
+                .setEntryLogPerLedgerEnabled(true)
+                .setLedgerStorageClass(InterleavedLedgerStorage.class.getName())
+                // set setFlushIntervalInBytes to some very high number
+                .setFlushIntervalInBytes(10000000);
+
+        Assert.assertEquals("Number of JournalDirs", 1, conf.getJournalDirs().length);
+        // we know there is only one ledgerDir
+        File ledgerDir = Bookie.getCurrentDirectories(conf.getLedgerDirs())[0];
+        BookieServer server = new BookieServer(conf);
+        server.start();
+        ClientConfiguration clientConf = new ClientConfiguration();
+        clientConf.setZkServers(zkUtil.getZooKeeperConnectString());
+        BookKeeper bkClient = new BookKeeper(clientConf);
+        InterleavedLedgerStorage ledgerStorage = (InterleavedLedgerStorage) server.getBookie().ledgerStorage;
+        EntryLogger entryLogger = ledgerStorage.entryLogger;
+
+        int numOfEntries = 5;
+        byte[] dataBytes = "data".getBytes();
+
+        long ledgerId = 10;
+        LedgerHandle handle = bkClient.createLedgerAdv(ledgerId, 1, 1, 1, DigestType.CRC32, "passwd".getBytes(), null);
+        for (int j = 0; j < numOfEntries; j++) {
+            handle.addEntry(j, dataBytes);
+        }
+        handle.close();
+        // simulate rolling entrylog
+        ledgerStorage.entryLogger.rollLog();
+
+        ledgerId = 20;
+        handle = bkClient.createLedgerAdv(ledgerId, 1, 1, 1, DigestType.CRC32, "passwd".getBytes(), null);
+        for (int j = 0; j < numOfEntries; j++) {
+            handle.addEntry(j, dataBytes);
+        }
+        handle.close();
+        // simulate rolling entrylog
+        ledgerStorage.entryLogger.rollLog();
+
+        ledgerId = 30;
+        handle = bkClient.createLedgerAdv(ledgerId, 1, 1, 1, DigestType.CRC32, "passwd".getBytes(), null);
+        for (int j = 0; j < numOfEntries; j++) {
+            handle.addEntry(j, dataBytes);
+        }
+        handle.close();
+
+        Assert.assertNotEquals("bytesWrittenSinceLastFlush shouldn't be zero", 0,
+                entryLogger.logChannel.getUnpersistedBytes());
+        Assert.assertNotEquals("There should be logChannelsToFlush", 0, entryLogger.logChannelsToFlush.size());
+
+        /*
+         * wait for atleast flushInterval period, so that checkpoint can happen.
+         */
+        executorController.advance(Duration.ofMillis(conf.getFlushInterval()));
+
+        /*
+         * since checkpoint happenend, there shouldn't be any logChannelsToFlush
+         * and bytesWrittenSinceLastFlush should be zero.
+         */
+        Assert.assertTrue("There shouldn't be logChannelsToFlush",
+                ((entryLogger.logChannelsToFlush == null) || (entryLogger.logChannelsToFlush.size() == 0)));
+
+        Assert.assertEquals("bytesWrittenSinceLastFlush should be zero", 0,
+                entryLogger.logChannel.getUnpersistedBytes());
+    }
+
+    static class MockInterleavedLedgerStorage extends InterleavedLedgerStorage {
+        @Override
+        public void shutdown() {
+            // During BookieServer shutdown this method will be called
+            // and we want it to be noop.
+            // do nothing
+        }
+
+        @Override
+        public synchronized void flush() throws IOException {
+            // this method will be called by SyncThread.shutdown.
+            // During BookieServer shutdown we want this method to be noop
+            // do nothing
+        }
+    }
+
+    /*
+     * This is complete end-to-end scenario.
+     *
+     * 1) This testcase uses MockInterleavedLedgerStorage, which extends
+     * InterleavedLedgerStorage but doesn't do anything when Bookie is shutdown.
+     * This is needed to simulate Bookie crash.
+     * 2) entryLogPerLedger is enabled
+     * 3) ledgers are created and entries are added.
+     * 4) wait for flushInterval period for checkpoint to complete
+     * 5) simulate bookie crash
+     * 6) delete the journal files and lastmark file
+     * 7) Now restart the Bookie
+     * 8) validate that the entries which were written can be read successfully.
+     */
+    @Test
+    public void testCheckPointForEntryLoggerWithMultipleActiveEntryLogs() throws Exception {
+        File tmpDir = createTempDir("DiskCheck", "test");
+
+        final ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
+                .setZkServers(zkUtil.getZooKeeperConnectString())
+                .setZkTimeout(5000)
+                .setJournalDirName(tmpDir.getPath())
+                .setLedgerDirNames(new String[] { tmpDir.getPath() })
+                .setAutoRecoveryDaemonEnabled(false)
+                .setFlushInterval(3000)
+                .setBookiePort(PortManager.nextFreePort())
+                // entrylog per ledger is enabled
+                .setEntryLogPerLedgerEnabled(true)
+                .setLedgerStorageClass(MockInterleavedLedgerStorage.class.getName());
+
+        Assert.assertEquals("Number of JournalDirs", 1, conf.getJournalDirs().length);
+        // we know there is only one ledgerDir
+        File ledgerDir = Bookie.getCurrentDirectories(conf.getLedgerDirs())[0];
+        BookieServer server = new BookieServer(conf);
+        server.start();
+        ClientConfiguration clientConf = new ClientConfiguration();
+        clientConf.setZkServers(zkUtil.getZooKeeperConnectString());
+        final BookKeeper bkClient = new BookKeeper(clientConf);
+
+        int numOfLedgers = 12;
+        int numOfEntries = 100;
+        byte[] dataBytes = "data".getBytes();
+        AtomicBoolean receivedExceptionForAdd = new AtomicBoolean(false);
+        LongStream.range(0, numOfLedgers).parallel().mapToObj((ledgerId) -> {
+            LedgerHandle handle = null;
+            try {
+                handle = bkClient.createLedgerAdv(ledgerId, 1, 1, 1, DigestType.CRC32, "passwd".getBytes(), null);
+            } catch (BKException | InterruptedException exc) {
+                receivedExceptionForAdd.compareAndSet(false, true);
+                LOG.error("Got Exception while trying to create LedgerHandle for ledgerId: " + ledgerId, exc);
+            }
+            return handle;
+        }).forEach((writeHandle) -> {
+            IntStream.range(0, numOfEntries).forEach((entryId) -> {
+                try {
+                    writeHandle.addEntry(entryId, dataBytes);
+                } catch (BKException | InterruptedException exc) {
+                    receivedExceptionForAdd.compareAndSet(false, true);
+                    LOG.error("Got Exception while trying to AddEntry of ledgerId: " + writeHandle.getId()
+                            + " entryId: " + entryId, exc);
+                }
+            });
+            try {
+                writeHandle.close();
+            } catch (BKException | InterruptedException e) {
+                receivedExceptionForAdd.compareAndSet(false, true);
+                LOG.error("Got Exception while trying to close writeHandle of ledgerId: " + writeHandle.getId(), e);
+            }
+        });
+
+        Assert.assertFalse(
+                "There shouldn't be any exceptions while creating writeHandle and adding entries to writeHandle",
+                receivedExceptionForAdd.get());
+
+        executorController.advance(Duration.ofMillis(conf.getFlushInterval()));
+        // since we have waited for more than flushInterval SyncThread should have checkpointed.
+        // if entrylogperledger is not enabled, then we checkpoint only when currentLog in EntryLogger
+        // is rotated. but if entrylogperledger is enabled, then we checkpoint for every flushInterval period
+        File lastMarkFile = new File(ledgerDir, "lastMark");
+        Assert.assertTrue("lastMark file must be existing, because checkpoint should have happened",
+                lastMarkFile.exists());
+        LogMark rolledLogMark = readLastMarkFile(lastMarkFile);
+        Assert.assertNotEquals("rolledLogMark should not be zero, since checkpoint has happenend", 0,
+                rolledLogMark.compare(new LogMark()));
+
+        bkClient.close();
+        // here we are calling shutdown, but MockInterleavedLedgerStorage shudown/flush
+        // methods are noop, so entrylogger is not flushed as part of this shutdown
+        // here we are trying to simulate Bookie crash, but there is no way to
+        // simulate bookie abrupt crash
+        server.shutdown();
+
+        // delete journal files and lastMark, to make sure that we are not reading from
+        // Journal file
+        File[] journalDirs = conf.getJournalDirs();
+        for (File journalDir : journalDirs) {
+            File journalDirectory = Bookie.getCurrentDirectory(journalDir);
+            List<Long> journalLogsId = Journal.listJournalIds(journalDirectory, null);
+            for (long journalId : journalLogsId) {
+                File journalFile = new File(journalDirectory, Long.toHexString(journalId) + ".txn");
+                journalFile.delete();
+            }
+        }
+
+        // we know there is only one ledgerDir
+        lastMarkFile = new File(ledgerDir, "lastMark");
+        lastMarkFile.delete();
+
+        // now we are restarting BookieServer
+        conf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName());
+        server = new BookieServer(conf);
+        server.start();
+        BookKeeper newBKClient = new BookKeeper(clientConf);
+        // since Bookie checkpointed successfully before shutdown/crash,
+        // we should be able to read from entryLogs though journal is deleted
+
+        AtomicBoolean receivedExceptionForRead = new AtomicBoolean(false);
+
+        LongStream.range(0, numOfLedgers).parallel().forEach((ledgerId) -> {
+            try {
+                LedgerHandle lh = newBKClient.openLedger(ledgerId, DigestType.CRC32, "passwd".getBytes());
+                Enumeration<LedgerEntry> entries = lh.readEntries(0, numOfEntries - 1);
+                while (entries.hasMoreElements()) {
+                    LedgerEntry entry = entries.nextElement();
+                    byte[] readData = entry.getEntry();
+                    Assert.assertEquals("Ledger Entry Data should match", new String("data".getBytes()),
+                            new String(readData));
+                }
+                lh.close();
+            } catch (BKException | InterruptedException e) {
+                receivedExceptionForRead.compareAndSet(false, true);
+                LOG.error("Got Exception while trying to read entries of ledger, ledgerId: " + ledgerId, e);
+            }
+        });
+        Assert.assertFalse("There shouldn't be any exceptions while creating readHandle and while reading"
+                + "entries using readHandle", receivedExceptionForRead.get());
+
+        newBKClient.close();
+        server.shutdown();
+    }
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
index cf2a49f..c183fbf 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
@@ -110,15 +110,26 @@ public class SortedLedgerStorageCheckpointTest extends LedgerStorageTestBase {
         // initial checkpoint
 
         this.storage = new SortedLedgerStorage();
-        this.checkpointer = checkpoint -> storage.getScheduler().submit(() -> {
-            log.info("Checkpoint the storage at {}", checkpoint);
-            try {
-                storage.checkpoint(checkpoint);
-                checkpoints.add(checkpoint);
-            } catch (IOException e) {
-                log.error("Failed to checkpoint at {}", checkpoint, e);
+        this.checkpointer = new Checkpointer() {
+            @Override
+            public void startCheckpoint(Checkpoint checkpoint) {
+                storage.getScheduler().submit(() -> {
+                    log.info("Checkpoint the storage at {}", checkpoint);
+                    try {
+                        storage.checkpoint(checkpoint);
+                        checkpoints.add(checkpoint);
+                    } catch (IOException e) {
+                        log.error("Failed to checkpoint at {}", checkpoint, e);
+                    }
+                });
             }
-        });
+
+            @Override
+            public void start() {
+                // no-op
+            }
+        };
+
         // if the SortedLedgerStorage need not to change bookie's state, pass StateManager==null is ok
         this.storage.initialize(
             conf,
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionRollbackTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionRollbackTest.java
index e3cec7c..bfd7a4d 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionRollbackTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionRollbackTest.java
@@ -70,6 +70,11 @@ public class ConversionRollbackTest {
         public void startCheckpoint(Checkpoint checkpoint) {
             // No-op
         }
+
+        @Override
+        public void start() {
+            // no-op
+        }
     };
 
     @Test
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionTest.java
index dbc3c97..b2afe4c 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionTest.java
@@ -67,6 +67,11 @@ public class ConversionTest {
         public void startCheckpoint(Checkpoint checkpoint) {
             // No-op
         }
+
+        @Override
+        public void start() {
+            // no-op
+        }
     };
 
     @Test
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildTest.java
index 9919dce..629a238 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildTest.java
@@ -67,6 +67,11 @@ public class LocationsIndexRebuildTest {
         public void startCheckpoint(Checkpoint checkpoint) {
             // No-op
         }
+
+        @Override
+        public void start() {
+            // no-op
+        }
     };
 
     @Test
diff --git a/conf/bk_server.conf b/conf/bk_server.conf
index 56f41fa..49f7b23 100755
--- a/conf/bk_server.conf
+++ b/conf/bk_server.conf
@@ -75,15 +75,20 @@ journalDirectory=/tmp/bk-txn
 # Interval to watch whether bookie is dead or not, in milliseconds
 # bookieDeathWatchInterval=1000
 
-# How long the interval to flush ledger index pages to disk, in milliseconds
-# Flushing index files will introduce much random disk I/O.
+# When entryLogPerLedgerEnabled is enabled, checkpoint doesn't happens
+# when a new active entrylog is created / previous one is rolled over.
+# Instead SyncThread checkpoints periodically with 'flushInterval' delay
+# (in milliseconds) in between executions. Checkpoint flushes both ledger 
+# entryLogs and ledger index pages to disk. 
+# Flushing entrylog and index files will introduce much random disk I/O.
 # If separating journal dir and ledger dirs each on different devices,
 # flushing would not affect performance. But if putting journal dir
 # and ledger dirs on same device, performance degrade significantly
 # on too frequent flushing. You can consider increment flush interval
 # to get better performance, but you need to pay more time on bookie
 # server restart after failure.
-# flushInterval=100
+# This config is used only when entryLogPerLedgerEnabled is enabled.
+# flushInterval=10000
 
 # Allow the expansion of bookie storage capacity. Newly added ledger
 # and index dirs must be empty.
@@ -435,6 +440,12 @@ ledgerManagerFactoryClass=org.apache.bookkeeper.meta.HierarchicalLedgerManagerFa
 # The number of bytes used as capacity for the write buffer. Default is 64KB.
 # writeBufferSizeBytes=65536
 
+# Specifies if entryLog per ledger is enabled/disabled. If it is enabled, then there would be a 
+# active entrylog for each ledger. It would be ideal to enable this feature if the underlying 
+# storage device has multiple DiskPartitions or SSD and if in a given moment, entries of fewer 
+# number of active ledgers are written to a bookie.
+# entryLogPerLedgerEnabled=false
+
 #############################################################################
 ## Entry log compaction settings
 #############################################################################
diff --git a/site/_data/config/bk_server.yaml b/site/_data/config/bk_server.yaml
index 3e564b5..f843dfb 100644
--- a/site/_data/config/bk_server.yaml
+++ b/site/_data/config/bk_server.yaml
@@ -43,8 +43,8 @@ groups:
     description: Interval to watch whether bookie is dead or not, in milliseconds.
     default: 1000
   - param: flushInterval
-    description: How long the interval to flush ledger index pages to disk, in milliseconds. Flushing index files will introduce much random disk I/O. If separating journal dir and ledger dirs each on different devices, flushing would not affect performance. But if putting journal dir and ledger dirs on same device, performance degrade significantly on too frequent flushing. You can consider increment flush interval to get better performance, but you need to pay more time on bookie serve [...]
-    default: 100
+    description: When entryLogPerLedgerEnabled is enabled, checkpoint doesn't happens when a new active entrylog is created / previous one is rolled over. Instead SyncThread checkpoints periodically with 'flushInterval' delay (in milliseconds) in between executions. Checkpoint flushes both ledger entryLogs and ledger index pages to disk.  Flushing entrylog and index files will introduce much random disk I/O. If separating journal dir and ledger dirs each on different devices, flushing wo [...]
+    default: 10000
   - param: allowStorageExpansion
     description: Allow the expansion of bookie storage capacity. Newly added ledger and index directories must be empty.
     default: 'false'
@@ -300,6 +300,9 @@ groups:
   - param: writeBufferSizeBytes
     description: The number of bytes used as capacity for the write buffer.
     default: 65536
+  - param: entryLogPerLedgerEnabled
+    description: Specifies if entryLog per ledger is enabled/disabled. If it is enabled, then there would be a active entrylog for each ledger. It would be ideal to enable this feature if the underlying storage device has multiple DiskPartitions or SSD and if in a given moment, entries of fewer number of active ledgers are written to the bookie.
+    default: false
 
 - name: Entry log compaction settings
   params:

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.