You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2014/01/22 14:35:04 UTC

svn commit: r1560348 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/ bookkeeper...

Author: ivank
Date: Wed Jan 22 13:35:03 2014
New Revision: 1560348

URL: http://svn.apache.org/r1560348
Log:
BOOKKEEPER-643: Improve concurrency of entry logger (sijie & Aniruddha via ivank)

Added:
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannelBase.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1560348&r1=1560347&r2=1560348&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Wed Jan 22 13:35:03 2014
@@ -148,6 +148,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-720: CheckpointSource.MIN#compareTo does exactly the opposite of what it should (ivank via sijie)
 
+        BOOKKEEPER-643: Improve concurrency of entry logger (sijie & Aniruddha via ivank)
+
       hedwig-server:
 
         BOOKKEEPER-601: readahead cache size isn't updated correctly (sijie via fpj)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java?rev=1560348&r1=1560347&r2=1560348&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java Wed Jan 22 13:35:03 2014
@@ -24,49 +24,48 @@ package org.apache.bookkeeper.bookie;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
+import org.apache.bookkeeper.util.ZeroBuffer;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Provides a buffering layer in front of a FileChannel.
  */
-public class BufferedChannel {
+public class BufferedChannel extends BufferedReadChannel {
+    // The capacity of the write buffer.
+    protected final int writeCapacity;
+    // The position of the file channel's write pointer.
+    protected AtomicLong writeBufferStartPosition = new AtomicLong(0);
+    // The buffer used to write operations.
+    protected final ByteBuffer writeBuffer;
+    // The absolute position of the next write operation.
+    protected volatile long position;
 
-    static final byte zeroPage[] = new byte[64 * 1024];
-
-    ByteBuffer writeBuffer;
-    ByteBuffer readBuffer;
-    private FileChannel bc;
-    long position;
-    int capacity;
-    long readBufferStartPosition;
-    long writeBufferStartPosition;
     // make constructor to be public for unit test
-    public BufferedChannel(FileChannel bc, int capacity) throws IOException {
-        this.bc = bc;
-        this.capacity = capacity;
-        position = bc.position();
-        writeBufferStartPosition = position;
+    public BufferedChannel(FileChannel fc, int capacity) throws IOException {
+        // Use the same capacity for read and write buffers.
+        this(fc, capacity, capacity);
     }
 
-    /**
-     * @return file channel
-     */
-    FileChannel getFileChannel() {
-        return this.bc;
+    public BufferedChannel(FileChannel fc, int writeCapacity, int readCapacity) throws IOException {
+        super(fc, readCapacity);
+        // Set the read buffer's limit to readCapacity.
+        this.readBuffer.limit(readCapacity);
+        this.writeCapacity = writeCapacity;
+        this.position = fc.position();
+        this.writeBufferStartPosition.set(position);
+        this.writeBuffer = ByteBuffer.allocateDirect(writeCapacity);
     }
 
-    /*    public void close() throws IOException {
-            bc.close();
-        }
-    */
-//    public boolean isOpen() {
-//        return bc.isOpen();
-//    }
-
-    synchronized public int write(ByteBuffer src) throws IOException {
+    /**
+     * Write all the data in src to the {@link FileChannel}. Note that this function can
+     * buffer or re-order writes based on the implementation. These writes will be flushed
+     * to the disk only when flush() is invoked.
+     *
+     * @param src The source ByteBuffer which contains the data to be written.
+     * @throws IOException if a write operation fails.
+     */
+    synchronized public void write(ByteBuffer src) throws IOException {
         int copied = 0;
-        if (writeBuffer == null) {
-            writeBuffer = ByteBuffer.allocateDirect(capacity);
-        }
         while(src.remaining() > 0) {
             int truncated = 0;
             if (writeBuffer.remaining() < src.remaining()) {
@@ -76,32 +75,31 @@ public class BufferedChannel {
             copied += src.remaining();
             writeBuffer.put(src);
             src.limit(src.limit()+truncated);
+            // if we have run out of buffer space, we should flush to the file
             if (writeBuffer.remaining() == 0) {
-                writeBuffer.flip();
-                bc.write(writeBuffer);
-                writeBuffer.clear();
-                writeBufferStartPosition = bc.position();
+                flushInternal();
             }
         }
         position += copied;
-        return copied;
     }
 
+    /**
+     * Get the position where the next write operation will begin writing from.
+     * @return
+     */
     public long position() {
         return position;
     }
 
     /**
-     * Retrieve the current size of the underlying FileChannel
-     *
-     * @return FileChannel size measured in bytes
-     *
-     * @throws IOException if some I/O error occurs reading the FileChannel
+     * Get the position of the file channel's write pointer.
+     * @return
      */
-    public long size() throws IOException {
-        return bc.size();
+    public long getFileChannelPosition() {
+        return writeBufferStartPosition.get();
     }
 
+
     /**
      * Write any data in the buffer to the file. If sync is set to true, force a sync operation so that
      * data is persisted to the disk.
@@ -123,15 +121,12 @@ public class BufferedChannel {
      * @throws IOException if the write fails.
      */
     private void flushInternal() throws IOException {
-        if (writeBuffer == null) {
-            return;
-        }
         writeBuffer.flip();
         do {
-            bc.write(writeBuffer);
+            fileChannel.write(writeBuffer);
         } while (writeBuffer.hasRemaining());
         writeBuffer.clear();
-        writeBufferStartPosition = bc.position();
+        writeBufferStartPosition.set(fileChannel.position());
     }
 
     public long forceWrite(boolean forceMetadata) throws IOException {
@@ -139,30 +134,21 @@ public class BufferedChannel {
         // before issuing this force write hence is guaranteed to be made durable by
         // the force write, any flush that happens after this may or may
         // not be flushed
-        long positionForceWrite;
-        synchronized (this) {
-            positionForceWrite = writeBufferStartPosition;
-        }
-        bc.force(forceMetadata);
+        long positionForceWrite = writeBufferStartPosition.get();
+        fileChannel.force(forceMetadata);
         return positionForceWrite;
     }
 
-    /*public Channel getInternalChannel() {
-        return bc;
-    }*/
-    synchronized public int read(ByteBuffer buff, long pos) throws IOException {
-        if (readBuffer == null) {
-            readBuffer = ByteBuffer.allocateDirect(capacity);
-            readBufferStartPosition = Long.MIN_VALUE;
-        }
+    @Override
+    synchronized public int read(ByteBuffer dest, long pos) throws IOException {
         long prevPos = pos;
-        while(buff.remaining() > 0) {
+        while(dest.remaining() > 0) {
             // check if it is in the write buffer
-            if (writeBuffer != null && writeBufferStartPosition <= pos) {
-                long positionInBuffer = pos - writeBufferStartPosition;
+            if (writeBuffer != null && writeBufferStartPosition.get() <= pos) {
+                long positionInBuffer = pos - writeBufferStartPosition.get();
                 long bytesToCopy = writeBuffer.position()-positionInBuffer;
-                if (bytesToCopy > buff.remaining()) {
-                    bytesToCopy = buff.remaining();
+                if (bytesToCopy > dest.remaining()) {
+                    bytesToCopy = dest.remaining();
                 }
                 if (bytesToCopy == 0) {
                     throw new IOException("Read past EOF");
@@ -170,43 +156,49 @@ public class BufferedChannel {
                 ByteBuffer src = writeBuffer.duplicate();
                 src.position((int) positionInBuffer);
                 src.limit((int) (positionInBuffer+bytesToCopy));
-                buff.put(src);
+                dest.put(src);
                 pos+= bytesToCopy;
-            } else if (writeBuffer == null && writeBufferStartPosition <= pos) {
+            } else if (writeBuffer == null && writeBufferStartPosition.get() <= pos) {
                 // here we reach the end
                 break;
                 // first check if there is anything we can grab from the readBuffer
             } else if (readBufferStartPosition <= pos && pos < readBufferStartPosition+readBuffer.capacity()) {
                 long positionInBuffer = pos - readBufferStartPosition;
                 long bytesToCopy = readBuffer.capacity()-positionInBuffer;
-                if (bytesToCopy > buff.remaining()) {
-                    bytesToCopy = buff.remaining();
+                if (bytesToCopy > dest.remaining()) {
+                    bytesToCopy = dest.remaining();
                 }
                 ByteBuffer src = readBuffer.duplicate();
                 src.position((int) positionInBuffer);
                 src.limit((int) (positionInBuffer+bytesToCopy));
-                buff.put(src);
+                dest.put(src);
                 pos += bytesToCopy;
                 // let's read it
             } else {
                 readBufferStartPosition = pos;
                 readBuffer.clear();
                 // make sure that we don't overlap with the write buffer
-                if (readBufferStartPosition + readBuffer.capacity() >= writeBufferStartPosition) {
-                    readBufferStartPosition = writeBufferStartPosition - readBuffer.capacity();
+                if (readBufferStartPosition + readBuffer.capacity() >= writeBufferStartPosition.get()) {
+                    readBufferStartPosition = writeBufferStartPosition.get() - readBuffer.capacity();
                     if (readBufferStartPosition < 0) {
-                        readBuffer.put(zeroPage, 0, (int) -readBufferStartPosition);
+                        ZeroBuffer.put(readBuffer, (int)-readBufferStartPosition);
                     }
                 }
                 while(readBuffer.remaining() > 0) {
-                    if (bc.read(readBuffer, readBufferStartPosition+readBuffer.position()) <= 0) {
+                    if (fileChannel.read(readBuffer, readBufferStartPosition+readBuffer.position()) <= 0) {
                         throw new IOException("Short read");
                     }
                 }
-                readBuffer.put(zeroPage, 0, readBuffer.remaining());
+                ZeroBuffer.put(readBuffer);
                 readBuffer.clear();
             }
         }
         return (int)(pos - prevPos);
     }
+
+    @Override
+    synchronized public void clear() {
+        super.clear();
+        writeBuffer.clear();
+    }
 }

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannelBase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannelBase.java?rev=1560348&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannelBase.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannelBase.java Wed Jan 22 13:35:03 2014
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.bookie;
+
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+
+public abstract class BufferedChannelBase {
+    protected final FileChannel fileChannel;
+
+    protected BufferedChannelBase(FileChannel fc) {
+        this.fileChannel = fc;
+    }
+
+    protected FileChannel validateAndGetFileChannel() throws IOException {
+        // Even if we have BufferedChannelBase objects in the cache, higher layers should
+        // guarantee that once a log file has been closed and possibly deleted during garbage
+        // collection, attempts will not be made to read from it
+        if (!fileChannel.isOpen()) {
+            throw new IOException("Attempting to access a file channel that has already been closed");
+        }
+        return fileChannel;
+    }
+
+    /**
+     * Get the current size of the underlying FileChannel.
+     * @return
+     */
+    public long size() throws IOException {
+        return validateAndGetFileChannel().size();
+    }
+
+    /**
+     * Get the {@link FileChannel} that this BufferedChannel wraps around.
+     * @return
+     */
+    public FileChannel getFileChannel() {
+        return fileChannel;
+    }
+}

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java?rev=1560348&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java Wed Jan 22 13:35:03 2014
@@ -0,0 +1,103 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+/**
+ * A Buffered channel without a write buffer. Only reads are buffered.
+ */
+public class BufferedReadChannel extends BufferedChannelBase {
+    private static Logger LOG = LoggerFactory.getLogger(BufferedReadChannel.class);
+    // The capacity of the read buffer.
+    protected final int readCapacity;
+    // The buffer for read operations.
+    protected ByteBuffer readBuffer;
+    // The starting position of the data currently in the read buffer.
+    protected long readBufferStartPosition = Long.MIN_VALUE;
+
+    long invocationCount = 0;
+    long cacheHitCount = 0;
+
+    public BufferedReadChannel(FileChannel fileChannel, int readCapacity) throws IOException {
+        super(fileChannel);
+        this.readCapacity = readCapacity;
+        this.readBuffer = ByteBuffer.allocateDirect(readCapacity);
+        this.readBuffer.limit(0);
+    }
+
+    /**
+     * Read as many bytes into dest as dest.capacity() starting at position pos in the
+     * FileChannel. This function can read from the buffer or the file channel
+     * depending on the implementation..
+     * @param dest
+     * @param pos
+     * @return The total number of bytes read. -1 if the given position is greater than or equal to the file's current size.
+     * @throws IOException if I/O error occurs
+     */
+    synchronized public int read(ByteBuffer dest, long pos) throws IOException {
+        invocationCount++;
+        long currentPosition = pos;
+        long eof = validateAndGetFileChannel().size();
+        // return -1 if the given position is greater than or equal to the file's current size.
+        if (pos >= eof) {
+            return -1;
+        }
+        while (dest.remaining() > 0) {
+            // Check if the data is in the buffer, if so, copy it.
+            if (readBufferStartPosition <= currentPosition && currentPosition < readBufferStartPosition + readBuffer.limit()) {
+                long posInBuffer = currentPosition - readBufferStartPosition;
+                long bytesToCopy = Math.min(dest.remaining(), readBuffer.limit() - posInBuffer);
+                ByteBuffer rbDup = readBuffer.duplicate();
+                rbDup.position((int)posInBuffer);
+                rbDup.limit((int)(posInBuffer + bytesToCopy));
+                dest.put(rbDup);
+                currentPosition += bytesToCopy;
+                cacheHitCount++;
+            } else if (currentPosition >= eof) {
+                // here we reached eof.
+                break;
+            } else {
+                // We don't have it in the buffer, so put necessary data in the buffer
+                readBuffer.clear();
+                readBufferStartPosition = currentPosition;
+                int readBytes = 0;
+                if ((readBytes = validateAndGetFileChannel().read(readBuffer, currentPosition)) <= 0) {
+                    throw new IOException("Reading from filechannel returned a non-positive value. Short read.");
+                }
+                readBuffer.limit(readBytes);
+            }
+        }
+        return (int)(currentPosition - pos);
+    }
+
+    synchronized public void clear() {
+        readBuffer.clear();
+        readBuffer.limit(0);
+    }
+
+}

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java?rev=1560348&r1=1560347&r2=1560348&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java Wed Jan 22 13:35:03 2014
@@ -40,8 +40,16 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map.Entry;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -51,6 +59,8 @@ import org.apache.bookkeeper.util.IOUtil
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.MapMaker;
+
 /**
  * This class manages the writing of the bookkeeper entries. All the new
  * entries are written to a common log. The LedgerCache will have pointers
@@ -61,19 +71,32 @@ import org.slf4j.LoggerFactory;
 public class EntryLogger {
     private static final Logger LOG = LoggerFactory.getLogger(EntryLogger.class);
 
+    private static class BufferedLogChannel extends BufferedChannel {
+        final private long logId;
+        public BufferedLogChannel(FileChannel fc, int writeCapacity,
+                                  int readCapacity, long logId) throws IOException {
+            super(fc, writeCapacity, readCapacity);
+            this.logId = logId;
+        }
+        public long getLogId() {
+            return logId;
+        }
+    }
+
     volatile File currentDir;
     private final LedgerDirsManager ledgerDirsManager;
     private final AtomicBoolean shouldCreateNewEntryLog = new AtomicBoolean(false);
 
-    private long logId;
     private volatile long leastUnflushedLogId;
 
     /**
      * The maximum size of a entry logger file.
      */
     final long logSizeLimit;
-    private List<BufferedChannel> logChannelsToFlush;
-    private volatile BufferedChannel logChannel;
+    private List<BufferedLogChannel> logChannelsToFlush;
+    private volatile BufferedLogChannel logChannel;
+    private final EntryLoggerAllocator entryLoggerAllocator;
+    private final boolean entryLogPreAllocationEnabled;
     private final CopyOnWriteArrayList<EntryLogListener> listeners
         = new CopyOnWriteArrayList<EntryLogListener>();
 
@@ -83,10 +106,12 @@ public class EntryLogger {
      */
     final static int LOGFILE_HEADER_SIZE = 1024;
     final ByteBuffer LOGFILE_HEADER = ByteBuffer.allocate(LOGFILE_HEADER_SIZE);
+    final static long INVALID_LID = -1L;
 
     final static int MIN_SANE_ENTRY_SIZE = 8 + 8;
     final static long MB = 1024 * 1024;
 
+    final ServerConfiguration conf;
     /**
      * Scan entries in a entry log file.
      */
@@ -143,6 +168,7 @@ public class EntryLogger {
         }
         // log size limit
         this.logSizeLimit = conf.getEntryLogSizeLimit();
+        this.entryLogPreAllocationEnabled = conf.isEntryLogFilePreAllocationEnabled();
 
         // Initialize the entry log header buffer. This cannot be a static object
         // since in our unit tests, we run multiple Bookies and thus EntryLoggers
@@ -152,7 +178,7 @@ public class EntryLogger {
         LOGFILE_HEADER.put("BKLO".getBytes(UTF_8));
 
         // Find the largest logId
-        logId = -1;
+        long logId = INVALID_LID;
         for (File dir : ledgerDirsManager.getAllLedgerDirs()) {
             if (!dir.exists()) {
                 throw new FileNotFoundException(
@@ -164,6 +190,8 @@ public class EntryLogger {
             }
         }
         this.leastUnflushedLogId = logId + 1;
+        this.entryLoggerAllocator = new EntryLoggerAllocator(logId);
+        this.conf = conf;
         initialize();
     }
 
@@ -174,9 +202,86 @@ public class EntryLogger {
     }
 
     /**
-     * Maps entry log files to open channels.
+     * If the log id of current writable channel is the same as entryLogId and the position
+     * we want to read might end up reading from a position in the write buffer of the
+     * buffered channel, route this read to the current logChannel. Else,
+     * read from the BufferedReadChannel that is provided.
+     * @param entryLogId
+     * @param channel
+     * @param buff remaining() on this bytebuffer tells us the last position that we
+     *             expect to read.
+     * @param pos The starting position from where we want to read.
+     * @return
+     */
+    private int readFromLogChannel(long entryLogId, BufferedReadChannel channel, ByteBuffer buff, long pos)
+            throws IOException {
+        BufferedLogChannel bc = logChannel;
+        if (null != bc) {
+            if (entryLogId == bc.getLogId()) {
+                synchronized (bc) {
+                    if (pos + buff.remaining() >= bc.getFileChannelPosition()) {
+                        return bc.read(buff, pos);
+                    }
+                }
+            }
+        }
+        return channel.read(buff, pos);
+    }
+
+    /**
+     * A thread-local variable that wraps a mapping of log ids to bufferedchannels
+     * These channels should be used only for reading. logChannel is the one
+     * that is used for writes.
+     */
+    private final ThreadLocal<Map<Long, BufferedReadChannel>> logid2Channel
+            = new ThreadLocal<Map<Long, BufferedReadChannel>>() {
+        @Override
+        public Map<Long, BufferedReadChannel> initialValue() {
+            // Since this is thread local there only one modifier
+            // We dont really need the concurrency, but we need to use
+            // the weak values. Therefore using the concurrency level of 1
+            return new MapMaker().concurrencyLevel(1)
+                .weakValues()
+                .makeMap();
+        }
+    };
+
+    /**
+     * Each thread local buffered read channel can share the same file handle because reads are not relative
+     * and don't cause a change in the channel's position. We use this map to store the file channels. Each
+     * file channel is mapped to a log id which represents an open log file.
+     */
+    private final ConcurrentMap<Long, FileChannel> logid2FileChannel
+            = new ConcurrentHashMap<Long, FileChannel>();
+
+    /**
+     * Put the logId, bc pair in the map responsible for the current thread.
+     * @param logId
+     * @param bc
+     */
+    public BufferedReadChannel putInReadChannels(long logId, BufferedReadChannel bc) {
+        Map<Long, BufferedReadChannel> threadMap = logid2Channel.get();
+        return threadMap.put(logId, bc);
+    }
+
+    /**
+     * Remove all entries for this log file in each thread's cache.
+     * @param logId
      */
-    private ConcurrentHashMap<Long, BufferedChannel> channels = new ConcurrentHashMap<Long, BufferedChannel>();
+    public void removeFromChannelsAndClose(long logId) {
+        FileChannel fileChannel = logid2FileChannel.remove(logId);
+        if (null != fileChannel) {
+            try {
+                fileChannel.close();
+            } catch (IOException e) {
+                LOG.warn("Exception while closing channel for log file:" + logId);
+            }
+        }
+    }
+
+    public BufferedReadChannel getFromChannels(long logId) {
+        return logid2Channel.get().get(logId);
+    }
 
     /**
      * Get the least unflushed log id. Garbage collector thread should not process
@@ -189,7 +294,7 @@ public class EntryLogger {
     }
 
     synchronized long getCurrentLogId() {
-        return logId;
+        return logChannel.getLogId();
     }
 
     protected void initialize() throws IOException {
@@ -259,38 +364,108 @@ public class EntryLogger {
     void createNewLog() throws IOException {
         if (null != logChannel) {
             if (null == logChannelsToFlush) {
-                logChannelsToFlush = new LinkedList<BufferedChannel>();
+                logChannelsToFlush = new LinkedList<BufferedLogChannel>();
             }
             // flush the internal buffer back to filesystem but not sync disk
             // so the readers could access the data from filesystem.
             logChannel.flush(false);
             logChannelsToFlush.add(logChannel);
+            LOG.info("Flushing entry logger {} back to filesystem, pending for syncing entry loggers : {}.",
+                    logChannel.getLogId(), logChannelsToFlush);
             for (EntryLogListener listener : listeners) {
                 listener.onRotateEntryLog();
             }
         }
-        String logFileName = null;
-        do {
-            logFileName = Long.toHexString(++logId) + ".log";
-            for (File dir : ledgerDirsManager.getAllLedgerDirs()) {
-                File newLogFile = new File(dir, logFileName);
-                if (newLogFile.exists()) {
-                    LOG.warn("Found existed entry log " + newLogFile
-                             + " when trying to create it as a new log.");
-                    logFileName = null;
-                    break;
+        logChannel = entryLoggerAllocator.createNewLog();
+    }
+
+    /**
+     * An allocator pre-allocates entry log files.
+     */
+    class EntryLoggerAllocator {
+
+        long preallocatedLogId;
+        Future<BufferedLogChannel> preallocation = null;
+        ExecutorService allocatorExecutor;
+
+        EntryLoggerAllocator(long logId) {
+            preallocatedLogId = logId;
+            allocatorExecutor = Executors.newSingleThreadExecutor();
+        }
+
+        synchronized BufferedLogChannel createNewLog() throws IOException {
+            BufferedLogChannel bc;
+            if (!entryLogPreAllocationEnabled || null == preallocation) {
+                // initialization time to create a new log
+                bc = allocateNewLog();
+            } else {
+                // has a preallocated entry log
+                try {
+                    bc = preallocation.get();
+                } catch (ExecutionException ee) {
+                    if (ee.getCause() instanceof IOException) {
+                        throw (IOException) (ee.getCause());
+                    } else {
+                        throw new IOException("Error to execute entry log allocation.", ee);
+                    }
+                } catch (CancellationException ce) {
+                    throw new IOException("Task to allocate a new entry log is cancelled.", ce);
+                } catch (InterruptedException ie) {
+                    throw new IOException("Intrrupted when waiting a new entry log to be allocated.", ie);
                 }
+                preallocation = allocatorExecutor.submit(new Callable<BufferedLogChannel>() {
+                    @Override
+                    public BufferedLogChannel call() throws IOException {
+                        return allocateNewLog();
+                    }
+                });
             }
-        } while (logFileName == null);
+            LOG.info("Created new entry logger {}.", bc.getLogId());
+            return bc;
+        }
+
+        /**
+         * Allocate a new log file.
+         */
+        BufferedLogChannel allocateNewLog() throws IOException {
+            List<File> list = ledgerDirsManager.getWritableLedgerDirs();
+            Collections.shuffle(list);
+            // It would better not to overwrite existing entry log files
+            File newLogFile = null;
+            do {
+                String logFileName = Long.toHexString(++preallocatedLogId) + ".log";
+                for (File dir : list) {
+                    newLogFile = new File(dir, logFileName);
+                    currentDir = dir;
+                    if (newLogFile.exists()) {
+                        LOG.warn("Found existed entry log " + newLogFile
+                               + " when trying to create it as a new log.");
+                        newLogFile = null;
+                        break;
+                    }
+                }
+            } while (newLogFile == null);
+
+            FileChannel channel = new RandomAccessFile(newLogFile, "rw").getChannel();
+            BufferedLogChannel logChannel = new BufferedLogChannel(channel,
+                    conf.getWriteBufferBytes(), conf.getReadBufferBytes(), preallocatedLogId);
+            logChannel.write((ByteBuffer) LOGFILE_HEADER.clear());
 
-        // Update last log id first
-        currentDir = ledgerDirsManager.pickRandomWritableDir();
-        setLastLogId(currentDir, logId);
-
-        File newLogFile = new File(currentDir, logFileName);
-        logChannel = new BufferedChannel(new RandomAccessFile(newLogFile, "rw").getChannel(), 64*1024);
-        logChannel.write((ByteBuffer) LOGFILE_HEADER.clear());
-        channels.put(logId, logChannel);
+            for (File f : list) {
+                setLastLogId(f, preallocatedLogId);
+            }
+            LOG.info("Preallocated entry logger {}.", preallocatedLogId);
+            return logChannel;
+        }
+
+        /**
+         * Stop the allocator.
+         */
+        void stop() {
+            // wait until the preallocation finished.
+            allocatorExecutor.shutdown();
+            LOG.info("Stopped entry logger preallocator.");
+        }
     }
 
     /**
@@ -300,15 +475,7 @@ public class EntryLogger {
      *          Entry Log File Id
      */
     protected boolean removeEntryLog(long entryLogId) {
-        BufferedChannel bc = channels.remove(entryLogId);
-        if (null != bc) {
-            // close its underlying file channel, so it could be deleted really
-            try {
-                bc.getFileChannel().close();
-            } catch (IOException ie) {
-                LOG.warn("Exception while closing garbage collected entryLog file : ", ie);
-            }
-        }
+        removeFromChannelsAndClose(entryLogId);
         File entryLogFile;
         try {
             entryLogFile = findFile(entryLogId);
@@ -337,6 +504,7 @@ public class EntryLogger {
             try {
                 bw.close();
             } catch (IOException e) {
+                LOG.error("Could not close lastId file in {}", dir.getPath());
             }
         }
     }
@@ -365,7 +533,7 @@ public class EntryLogger {
         }
         // no log file found in this directory
         if (0 == logs.size()) {
-            return -1;
+            return INVALID_LID;
         }
         // order the collections
         Collections.sort(logs);
@@ -380,16 +548,16 @@ public class EntryLogger {
         try {
             fis = new FileInputStream(new File(f, "lastId"));
         } catch (FileNotFoundException e) {
-            return -1;
+            return INVALID_LID;
         }
         BufferedReader br = new BufferedReader(new InputStreamReader(fis, UTF_8));
         try {
             String lastIdString = br.readLine();
             return Long.parseLong(lastIdString, 16);
         } catch (IOException e) {
-            return -1;
+            return INVALID_LID;
         } catch(NumberFormatException e) {
-            return -1;
+            return INVALID_LID;
         } finally {
             try {
                 br.close();
@@ -407,21 +575,28 @@ public class EntryLogger {
     }
 
     void flushRotatedLogs() throws IOException {
-        List<BufferedChannel> tmpChannels = null;
-        long newUnflushedLogId;
+        List<BufferedLogChannel> channels = null;
+        long flushedLogId = INVALID_LID;
         synchronized (this) {
-            tmpChannels = logChannelsToFlush;
+            channels = logChannelsToFlush;
             logChannelsToFlush = null;
-            newUnflushedLogId = logId;
         }
-        if (null == tmpChannels) {
+        if (null == channels) {
             return;
         }
-        for (BufferedChannel channel : tmpChannels) {
+        for (BufferedLogChannel channel : channels) {
             channel.flush(true);
+            // since this channel is only used for writing, after flushing the channel,
+            // we had to close the underlying file channel. Otherwise, we might end up
+            // leaking fds which cause the disk spaces could not be reclaimed.
+            closeFileChannel(channel);
+            if (channel.getLogId() > flushedLogId) {
+                flushedLogId = channel.getLogId();
+            }
+            LOG.info("Synced entry logger {} to disk.", channel.getLogId());
         }
         // move the leastUnflushedLogId ptr
-        leastUnflushedLogId = newUnflushedLogId;
+        leastUnflushedLogId = flushedLogId + 1;
     }
 
     void flush() throws IOException {
@@ -432,6 +607,7 @@ public class EntryLogger {
     synchronized void flushCurrentLog() throws IOException {
         if (logChannel != null) {
             logChannel.flush(true);
+            LOG.debug("Flush and sync current entry logger {}.", logChannel.getLogId());
         }
     }
 
@@ -458,7 +634,7 @@ public class EntryLogger {
         long pos = logChannel.position();
         logChannel.write(entry);
 
-        return (logId << 32L) | pos;
+        return (logChannel.getLogId() << 32L) | pos;
     }
 
     static long logIdForOffset(long offset) {
@@ -474,7 +650,7 @@ public class EntryLogger {
         long pos = location & 0xffffffffL;
         ByteBuffer sizeBuff = ByteBuffer.allocate(4);
         pos -= 4; // we want to get the ledgerId and length to check
-        BufferedChannel fc;
+        BufferedReadChannel fc;
         try {
             fc = getChannelForLogId(entryLogId);
         } catch (FileNotFoundException e) {
@@ -482,7 +658,7 @@ public class EntryLogger {
             newe.setStackTrace(e.getStackTrace());
             throw newe;
         }
-        if (fc.read(sizeBuff, pos) != sizeBuff.capacity()) {
+        if (readFromLogChannel(entryLogId, fc, sizeBuff, pos) != sizeBuff.capacity()) {
             throw new Bookie.NoEntryException("Short read from entrylog " + entryLogId,
                                               ledgerId, entryId);
         }
@@ -500,7 +676,7 @@ public class EntryLogger {
         }
         byte data[] = new byte[entrySize];
         ByteBuffer buff = ByteBuffer.wrap(data);
-        int rc = fc.read(buff, pos);
+        int rc = readFromLogChannel(entryLogId, fc, buff, pos);
         if ( rc != data.length) {
             // Note that throwing NoEntryException here instead of IOException is not
             // without risk. If all bookies in a quorum throw this same exception
@@ -526,8 +702,8 @@ public class EntryLogger {
         return data;
     }
 
-    private BufferedChannel getChannelForLogId(long entryLogId) throws IOException {
-        BufferedChannel fc = channels.get(entryLogId);
+    private BufferedReadChannel getChannelForLogId(long entryLogId) throws IOException {
+        BufferedReadChannel fc = getFromChannels(entryLogId);
         if (fc != null) {
             return fc;
         }
@@ -535,18 +711,16 @@ public class EntryLogger {
         // get channel is used to open an existing entry log file
         // it would be better to open using read mode
         FileChannel newFc = new RandomAccessFile(file, "r").getChannel();
-        // If the file already exists before creating a BufferedChannel layer above it,
-        // set the FileChannel's position to the end so the write buffer knows where to start.
-        newFc.position(newFc.size());
-        fc = new BufferedChannel(newFc, 8192);
-
-        BufferedChannel oldfc = channels.putIfAbsent(entryLogId, fc);
-        if (oldfc != null) {
+        FileChannel oldFc = logid2FileChannel.putIfAbsent(entryLogId, newFc);
+        if (null != oldFc) {
             newFc.close();
-            return oldfc;
-        } else {
-            return fc;
+            newFc = oldFc;
         }
+        // We set the position of the write buffer of this buffered channel to Long.MAX_VALUE
+        // so that there are no overlaps with the write buffer while reading
+        fc = new BufferedReadChannel(newFc, conf.getReadBufferBytes());
+        putInReadChannels(entryLogId, fc);
+        return fc;
     }
 
     /**
@@ -584,7 +758,7 @@ public class EntryLogger {
     protected void scanEntryLog(long entryLogId, EntryLogScanner scanner) throws IOException {
         ByteBuffer sizeBuff = ByteBuffer.allocate(4);
         ByteBuffer lidBuff = ByteBuffer.allocate(8);
-        BufferedChannel bc;
+        BufferedReadChannel bc;
         // Get the BufferedChannel for the current entry log file
         try {
             bc = getChannelForLogId(entryLogId);
@@ -601,7 +775,7 @@ public class EntryLogger {
             if (pos >= bc.size()) {
                 break;
             }
-            if (bc.read(sizeBuff, pos) != sizeBuff.capacity()) {
+            if (readFromLogChannel(entryLogId, bc, sizeBuff, pos) != sizeBuff.capacity()) {
                 throw new IOException("Short read for entry size from entrylog " + entryLogId);
             }
             long offset = pos;
@@ -614,7 +788,7 @@ public class EntryLogger {
             }
             sizeBuff.clear();
             // try to read ledger id first
-            if (bc.read(lidBuff, pos) != lidBuff.capacity()) {
+            if (readFromLogChannel(entryLogId, bc, lidBuff, pos) != lidBuff.capacity()) {
                 throw new IOException("Short read for ledger id from entrylog " + entryLogId);
             }
             lidBuff.flip();
@@ -628,7 +802,7 @@ public class EntryLogger {
             // read the entry
             byte data[] = new byte[entrySize];
             ByteBuffer buff = ByteBuffer.wrap(data);
-            int rc = bc.read(buff, pos);
+            int rc = readFromLogChannel(entryLogId, bc, buff, pos);
             if (rc != data.length) {
                 throw new IOException("Short read for ledger entry from entryLog " + entryLogId
                                     + "@" + pos + "(" + rc + "!=" + data.length + ")");
@@ -649,22 +823,44 @@ public class EntryLogger {
         LOG.info("Stopping EntryLogger");
         try {
             flush();
-            for (Entry<Long, BufferedChannel> channelEntry : channels
-                    .entrySet()) {
-                channelEntry.getValue().getFileChannel().close();
+            for (FileChannel fc : logid2FileChannel.values()) {
+                fc.close();
             }
+            // clear the mapping, so we don't need to go through the channels again in finally block in normal case.
+            logid2FileChannel.clear();
+            // close current writing log file
+            closeFileChannel(logChannel);
+            logChannel = null;
         } catch (IOException ie) {
             // we have no idea how to avoid io exception during shutting down, so just ignore it
             LOG.error("Error flush entry log during shutting down, which may cause entry log corrupted.", ie);
         } finally {
-            for (Entry<Long, BufferedChannel> channelEntry : channels
-                    .entrySet()) {
-                FileChannel fileChannel = channelEntry.getValue()
-                        .getFileChannel();
-                if (fileChannel.isOpen()) {
-                    IOUtils.close(LOG, fileChannel);
-                }
+            for (FileChannel fc : logid2FileChannel.values()) {
+                IOUtils.close(LOG, fc);
             }
+            forceCloseFileChannel(logChannel);
+        }
+        // shutdown the pre-allocation thread
+        entryLoggerAllocator.stop();
+    }
+
+    private static void closeFileChannel(BufferedChannelBase channel) throws IOException {
+        if (null == channel) {
+            return;
+        }
+        FileChannel fileChannel = channel.getFileChannel();
+        if (null != fileChannel) {
+            fileChannel.close();
+        }
+    }
+
+    private static void forceCloseFileChannel(BufferedChannelBase channel) {
+        if (null == channel) {
+            return;
+        }
+        FileChannel fileChannel = channel.getFileChannel();
+        if (null != fileChannel) {
+            IOUtils.close(LOG, fileChannel);
         }
     }
 

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java?rev=1560348&r1=1560347&r2=1560348&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java Wed Jan 22 13:35:03 2014
@@ -32,6 +32,7 @@ import com.google.common.annotations.Bet
 public class ServerConfiguration extends AbstractConfiguration {
     // Entry Log Parameters
     protected final static String ENTRY_LOG_SIZE_LIMIT = "logSizeLimit";
+    protected final static String ENTRY_LOG_FILE_PREALLOCATION_ENABLED = "entryLogFilePreallocationEnabled";
     protected final static String MINOR_COMPACTION_INTERVAL = "minorCompactionInterval";
     protected final static String MINOR_COMPACTION_THRESHOLD = "minorCompactionThreshold";
     protected final static String MAJOR_COMPACTION_INTERVAL = "majorCompactionInterval";
@@ -132,6 +133,27 @@ public class ServerConfiguration extends
     }
 
     /**
+     * Is entry log file preallocation enabled.
+     *
+     * @return whether entry log file preallocation is enabled or not.
+     */
+    public boolean isEntryLogFilePreAllocationEnabled() {
+        return this.getBoolean(ENTRY_LOG_FILE_PREALLOCATION_ENABLED, true);
+    }
+
+    /**
+     * Enable/disable entry log file preallocation.
+     *
+     * @param enabled
+     *          enable/disable entry log file preallocation.
+     * @return server configuration object.
+     */
+    public ServerConfiguration setEntryLogFilePreAllocationEnabled(boolean enabled) {
+        this.setProperty(ENTRY_LOG_FILE_PREALLOCATION_ENABLED, enabled);
+        return this;
+    }
+
+    /**
      * Get Garbage collection wait time
      *
      * @return gc wait time

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java?rev=1560348&r1=1560347&r2=1560348&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java Wed Jan 22 13:35:03 2014
@@ -95,11 +95,13 @@ public class CompactionTest extends Book
     public void setUp() throws Exception {
         // Set up the configuration properties needed.
         baseConf.setEntryLogSizeLimit(numEntries * ENTRY_SIZE);
+        // Disable skip list for compaction
         baseConf.setGcWaitTime(gcWaitTime);
         baseConf.setMinorCompactionThreshold(minorCompactionThreshold);
         baseConf.setMajorCompactionThreshold(majorCompactionThreshold);
         baseConf.setMinorCompactionInterval(minorCompactionInterval);
         baseConf.setMajorCompactionInterval(majorCompactionInterval);
+        baseConf.setEntryLogFilePreAllocationEnabled(false);
 
         super.setUp();
     }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java?rev=1560348&r1=1560347&r2=1560348&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java Wed Jan 22 13:35:03 2014
@@ -57,6 +57,7 @@ public class LedgerDeleteTest extends Mu
         // Set up the configuration properties needed.
         baseConf.setEntryLogSizeLimit(2 * 1024 * 1024L);
         baseConf.setGcWaitTime(1000);
+        baseConf.setEntryLogFilePreAllocationEnabled(false);
         super.setUp();
     }
 

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java?rev=1560348&r1=1560347&r2=1560348&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java Wed Jan 22 13:35:03 2014
@@ -39,6 +39,7 @@ public class ReadOnlyBookieTest extends 
 
     public ReadOnlyBookieTest() {
         super(2);
+        baseConf.setEntryLogFilePreAllocationEnabled(false);
     }
 
     /**