You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/11/16 22:10:30 UTC

[bookkeeper] branch master updated: [BOOKIE] Fix sorted ledger storage rotating entry log files too frequent

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new deebe6d  [BOOKIE] Fix sorted ledger storage rotating entry log files too frequent
deebe6d is described below

commit deebe6db0c2fbd89cd7558e3a7ce76ec076aac5b
Author: Sijie Guo <gu...@gmail.com>
AuthorDate: Fri Nov 16 14:10:26 2018 -0800

    [BOOKIE] Fix sorted ledger storage rotating entry log files too frequent
    
    
    
    Descriptions of the changes in this PR:
    
    *Motivation*
    
    A strong behavior was observed when using sorted ledger storage with single entry log manager on production:
    
    "the entry log files are rotated very frequently and small entry log files are produced".
    
    The problem was introduced due to #1410.
    
    At current entry logger, when a new entry log file is created, EntryLogger will notify its listeners
    that a new entry log file is rotated via [`onRotateEntryLog`](https://github.com/apache/bookkeeper/blob/master/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java#L152).
    
    Before the change in #1410, `SortedLedgerStorage` inherits from `InterleavedLedgerStorage`.
    So when a new entry log file is rotated, `SortedLedgerStorage` is notified.
    
    However after the change in #1410, `SortedLedgerStorage` doesn't inherits `InterleavedLedgerStorage` anymore.
    Instead, the relationship is changed to composition. `SortedLedgerStorage` is composed using an interleaved ledger
    storage. So the entrylog listener contract was broken. `SortedLedgerStorage` will not receive any `onRotateEntryLog`
    notification any more.
    
    *Changes*
    
    When `SortedLedgerStorage` initializes, it passes its own entry log listener down to the interleaved ledger storage.
    So entry logger can notify the right person for entry log rotations.
    
    *Tests*
    
    Existing tests should cover most of the case. Looking for how to add new test cases.
    
    
    
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>, Charan Reddy Guttapalem <re...@gmail.com>, Andrey Yegorov <None>
    
    This closes #1807 from sijie/fix_rotation_behavior
---
 .../bookkeeper/bookie/EntryLogManagerBase.java     | 14 +++++++++++++
 .../EntryLogManagerForEntryLogPerLedger.java       |  6 ++++--
 .../bookie/EntryLogManagerForSingleEntryLog.java   | 12 +++++++----
 .../bookie/InterleavedLedgerStorage.java           | 23 +++++++++++++++++++++-
 .../bookkeeper/bookie/SortedLedgerStorage.java     |  5 ++++-
 5 files changed, 52 insertions(+), 8 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java
index 701fb7b..f9c6d97 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java
@@ -21,6 +21,9 @@
 
 package org.apache.bookkeeper.bookie;
 
+import static org.apache.bookkeeper.bookie.EntryLogger.UNASSIGNED_LEDGERID;
+
+import com.google.common.annotations.VisibleForTesting;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.util.concurrent.FastThreadLocal;
@@ -131,7 +134,18 @@ abstract class EntryLogManagerBase implements EntryLogManager {
      * Creates a new log file. This method should be guarded by a lock,
      * so callers of this method should be in right scope of the lock.
      */
+    @VisibleForTesting
     void createNewLog(long ledgerId) throws IOException {
+        createNewLog(ledgerId, "");
+    }
+
+    void createNewLog(long ledgerId, String reason) throws IOException {
+        if (ledgerId != UNASSIGNED_LEDGERID) {
+            log.info("Creating a new entry log file for ledger '{}' {}", ledgerId, reason);
+        } else {
+            log.info("Creating a new entry log file {}", reason);
+        }
+
         BufferedLogChannel logChannel = getCurrentLogForLedger(ledgerId);
         // first tried to create a new log channel. add current log channel to ToFlush list only when
         // there is a new log channel. it would prevent that a log channel is referenced by both
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java
index 39ed60c..8093b53 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java
@@ -484,7 +484,7 @@ class EntryLogManagerForEntryLogPerLedger extends EntryLogManagerBase {
                 try {
                     if (reachEntryLogLimit(currentLog, 0L)) {
                         log.info("Rolling entry logger since it reached size limitation for ledger: {}", ledgerId);
-                        createNewLog(ledgerId);
+                        createNewLog(ledgerId, "after entry log file is rotated");
                     }
                 } finally {
                     lock.unlock();
@@ -640,7 +640,9 @@ class EntryLogManagerForEntryLogPerLedger extends EntryLogManagerBase {
                 if (logChannel != null) {
                     logChannel.flushAndForceWriteIfRegularFlush(false);
                 }
-                createNewLog(ledgerId);
+                createNewLog(ledgerId,
+                    ": diskFull = " + diskFull + ", allDisksFull = " + allDisksFull
+                        + ", reachEntryLogLimit = " + reachEntryLogLimit + ", logChannel = " + logChannel);
             }
 
             return getCurrentLogForLedger(ledgerId);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForSingleEntryLog.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForSingleEntryLog.java
index 3e552d0..72c818a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForSingleEntryLog.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForSingleEntryLog.java
@@ -92,7 +92,7 @@ class EntryLogManagerForSingleEntryLog extends EntryLogManagerBase {
             boolean rollLog) throws IOException {
         if (null == activeLogChannel) {
             // log channel can be null because the file is deferred to be created
-            createNewLog(UNASSIGNED_LEDGERID);
+            createNewLog(UNASSIGNED_LEDGERID, "because current active log channel has not initialized yet");
         }
 
         boolean reachEntryLogLimit = rollLog ? reachEntryLogLimit(activeLogChannel, entrySize)
@@ -103,7 +103,8 @@ class EntryLogManagerForSingleEntryLog extends EntryLogManagerBase {
             if (activeLogChannel != null) {
                 activeLogChannel.flushAndForceWriteIfRegularFlush(false);
             }
-            createNewLog(UNASSIGNED_LEDGERID);
+            createNewLog(UNASSIGNED_LEDGERID,
+                ": createNewLog = " + createNewLog + ", reachEntryLogLimit = " + reachEntryLogLimit);
             // Reset the flag
             if (createNewLog) {
                 shouldCreateNewEntryLog.set(false);
@@ -238,7 +239,9 @@ class EntryLogManagerForSingleEntryLog extends EntryLogManagerBase {
          */
         if (reachEntryLogLimit(activeLogChannel, 0L) || logIdAfterFlush != logIdBeforeFlush) {
             log.info("Rolling entry logger since it reached size limitation");
-            createNewLog(UNASSIGNED_LEDGERID);
+            createNewLog(UNASSIGNED_LEDGERID,
+                "due to reaching log limit after flushing memtable : logIdBeforeFlush = "
+                    + logIdBeforeFlush + ", logIdAfterFlush = " + logIdAfterFlush);
             return true;
         }
         return false;
@@ -251,7 +254,8 @@ class EntryLogManagerForSingleEntryLog extends EntryLogManagerBase {
             // it means bytes might live at current active entry log, we need
             // roll current entry log and then issue checkpoint to underlying
             // interleaved ledger storage.
-            createNewLog(UNASSIGNED_LEDGERID);
+            createNewLog(UNASSIGNED_LEDGERID,
+                "due to preparing checkpoint : numBytesFlushed = " + numBytesFlushed);
         }
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index 08e7f4e..8ab6517 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -98,11 +98,32 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
                            Checkpointer checkpointer,
                            StatsLogger statsLogger)
             throws IOException {
+        initializeWithEntryLogListener(
+            conf,
+            ledgerManager,
+            ledgerDirsManager,
+            indexDirsManager,
+            stateManager,
+            checkpointSource,
+            checkpointer,
+            this,
+            statsLogger);
+    }
+
+    void initializeWithEntryLogListener(ServerConfiguration conf,
+                                        LedgerManager ledgerManager,
+                                        LedgerDirsManager ledgerDirsManager,
+                                        LedgerDirsManager indexDirsManager,
+                                        StateManager stateManager,
+                                        CheckpointSource checkpointSource,
+                                        Checkpointer checkpointer,
+                                        EntryLogListener entryLogListener,
+                                        StatsLogger statsLogger) throws IOException {
         checkNotNull(checkpointSource, "invalid null checkpoint source");
         checkNotNull(checkpointer, "invalid null checkpointer");
         this.checkpointSource = checkpointSource;
         this.checkpointer = checkpointer;
-        entryLogger = new EntryLogger(conf, ledgerDirsManager, this, statsLogger.scope(ENTRYLOGGER_SCOPE));
+        entryLogger = new EntryLogger(conf, ledgerDirsManager, entryLogListener, statsLogger.scope(ENTRYLOGGER_SCOPE));
         ledgerCache = new LedgerCacheImpl(conf, activeLedgers,
                 null == indexDirsManager ? ledgerDirsManager : indexDirsManager, statsLogger);
         gcThread = new GarbageCollectorThread(conf, ledgerManager, this, statsLogger.scope("gc"));
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
index 5c4f75a..e4b137f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
@@ -72,7 +72,7 @@ public class SortedLedgerStorage
                            StatsLogger statsLogger)
             throws IOException {
 
-        interleavedLedgerStorage.initialize(
+        interleavedLedgerStorage.initializeWithEntryLogListener(
             conf,
             ledgerManager,
             ledgerDirsManager,
@@ -80,6 +80,9 @@ public class SortedLedgerStorage
             stateManager,
             checkpointSource,
             checkpointer,
+            // uses sorted ledger storage's own entry log listener
+            // since it manages entry log rotations and checkpoints.
+            this,
             statsLogger);
 
         if (conf.isEntryLogPerLedgerEnabled()) {