You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/05/03 22:49:40 UTC

[GitHub] sijie closed pull request #1365: Refactor EntryLogger class

sijie closed pull request #1365: Refactor EntryLogger class
URL: https://github.com/apache/bookkeeper/pull/1365
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManager.java
new file mode 100644
index 000000000..340e9a18b
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManager.java
@@ -0,0 +1,103 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import io.netty.buffer.ByteBuf;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
+
+interface EntryLogManager {
+
+    /*
+     * add entry to the corresponding entrylog and return the position of
+     * the entry in the entrylog
+     */
+    long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException;
+
+    /*
+     * gets the active logChannel with the given entryLogId. null if it is
+     * not existing.
+     */
+    BufferedLogChannel getCurrentLogIfPresent(long entryLogId);
+
+    /*
+     * Returns eligible writable ledger dir for the creation next entrylog
+     */
+    File getDirForNextEntryLog(List<File> writableLedgerDirs);
+
+    /*
+     * Do the operations required for checkpoint.
+     */
+    void checkpoint() throws IOException;
+
+    /*
+     * flush both current and rotated logs.
+     */
+    void flush() throws IOException;
+
+    /*
+     * close current logs.
+     */
+    void close() throws IOException;
+
+    /*
+     * force close current logs.
+     */
+    void forceClose();
+
+    /*
+     * prepare entrylogger/entrylogmanager before doing SortedLedgerStorage
+     * Checkpoint.
+     */
+    void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) throws IOException;
+
+    /*
+     * this method should be called before doing entrymemtable flush, it
+     * would save the state of the entrylogger before entrymemtable flush
+     * and commitEntryMemTableFlush would take appropriate action after
+     * entrymemtable flush.
+     */
+    void prepareEntryMemTableFlush();
+
+    /*
+     * this method should be called after doing entrymemtable flush,it would
+     * take appropriate action after entrymemtable flush depending on the
+     * current state of the entrylogger and the state of the entrylogger
+     * during prepareEntryMemTableFlush.
+     *
+     * It is assumed that there would be corresponding
+     * prepareEntryMemTableFlush for every commitEntryMemTableFlush and both
+     * would be called from the same thread.
+     *
+     * returns boolean value indicating whether EntryMemTable should do checkpoint
+     * after this commit method.
+     */
+    boolean commitEntryMemTableFlush() throws IOException;
+
+    /*
+     * creates new separate log for compaction.
+     */
+    BufferedLogChannel createNewLogForCompaction() throws IOException;
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java
new file mode 100644
index 000000000..849336f66
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java
@@ -0,0 +1,163 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.util.concurrent.FastThreadLocal;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
+import org.apache.bookkeeper.bookie.EntryLogger.EntryLogListener;
+import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+
+@Slf4j
+abstract class EntryLogManagerBase implements EntryLogManager {
+    volatile List<BufferedLogChannel> rotatedLogChannels;
+    final EntryLoggerAllocator entryLoggerAllocator;
+    private final LedgerDirsManager ledgerDirsManager;
+    private final List<EntryLogger.EntryLogListener> listeners;
+    /**
+     * The maximum size of a entry logger file.
+     */
+    final long logSizeLimit;
+
+    EntryLogManagerBase(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager,
+            EntryLoggerAllocator entryLoggerAllocator, List<EntryLogger.EntryLogListener> listeners) {
+        this.ledgerDirsManager = ledgerDirsManager;
+        this.entryLoggerAllocator = entryLoggerAllocator;
+        this.listeners = listeners;
+        this.logSizeLimit = conf.getEntryLogSizeLimit();
+    }
+
+    private final FastThreadLocal<ByteBuf> sizeBufferForAdd = new FastThreadLocal<ByteBuf>() {
+        @Override
+        protected ByteBuf initialValue() throws Exception {
+            return Unpooled.buffer(4);
+        }
+    };
+
+    /*
+     * This method should be guarded by a lock, so callers of this method
+     * should be in the right scope of the lock.
+     */
+    @Override
+    public long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException {
+        int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to prepend the size
+        BufferedLogChannel logChannel = getCurrentLogForLedgerForAddEntry(ledger, entrySize, rollLog);
+        ByteBuf sizeBuffer = sizeBufferForAdd.get();
+        sizeBuffer.clear();
+        sizeBuffer.writeInt(entry.readableBytes());
+        logChannel.write(sizeBuffer);
+
+        long pos = logChannel.position();
+        logChannel.write(entry);
+        logChannel.registerWrittenEntry(ledger, entrySize);
+
+        return (logChannel.getLogId() << 32L) | pos;
+    }
+
+    boolean reachEntryLogLimit(BufferedLogChannel logChannel, long size) {
+        if (logChannel == null) {
+            return false;
+        }
+        return logChannel.position() + size > logSizeLimit;
+    }
+
+    boolean readEntryLogHardLimit(BufferedLogChannel logChannel, long size) {
+        if (logChannel == null) {
+            return false;
+        }
+        return logChannel.position() + size > Integer.MAX_VALUE;
+    }
+
+    abstract BufferedLogChannel getCurrentLogForLedger(long ledgerId);
+
+    abstract BufferedLogChannel getCurrentLogForLedgerForAddEntry(long ledgerId, int entrySize, boolean rollLog)
+            throws IOException;
+
+    abstract void setCurrentLogForLedgerAndAddToRotate(long ledgerId, BufferedLogChannel logChannel);
+
+    /*
+     * flush current logs.
+     */
+    abstract void flushCurrentLogs() throws IOException;
+
+    /*
+     * flush rotated logs.
+     */
+    abstract void flushRotatedLogs() throws IOException;
+
+    List<BufferedLogChannel> getRotatedLogChannels() {
+        return rotatedLogChannels;
+    }
+
+    @Override
+    public void flush() throws IOException {
+        flushCurrentLogs();
+        flushRotatedLogs();
+    }
+
+    void flushLogChannel(BufferedLogChannel logChannel, boolean forceMetadata) throws IOException {
+        if (logChannel != null) {
+            logChannel.flushAndForceWrite(forceMetadata);
+            log.debug("Flush and sync current entry logger {}", logChannel.getLogId());
+        }
+    }
+
+    /*
+     * Creates a new log file. This method should be guarded by a lock,
+     * so callers of this method should be in right scope of the lock.
+     */
+    void createNewLog(long ledgerId) throws IOException {
+        BufferedLogChannel logChannel = getCurrentLogForLedger(ledgerId);
+        // first tried to create a new log channel. add current log channel to ToFlush list only when
+        // there is a new log channel. it would prevent that a log channel is referenced by both
+        // *logChannel* and *ToFlush* list.
+        if (null != logChannel) {
+
+            // flush the internal buffer back to filesystem but not sync disk
+            logChannel.flush();
+
+            // Append ledgers map at the end of entry log
+            logChannel.appendLedgersMap();
+
+            BufferedLogChannel newLogChannel = entryLoggerAllocator.createNewLog(selectDirForNextEntryLog());
+            setCurrentLogForLedgerAndAddToRotate(ledgerId, newLogChannel);
+            log.info("Flushing entry logger {} back to filesystem, pending for syncing entry loggers : {}.",
+                    logChannel.getLogId(), rotatedLogChannels);
+            for (EntryLogListener listener : listeners) {
+                listener.onRotateEntryLog();
+            }
+        } else {
+            setCurrentLogForLedgerAndAddToRotate(ledgerId,
+                    entryLoggerAllocator.createNewLog(selectDirForNextEntryLog()));
+        }
+    }
+
+    File selectDirForNextEntryLog() throws NoWritableLedgerDirException {
+        return getDirForNextEntryLog(ledgerDirsManager.getWritableLedgerDirsForNewLog());
+    }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForSingleEntryLog.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForSingleEntryLog.java
new file mode 100644
index 000000000..84e4ad36f
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForSingleEntryLog.java
@@ -0,0 +1,263 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import static org.apache.bookkeeper.bookie.EntryLogger.INVALID_LID;
+import static org.apache.bookkeeper.bookie.EntryLogger.UNASSIGNED_LEDGERID;
+
+import io.netty.buffer.ByteBuf;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
+import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+
+@Slf4j
+class EntryLogManagerForSingleEntryLog extends EntryLogManagerBase {
+
+    private volatile BufferedLogChannel activeLogChannel;
+    private long logIdBeforeFlush = INVALID_LID;
+    private final AtomicBoolean shouldCreateNewEntryLog = new AtomicBoolean(false);
+    private EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus;
+
+    EntryLogManagerForSingleEntryLog(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager,
+            EntryLoggerAllocator entryLoggerAllocator, List<EntryLogger.EntryLogListener> listeners,
+            EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus) {
+        super(conf, ledgerDirsManager, entryLoggerAllocator, listeners);
+        this.rotatedLogChannels = new LinkedList<BufferedLogChannel>();
+        this.recentlyCreatedEntryLogsStatus = recentlyCreatedEntryLogsStatus;
+        // Register listener for disk full notifications.
+        ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
+    }
+
+    private LedgerDirsListener getLedgerDirsListener() {
+        return new LedgerDirsListener() {
+            @Override
+            public void diskFull(File disk) {
+                // If the current entry log disk is full, then create new
+                // entry log.
+                BufferedLogChannel currentActiveLogChannel = activeLogChannel;
+                if (currentActiveLogChannel != null
+                        && currentActiveLogChannel.getLogFile().getParentFile().equals(disk)) {
+                    shouldCreateNewEntryLog.set(true);
+                }
+            }
+
+            @Override
+            public void diskAlmostFull(File disk) {
+                // If the current entry log disk is almost full, then create new entry
+                // log.
+                BufferedLogChannel currentActiveLogChannel = activeLogChannel;
+                if (currentActiveLogChannel != null
+                        && currentActiveLogChannel.getLogFile().getParentFile().equals(disk)) {
+                    shouldCreateNewEntryLog.set(true);
+                }
+            }
+        };
+    }
+
+    @Override
+    public synchronized long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException {
+        return super.addEntry(ledger, entry, rollLog);
+    }
+
+    @Override
+    synchronized BufferedLogChannel getCurrentLogForLedgerForAddEntry(long ledgerId, int entrySize,
+            boolean rollLog) throws IOException {
+        if (null == activeLogChannel) {
+            // log channel can be null because the file is deferred to be created
+            createNewLog(UNASSIGNED_LEDGERID);
+        }
+
+        boolean reachEntryLogLimit = rollLog ? reachEntryLogLimit(activeLogChannel, entrySize)
+                : readEntryLogHardLimit(activeLogChannel, entrySize);
+        // Create new log if logSizeLimit reached or current disk is full
+        boolean createNewLog = shouldCreateNewEntryLog.get();
+        if (createNewLog || reachEntryLogLimit) {
+            if (activeLogChannel != null) {
+                activeLogChannel.flushAndForceWriteIfRegularFlush(false);
+            }
+            createNewLog(UNASSIGNED_LEDGERID);
+            // Reset the flag
+            if (createNewLog) {
+                shouldCreateNewEntryLog.set(false);
+            }
+        }
+        return activeLogChannel;
+    }
+
+    @Override
+    synchronized void createNewLog(long ledgerId) throws IOException {
+        super.createNewLog(ledgerId);
+    }
+
+    @Override
+    public synchronized void setCurrentLogForLedgerAndAddToRotate(long ledgerId, BufferedLogChannel logChannel) {
+        BufferedLogChannel hasToRotateLogChannel = activeLogChannel;
+        activeLogChannel = logChannel;
+        if (hasToRotateLogChannel != null) {
+            rotatedLogChannels.add(hasToRotateLogChannel);
+        }
+    }
+
+    @Override
+    public BufferedLogChannel getCurrentLogForLedger(long ledgerId) {
+        return activeLogChannel;
+    }
+
+    @Override
+    public BufferedLogChannel getCurrentLogIfPresent(long entryLogId) {
+        BufferedLogChannel activeLogChannelTemp = activeLogChannel;
+        if ((activeLogChannelTemp != null) && (activeLogChannelTemp.getLogId() == entryLogId)) {
+            return activeLogChannelTemp;
+        }
+        return null;
+    }
+
+    @Override
+    public File getDirForNextEntryLog(List<File> writableLedgerDirs) {
+        Collections.shuffle(writableLedgerDirs);
+        return writableLedgerDirs.get(0);
+    }
+
+    @Override
+    public void checkpoint() throws IOException {
+        flushRotatedLogs();
+    }
+
+    public long getCurrentLogId() {
+        BufferedLogChannel currentActiveLogChannel = activeLogChannel;
+        if (currentActiveLogChannel != null) {
+            return currentActiveLogChannel.getLogId();
+        } else {
+            return EntryLogger.UNINITIALIZED_LOG_ID;
+        }
+    }
+
+    @Override
+    public void flushCurrentLogs() throws IOException {
+        BufferedLogChannel currentActiveLogChannel = activeLogChannel;
+        if (currentActiveLogChannel != null) {
+            /**
+             * flushCurrentLogs method is called during checkpoint, so
+             * metadata of the file also should be force written.
+             */
+            flushLogChannel(currentActiveLogChannel, true);
+        }
+    }
+
+    @Override
+    void flushRotatedLogs() throws IOException {
+        List<BufferedLogChannel> channels = null;
+        synchronized (this) {
+            channels = rotatedLogChannels;
+            rotatedLogChannels = new LinkedList<BufferedLogChannel>();
+        }
+        if (null == channels) {
+            return;
+        }
+        Iterator<BufferedLogChannel> chIter = channels.iterator();
+        while (chIter.hasNext()) {
+            BufferedLogChannel channel = chIter.next();
+            try {
+                channel.flushAndForceWrite(true);
+            } catch (IOException ioe) {
+                // rescue from flush exception, add unflushed channels back
+                synchronized (this) {
+                    if (null == rotatedLogChannels) {
+                        rotatedLogChannels = channels;
+                    } else {
+                        rotatedLogChannels.addAll(0, channels);
+                    }
+                }
+                throw ioe;
+            }
+            // remove the channel from the list after it is successfully flushed
+            chIter.remove();
+            // since this channel is only used for writing, after flushing the channel,
+            // we had to close the underlying file channel. Otherwise, we might end up
+            // leaking fds which cause the disk spaces could not be reclaimed.
+            EntryLogger.closeFileChannel(channel);
+            recentlyCreatedEntryLogsStatus.flushRotatedEntryLog(channel.getLogId());
+            log.info("Synced entry logger {} to disk.", channel.getLogId());
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (activeLogChannel != null) {
+            EntryLogger.closeFileChannel(activeLogChannel);
+        }
+    }
+
+    @Override
+    public void forceClose() {
+        if (activeLogChannel != null) {
+            EntryLogger.forceCloseFileChannel(activeLogChannel);
+        }
+    }
+
+    @Override
+    public void prepareEntryMemTableFlush() {
+        logIdBeforeFlush = getCurrentLogId();
+    }
+
+    @Override
+    public boolean commitEntryMemTableFlush() throws IOException {
+        long logIdAfterFlush = getCurrentLogId();
+        /*
+         * in any case that an entry log reaches the limit, we roll the log
+         * and start checkpointing. if a memory table is flushed spanning
+         * over two entry log files, we also roll log. this is for
+         * performance consideration: since we don't wanna checkpoint a new
+         * log file that ledger storage is writing to.
+         */
+        if (reachEntryLogLimit(activeLogChannel, 0L) || logIdAfterFlush != logIdBeforeFlush) {
+            log.info("Rolling entry logger since it reached size limitation");
+            createNewLog(UNASSIGNED_LEDGERID);
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) throws IOException{
+        if (numBytesFlushed > 0) {
+            // if bytes are added between previous flush and this checkpoint,
+            // it means bytes might live at current active entry log, we need
+            // roll current entry log and then issue checkpoint to underlying
+            // interleaved ledger storage.
+            createNewLog(UNASSIGNED_LEDGERID);
+        }
+    }
+
+    @Override
+    public EntryLogger.BufferedLogChannel createNewLogForCompaction() throws IOException {
+        return entryLoggerAllocator.createNewLogForCompaction(selectDirForNextEntryLog());
+    }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
index d289b0e8f..75445c23f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
@@ -22,8 +22,6 @@
 package org.apache.bookkeeper.bookie;
 
 import static com.google.common.base.Charsets.UTF_8;
-import static org.apache.bookkeeper.bookie.TransactionalEntryLogCompactor.COMPACTING_SUFFIX;
-import static org.apache.bookkeeper.util.BookKeeperConstants.MAX_LOG_SIZE_LIMIT;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
@@ -35,18 +33,14 @@
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.Unpooled;
 import io.netty.util.concurrent.FastThreadLocal;
-
 import java.io.BufferedReader;
-import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileFilter;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
 import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
@@ -54,24 +48,15 @@
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
-import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.util.IOUtils;
 import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap;
@@ -90,7 +75,7 @@
     private static final Logger LOG = LoggerFactory.getLogger(EntryLogger.class);
     static final long UNASSIGNED_LEDGERID = -1L;
     // log file suffix
-    private static final String LOG_FILE_SUFFIX = ".log";
+    static final String LOG_FILE_SUFFIX = ".log";
 
     @VisibleForTesting
     static final int UNINITIALIZED_LOG_ID = -0xDEAD;
@@ -145,7 +130,7 @@ public String toString() {
          * Append the ledger map at the end of the entry log.
          * Updates the entry log file header with the offset and size of the map.
          */
-        private void appendLedgersMap() throws IOException {
+        void appendLedgersMap() throws IOException {
 
             long ledgerMapOffset = this.position();
 
@@ -228,21 +213,16 @@ public void accept(long ledgerId, long size) {
      */
     private final Object compactionLogLock = new Object();
 
-    /**
-     * The maximum size of a entry logger file.
-     */
-    final long logSizeLimit;
     private volatile BufferedLogChannel compactionLogChannel;
 
     final EntryLoggerAllocator entryLoggerAllocator;
     private final EntryLogManager entryLogManager;
 
-    private final boolean entryLogPreAllocationEnabled;
     private final CopyOnWriteArrayList<EntryLogListener> listeners = new CopyOnWriteArrayList<EntryLogListener>();
 
     private static final int HEADER_V0 = 0; // Old log file format (no ledgers map index)
     private static final int HEADER_V1 = 1; // Introduced ledger map index
-    private static final int HEADER_CURRENT_VERSION = HEADER_V1;
+    static final int HEADER_CURRENT_VERSION = HEADER_V1;
 
     private static class Header {
         final int version;
@@ -360,9 +340,6 @@ public EntryLogger(ServerConfiguration conf,
         if (listener != null) {
             addListener(listener);
         }
-        // log size limit
-        this.logSizeLimit = Math.min(conf.getEntryLogSizeLimit(), MAX_LOG_SIZE_LIMIT);
-        this.entryLogPreAllocationEnabled = conf.isEntryLogFilePreAllocationEnabled();
 
         // Initialize the entry log header buffer. This cannot be a static object
         // since in our unit tests, we run multiple Bookies and thus EntryLoggers
@@ -386,9 +363,11 @@ public EntryLogger(ServerConfiguration conf,
             }
         }
         this.recentlyCreatedEntryLogsStatus = new RecentEntryLogsStatus(logId + 1);
-        this.entryLoggerAllocator = new EntryLoggerAllocator(logId);
+        this.entryLoggerAllocator = new EntryLoggerAllocator(conf, ledgerDirsManager, recentlyCreatedEntryLogsStatus,
+                logId);
         if (entryLogPerLedgerEnabled) {
-            this.entryLogManager = new EntryLogManagerForSingleEntryLog(ledgerDirsManager) {
+            this.entryLogManager = new EntryLogManagerForSingleEntryLog(conf, ledgerDirsManager, entryLoggerAllocator,
+                    listeners, recentlyCreatedEntryLogsStatus) {
                 @Override
                 public void checkpoint() throws IOException {
                     /*
@@ -445,7 +424,8 @@ public boolean commitEntryMemTableFlush() throws IOException {
                 }
             };
         } else {
-            this.entryLogManager = new EntryLogManagerForSingleEntryLog(ledgerDirsManager);
+            this.entryLogManager = new EntryLogManagerForSingleEntryLog(conf, ledgerDirsManager, entryLoggerAllocator,
+                    listeners, recentlyCreatedEntryLogsStatus);
         }
     }
 
@@ -583,137 +563,6 @@ EntryLoggerAllocator getEntryLoggerAllocator() {
         return entryLoggerAllocator;
     }
 
-    /**
-     * An allocator pre-allocates entry log files.
-     */
-    class EntryLoggerAllocator {
-
-        private long preallocatedLogId;
-        private Future<BufferedLogChannel> preallocation = null;
-        private ExecutorService allocatorExecutor;
-        private final Object createEntryLogLock = new Object();
-        private final Object createCompactionLogLock = new Object();
-
-        EntryLoggerAllocator(long logId) {
-            preallocatedLogId = logId;
-            allocatorExecutor = Executors.newSingleThreadExecutor();
-        }
-
-        synchronized long getPreallocatedLogId() {
-            return preallocatedLogId;
-        }
-
-        BufferedLogChannel createNewLog() throws IOException {
-            synchronized (createEntryLogLock) {
-                BufferedLogChannel bc;
-                if (!entryLogPreAllocationEnabled){
-                    // create a new log directly
-                    bc = allocateNewLog();
-                    return bc;
-                } else {
-                    // allocate directly to response request
-                    if (null == preallocation){
-                        bc = allocateNewLog();
-                    } else {
-                        // has a preallocated entry log
-                        try {
-                            bc = preallocation.get();
-                        } catch (ExecutionException ee) {
-                            if (ee.getCause() instanceof IOException) {
-                                throw (IOException) (ee.getCause());
-                            } else {
-                                throw new IOException("Error to execute entry log allocation.", ee);
-                            }
-                        } catch (CancellationException ce) {
-                            throw new IOException("Task to allocate a new entry log is cancelled.", ce);
-                        } catch (InterruptedException ie) {
-                            Thread.currentThread().interrupt();
-                            throw new IOException("Intrrupted when waiting a new entry log to be allocated.", ie);
-                        }
-                    }
-                    // preallocate a new log in background upon every call
-                    preallocation = allocatorExecutor.submit(() -> allocateNewLog());
-                    return bc;
-                }
-            }
-        }
-
-        BufferedLogChannel createNewLogForCompaction() throws IOException {
-            synchronized (createCompactionLogLock) {
-                return allocateNewLog(COMPACTING_SUFFIX);
-            }
-        }
-
-        private synchronized BufferedLogChannel allocateNewLog() throws IOException {
-            return allocateNewLog(".log");
-        }
-
-        /**
-         * Allocate a new log file.
-         */
-        private synchronized BufferedLogChannel allocateNewLog(String suffix) throws IOException {
-            List<File> list = ledgerDirsManager.getWritableLedgerDirsForNewLog();
-            File dirForNextEntryLog = entryLogManager.getDirForNextEntryLog(list);
-
-            List<File> ledgersDirs = ledgerDirsManager.getAllLedgerDirs();
-            String logFileName;
-            // It would better not to overwrite existing entry log files
-            File testLogFile = null;
-            do {
-                if (preallocatedLogId >= Integer.MAX_VALUE) {
-                    preallocatedLogId = 0;
-                } else {
-                    ++preallocatedLogId;
-                }
-                logFileName = Long.toHexString(preallocatedLogId) + suffix;
-                for (File dir : ledgersDirs) {
-                    testLogFile = new File(dir, logFileName);
-                    if (testLogFile.exists()) {
-                        LOG.warn("Found existed entry log " + testLogFile
-                               + " when trying to create it as a new log.");
-                        testLogFile = null;
-                        break;
-                    }
-                }
-            } while (testLogFile == null);
-
-            File newLogFile = new File(dirForNextEntryLog, logFileName);
-            FileChannel channel = new RandomAccessFile(newLogFile, "rw").getChannel();
-
-            BufferedLogChannel logChannel = new BufferedLogChannel(channel, conf.getWriteBufferBytes(),
-                    conf.getReadBufferBytes(), preallocatedLogId, newLogFile, conf.getFlushIntervalInBytes());
-            logfileHeader.readerIndex(0);
-            logChannel.write(logfileHeader);
-
-            for (File f : list) {
-                setLastLogId(f, preallocatedLogId);
-            }
-
-            if (suffix.equals(LOG_FILE_SUFFIX)) {
-                recentlyCreatedEntryLogsStatus.createdEntryLog(preallocatedLogId);
-            }
-
-            LOG.info("Created new entry log file {} for logId {}.", newLogFile, preallocatedLogId);
-            return logChannel;
-        }
-
-        /**
-         * Stop the allocator.
-         */
-        void stop() {
-            // wait until the preallocation finished.
-            allocatorExecutor.shutdown();
-            LOG.info("Stopped entry logger preallocator.");
-        }
-
-        /**
-         * get the preallocation for tests.
-         */
-        Future<BufferedLogChannel> getPreallocationFuture(){
-            return preallocation;
-        }
-    }
-
     /**
      * Remove entry log.
      *
@@ -736,27 +585,6 @@ protected boolean removeEntryLog(long entryLogId) {
         return true;
     }
 
-    /**
-     * writes the given id to the "lastId" file in the given directory.
-     */
-    private void setLastLogId(File dir, long logId) throws IOException {
-        FileOutputStream fos;
-        fos = new FileOutputStream(new File(dir, "lastId"));
-        BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(fos, UTF_8));
-        try {
-            bw.write(Long.toHexString(logId) + "\n");
-            bw.flush();
-        } catch (IOException e) {
-            LOG.warn("Failed write lastId file");
-        } finally {
-            try {
-                bw.close();
-            } catch (IOException e) {
-                LOG.error("Could not close lastId file in {}", dir.getPath());
-            }
-        }
-    }
-
     private long getLastLogId(File dir) {
         long id = readLastLogId(dir);
         // read success
@@ -812,396 +640,6 @@ private long readLastLogId(File f) {
         }
     }
 
-    interface EntryLogManager {
-
-        /*
-         * add entry to the corresponding entrylog and return the position of
-         * the entry in the entrylog
-         */
-        long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException;
-
-        /*
-         * gets the active logChannel with the given entryLogId. null if it is
-         * not existing.
-         */
-        BufferedLogChannel getCurrentLogIfPresent(long entryLogId);
-
-        /*
-         * Returns eligible writable ledger dir for the creation next entrylog
-         */
-        File getDirForNextEntryLog(List<File> writableLedgerDirs);
-
-        /*
-         * Do the operations required for checkpoint.
-         */
-        void checkpoint() throws IOException;
-
-        /*
-         * flush both current and rotated logs.
-         */
-        void flush() throws IOException;
-
-        /*
-         * close current logs.
-         */
-        void close() throws IOException;
-
-        /*
-         * force close current logs.
-         */
-        void forceClose();
-
-        /*
-         *
-         */
-        void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) throws IOException;
-
-        /*
-         * this method should be called before doing entrymemtable flush, it
-         * would save the state of the entrylogger before entrymemtable flush
-         * and commitEntryMemTableFlush would take appropriate action after
-         * entrymemtable flush.
-         */
-        void prepareEntryMemTableFlush();
-
-        /*
-         * this method should be called after doing entrymemtable flush,it would
-         * take appropriate action after entrymemtable flush depending on the
-         * current state of the entrylogger and the state of the entrylogger
-         * during prepareEntryMemTableFlush.
-         *
-         * It is assumed that there would be corresponding
-         * prepareEntryMemTableFlush for every commitEntryMemTableFlush and both
-         * would be called from the same thread.
-         *
-         * returns boolean value indicating whether EntryMemTable should do checkpoint
-         * after this commit method.
-         */
-        boolean commitEntryMemTableFlush() throws IOException;
-    }
-
-    abstract class EntryLogManagerBase implements EntryLogManager {
-        volatile List<BufferedLogChannel> rotatedLogChannels;
-
-        private final FastThreadLocal<ByteBuf> sizeBufferForAdd = new FastThreadLocal<ByteBuf>() {
-            @Override
-            protected ByteBuf initialValue() throws Exception {
-                return Unpooled.buffer(4);
-            }
-        };
-
-        /*
-         * This method should be guarded by a lock, so callers of this method
-         * should be in the right scope of the lock.
-         */
-        @Override
-        public long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException {
-            int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to prepend the size
-            BufferedLogChannel logChannel = getCurrentLogForLedgerForAddEntry(ledger, entrySize, rollLog);
-            ByteBuf sizeBuffer = sizeBufferForAdd.get();
-            sizeBuffer.clear();
-            sizeBuffer.writeInt(entry.readableBytes());
-            logChannel.write(sizeBuffer);
-
-            long pos = logChannel.position();
-            logChannel.write(entry);
-            logChannel.registerWrittenEntry(ledger, entrySize);
-
-            return (logChannel.getLogId() << 32L) | pos;
-        }
-
-        boolean reachEntryLogLimit(BufferedLogChannel logChannel, long size) {
-            if (logChannel == null) {
-                return false;
-            }
-            return logChannel.position() + size > logSizeLimit;
-        }
-
-        boolean readEntryLogHardLimit(BufferedLogChannel logChannel, long size) {
-            if (logChannel == null) {
-                return false;
-            }
-            return logChannel.position() + size > Integer.MAX_VALUE;
-        }
-
-        abstract BufferedLogChannel getCurrentLogForLedger(long ledgerId);
-
-        abstract BufferedLogChannel getCurrentLogForLedgerForAddEntry(long ledgerId, int entrySize, boolean rollLog)
-                throws IOException;
-
-        abstract void setCurrentLogForLedgerAndAddToRotate(long ledgerId, BufferedLogChannel logChannel);
-
-        /*
-         * flush current logs.
-         */
-        abstract void flushCurrentLogs() throws IOException;
-
-        /*
-         * flush rotated logs.
-         */
-        abstract void flushRotatedLogs() throws IOException;
-
-        List<BufferedLogChannel> getRotatedLogChannels() {
-            return rotatedLogChannels;
-        }
-
-        @Override
-        public void flush() throws IOException {
-            flushCurrentLogs();
-            flushRotatedLogs();
-        }
-
-        void flushLogChannel(BufferedLogChannel logChannel, boolean forceMetadata) throws IOException {
-            if (logChannel != null) {
-                logChannel.flushAndForceWrite(forceMetadata);
-                LOG.debug("Flush and sync current entry logger {}", logChannel.getLogId());
-            }
-        }
-
-        /*
-         * Creates a new log file. This method should be guarded by a lock,
-         * so callers of this method should be in right scope of the lock.
-         */
-        void createNewLog(long ledgerId) throws IOException {
-            BufferedLogChannel logChannel = getCurrentLogForLedger(ledgerId);
-            // first tried to create a new log channel. add current log channel to ToFlush list only when
-            // there is a new log channel. it would prevent that a log channel is referenced by both
-            // *logChannel* and *ToFlush* list.
-            if (null != logChannel) {
-
-                // flush the internal buffer back to filesystem but not sync disk
-                logChannel.flush();
-
-                // Append ledgers map at the end of entry log
-                logChannel.appendLedgersMap();
-
-                BufferedLogChannel newLogChannel = entryLoggerAllocator.createNewLog();
-                setCurrentLogForLedgerAndAddToRotate(ledgerId, newLogChannel);
-                LOG.info("Flushing entry logger {} back to filesystem, pending for syncing entry loggers : {}.",
-                        logChannel.getLogId(), rotatedLogChannels);
-                for (EntryLogListener listener : listeners) {
-                    listener.onRotateEntryLog();
-                }
-            } else {
-                setCurrentLogForLedgerAndAddToRotate(ledgerId, entryLoggerAllocator.createNewLog());
-            }
-        }
-    }
-
-    class EntryLogManagerForSingleEntryLog extends EntryLogManagerBase {
-
-        private volatile BufferedLogChannel activeLogChannel;
-        private long logIdBeforeFlush = INVALID_LID;
-        private final AtomicBoolean shouldCreateNewEntryLog = new AtomicBoolean(false);
-
-        EntryLogManagerForSingleEntryLog(LedgerDirsManager ledgerDirsManager) {
-            this.rotatedLogChannels = new LinkedList<BufferedLogChannel>();
-            // Register listener for disk full notifications.
-            ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
-        }
-
-        private LedgerDirsListener getLedgerDirsListener() {
-            return new LedgerDirsListener() {
-                @Override
-                public void diskFull(File disk) {
-                    // If the current entry log disk is full, then create new
-                    // entry log.
-                    BufferedLogChannel currentActiveLogChannel = activeLogChannel;
-                    if (currentActiveLogChannel != null
-                            && currentActiveLogChannel.getLogFile().getParentFile().equals(disk)) {
-                        shouldCreateNewEntryLog.set(true);
-                    }
-                }
-
-                @Override
-                public void diskAlmostFull(File disk) {
-                    // If the current entry log disk is almost full, then create new entry
-                    // log.
-                    BufferedLogChannel currentActiveLogChannel = activeLogChannel;
-                    if (currentActiveLogChannel != null
-                            && currentActiveLogChannel.getLogFile().getParentFile().equals(disk)) {
-                        shouldCreateNewEntryLog.set(true);
-                    }
-                }
-            };
-        }
-
-        @Override
-        public synchronized long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException {
-            return super.addEntry(ledger, entry, rollLog);
-        }
-
-        @Override
-        synchronized BufferedLogChannel getCurrentLogForLedgerForAddEntry(long ledgerId, int entrySize,
-                boolean rollLog) throws IOException {
-            if (null == activeLogChannel) {
-                // log channel can be null because the file is deferred to be created
-                createNewLog(UNASSIGNED_LEDGERID);
-            }
-
-            boolean reachEntryLogLimit = rollLog ? reachEntryLogLimit(activeLogChannel, entrySize)
-                    : readEntryLogHardLimit(activeLogChannel, entrySize);
-            // Create new log if logSizeLimit reached or current disk is full
-            boolean createNewLog = shouldCreateNewEntryLog.get();
-            if (createNewLog || reachEntryLogLimit) {
-                if (activeLogChannel != null) {
-                    activeLogChannel.flushAndForceWriteIfRegularFlush(false);
-                }
-                createNewLog(UNASSIGNED_LEDGERID);
-                // Reset the flag
-                if (createNewLog) {
-                    shouldCreateNewEntryLog.set(false);
-                }
-            }
-            return activeLogChannel;
-        }
-
-        @Override
-        synchronized void createNewLog(long ledgerId) throws IOException {
-            super.createNewLog(ledgerId);
-        }
-
-        @Override
-        public synchronized void setCurrentLogForLedgerAndAddToRotate(long ledgerId, BufferedLogChannel logChannel) {
-            BufferedLogChannel hasToRotateLogChannel = activeLogChannel;
-            activeLogChannel = logChannel;
-            if (hasToRotateLogChannel != null) {
-                rotatedLogChannels.add(hasToRotateLogChannel);
-            }
-        }
-
-        @Override
-        public BufferedLogChannel getCurrentLogForLedger(long ledgerId) {
-            return activeLogChannel;
-        }
-
-        @Override
-        public BufferedLogChannel getCurrentLogIfPresent(long entryLogId) {
-            BufferedLogChannel activeLogChannelTemp = activeLogChannel;
-            if ((activeLogChannelTemp != null) && (activeLogChannelTemp.getLogId() == entryLogId)) {
-                return activeLogChannelTemp;
-            }
-            return null;
-        }
-
-        @Override
-        public File getDirForNextEntryLog(List<File> writableLedgerDirs) {
-            Collections.shuffle(writableLedgerDirs);
-            return writableLedgerDirs.get(0);
-        }
-
-        @Override
-        public void checkpoint() throws IOException {
-            flushRotatedLogs();
-        }
-
-        public long getCurrentLogId() {
-            BufferedLogChannel currentActiveLogChannel = activeLogChannel;
-            if (currentActiveLogChannel != null) {
-                return currentActiveLogChannel.getLogId();
-            } else {
-                return EntryLogger.UNINITIALIZED_LOG_ID;
-            }
-        }
-
-        @Override
-        public void flushCurrentLogs() throws IOException {
-            BufferedLogChannel currentActiveLogChannel = activeLogChannel;
-            if (currentActiveLogChannel != null) {
-                /**
-                 * flushCurrentLogs method is called during checkpoint, so
-                 * metadata of the file also should be force written.
-                 */
-                flushLogChannel(currentActiveLogChannel, true);
-            }
-        }
-
-        @Override
-        void flushRotatedLogs() throws IOException {
-            List<BufferedLogChannel> channels = null;
-            synchronized (this) {
-                channels = rotatedLogChannels;
-                rotatedLogChannels = new LinkedList<BufferedLogChannel>();
-            }
-            if (null == channels) {
-                return;
-            }
-            Iterator<BufferedLogChannel> chIter = channels.iterator();
-            while (chIter.hasNext()) {
-                BufferedLogChannel channel = chIter.next();
-                try {
-                    channel.flushAndForceWrite(true);
-                } catch (IOException ioe) {
-                    // rescue from flush exception, add unflushed channels back
-                    synchronized (this) {
-                        if (null == rotatedLogChannels) {
-                            rotatedLogChannels = channels;
-                        } else {
-                            rotatedLogChannels.addAll(0, channels);
-                        }
-                    }
-                    throw ioe;
-                }
-                // remove the channel from the list after it is successfully flushed
-                chIter.remove();
-                // since this channel is only used for writing, after flushing the channel,
-                // we had to close the underlying file channel. Otherwise, we might end up
-                // leaking fds which cause the disk spaces could not be reclaimed.
-                closeFileChannel(channel);
-                recentlyCreatedEntryLogsStatus.flushRotatedEntryLog(channel.getLogId());
-                LOG.info("Synced entry logger {} to disk.", channel.getLogId());
-            }
-        }
-
-        @Override
-        public void close() throws IOException {
-            if (activeLogChannel != null) {
-                closeFileChannel(activeLogChannel);
-            }
-        }
-
-        @Override
-        public void forceClose() {
-            if (activeLogChannel != null) {
-                forceCloseFileChannel(activeLogChannel);
-            }
-        }
-
-        @Override
-        public void prepareEntryMemTableFlush() {
-            logIdBeforeFlush = getCurrentLogId();
-        }
-
-        @Override
-        public boolean commitEntryMemTableFlush() throws IOException {
-            long logIdAfterFlush = getCurrentLogId();
-            /*
-             * in any case that an entry log reaches the limit, we roll the log
-             * and start checkpointing. if a memory table is flushed spanning
-             * over two entry log files, we also roll log. this is for
-             * performance consideration: since we don't wanna checkpoint a new
-             * log file that ledger storage is writing to.
-             */
-            if (reachEntryLogLimit(activeLogChannel, 0L) || logIdAfterFlush != logIdBeforeFlush) {
-                LOG.info("Rolling entry logger since it reached size limitation");
-                createNewLog(UNASSIGNED_LEDGERID);
-                return true;
-            }
-            return false;
-        }
-
-        @Override
-        public void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) throws IOException{
-            if (numBytesFlushed > 0) {
-                // if bytes are added between previous flush and this checkpoint,
-                // it means bytes might live at current active entry log, we need
-                // roll current entry log and then issue checkpoint to underlying
-                // interleaved ledger storage.
-                createNewLog(UNASSIGNED_LEDGERID);
-            }
-        }
-    }
-
     /**
      * Flushes all rotated log channels. After log channels are flushed,
      * move leastUnflushedLogId ptr to current logId.
@@ -1273,7 +711,7 @@ void flushCompactionLog() throws IOException {
     void createNewCompactionLog() throws IOException {
         synchronized (compactionLogLock) {
             if (compactionLogChannel == null) {
-                compactionLogChannel = entryLoggerAllocator.createNewLogForCompaction();
+                compactionLogChannel = entryLogManager.createNewLogForCompaction();
             }
         }
     }
@@ -1702,7 +1140,7 @@ public void shutdown() {
         entryLoggerAllocator.stop();
     }
 
-    private static void closeFileChannel(BufferedChannelBase channel) throws IOException {
+    static void closeFileChannel(BufferedChannelBase channel) throws IOException {
         if (null == channel) {
             return;
         }
@@ -1713,7 +1151,7 @@ private static void closeFileChannel(BufferedChannelBase channel) throws IOExcep
         }
     }
 
-    private static void forceCloseFileChannel(BufferedChannelBase channel) {
+    static void forceCloseFileChannel(BufferedChannelBase channel) {
         if (null == channel) {
             return;
         }
@@ -1784,4 +1222,4 @@ synchronized long getLeastUnflushedLogId() {
             return leastUnflushedLogId;
         }
     }
-}
\ No newline at end of file
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java
new file mode 100644
index 000000000..d33d7c48d
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java
@@ -0,0 +1,215 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.apache.bookkeeper.bookie.TransactionalEntryLogCompactor.COMPACTING_SUFFIX;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+
+/**
+ * An allocator pre-allocates entry log files.
+ */
+@Slf4j
+class EntryLoggerAllocator {
+
+    private long preallocatedLogId;
+    private Future<BufferedLogChannel> preallocation = null;
+    private ExecutorService allocatorExecutor;
+    private final ServerConfiguration conf;
+    private final LedgerDirsManager ledgerDirsManager;
+    private final Object createEntryLogLock = new Object();
+    private final Object createCompactionLogLock = new Object();
+    private final EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus;
+    private final boolean entryLogPreAllocationEnabled;
+    final ByteBuf logfileHeader = Unpooled.buffer(EntryLogger.LOGFILE_HEADER_SIZE);
+
+    EntryLoggerAllocator(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager,
+            EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus, long logId) {
+        this.conf = conf;
+        this.ledgerDirsManager = ledgerDirsManager;
+        this.preallocatedLogId = logId;
+        this.recentlyCreatedEntryLogsStatus = recentlyCreatedEntryLogsStatus;
+        this.entryLogPreAllocationEnabled = conf.isEntryLogFilePreAllocationEnabled();
+        this.allocatorExecutor = Executors.newSingleThreadExecutor();
+
+        // Initialize the entry log header buffer. This cannot be a static object
+        // since in our unit tests, we run multiple Bookies and thus EntryLoggers
+        // within the same JVM. All of these Bookie instances access this header
+        // so there can be race conditions when entry logs are rolled over and
+        // this header buffer is cleared before writing it into the new logChannel.
+        logfileHeader.writeBytes("BKLO".getBytes(UTF_8));
+        logfileHeader.writeInt(EntryLogger.HEADER_CURRENT_VERSION);
+        logfileHeader.writerIndex(EntryLogger.LOGFILE_HEADER_SIZE);
+
+    }
+
+    synchronized long getPreallocatedLogId() {
+        return preallocatedLogId;
+    }
+
+    BufferedLogChannel createNewLog(File dirForNextEntryLog) throws IOException {
+        synchronized (createEntryLogLock) {
+            BufferedLogChannel bc;
+            if (!entryLogPreAllocationEnabled){
+                // create a new log directly
+                bc = allocateNewLog(dirForNextEntryLog);
+                return bc;
+            } else {
+                // allocate directly to response request
+                if (null == preallocation){
+                    bc = allocateNewLog(dirForNextEntryLog);
+                } else {
+                    // has a preallocated entry log
+                    try {
+                        bc = preallocation.get();
+                    } catch (ExecutionException ee) {
+                        if (ee.getCause() instanceof IOException) {
+                            throw (IOException) (ee.getCause());
+                        } else {
+                            throw new IOException("Error to execute entry log allocation.", ee);
+                        }
+                    } catch (CancellationException ce) {
+                        throw new IOException("Task to allocate a new entry log is cancelled.", ce);
+                    } catch (InterruptedException ie) {
+                        Thread.currentThread().interrupt();
+                        throw new IOException("Intrrupted when waiting a new entry log to be allocated.", ie);
+                    }
+                }
+                // preallocate a new log in background upon every call
+                preallocation = allocatorExecutor.submit(() -> allocateNewLog(dirForNextEntryLog));
+                return bc;
+            }
+        }
+    }
+
+    BufferedLogChannel createNewLogForCompaction(File dirForNextEntryLog) throws IOException {
+        synchronized (createCompactionLogLock) {
+            return allocateNewLog(dirForNextEntryLog, COMPACTING_SUFFIX);
+        }
+    }
+
+    private synchronized BufferedLogChannel allocateNewLog(File dirForNextEntryLog) throws IOException {
+        return allocateNewLog(dirForNextEntryLog, ".log");
+    }
+
+    /**
+     * Allocate a new log file.
+     */
+    private synchronized BufferedLogChannel allocateNewLog(File dirForNextEntryLog, String suffix) throws IOException {
+        List<File> ledgersDirs = ledgerDirsManager.getAllLedgerDirs();
+        String logFileName;
+        // It would better not to overwrite existing entry log files
+        File testLogFile = null;
+        do {
+            if (preallocatedLogId >= Integer.MAX_VALUE) {
+                preallocatedLogId = 0;
+            } else {
+                ++preallocatedLogId;
+            }
+            logFileName = Long.toHexString(preallocatedLogId) + suffix;
+            for (File dir : ledgersDirs) {
+                testLogFile = new File(dir, logFileName);
+                if (testLogFile.exists()) {
+                    log.warn("Found existed entry log " + testLogFile
+                           + " when trying to create it as a new log.");
+                    testLogFile = null;
+                    break;
+                }
+            }
+        } while (testLogFile == null);
+
+        File newLogFile = new File(dirForNextEntryLog, logFileName);
+        FileChannel channel = new RandomAccessFile(newLogFile, "rw").getChannel();
+
+        BufferedLogChannel logChannel = new BufferedLogChannel(channel, conf.getWriteBufferBytes(),
+                conf.getReadBufferBytes(), preallocatedLogId, newLogFile, conf.getFlushIntervalInBytes());
+        logfileHeader.readerIndex(0);
+        logChannel.write(logfileHeader);
+
+        for (File f : ledgersDirs) {
+            setLastLogId(f, preallocatedLogId);
+        }
+
+        if (suffix.equals(EntryLogger.LOG_FILE_SUFFIX)) {
+            recentlyCreatedEntryLogsStatus.createdEntryLog(preallocatedLogId);
+        }
+
+        log.info("Created new entry log file {} for logId {}.", newLogFile, preallocatedLogId);
+        return logChannel;
+    }
+
+    /**
+     * writes the given id to the "lastId" file in the given directory.
+     */
+    private void setLastLogId(File dir, long logId) throws IOException {
+        FileOutputStream fos;
+        fos = new FileOutputStream(new File(dir, "lastId"));
+        BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(fos, UTF_8));
+        try {
+            bw.write(Long.toHexString(logId) + "\n");
+            bw.flush();
+        } catch (IOException e) {
+            log.warn("Failed write lastId file");
+        } finally {
+            try {
+                bw.close();
+            } catch (IOException e) {
+                log.error("Could not close lastId file in {}", dir.getPath());
+            }
+        }
+    }
+
+    /**
+     * Stop the allocator.
+     */
+    void stop() {
+        // wait until the preallocation finished.
+        allocatorExecutor.shutdown();
+        log.info("Stopped entry logger preallocator.");
+    }
+
+    /**
+     * get the preallocation for tests.
+     */
+    Future<BufferedLogChannel> getPreallocationFuture(){
+        return preallocation;
+    }
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
index 4257cccd3..4c4514a7e 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
@@ -26,8 +26,6 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.IntStream;
 
-import org.apache.bookkeeper.bookie.EntryLogger.EntryLogManagerBase;
-import org.apache.bookkeeper.bookie.EntryLogger.EntryLogManagerForSingleEntryLog;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.util.DiskChecker;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
index d402a85f8..37eebcb45 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
@@ -49,9 +49,6 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
-import org.apache.bookkeeper.bookie.EntryLogger.EntryLogManager;
-import org.apache.bookkeeper.bookie.EntryLogger.EntryLogManagerBase;
-import org.apache.bookkeeper.bookie.EntryLogger.EntryLogManagerForSingleEntryLog;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
@@ -811,7 +808,7 @@ public void testFlushIntervalInBytes() throws Exception {
         conf.setEntryLogPerLedgerEnabled(false);
         EntryLogger newEntryLogger = new EntryLogger(conf, ledgerDirsManager);
         EntryLogManager newEntryLogManager = newEntryLogger.getEntryLogManager();
-        Assert.assertEquals("EntryLogManager class type", EntryLogger.EntryLogManagerForSingleEntryLog.class,
+        Assert.assertEquals("EntryLogManager class type", EntryLogManagerForSingleEntryLog.class,
                 newEntryLogManager.getClass());
 
         ByteBuf buf = newEntryLogger.readEntry(ledgerId, 0L, entry0Position);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java
index 6cbdcdef8..604099cc3 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java
@@ -43,7 +43,6 @@
 import java.util.stream.LongStream;
 
 import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
-import org.apache.bookkeeper.bookie.EntryLogger.EntryLogManagerBase;
 import org.apache.bookkeeper.bookie.Journal.LastLogMark;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
index 9642c18d1..f3ef1086c 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
@@ -36,7 +36,6 @@
 import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
-import org.apache.bookkeeper.bookie.EntryLogger.EntryLogManagerForSingleEntryLog;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.junit.After;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services