You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/03/13 22:51:53 UTC

[GitHub] sijie closed pull request #1236: Issue #570: make changes to SyncThread/checkpoint logic.

sijie closed pull request #1236: Issue #570: make changes to SyncThread/checkpoint logic.
URL: https://github.com/apache/bookkeeper/pull/1236
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index 7b28cdd31..223c97738 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -63,6 +63,7 @@
 import org.apache.bookkeeper.bookie.BookieException.InvalidCookieException;
 import org.apache.bookkeeper.bookie.BookieException.MetadataStoreException;
 import org.apache.bookkeeper.bookie.BookieException.UnknownBookieIdException;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.bookie.Journal.JournalScanner;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
@@ -112,6 +113,7 @@
     final List<Journal> journals;
 
     final HandleFactory handles;
+    final boolean entryLogPerLedgerEnabled;
 
     static final long METAENTRY_ID_LEDGER_KEY = -0x1000;
     static final long METAENTRY_ID_FENCE_KEY  = -0x2000;
@@ -688,13 +690,43 @@ public Bookie(ServerConfiguration conf, StatsLogger statsLogger)
                          conf, ledgerDirsManager, statsLogger.scope(JOURNAL_SCOPE + "_" + i)));
         }
 
+        this.entryLogPerLedgerEnabled = conf.isEntryLogPerLedgerEnabled();
         CheckpointSource checkpointSource = new CheckpointSourceList(journals);
 
         // Instantiate the ledger storage implementation
         String ledgerStorageClass = conf.getLedgerStorageClass();
         LOG.info("Using ledger storage: {}", ledgerStorageClass);
         ledgerStorage = LedgerStorageFactory.createLedgerStorage(ledgerStorageClass);
-        syncThread = new SyncThread(conf, getLedgerDirsListener(), ledgerStorage, checkpointSource);
+
+        /*
+         * with this change https://github.com/apache/bookkeeper/pull/677,
+         * LedgerStorage drives the checkpoint logic. But with multiple entry
+         * logs, checkpoint logic based on a entry log is not possible, hence it
+         * needs to be timebased recurring thing and it is driven by SyncThread.
+         * SyncThread.start does that and it is started in Bookie.start method.
+         */
+        if (entryLogPerLedgerEnabled) {
+            syncThread = new SyncThread(conf, getLedgerDirsListener(), ledgerStorage, checkpointSource) {
+                @Override
+                public void startCheckpoint(Checkpoint checkpoint) {
+                    /*
+                     * in the case of entryLogPerLedgerEnabled, LedgerStorage
+                     * dont drive checkpoint logic, but instead it is done
+                     * periodically by SyncThread. So startCheckpoint which
+                     * will be called by LedgerStorage will be no-op.
+                     */
+                }
+
+                @Override
+                public void start() {
+                    executor.scheduleAtFixedRate(() -> {
+                        doCheckpoint(checkpointSource.newCheckpoint());
+                    }, conf.getFlushInterval(), conf.getFlushInterval(), TimeUnit.MILLISECONDS);
+                }
+            };
+        } else {
+            syncThread = new SyncThread(conf, getLedgerDirsListener(), ledgerStorage, checkpointSource);
+        }
 
         ledgerStorage.initialize(
             conf,
@@ -823,6 +855,14 @@ public synchronized void start() {
         }
         LOG.info("Finished reading journal, starting bookie");
 
+
+        /*
+         * start sync thread first, so during replaying journals, we could do
+         * checkpoint which reduce the chance that we need to replay journals
+         * again if bookie restarted again before finished journal replays.
+         */
+        syncThread.start();
+
         // start bookie thread
         super.start();
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index 027a04dda..d35684ecf 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -2494,6 +2494,11 @@ public void checkpointComplete(Checkpoint checkpoint, boolean compact)
                 public void startCheckpoint(Checkpoint checkpoint) {
                     // No-op
                 }
+
+                @Override
+                public void start() {
+                    // no-op
+                }
             };
 
             interleavedStorage.initialize(conf, null, ledgerDirsManager, ledgerIndexManager,
@@ -2584,6 +2589,11 @@ public void checkpointComplete(Checkpoint checkpoint, boolean compact)
                 public void startCheckpoint(Checkpoint checkpoint) {
                     // No-op
                 }
+
+                @Override
+                public void start() {
+                    // no-op
+                }
             };
 
             dbStorage.initialize(conf, null, ledgerDirsManager, ledgerIndexManager, null,
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Checkpointer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Checkpointer.java
index 967d3e925..1e3f489e8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Checkpointer.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Checkpointer.java
@@ -26,8 +26,16 @@
  */
 public interface Checkpointer {
 
-    Checkpointer NULL = checkpoint -> {
-        // do nothing;
+    Checkpointer NULL = new Checkpointer(){
+        @Override
+        public void startCheckpoint(Checkpoint checkpoint) {
+            // no-op
+        }
+
+        @Override
+        public void start() {
+            // no-op
+        }
     };
 
     /**
@@ -37,4 +45,5 @@
      */
     void startCheckpoint(Checkpoint checkpoint);
 
+    void start();
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
index 1792417ad..57540096b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
@@ -83,7 +83,7 @@
 public class EntryLogger {
     private static final Logger LOG = LoggerFactory.getLogger(EntryLogger.class);
 
-    private static class BufferedLogChannel extends BufferedChannel {
+    static class BufferedLogChannel extends BufferedChannel {
         private final long logId;
         private final EntryLogMetadata entryLogMetadata;
         private final File logFile;
@@ -118,6 +118,7 @@ public ConcurrentLongLongHashMap getLedgersMap() {
 
     volatile File currentDir;
     private final LedgerDirsManager ledgerDirsManager;
+    private final boolean entryLogPerLedgerEnabled;
     private final AtomicBoolean shouldCreateNewEntryLog = new AtomicBoolean(false);
 
     private volatile long leastUnflushedLogId;
@@ -131,8 +132,8 @@ public ConcurrentLongLongHashMap getLedgersMap() {
      * The maximum size of a entry logger file.
      */
     final long logSizeLimit;
-    private List<BufferedLogChannel> logChannelsToFlush;
-    private volatile BufferedLogChannel logChannel;
+    List<BufferedLogChannel> logChannelsToFlush;
+    volatile BufferedLogChannel logChannel;
     private volatile BufferedLogChannel compactionLogChannel;
 
     private final EntryLoggerAllocator entryLoggerAllocator;
@@ -285,6 +286,7 @@ public EntryLogger(ServerConfiguration conf,
         this.leastUnflushedLogId = logId + 1;
         this.entryLoggerAllocator = new EntryLoggerAllocator(logId);
         this.conf = conf;
+        this.entryLogPerLedgerEnabled = conf.isEntryLogPerLedgerEnabled();
 
         initialize();
     }
@@ -797,6 +799,20 @@ private long readLastLogId(File f) {
      */
     void checkpoint() throws IOException {
         flushRotatedLogs();
+        /*
+         * In the case of entryLogPerLedgerEnabled we need to flush both
+         * rotatedlogs and currentlogs. This is needed because syncThread
+         * periodically does checkpoint and at this time all the logs should
+         * be flushed.
+         *
+         * TODO: When EntryLogManager is introduced in the subsequent sub-tasks of
+         * this Issue, I will move this logic to individual implamentations of
+         * EntryLogManager and it would be free of this booalen flag based logic.
+         *
+         */
+        if (entryLogPerLedgerEnabled) {
+            flushCurrentLog();
+        }
     }
 
     void flushRotatedLogs() throws IOException {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index 81de730e4..ce1a4998e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -99,7 +99,6 @@ public void initialize(ServerConfiguration conf,
             throws IOException {
         checkNotNull(checkpointSource, "invalid null checkpoint source");
         checkNotNull(checkpointer, "invalid null checkpointer");
-
         this.checkpointSource = checkpointSource;
         this.checkpointer = checkpointer;
         entryLogger = new EntryLogger(conf, ledgerDirsManager, this);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index e21f4d133..5a816a016 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -82,7 +82,7 @@
      * @param filter journal id filter
      * @return list of filtered ids
      */
-    private static List<Long> listJournalIds(File journalDir, JournalIdFilter filter) {
+    static List<Long> listJournalIds(File journalDir, JournalIdFilter filter) {
         File logFiles[] = journalDir.listFiles();
         if (logFiles == null || logFiles.length == 0) {
             return Collections.emptyList();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
index 3fbedbc60..eeba1c430 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
@@ -22,9 +22,7 @@
 package org.apache.bookkeeper.bookie;
 
 import com.google.common.annotations.VisibleForTesting;
-
 import io.netty.util.concurrent.DefaultThreadFactory;
-
 import java.io.IOException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -32,14 +30,12 @@
 import java.util.concurrent.TimeUnit;
 
 import lombok.extern.slf4j.Slf4j;
-
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.util.MathUtils;
 
-
 /**
  * SyncThread is a background thread which help checkpointing ledger storage
  * when a checkpoint is requested. After a ledger storage is checkpointed,
@@ -61,7 +57,6 @@
 class SyncThread implements Checkpointer {
 
     final ScheduledExecutorService executor;
-    final int flushInterval;
     final LedgerStorage ledgerStorage;
     final LedgerDirsListener dirsListener;
     final CheckpointSource checkpointSource;
@@ -78,14 +73,14 @@ public SyncThread(ServerConfiguration conf,
         this.ledgerStorage = ledgerStorage;
         this.checkpointSource = checkpointSource;
         this.executor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("SyncThread"));
-        flushInterval = conf.getFlushInterval();
-        if (log.isDebugEnabled()) {
-            log.debug("Flush Interval : {}", flushInterval);
-        }
     }
 
     @Override
     public void startCheckpoint(Checkpoint checkpoint) {
+        doCheckpoint(checkpoint);
+    }
+
+    protected void doCheckpoint(Checkpoint checkpoint) {
         executor.submit(() -> {
             try {
                 synchronized (suspensionLock) {
@@ -108,14 +103,13 @@ public void startCheckpoint(Checkpoint checkpoint) {
         });
     }
 
-    public Future<Void> requestFlush() {
+    public Future requestFlush() {
         return executor.submit(() -> {
             try {
                 flush();
             } catch (Throwable t) {
                 log.error("Exception flushing ledgers ", t);
             }
-            return null;
         });
     }
 
@@ -171,6 +165,11 @@ public void checkpoint(Checkpoint checkpoint) {
         }
     }
 
+    @Override
+    public void start() {
+        // no-op
+    }
+
     /**
      * Suspend sync thread. (for testing)
      */
@@ -201,6 +200,7 @@ public void disableCheckpoint() {
     void shutdown() throws InterruptedException {
         log.info("Shutting down SyncThread");
         requestFlush();
+
         executor.shutdown();
         long start = MathUtils.now();
         while (!executor.awaitTermination(5, TimeUnit.MINUTES)) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index da6368556..328f47315 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -179,6 +179,11 @@
     // Stats
     protected static final String ENABLE_TASK_EXECUTION_STATS = "enableTaskExecutionStats";
 
+    /*
+     * config specifying if the entrylog per ledger is enabled or not.
+     */
+    protected static final String ENTRY_LOG_PER_LEDGER_ENABLED = "entryLogPerLedgerEnabled";
+
     /**
      * Construct a default configuration object.
      */
@@ -2633,4 +2638,21 @@ public void setRegistrationManagerClass(
     protected ServerConfiguration getThis() {
         return this;
     }
+
+    /*
+     * specifies if entryLog per ledger is enabled. If it is enabled, then there
+     * would be a active entrylog for each ledger
+     */
+    public boolean isEntryLogPerLedgerEnabled() {
+        return this.getBoolean(ENTRY_LOG_PER_LEDGER_ENABLED, false);
+    }
+
+    /*
+     * enables/disables entrylog per ledger feature.
+     *
+     */
+    public ServerConfiguration setEntryLogPerLedgerEnabled(boolean entryLogPerLedgerEnabled) {
+        this.setProperty(ENTRY_LOG_PER_LEDGER_ENABLED, Boolean.toString(entryLogPerLedgerEnabled));
+        return this;
+    }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java
new file mode 100644
index 000000000..1a795ad0d
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java
@@ -0,0 +1,699 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Enumeration;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+
+import org.apache.bookkeeper.bookie.Journal.LastLogMark;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.common.testing.executors.MockExecutorController;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.bookkeeper.test.ZooKeeperUtil;
+import org.apache.bookkeeper.util.IOUtils;
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * LedgerStorageCheckpointTest.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(SyncThread.class)
+@PowerMockIgnore("javax.*")
+public class LedgerStorageCheckpointTest {
+    private static final Logger LOG = LoggerFactory
+            .getLogger(LedgerStorageCheckpointTest.class);
+
+    @Rule
+    public final TestName runtime = new TestName();
+
+    // ZooKeeper related variables
+    protected final ZooKeeperUtil zkUtil = new ZooKeeperUtil();
+
+    // BookKeeper related variables
+    protected final List<File> tmpDirs = new LinkedList<File>();
+
+    // ScheduledExecutorService used by SyncThread
+    MockExecutorController executorController;
+
+    @Before
+    public void setUp() throws Exception {
+        LOG.info("Setting up test {}", getClass());
+        PowerMockito.mockStatic(Executors.class);
+
+        try {
+            // start zookeeper service
+            startZKCluster();
+        } catch (Exception e) {
+            LOG.error("Error setting up", e);
+            throw e;
+        }
+
+        ScheduledExecutorService scheduledExecutorService = PowerMockito.mock(ScheduledExecutorService.class);
+        executorController = new MockExecutorController()
+                .controlSubmit(scheduledExecutorService)
+                .controlScheduleAtFixedRate(scheduledExecutorService, 10);
+        PowerMockito.when(scheduledExecutorService.awaitTermination(anyLong(), any(TimeUnit.class))).thenReturn(true);
+        PowerMockito.when(Executors.newSingleThreadScheduledExecutor(any())).thenReturn(scheduledExecutorService);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        LOG.info("TearDown");
+        Exception tearDownException = null;
+        // stop zookeeper service
+        try {
+            stopZKCluster();
+        } catch (Exception e) {
+            LOG.error("Got Exception while trying to stop ZKCluster", e);
+            tearDownException = e;
+        }
+        // cleanup temp dirs
+        try {
+            cleanupTempDirs();
+        } catch (Exception e) {
+            LOG.error("Got Exception while trying to cleanupTempDirs", e);
+            tearDownException = e;
+        }
+        if (tearDownException != null) {
+            throw tearDownException;
+        }
+    }
+
+    /**
+     * Start zookeeper cluster.
+     *
+     * @throws Exception
+     */
+    protected void startZKCluster() throws Exception {
+        zkUtil.startServer();
+    }
+
+    /**
+     * Stop zookeeper cluster.
+     *
+     * @throws Exception
+     */
+    protected void stopZKCluster() throws Exception {
+        zkUtil.killServer();
+    }
+
+    protected void cleanupTempDirs() throws Exception {
+        for (File f : tmpDirs) {
+            FileUtils.deleteDirectory(f);
+        }
+    }
+
+    protected File createTempDir(String prefix, String suffix) throws IOException {
+        File dir = IOUtils.createTempDir(prefix, suffix);
+        tmpDirs.add(dir);
+        return dir;
+    }
+
+    private LogMark readLastMarkFile(File lastMarkFile) throws IOException {
+        byte buff[] = new byte[16];
+        ByteBuffer bb = ByteBuffer.wrap(buff);
+        LogMark rolledLogMark = new LogMark();
+        FileInputStream fis = new FileInputStream(lastMarkFile);
+        int bytesRead = fis.read(buff);
+        fis.close();
+        if (bytesRead != 16) {
+            throw new IOException("Couldn't read enough bytes from lastMark." + " Wanted " + 16 + ", got " + bytesRead);
+        }
+        bb.clear();
+        rolledLogMark.readLogMark(bb);
+        return rolledLogMark;
+    }
+
+    /*
+     * In this testcase, InterleavedLedgerStorage is used and validate if the
+     * checkpoint is called for every flushinterval period.
+     */
+    @Test
+    public void testPeriodicCheckpointForInterleavedLedgerStorage() throws Exception {
+        testPeriodicCheckpointForLedgerStorage(InterleavedLedgerStorage.class.getName());
+    }
+
+    /*
+     * In this testcase, SortedLedgerStorage is used and validate if the
+     * checkpoint is called for every flushinterval period.
+     */
+    @Test
+    public void testPeriodicCheckpointForSortedLedgerStorage() throws Exception {
+        testPeriodicCheckpointForLedgerStorage(SortedLedgerStorage.class.getName());
+    }
+
+    public void testPeriodicCheckpointForLedgerStorage(String ledgerStorageClassName) throws Exception {
+        File tmpDir = createTempDir("DiskCheck", "test");
+
+        final ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
+                .setZkServers(zkUtil.getZooKeeperConnectString())
+                .setZkTimeout(5000)
+                .setJournalDirName(tmpDir.getPath())
+                .setLedgerDirNames(new String[] { tmpDir.getPath() })
+                .setAutoRecoveryDaemonEnabled(false)
+                .setFlushInterval(2000)
+                .setBookiePort(PortManager.nextFreePort())
+                // entrylog per ledger is enabled
+                .setEntryLogPerLedgerEnabled(true)
+                .setLedgerStorageClass(ledgerStorageClassName);
+        Assert.assertEquals("Number of JournalDirs", 1, conf.getJournalDirs().length);
+        // we know there is only one ledgerDir
+        File ledgerDir = Bookie.getCurrentDirectories(conf.getLedgerDirs())[0];
+        BookieServer server = new BookieServer(conf);
+        server.start();
+        ClientConfiguration clientConf = new ClientConfiguration();
+        clientConf.setZkServers(zkUtil.getZooKeeperConnectString());
+        BookKeeper bkClient = new BookKeeper(clientConf);
+
+        int numOfLedgers = 2;
+        int numOfEntries = 5;
+        byte[] dataBytes = "data".getBytes();
+
+        for (int i = 0; i < numOfLedgers; i++) {
+            int ledgerIndex = i;
+            LedgerHandle handle = bkClient.createLedgerAdv((long) i, 1, 1, 1, DigestType.CRC32, "passwd".getBytes(),
+                    null);
+            for (int j = 0; j < numOfEntries; j++) {
+                handle.addEntry(j, dataBytes);
+            }
+            handle.close();
+        }
+
+        LastLogMark lastLogMarkAfterFirstSetOfAdds = server.getBookie().journals.get(0).getLastLogMark();
+        LogMark curMarkAfterFirstSetOfAdds = lastLogMarkAfterFirstSetOfAdds.getCurMark();
+
+        File lastMarkFile = new File(ledgerDir, "lastMark");
+        // lastMark file should be zero, because checkpoint hasn't happenend
+        LogMark logMarkFileBeforeCheckpoint = readLastMarkFile(lastMarkFile);
+        Assert.assertEquals("lastMarkFile before checkpoint should be zero", 0,
+                logMarkFileBeforeCheckpoint.compare(new LogMark()));
+
+        // wait for flushInterval for SyncThread to do next iteration of checkpoint
+        executorController.advance(Duration.ofMillis(conf.getFlushInterval()));
+        /*
+         * since we have waited for more than flushInterval SyncThread should
+         * have checkpointed. if entrylogperledger is not enabled, then we
+         * checkpoint only when currentLog in EntryLogger is rotated. but if
+         * entrylogperledger is enabled, then we checkpoint for every
+         * flushInterval period
+         */
+        Assert.assertTrue("lastMark file must be existing, because checkpoint should have happened",
+                lastMarkFile.exists());
+
+        LastLogMark lastLogMarkAfterCheckpoint = server.getBookie().journals.get(0).getLastLogMark();
+        LogMark curMarkAfterCheckpoint = lastLogMarkAfterCheckpoint.getCurMark();
+
+        LogMark rolledLogMark = readLastMarkFile(lastMarkFile);
+        Assert.assertNotEquals("rolledLogMark should not be zero, since checkpoint has happenend", 0,
+                rolledLogMark.compare(new LogMark()));
+        /*
+         * Curmark should be equal before and after checkpoint, because we didnt
+         * add new entries during this period
+         */
+        Assert.assertTrue("Curmark should be equal before and after checkpoint",
+                curMarkAfterCheckpoint.compare(curMarkAfterFirstSetOfAdds) == 0);
+        /*
+         * Curmark after checkpoint should be equal to rolled logmark, because
+         * we checkpointed
+         */
+        Assert.assertTrue("Curmark after first set of adds should be equal to rolled logmark",
+                curMarkAfterCheckpoint.compare(rolledLogMark) == 0);
+
+        // add more ledger/entries
+        for (int i = numOfLedgers; i < 2 * numOfLedgers; i++) {
+            int ledgerIndex = i;
+            LedgerHandle handle = bkClient.createLedgerAdv((long) i, 1, 1, 1, DigestType.CRC32, "passwd".getBytes(),
+                    null);
+            for (int j = 0; j < numOfEntries; j++) {
+                handle.addEntry(j, dataBytes);
+            }
+            handle.close();
+        }
+
+        // wait for flushInterval for SyncThread to do next iteration of checkpoint
+        executorController.advance(Duration.ofMillis(conf.getFlushInterval()));
+
+        LastLogMark lastLogMarkAfterSecondSetOfAdds = server.getBookie().journals.get(0).getLastLogMark();
+        LogMark curMarkAfterSecondSetOfAdds = lastLogMarkAfterSecondSetOfAdds.getCurMark();
+
+        rolledLogMark = readLastMarkFile(lastMarkFile);
+        /*
+         * Curmark after checkpoint should be equal to rolled logmark, because
+         * we checkpointed
+         */
+        Assert.assertTrue("Curmark after second set of adds should be equal to rolled logmark",
+                curMarkAfterSecondSetOfAdds.compare(rolledLogMark) == 0);
+
+        server.shutdown();
+        bkClient.close();
+    }
+
+    /*
+     * In this testcase, InterleavedLedgerStorage is used, entrylogperledger is
+     * enabled and validate that when entrylog is rotated it doesn't do
+     * checkpoint.
+     */
+    @Test
+    public void testCheckpointOfILSEntryLogIsRotatedWithELPLEnabled() throws Exception {
+        testCheckpointofILSWhenEntryLogIsRotated(true);
+    }
+
+    /*
+     * In this testcase, InterleavedLedgerStorage is used, entrylogperledger is
+     * not enabled and validate that when entrylog is rotated it does
+     * checkpoint.
+     */
+    @Test
+    public void testCheckpointOfILSEntryLogIsRotatedWithELPLDisabled() throws Exception {
+        testCheckpointofILSWhenEntryLogIsRotated(false);
+    }
+
+    public void testCheckpointofILSWhenEntryLogIsRotated(boolean entryLogPerLedgerEnabled) throws Exception {
+        File tmpDir = createTempDir("DiskCheck", "test");
+
+        final ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
+                .setZkServers(zkUtil.getZooKeeperConnectString())
+                .setZkTimeout(5000)
+                .setJournalDirName(tmpDir.getPath())
+                .setLedgerDirNames(new String[] { tmpDir.getPath() })
+                .setAutoRecoveryDaemonEnabled(false)
+                //set very high period for flushInterval
+                .setFlushInterval(30000)
+                .setBookiePort(PortManager.nextFreePort())
+                // entrylog per ledger is enabled
+                .setEntryLogPerLedgerEnabled(entryLogPerLedgerEnabled)
+                .setLedgerStorageClass(InterleavedLedgerStorage.class.getName());
+
+        Assert.assertEquals("Number of JournalDirs", 1, conf.getJournalDirs().length);
+        // we know there is only one ledgerDir
+        File ledgerDir = Bookie.getCurrentDirectories(conf.getLedgerDirs())[0];
+        BookieServer server = new BookieServer(conf);
+        server.start();
+        ClientConfiguration clientConf = new ClientConfiguration();
+        clientConf.setZkServers(zkUtil.getZooKeeperConnectString());
+        BookKeeper bkClient = new BookKeeper(clientConf);
+        InterleavedLedgerStorage ledgerStorage = (InterleavedLedgerStorage) server.getBookie().ledgerStorage;
+
+        int numOfEntries = 5;
+        byte[] dataBytes = "data".getBytes();
+
+        long ledgerId = 10;
+        LedgerHandle handle = bkClient.createLedgerAdv(ledgerId, 1, 1, 1, DigestType.CRC32, "passwd".getBytes(), null);
+        for (int j = 0; j < numOfEntries; j++) {
+            handle.addEntry(j, dataBytes);
+        }
+        handle.close();
+        // simulate rolling entrylog
+        ledgerStorage.entryLogger.rollLog();
+        // sleep for a bit for checkpoint to do its task
+        executorController.advance(Duration.ofMillis(500));
+
+        File lastMarkFile = new File(ledgerDir, "lastMark");
+        LogMark rolledLogMark = readLastMarkFile(lastMarkFile);
+        if (entryLogPerLedgerEnabled) {
+            Assert.assertEquals(
+                    "rolledLogMark should be zero, since checkpoint"
+                            + "shouldn't have happened when entryLog is rotated",
+                    0, rolledLogMark.compare(new LogMark()));
+        } else {
+            Assert.assertNotEquals("rolledLogMark shouldn't be zero, since checkpoint"
+                    + "should have happened when entryLog is rotated", 0, rolledLogMark.compare(new LogMark()));
+        }
+        bkClient.close();
+        server.shutdown();
+    }
+
+    /*
+     * In this testcase, SortedLedgerStorage is used, entrylogperledger is
+     * enabled and validate that when entrylog is rotated it doesn't do
+     * checkpoint.
+     */
+    @Test
+    public void testCheckpointOfSLSEntryLogIsRotatedWithELPLEnabled() throws Exception {
+        testCheckpointOfSLSWhenEntryLogIsRotated(true);
+    }
+
+    /*
+     * In this testcase, SortedLedgerStorage is used, entrylogperledger is
+     * not enabled and validate that when entrylog is rotated it does
+     * checkpoint.
+     */
+    @Test
+    public void testCheckpointOfSLSEntryLogIsRotatedWithELPLDisabled() throws Exception {
+        testCheckpointOfSLSWhenEntryLogIsRotated(false);
+    }
+
+    public void testCheckpointOfSLSWhenEntryLogIsRotated(boolean entryLogPerLedgerEnabled) throws Exception {
+        File tmpDir = createTempDir("DiskCheck", "test");
+
+        final ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
+                .setZkServers(zkUtil.getZooKeeperConnectString())
+                .setZkTimeout(5000)
+                .setJournalDirName(tmpDir.getPath())
+                .setLedgerDirNames(new String[] { tmpDir.getPath() })
+                .setAutoRecoveryDaemonEnabled(false)
+                //set very high period for flushInterval
+                .setFlushInterval(30000)
+                .setBookiePort(PortManager.nextFreePort())
+                // entrylog per ledger is enabled
+                .setEntryLogPerLedgerEnabled(entryLogPerLedgerEnabled)
+                .setLedgerStorageClass(SortedLedgerStorage.class.getName())
+                // set very low skipListSizeLimit and entryLogSizeLimit to simulate log file rotation
+                .setSkipListSizeLimit(1 * 1000 * 1000)
+                .setEntryLogSizeLimit(2 * 1000 * 1000);
+
+        Assert.assertEquals("Number of JournalDirs", 1, conf.getJournalDirs().length);
+        // we know there is only one ledgerDir
+        File ledgerDir = Bookie.getCurrentDirectories(conf.getLedgerDirs())[0];
+        BookieServer server = new BookieServer(conf);
+        server.start();
+        ClientConfiguration clientConf = new ClientConfiguration();
+        clientConf.setZkServers(zkUtil.getZooKeeperConnectString());
+        BookKeeper bkClient = new BookKeeper(clientConf);
+        InterleavedLedgerStorage ledgerStorage = (InterleavedLedgerStorage) server.getBookie().ledgerStorage;
+
+        Random rand = new Random();
+        byte[] dataBytes = new byte[10 * 1000];
+        rand.nextBytes(dataBytes);
+        int numOfEntries = ((int) conf.getEntryLogSizeLimit() + (100 * 1000)) / dataBytes.length;
+
+        LedgerHandle handle = bkClient.createLedgerAdv(10, 1, 1, 1, DigestType.CRC32, "passwd".getBytes(), null);
+        for (int j = 0; j < numOfEntries; j++) {
+            handle.addEntry(j, dataBytes);
+        }
+        handle.close();
+
+        // sleep for a bit for checkpoint to do its task
+        executorController.advance(Duration.ofMillis(500));
+
+        File lastMarkFile = new File(ledgerDir, "lastMark");
+        LogMark rolledLogMark = readLastMarkFile(lastMarkFile);
+        if (entryLogPerLedgerEnabled) {
+            Assert.assertEquals(
+                    "rolledLogMark should be zero, since checkpoint"
+                            + "shouldn't have happened when entryLog is rotated",
+                    0, rolledLogMark.compare(new LogMark()));
+        } else {
+            Assert.assertNotEquals("rolledLogMark shouldn't be zero, since checkpoint"
+                    + "should have happened when entryLog is rotated", 0, rolledLogMark.compare(new LogMark()));
+        }
+        bkClient.close();
+        server.shutdown();
+    }
+
+    /*
+     * in this method it checks if entryLogPerLedger is enabled, then
+     * InterLeavedLedgerStorage.checkpoint flushes current activelog and flushes
+     * all rotatedlogs and closes them.
+     *
+     */
+    @Test
+    public void testIfEntryLogPerLedgerEnabledCheckpointFlushesAllLogs() throws Exception {
+        File tmpDir = createTempDir("DiskCheck", "test");
+
+        final ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
+                .setZkServers(zkUtil.getZooKeeperConnectString())
+                .setZkTimeout(5000)
+                .setJournalDirName(tmpDir.getPath())
+                .setLedgerDirNames(new String[] { tmpDir.getPath() })
+                .setAutoRecoveryDaemonEnabled(false)
+                //set flushInterval
+                .setFlushInterval(3000)
+                .setBookiePort(PortManager.nextFreePort())
+                // entrylog per ledger is enabled
+                .setEntryLogPerLedgerEnabled(true)
+                .setLedgerStorageClass(InterleavedLedgerStorage.class.getName())
+                // set setFlushIntervalInBytes to some very high number
+                .setFlushIntervalInBytes(10000000);
+
+        Assert.assertEquals("Number of JournalDirs", 1, conf.getJournalDirs().length);
+        // we know there is only one ledgerDir
+        File ledgerDir = Bookie.getCurrentDirectories(conf.getLedgerDirs())[0];
+        BookieServer server = new BookieServer(conf);
+        server.start();
+        ClientConfiguration clientConf = new ClientConfiguration();
+        clientConf.setZkServers(zkUtil.getZooKeeperConnectString());
+        BookKeeper bkClient = new BookKeeper(clientConf);
+        InterleavedLedgerStorage ledgerStorage = (InterleavedLedgerStorage) server.getBookie().ledgerStorage;
+        EntryLogger entryLogger = ledgerStorage.entryLogger;
+
+        int numOfEntries = 5;
+        byte[] dataBytes = "data".getBytes();
+
+        long ledgerId = 10;
+        LedgerHandle handle = bkClient.createLedgerAdv(ledgerId, 1, 1, 1, DigestType.CRC32, "passwd".getBytes(), null);
+        for (int j = 0; j < numOfEntries; j++) {
+            handle.addEntry(j, dataBytes);
+        }
+        handle.close();
+        // simulate rolling entrylog
+        ledgerStorage.entryLogger.rollLog();
+
+        ledgerId = 20;
+        handle = bkClient.createLedgerAdv(ledgerId, 1, 1, 1, DigestType.CRC32, "passwd".getBytes(), null);
+        for (int j = 0; j < numOfEntries; j++) {
+            handle.addEntry(j, dataBytes);
+        }
+        handle.close();
+        // simulate rolling entrylog
+        ledgerStorage.entryLogger.rollLog();
+
+        ledgerId = 30;
+        handle = bkClient.createLedgerAdv(ledgerId, 1, 1, 1, DigestType.CRC32, "passwd".getBytes(), null);
+        for (int j = 0; j < numOfEntries; j++) {
+            handle.addEntry(j, dataBytes);
+        }
+        handle.close();
+
+        Assert.assertNotEquals("bytesWrittenSinceLastFlush shouldn't be zero", 0,
+                entryLogger.logChannel.getUnpersistedBytes());
+        Assert.assertNotEquals("There should be logChannelsToFlush", 0, entryLogger.logChannelsToFlush.size());
+
+        /*
+         * wait for atleast flushInterval period, so that checkpoint can happen.
+         */
+        executorController.advance(Duration.ofMillis(conf.getFlushInterval()));
+
+        /*
+         * since checkpoint happenend, there shouldn't be any logChannelsToFlush
+         * and bytesWrittenSinceLastFlush should be zero.
+         */
+        Assert.assertTrue("There shouldn't be logChannelsToFlush",
+                ((entryLogger.logChannelsToFlush == null) || (entryLogger.logChannelsToFlush.size() == 0)));
+
+        Assert.assertEquals("bytesWrittenSinceLastFlush should be zero", 0,
+                entryLogger.logChannel.getUnpersistedBytes());
+    }
+
+    static class MockInterleavedLedgerStorage extends InterleavedLedgerStorage {
+        @Override
+        public void shutdown() {
+            // During BookieServer shutdown this method will be called
+            // and we want it to be noop.
+            // do nothing
+        }
+
+        @Override
+        public synchronized void flush() throws IOException {
+            // this method will be called by SyncThread.shutdown.
+            // During BookieServer shutdown we want this method to be noop
+            // do nothing
+        }
+    }
+
+    /*
+     * This is complete end-to-end scenario.
+     *
+     * 1) This testcase uses MockInterleavedLedgerStorage, which extends
+     * InterleavedLedgerStorage but doesn't do anything when Bookie is shutdown.
+     * This is needed to simulate Bookie crash.
+     * 2) entryLogPerLedger is enabled
+     * 3) ledgers are created and entries are added.
+     * 4) wait for flushInterval period for checkpoint to complete
+     * 5) simulate bookie crash
+     * 6) delete the journal files and lastmark file
+     * 7) Now restart the Bookie
+     * 8) validate that the entries which were written can be read successfully.
+     */
+    @Test
+    public void testCheckPointForEntryLoggerWithMultipleActiveEntryLogs() throws Exception {
+        File tmpDir = createTempDir("DiskCheck", "test");
+
+        final ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
+                .setZkServers(zkUtil.getZooKeeperConnectString())
+                .setZkTimeout(5000)
+                .setJournalDirName(tmpDir.getPath())
+                .setLedgerDirNames(new String[] { tmpDir.getPath() })
+                .setAutoRecoveryDaemonEnabled(false)
+                .setFlushInterval(3000)
+                .setBookiePort(PortManager.nextFreePort())
+                // entrylog per ledger is enabled
+                .setEntryLogPerLedgerEnabled(true)
+                .setLedgerStorageClass(MockInterleavedLedgerStorage.class.getName());
+
+        Assert.assertEquals("Number of JournalDirs", 1, conf.getJournalDirs().length);
+        // we know there is only one ledgerDir
+        File ledgerDir = Bookie.getCurrentDirectories(conf.getLedgerDirs())[0];
+        BookieServer server = new BookieServer(conf);
+        server.start();
+        ClientConfiguration clientConf = new ClientConfiguration();
+        clientConf.setZkServers(zkUtil.getZooKeeperConnectString());
+        final BookKeeper bkClient = new BookKeeper(clientConf);
+
+        int numOfLedgers = 12;
+        int numOfEntries = 100;
+        byte[] dataBytes = "data".getBytes();
+        AtomicBoolean receivedExceptionForAdd = new AtomicBoolean(false);
+        LongStream.range(0, numOfLedgers).parallel().mapToObj((ledgerId) -> {
+            LedgerHandle handle = null;
+            try {
+                handle = bkClient.createLedgerAdv(ledgerId, 1, 1, 1, DigestType.CRC32, "passwd".getBytes(), null);
+            } catch (BKException | InterruptedException exc) {
+                receivedExceptionForAdd.compareAndSet(false, true);
+                LOG.error("Got Exception while trying to create LedgerHandle for ledgerId: " + ledgerId, exc);
+            }
+            return handle;
+        }).forEach((writeHandle) -> {
+            IntStream.range(0, numOfEntries).forEach((entryId) -> {
+                try {
+                    writeHandle.addEntry(entryId, dataBytes);
+                } catch (BKException | InterruptedException exc) {
+                    receivedExceptionForAdd.compareAndSet(false, true);
+                    LOG.error("Got Exception while trying to AddEntry of ledgerId: " + writeHandle.getId()
+                            + " entryId: " + entryId, exc);
+                }
+            });
+            try {
+                writeHandle.close();
+            } catch (BKException | InterruptedException e) {
+                receivedExceptionForAdd.compareAndSet(false, true);
+                LOG.error("Got Exception while trying to close writeHandle of ledgerId: " + writeHandle.getId(), e);
+            }
+        });
+
+        Assert.assertFalse(
+                "There shouldn't be any exceptions while creating writeHandle and adding entries to writeHandle",
+                receivedExceptionForAdd.get());
+
+        executorController.advance(Duration.ofMillis(conf.getFlushInterval()));
+        // since we have waited for more than flushInterval SyncThread should have checkpointed.
+        // if entrylogperledger is not enabled, then we checkpoint only when currentLog in EntryLogger
+        // is rotated. but if entrylogperledger is enabled, then we checkpoint for every flushInterval period
+        File lastMarkFile = new File(ledgerDir, "lastMark");
+        Assert.assertTrue("lastMark file must be existing, because checkpoint should have happened",
+                lastMarkFile.exists());
+        LogMark rolledLogMark = readLastMarkFile(lastMarkFile);
+        Assert.assertNotEquals("rolledLogMark should not be zero, since checkpoint has happenend", 0,
+                rolledLogMark.compare(new LogMark()));
+
+        bkClient.close();
+        // here we are calling shutdown, but MockInterleavedLedgerStorage shudown/flush
+        // methods are noop, so entrylogger is not flushed as part of this shutdown
+        // here we are trying to simulate Bookie crash, but there is no way to
+        // simulate bookie abrupt crash
+        server.shutdown();
+
+        // delete journal files and lastMark, to make sure that we are not reading from
+        // Journal file
+        File[] journalDirs = conf.getJournalDirs();
+        for (File journalDir : journalDirs) {
+            File journalDirectory = Bookie.getCurrentDirectory(journalDir);
+            List<Long> journalLogsId = Journal.listJournalIds(journalDirectory, null);
+            for (long journalId : journalLogsId) {
+                File journalFile = new File(journalDirectory, Long.toHexString(journalId) + ".txn");
+                journalFile.delete();
+            }
+        }
+
+        // we know there is only one ledgerDir
+        lastMarkFile = new File(ledgerDir, "lastMark");
+        lastMarkFile.delete();
+
+        // now we are restarting BookieServer
+        conf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName());
+        server = new BookieServer(conf);
+        server.start();
+        BookKeeper newBKClient = new BookKeeper(clientConf);
+        // since Bookie checkpointed successfully before shutdown/crash,
+        // we should be able to read from entryLogs though journal is deleted
+
+        AtomicBoolean receivedExceptionForRead = new AtomicBoolean(false);
+
+        LongStream.range(0, numOfLedgers).parallel().forEach((ledgerId) -> {
+            try {
+                LedgerHandle lh = newBKClient.openLedger(ledgerId, DigestType.CRC32, "passwd".getBytes());
+                Enumeration<LedgerEntry> entries = lh.readEntries(0, numOfEntries - 1);
+                while (entries.hasMoreElements()) {
+                    LedgerEntry entry = entries.nextElement();
+                    byte[] readData = entry.getEntry();
+                    Assert.assertEquals("Ledger Entry Data should match", new String("data".getBytes()),
+                            new String(readData));
+                }
+                lh.close();
+            } catch (BKException | InterruptedException e) {
+                receivedExceptionForRead.compareAndSet(false, true);
+                LOG.error("Got Exception while trying to read entries of ledger, ledgerId: " + ledgerId, e);
+            }
+        });
+        Assert.assertFalse("There shouldn't be any exceptions while creating readHandle and while reading"
+                + "entries using readHandle", receivedExceptionForRead.get());
+
+        newBKClient.close();
+        server.shutdown();
+    }
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
index cf2a49f8c..c183fbf40 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
@@ -110,15 +110,26 @@ public void setUp() throws Exception {
         // initial checkpoint
 
         this.storage = new SortedLedgerStorage();
-        this.checkpointer = checkpoint -> storage.getScheduler().submit(() -> {
-            log.info("Checkpoint the storage at {}", checkpoint);
-            try {
-                storage.checkpoint(checkpoint);
-                checkpoints.add(checkpoint);
-            } catch (IOException e) {
-                log.error("Failed to checkpoint at {}", checkpoint, e);
+        this.checkpointer = new Checkpointer() {
+            @Override
+            public void startCheckpoint(Checkpoint checkpoint) {
+                storage.getScheduler().submit(() -> {
+                    log.info("Checkpoint the storage at {}", checkpoint);
+                    try {
+                        storage.checkpoint(checkpoint);
+                        checkpoints.add(checkpoint);
+                    } catch (IOException e) {
+                        log.error("Failed to checkpoint at {}", checkpoint, e);
+                    }
+                });
             }
-        });
+
+            @Override
+            public void start() {
+                // no-op
+            }
+        };
+
         // if the SortedLedgerStorage need not to change bookie's state, pass StateManager==null is ok
         this.storage.initialize(
             conf,
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionRollbackTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionRollbackTest.java
index e3cec7c3c..bfd7a4d71 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionRollbackTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionRollbackTest.java
@@ -70,6 +70,11 @@ public void checkpointComplete(Checkpoint checkpoint, boolean compact) throws IO
         public void startCheckpoint(Checkpoint checkpoint) {
             // No-op
         }
+
+        @Override
+        public void start() {
+            // no-op
+        }
     };
 
     @Test
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionTest.java
index dbc3c9702..b2afe4c6d 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionTest.java
@@ -67,6 +67,11 @@ public void checkpointComplete(Checkpoint checkpoint, boolean compact) throws IO
         public void startCheckpoint(Checkpoint checkpoint) {
             // No-op
         }
+
+        @Override
+        public void start() {
+            // no-op
+        }
     };
 
     @Test
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildTest.java
index 9919dce8c..629a238f2 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildTest.java
@@ -67,6 +67,11 @@ public void checkpointComplete(Checkpoint checkpoint, boolean compact) throws IO
         public void startCheckpoint(Checkpoint checkpoint) {
             // No-op
         }
+
+        @Override
+        public void start() {
+            // no-op
+        }
     };
 
     @Test
diff --git a/conf/bk_server.conf b/conf/bk_server.conf
index 56f41fa57..49f7b23f2 100755
--- a/conf/bk_server.conf
+++ b/conf/bk_server.conf
@@ -75,15 +75,20 @@ journalDirectory=/tmp/bk-txn
 # Interval to watch whether bookie is dead or not, in milliseconds
 # bookieDeathWatchInterval=1000
 
-# How long the interval to flush ledger index pages to disk, in milliseconds
-# Flushing index files will introduce much random disk I/O.
+# When entryLogPerLedgerEnabled is enabled, checkpoint doesn't happens
+# when a new active entrylog is created / previous one is rolled over.
+# Instead SyncThread checkpoints periodically with 'flushInterval' delay
+# (in milliseconds) in between executions. Checkpoint flushes both ledger 
+# entryLogs and ledger index pages to disk. 
+# Flushing entrylog and index files will introduce much random disk I/O.
 # If separating journal dir and ledger dirs each on different devices,
 # flushing would not affect performance. But if putting journal dir
 # and ledger dirs on same device, performance degrade significantly
 # on too frequent flushing. You can consider increment flush interval
 # to get better performance, but you need to pay more time on bookie
 # server restart after failure.
-# flushInterval=100
+# This config is used only when entryLogPerLedgerEnabled is enabled.
+# flushInterval=10000
 
 # Allow the expansion of bookie storage capacity. Newly added ledger
 # and index dirs must be empty.
@@ -435,6 +440,12 @@ ledgerManagerFactoryClass=org.apache.bookkeeper.meta.HierarchicalLedgerManagerFa
 # The number of bytes used as capacity for the write buffer. Default is 64KB.
 # writeBufferSizeBytes=65536
 
+# Specifies if entryLog per ledger is enabled/disabled. If it is enabled, then there would be a 
+# active entrylog for each ledger. It would be ideal to enable this feature if the underlying 
+# storage device has multiple DiskPartitions or SSD and if in a given moment, entries of fewer 
+# number of active ledgers are written to a bookie.
+# entryLogPerLedgerEnabled=false
+
 #############################################################################
 ## Entry log compaction settings
 #############################################################################
diff --git a/site/_data/config/bk_server.yaml b/site/_data/config/bk_server.yaml
index 3e564b593..f843dfbfc 100644
--- a/site/_data/config/bk_server.yaml
+++ b/site/_data/config/bk_server.yaml
@@ -43,8 +43,8 @@ groups:
     description: Interval to watch whether bookie is dead or not, in milliseconds.
     default: 1000
   - param: flushInterval
-    description: How long the interval to flush ledger index pages to disk, in milliseconds. Flushing index files will introduce much random disk I/O. If separating journal dir and ledger dirs each on different devices, flushing would not affect performance. But if putting journal dir and ledger dirs on same device, performance degrade significantly on too frequent flushing. You can consider increment flush interval to get better performance, but you need to pay more time on bookie server restart after failure.
-    default: 100
+    description: When entryLogPerLedgerEnabled is enabled, checkpoint doesn't happens when a new active entrylog is created / previous one is rolled over. Instead SyncThread checkpoints periodically with 'flushInterval' delay (in milliseconds) in between executions. Checkpoint flushes both ledger entryLogs and ledger index pages to disk.  Flushing entrylog and index files will introduce much random disk I/O. If separating journal dir and ledger dirs each on different devices, flushing would not affect performance. But if putting journal dir and ledger dirs on same device, performance degrade significantly on too frequent flushing. You can consider increment flush interval to get better performance, but you need to pay more time on bookie server restart after failure. This config is used only when entryLogPerLedgerEnabled is enabled.
+    default: 10000
   - param: allowStorageExpansion
     description: Allow the expansion of bookie storage capacity. Newly added ledger and index directories must be empty.
     default: 'false'
@@ -300,6 +300,9 @@ groups:
   - param: writeBufferSizeBytes
     description: The number of bytes used as capacity for the write buffer.
     default: 65536
+  - param: entryLogPerLedgerEnabled
+    description: Specifies if entryLog per ledger is enabled/disabled. If it is enabled, then there would be a active entrylog for each ledger. It would be ideal to enable this feature if the underlying storage device has multiple DiskPartitions or SSD and if in a given moment, entries of fewer number of active ledgers are written to the bookie.
+    default: false
 
 - name: Entry log compaction settings
   params:


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services