You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2014/01/22 14:35:04 UTC
svn commit: r1560348 - in /zookeeper/bookkeeper/trunk: ./
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/
bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/ bookkeeper...
Author: ivank
Date: Wed Jan 22 13:35:03 2014
New Revision: 1560348
URL: http://svn.apache.org/r1560348
Log:
BOOKKEEPER-643: Improve concurrency of entry logger (sijie & Aniruddha via ivank)
Added:
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannelBase.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1560348&r1=1560347&r2=1560348&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Wed Jan 22 13:35:03 2014
@@ -148,6 +148,8 @@ Trunk (unreleased changes)
BOOKKEEPER-720: CheckpointSource.MIN#compareTo does exactly the opposite of what it should (ivank via sijie)
+ BOOKKEEPER-643: Improve concurrency of entry logger (sijie & Aniruddha via ivank)
+
hedwig-server:
BOOKKEEPER-601: readahead cache size isn't updated correctly (sijie via fpj)
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java?rev=1560348&r1=1560347&r2=1560348&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java Wed Jan 22 13:35:03 2014
@@ -24,49 +24,48 @@ package org.apache.bookkeeper.bookie;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
+import org.apache.bookkeeper.util.ZeroBuffer;
+import java.util.concurrent.atomic.AtomicLong;
/**
* Provides a buffering layer in front of a FileChannel.
*/
-public class BufferedChannel {
+public class BufferedChannel extends BufferedReadChannel {
+ // The capacity of the write buffer.
+ protected final int writeCapacity;
+ // The position of the file channel's write pointer.
+ protected AtomicLong writeBufferStartPosition = new AtomicLong(0);
+ // The buffer used to write operations.
+ protected final ByteBuffer writeBuffer;
+ // The absolute position of the next write operation.
+ protected volatile long position;
- static final byte zeroPage[] = new byte[64 * 1024];
-
- ByteBuffer writeBuffer;
- ByteBuffer readBuffer;
- private FileChannel bc;
- long position;
- int capacity;
- long readBufferStartPosition;
- long writeBufferStartPosition;
// make constructor to be public for unit test
- public BufferedChannel(FileChannel bc, int capacity) throws IOException {
- this.bc = bc;
- this.capacity = capacity;
- position = bc.position();
- writeBufferStartPosition = position;
+ public BufferedChannel(FileChannel fc, int capacity) throws IOException {
+ // Use the same capacity for read and write buffers.
+ this(fc, capacity, capacity);
}
- /**
- * @return file channel
- */
- FileChannel getFileChannel() {
- return this.bc;
+ public BufferedChannel(FileChannel fc, int writeCapacity, int readCapacity) throws IOException {
+ super(fc, readCapacity);
+ // Set the read buffer's limit to readCapacity.
+ this.readBuffer.limit(readCapacity);
+ this.writeCapacity = writeCapacity;
+ this.position = fc.position();
+ this.writeBufferStartPosition.set(position);
+ this.writeBuffer = ByteBuffer.allocateDirect(writeCapacity);
}
- /* public void close() throws IOException {
- bc.close();
- }
- */
-// public boolean isOpen() {
-// return bc.isOpen();
-// }
-
- synchronized public int write(ByteBuffer src) throws IOException {
+ /**
+ * Write all the data in src to the {@link FileChannel}. Note that this function can
+ * buffer or re-order writes based on the implementation. These writes will be flushed
+ * to the disk only when flush() is invoked.
+ *
+ * @param src The source ByteBuffer which contains the data to be written.
+ * @throws IOException if a write operation fails.
+ */
+ synchronized public void write(ByteBuffer src) throws IOException {
int copied = 0;
- if (writeBuffer == null) {
- writeBuffer = ByteBuffer.allocateDirect(capacity);
- }
while(src.remaining() > 0) {
int truncated = 0;
if (writeBuffer.remaining() < src.remaining()) {
@@ -76,32 +75,31 @@ public class BufferedChannel {
copied += src.remaining();
writeBuffer.put(src);
src.limit(src.limit()+truncated);
+ // if we have run out of buffer space, we should flush to the file
if (writeBuffer.remaining() == 0) {
- writeBuffer.flip();
- bc.write(writeBuffer);
- writeBuffer.clear();
- writeBufferStartPosition = bc.position();
+ flushInternal();
}
}
position += copied;
- return copied;
}
+ /**
+ * Get the position where the next write operation will begin writing from.
+ * @return
+ */
public long position() {
return position;
}
/**
- * Retrieve the current size of the underlying FileChannel
- *
- * @return FileChannel size measured in bytes
- *
- * @throws IOException if some I/O error occurs reading the FileChannel
+ * Get the position of the file channel's write pointer.
+ * @return
*/
- public long size() throws IOException {
- return bc.size();
+ public long getFileChannelPosition() {
+ return writeBufferStartPosition.get();
}
+
/**
* Write any data in the buffer to the file. If sync is set to true, force a sync operation so that
* data is persisted to the disk.
@@ -123,15 +121,12 @@ public class BufferedChannel {
* @throws IOException if the write fails.
*/
private void flushInternal() throws IOException {
- if (writeBuffer == null) {
- return;
- }
writeBuffer.flip();
do {
- bc.write(writeBuffer);
+ fileChannel.write(writeBuffer);
} while (writeBuffer.hasRemaining());
writeBuffer.clear();
- writeBufferStartPosition = bc.position();
+ writeBufferStartPosition.set(fileChannel.position());
}
public long forceWrite(boolean forceMetadata) throws IOException {
@@ -139,30 +134,21 @@ public class BufferedChannel {
// before issuing this force write hence is guaranteed to be made durable by
// the force write, any flush that happens after this may or may
// not be flushed
- long positionForceWrite;
- synchronized (this) {
- positionForceWrite = writeBufferStartPosition;
- }
- bc.force(forceMetadata);
+ long positionForceWrite = writeBufferStartPosition.get();
+ fileChannel.force(forceMetadata);
return positionForceWrite;
}
- /*public Channel getInternalChannel() {
- return bc;
- }*/
- synchronized public int read(ByteBuffer buff, long pos) throws IOException {
- if (readBuffer == null) {
- readBuffer = ByteBuffer.allocateDirect(capacity);
- readBufferStartPosition = Long.MIN_VALUE;
- }
+ @Override
+ synchronized public int read(ByteBuffer dest, long pos) throws IOException {
long prevPos = pos;
- while(buff.remaining() > 0) {
+ while(dest.remaining() > 0) {
// check if it is in the write buffer
- if (writeBuffer != null && writeBufferStartPosition <= pos) {
- long positionInBuffer = pos - writeBufferStartPosition;
+ if (writeBuffer != null && writeBufferStartPosition.get() <= pos) {
+ long positionInBuffer = pos - writeBufferStartPosition.get();
long bytesToCopy = writeBuffer.position()-positionInBuffer;
- if (bytesToCopy > buff.remaining()) {
- bytesToCopy = buff.remaining();
+ if (bytesToCopy > dest.remaining()) {
+ bytesToCopy = dest.remaining();
}
if (bytesToCopy == 0) {
throw new IOException("Read past EOF");
@@ -170,43 +156,49 @@ public class BufferedChannel {
ByteBuffer src = writeBuffer.duplicate();
src.position((int) positionInBuffer);
src.limit((int) (positionInBuffer+bytesToCopy));
- buff.put(src);
+ dest.put(src);
pos+= bytesToCopy;
- } else if (writeBuffer == null && writeBufferStartPosition <= pos) {
+ } else if (writeBuffer == null && writeBufferStartPosition.get() <= pos) {
// here we reach the end
break;
// first check if there is anything we can grab from the readBuffer
} else if (readBufferStartPosition <= pos && pos < readBufferStartPosition+readBuffer.capacity()) {
long positionInBuffer = pos - readBufferStartPosition;
long bytesToCopy = readBuffer.capacity()-positionInBuffer;
- if (bytesToCopy > buff.remaining()) {
- bytesToCopy = buff.remaining();
+ if (bytesToCopy > dest.remaining()) {
+ bytesToCopy = dest.remaining();
}
ByteBuffer src = readBuffer.duplicate();
src.position((int) positionInBuffer);
src.limit((int) (positionInBuffer+bytesToCopy));
- buff.put(src);
+ dest.put(src);
pos += bytesToCopy;
// let's read it
} else {
readBufferStartPosition = pos;
readBuffer.clear();
// make sure that we don't overlap with the write buffer
- if (readBufferStartPosition + readBuffer.capacity() >= writeBufferStartPosition) {
- readBufferStartPosition = writeBufferStartPosition - readBuffer.capacity();
+ if (readBufferStartPosition + readBuffer.capacity() >= writeBufferStartPosition.get()) {
+ readBufferStartPosition = writeBufferStartPosition.get() - readBuffer.capacity();
if (readBufferStartPosition < 0) {
- readBuffer.put(zeroPage, 0, (int) -readBufferStartPosition);
+ ZeroBuffer.put(readBuffer, (int)-readBufferStartPosition);
}
}
while(readBuffer.remaining() > 0) {
- if (bc.read(readBuffer, readBufferStartPosition+readBuffer.position()) <= 0) {
+ if (fileChannel.read(readBuffer, readBufferStartPosition+readBuffer.position()) <= 0) {
throw new IOException("Short read");
}
}
- readBuffer.put(zeroPage, 0, readBuffer.remaining());
+ ZeroBuffer.put(readBuffer);
readBuffer.clear();
}
}
return (int)(pos - prevPos);
}
+
+ @Override
+ synchronized public void clear() {
+ super.clear();
+ writeBuffer.clear();
+ }
}
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannelBase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannelBase.java?rev=1560348&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannelBase.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannelBase.java Wed Jan 22 13:35:03 2014
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.bookie;
+
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+
+public abstract class BufferedChannelBase {
+ protected final FileChannel fileChannel;
+
+ protected BufferedChannelBase(FileChannel fc) {
+ this.fileChannel = fc;
+ }
+
+ protected FileChannel validateAndGetFileChannel() throws IOException {
+ // Even if we have BufferedChannelBase objects in the cache, higher layers should
+ // guarantee that once a log file has been closed and possibly deleted during garbage
+ // collection, attempts will not be made to read from it
+ if (!fileChannel.isOpen()) {
+ throw new IOException("Attempting to access a file channel that has already been closed");
+ }
+ return fileChannel;
+ }
+
+ /**
+ * Get the current size of the underlying FileChannel.
+ * @return
+ */
+ public long size() throws IOException {
+ return validateAndGetFileChannel().size();
+ }
+
+ /**
+ * Get the {@link FileChannel} that this BufferedChannel wraps around.
+ * @return
+ */
+ public FileChannel getFileChannel() {
+ return fileChannel;
+ }
+}
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java?rev=1560348&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java Wed Jan 22 13:35:03 2014
@@ -0,0 +1,103 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+/**
+ * A Buffered channel without a write buffer. Only reads are buffered.
+ */
+public class BufferedReadChannel extends BufferedChannelBase {
+ private static Logger LOG = LoggerFactory.getLogger(BufferedReadChannel.class);
+ // The capacity of the read buffer.
+ protected final int readCapacity;
+ // The buffer for read operations.
+ protected ByteBuffer readBuffer;
+ // The starting position of the data currently in the read buffer.
+ protected long readBufferStartPosition = Long.MIN_VALUE;
+
+ long invocationCount = 0;
+ long cacheHitCount = 0;
+
+ public BufferedReadChannel(FileChannel fileChannel, int readCapacity) throws IOException {
+ super(fileChannel);
+ this.readCapacity = readCapacity;
+ this.readBuffer = ByteBuffer.allocateDirect(readCapacity);
+ this.readBuffer.limit(0);
+ }
+
+ /**
+ * Read as many bytes into dest as dest.capacity() starting at position pos in the
+ * FileChannel. This function can read from the buffer or the file channel
+ * depending on the implementation..
+ * @param dest
+ * @param pos
+ * @return The total number of bytes read. -1 if the given position is greater than or equal to the file's current size.
+ * @throws IOException if I/O error occurs
+ */
+ synchronized public int read(ByteBuffer dest, long pos) throws IOException {
+ invocationCount++;
+ long currentPosition = pos;
+ long eof = validateAndGetFileChannel().size();
+ // return -1 if the given position is greater than or equal to the file's current size.
+ if (pos >= eof) {
+ return -1;
+ }
+ while (dest.remaining() > 0) {
+ // Check if the data is in the buffer, if so, copy it.
+ if (readBufferStartPosition <= currentPosition && currentPosition < readBufferStartPosition + readBuffer.limit()) {
+ long posInBuffer = currentPosition - readBufferStartPosition;
+ long bytesToCopy = Math.min(dest.remaining(), readBuffer.limit() - posInBuffer);
+ ByteBuffer rbDup = readBuffer.duplicate();
+ rbDup.position((int)posInBuffer);
+ rbDup.limit((int)(posInBuffer + bytesToCopy));
+ dest.put(rbDup);
+ currentPosition += bytesToCopy;
+ cacheHitCount++;
+ } else if (currentPosition >= eof) {
+ // here we reached eof.
+ break;
+ } else {
+ // We don't have it in the buffer, so put necessary data in the buffer
+ readBuffer.clear();
+ readBufferStartPosition = currentPosition;
+ int readBytes = 0;
+ if ((readBytes = validateAndGetFileChannel().read(readBuffer, currentPosition)) <= 0) {
+ throw new IOException("Reading from filechannel returned a non-positive value. Short read.");
+ }
+ readBuffer.limit(readBytes);
+ }
+ }
+ return (int)(currentPosition - pos);
+ }
+
+ synchronized public void clear() {
+ readBuffer.clear();
+ readBuffer.limit(0);
+ }
+
+}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java?rev=1560348&r1=1560347&r2=1560348&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java Wed Jan 22 13:35:03 2014
@@ -40,8 +40,16 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
-import java.util.Map.Entry;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -51,6 +59,8 @@ import org.apache.bookkeeper.util.IOUtil
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.MapMaker;
+
/**
* This class manages the writing of the bookkeeper entries. All the new
* entries are written to a common log. The LedgerCache will have pointers
@@ -61,19 +71,32 @@ import org.slf4j.LoggerFactory;
public class EntryLogger {
private static final Logger LOG = LoggerFactory.getLogger(EntryLogger.class);
+ private static class BufferedLogChannel extends BufferedChannel {
+ final private long logId;
+ public BufferedLogChannel(FileChannel fc, int writeCapacity,
+ int readCapacity, long logId) throws IOException {
+ super(fc, writeCapacity, readCapacity);
+ this.logId = logId;
+ }
+ public long getLogId() {
+ return logId;
+ }
+ }
+
volatile File currentDir;
private final LedgerDirsManager ledgerDirsManager;
private final AtomicBoolean shouldCreateNewEntryLog = new AtomicBoolean(false);
- private long logId;
private volatile long leastUnflushedLogId;
/**
* The maximum size of a entry logger file.
*/
final long logSizeLimit;
- private List<BufferedChannel> logChannelsToFlush;
- private volatile BufferedChannel logChannel;
+ private List<BufferedLogChannel> logChannelsToFlush;
+ private volatile BufferedLogChannel logChannel;
+ private final EntryLoggerAllocator entryLoggerAllocator;
+ private final boolean entryLogPreAllocationEnabled;
private final CopyOnWriteArrayList<EntryLogListener> listeners
= new CopyOnWriteArrayList<EntryLogListener>();
@@ -83,10 +106,12 @@ public class EntryLogger {
*/
final static int LOGFILE_HEADER_SIZE = 1024;
final ByteBuffer LOGFILE_HEADER = ByteBuffer.allocate(LOGFILE_HEADER_SIZE);
+ final static long INVALID_LID = -1L;
final static int MIN_SANE_ENTRY_SIZE = 8 + 8;
final static long MB = 1024 * 1024;
+ final ServerConfiguration conf;
/**
* Scan entries in a entry log file.
*/
@@ -143,6 +168,7 @@ public class EntryLogger {
}
// log size limit
this.logSizeLimit = conf.getEntryLogSizeLimit();
+ this.entryLogPreAllocationEnabled = conf.isEntryLogFilePreAllocationEnabled();
// Initialize the entry log header buffer. This cannot be a static object
// since in our unit tests, we run multiple Bookies and thus EntryLoggers
@@ -152,7 +178,7 @@ public class EntryLogger {
LOGFILE_HEADER.put("BKLO".getBytes(UTF_8));
// Find the largest logId
- logId = -1;
+ long logId = INVALID_LID;
for (File dir : ledgerDirsManager.getAllLedgerDirs()) {
if (!dir.exists()) {
throw new FileNotFoundException(
@@ -164,6 +190,8 @@ public class EntryLogger {
}
}
this.leastUnflushedLogId = logId + 1;
+ this.entryLoggerAllocator = new EntryLoggerAllocator(logId);
+ this.conf = conf;
initialize();
}
@@ -174,9 +202,86 @@ public class EntryLogger {
}
/**
- * Maps entry log files to open channels.
+ * If the log id of current writable channel is the same as entryLogId and the position
+ * we want to read might end up reading from a position in the write buffer of the
+ * buffered channel, route this read to the current logChannel. Else,
+ * read from the BufferedReadChannel that is provided.
+ * @param entryLogId
+ * @param channel
+ * @param buff remaining() on this bytebuffer tells us the last position that we
+ * expect to read.
+ * @param pos The starting position from where we want to read.
+ * @return
+ */
+ private int readFromLogChannel(long entryLogId, BufferedReadChannel channel, ByteBuffer buff, long pos)
+ throws IOException {
+ BufferedLogChannel bc = logChannel;
+ if (null != bc) {
+ if (entryLogId == bc.getLogId()) {
+ synchronized (bc) {
+ if (pos + buff.remaining() >= bc.getFileChannelPosition()) {
+ return bc.read(buff, pos);
+ }
+ }
+ }
+ }
+ return channel.read(buff, pos);
+ }
+
+ /**
+ * A thread-local variable that wraps a mapping of log ids to bufferedchannels
+ * These channels should be used only for reading. logChannel is the one
+ * that is used for writes.
+ */
+ private final ThreadLocal<Map<Long, BufferedReadChannel>> logid2Channel
+ = new ThreadLocal<Map<Long, BufferedReadChannel>>() {
+ @Override
+ public Map<Long, BufferedReadChannel> initialValue() {
+ // Since this is thread local there only one modifier
+ // We dont really need the concurrency, but we need to use
+ // the weak values. Therefore using the concurrency level of 1
+ return new MapMaker().concurrencyLevel(1)
+ .weakValues()
+ .makeMap();
+ }
+ };
+
+ /**
+ * Each thread local buffered read channel can share the same file handle because reads are not relative
+ * and don't cause a change in the channel's position. We use this map to store the file channels. Each
+ * file channel is mapped to a log id which represents an open log file.
+ */
+ private final ConcurrentMap<Long, FileChannel> logid2FileChannel
+ = new ConcurrentHashMap<Long, FileChannel>();
+
+ /**
+ * Put the logId, bc pair in the map responsible for the current thread.
+ * @param logId
+ * @param bc
+ */
+ public BufferedReadChannel putInReadChannels(long logId, BufferedReadChannel bc) {
+ Map<Long, BufferedReadChannel> threadMap = logid2Channel.get();
+ return threadMap.put(logId, bc);
+ }
+
+ /**
+ * Remove all entries for this log file in each thread's cache.
+ * @param logId
*/
- private ConcurrentHashMap<Long, BufferedChannel> channels = new ConcurrentHashMap<Long, BufferedChannel>();
+ public void removeFromChannelsAndClose(long logId) {
+ FileChannel fileChannel = logid2FileChannel.remove(logId);
+ if (null != fileChannel) {
+ try {
+ fileChannel.close();
+ } catch (IOException e) {
+ LOG.warn("Exception while closing channel for log file:" + logId);
+ }
+ }
+ }
+
+ public BufferedReadChannel getFromChannels(long logId) {
+ return logid2Channel.get().get(logId);
+ }
/**
* Get the least unflushed log id. Garbage collector thread should not process
@@ -189,7 +294,7 @@ public class EntryLogger {
}
synchronized long getCurrentLogId() {
- return logId;
+ return logChannel.getLogId();
}
protected void initialize() throws IOException {
@@ -259,38 +364,108 @@ public class EntryLogger {
void createNewLog() throws IOException {
if (null != logChannel) {
if (null == logChannelsToFlush) {
- logChannelsToFlush = new LinkedList<BufferedChannel>();
+ logChannelsToFlush = new LinkedList<BufferedLogChannel>();
}
// flush the internal buffer back to filesystem but not sync disk
// so the readers could access the data from filesystem.
logChannel.flush(false);
logChannelsToFlush.add(logChannel);
+ LOG.info("Flushing entry logger {} back to filesystem, pending for syncing entry loggers : {}.",
+ logChannel.getLogId(), logChannelsToFlush);
for (EntryLogListener listener : listeners) {
listener.onRotateEntryLog();
}
}
- String logFileName = null;
- do {
- logFileName = Long.toHexString(++logId) + ".log";
- for (File dir : ledgerDirsManager.getAllLedgerDirs()) {
- File newLogFile = new File(dir, logFileName);
- if (newLogFile.exists()) {
- LOG.warn("Found existed entry log " + newLogFile
- + " when trying to create it as a new log.");
- logFileName = null;
- break;
+ logChannel = entryLoggerAllocator.createNewLog();
+ }
+
+ /**
+ * An allocator pre-allocates entry log files.
+ */
+ class EntryLoggerAllocator {
+
+ long preallocatedLogId;
+ Future<BufferedLogChannel> preallocation = null;
+ ExecutorService allocatorExecutor;
+
+ EntryLoggerAllocator(long logId) {
+ preallocatedLogId = logId;
+ allocatorExecutor = Executors.newSingleThreadExecutor();
+ }
+
+ synchronized BufferedLogChannel createNewLog() throws IOException {
+ BufferedLogChannel bc;
+ if (!entryLogPreAllocationEnabled || null == preallocation) {
+ // initialization time to create a new log
+ bc = allocateNewLog();
+ } else {
+ // has a preallocated entry log
+ try {
+ bc = preallocation.get();
+ } catch (ExecutionException ee) {
+ if (ee.getCause() instanceof IOException) {
+ throw (IOException) (ee.getCause());
+ } else {
+ throw new IOException("Error to execute entry log allocation.", ee);
+ }
+ } catch (CancellationException ce) {
+ throw new IOException("Task to allocate a new entry log is cancelled.", ce);
+ } catch (InterruptedException ie) {
+ throw new IOException("Intrrupted when waiting a new entry log to be allocated.", ie);
}
+ preallocation = allocatorExecutor.submit(new Callable<BufferedLogChannel>() {
+ @Override
+ public BufferedLogChannel call() throws IOException {
+ return allocateNewLog();
+ }
+ });
}
- } while (logFileName == null);
+ LOG.info("Created new entry logger {}.", bc.getLogId());
+ return bc;
+ }
+
+ /**
+ * Allocate a new log file.
+ */
+ BufferedLogChannel allocateNewLog() throws IOException {
+ List<File> list = ledgerDirsManager.getWritableLedgerDirs();
+ Collections.shuffle(list);
+ // It would better not to overwrite existing entry log files
+ File newLogFile = null;
+ do {
+ String logFileName = Long.toHexString(++preallocatedLogId) + ".log";
+ for (File dir : list) {
+ newLogFile = new File(dir, logFileName);
+ currentDir = dir;
+ if (newLogFile.exists()) {
+ LOG.warn("Found existed entry log " + newLogFile
+ + " when trying to create it as a new log.");
+ newLogFile = null;
+ break;
+ }
+ }
+ } while (newLogFile == null);
+
+ FileChannel channel = new RandomAccessFile(newLogFile, "rw").getChannel();
+ BufferedLogChannel logChannel = new BufferedLogChannel(channel,
+ conf.getWriteBufferBytes(), conf.getReadBufferBytes(), preallocatedLogId);
+ logChannel.write((ByteBuffer) LOGFILE_HEADER.clear());
- // Update last log id first
- currentDir = ledgerDirsManager.pickRandomWritableDir();
- setLastLogId(currentDir, logId);
-
- File newLogFile = new File(currentDir, logFileName);
- logChannel = new BufferedChannel(new RandomAccessFile(newLogFile, "rw").getChannel(), 64*1024);
- logChannel.write((ByteBuffer) LOGFILE_HEADER.clear());
- channels.put(logId, logChannel);
+ for (File f : list) {
+ setLastLogId(f, preallocatedLogId);
+ }
+ LOG.info("Preallocated entry logger {}.", preallocatedLogId);
+ return logChannel;
+ }
+
+ /**
+ * Stop the allocator.
+ */
+ void stop() {
+ // wait until the preallocation finished.
+ allocatorExecutor.shutdown();
+ LOG.info("Stopped entry logger preallocator.");
+ }
}
/**
@@ -300,15 +475,7 @@ public class EntryLogger {
* Entry Log File Id
*/
protected boolean removeEntryLog(long entryLogId) {
- BufferedChannel bc = channels.remove(entryLogId);
- if (null != bc) {
- // close its underlying file channel, so it could be deleted really
- try {
- bc.getFileChannel().close();
- } catch (IOException ie) {
- LOG.warn("Exception while closing garbage collected entryLog file : ", ie);
- }
- }
+ removeFromChannelsAndClose(entryLogId);
File entryLogFile;
try {
entryLogFile = findFile(entryLogId);
@@ -337,6 +504,7 @@ public class EntryLogger {
try {
bw.close();
} catch (IOException e) {
+ LOG.error("Could not close lastId file in {}", dir.getPath());
}
}
}
@@ -365,7 +533,7 @@ public class EntryLogger {
}
// no log file found in this directory
if (0 == logs.size()) {
- return -1;
+ return INVALID_LID;
}
// order the collections
Collections.sort(logs);
@@ -380,16 +548,16 @@ public class EntryLogger {
try {
fis = new FileInputStream(new File(f, "lastId"));
} catch (FileNotFoundException e) {
- return -1;
+ return INVALID_LID;
}
BufferedReader br = new BufferedReader(new InputStreamReader(fis, UTF_8));
try {
String lastIdString = br.readLine();
return Long.parseLong(lastIdString, 16);
} catch (IOException e) {
- return -1;
+ return INVALID_LID;
} catch(NumberFormatException e) {
- return -1;
+ return INVALID_LID;
} finally {
try {
br.close();
@@ -407,21 +575,28 @@ public class EntryLogger {
}
void flushRotatedLogs() throws IOException {
- List<BufferedChannel> tmpChannels = null;
- long newUnflushedLogId;
+ List<BufferedLogChannel> channels = null;
+ long flushedLogId = INVALID_LID;
synchronized (this) {
- tmpChannels = logChannelsToFlush;
+ channels = logChannelsToFlush;
logChannelsToFlush = null;
- newUnflushedLogId = logId;
}
- if (null == tmpChannels) {
+ if (null == channels) {
return;
}
- for (BufferedChannel channel : tmpChannels) {
+ for (BufferedLogChannel channel : channels) {
channel.flush(true);
+ // since this channel is only used for writing, after flushing the channel,
+ // we had to close the underlying file channel. Otherwise, we might end up
+ // leaking fds which cause the disk spaces could not be reclaimed.
+ closeFileChannel(channel);
+ if (channel.getLogId() > flushedLogId) {
+ flushedLogId = channel.getLogId();
+ }
+ LOG.info("Synced entry logger {} to disk.", channel.getLogId());
}
// move the leastUnflushedLogId ptr
- leastUnflushedLogId = newUnflushedLogId;
+ leastUnflushedLogId = flushedLogId + 1;
}
void flush() throws IOException {
@@ -432,6 +607,7 @@ public class EntryLogger {
synchronized void flushCurrentLog() throws IOException {
if (logChannel != null) {
logChannel.flush(true);
+ LOG.debug("Flush and sync current entry logger {}.", logChannel.getLogId());
}
}
@@ -458,7 +634,7 @@ public class EntryLogger {
long pos = logChannel.position();
logChannel.write(entry);
- return (logId << 32L) | pos;
+ return (logChannel.getLogId() << 32L) | pos;
}
static long logIdForOffset(long offset) {
@@ -474,7 +650,7 @@ public class EntryLogger {
long pos = location & 0xffffffffL;
ByteBuffer sizeBuff = ByteBuffer.allocate(4);
pos -= 4; // we want to get the ledgerId and length to check
- BufferedChannel fc;
+ BufferedReadChannel fc;
try {
fc = getChannelForLogId(entryLogId);
} catch (FileNotFoundException e) {
@@ -482,7 +658,7 @@ public class EntryLogger {
newe.setStackTrace(e.getStackTrace());
throw newe;
}
- if (fc.read(sizeBuff, pos) != sizeBuff.capacity()) {
+ if (readFromLogChannel(entryLogId, fc, sizeBuff, pos) != sizeBuff.capacity()) {
throw new Bookie.NoEntryException("Short read from entrylog " + entryLogId,
ledgerId, entryId);
}
@@ -500,7 +676,7 @@ public class EntryLogger {
}
byte data[] = new byte[entrySize];
ByteBuffer buff = ByteBuffer.wrap(data);
- int rc = fc.read(buff, pos);
+ int rc = readFromLogChannel(entryLogId, fc, buff, pos);
if ( rc != data.length) {
// Note that throwing NoEntryException here instead of IOException is not
// without risk. If all bookies in a quorum throw this same exception
@@ -526,8 +702,8 @@ public class EntryLogger {
return data;
}
- private BufferedChannel getChannelForLogId(long entryLogId) throws IOException {
- BufferedChannel fc = channels.get(entryLogId);
+ private BufferedReadChannel getChannelForLogId(long entryLogId) throws IOException {
+ BufferedReadChannel fc = getFromChannels(entryLogId);
if (fc != null) {
return fc;
}
@@ -535,18 +711,16 @@ public class EntryLogger {
// get channel is used to open an existing entry log file
// it would be better to open using read mode
FileChannel newFc = new RandomAccessFile(file, "r").getChannel();
- // If the file already exists before creating a BufferedChannel layer above it,
- // set the FileChannel's position to the end so the write buffer knows where to start.
- newFc.position(newFc.size());
- fc = new BufferedChannel(newFc, 8192);
-
- BufferedChannel oldfc = channels.putIfAbsent(entryLogId, fc);
- if (oldfc != null) {
+ FileChannel oldFc = logid2FileChannel.putIfAbsent(entryLogId, newFc);
+ if (null != oldFc) {
newFc.close();
- return oldfc;
- } else {
- return fc;
+ newFc = oldFc;
}
+ // We set the position of the write buffer of this buffered channel to Long.MAX_VALUE
+ // so that there are no overlaps with the write buffer while reading
+ fc = new BufferedReadChannel(newFc, conf.getReadBufferBytes());
+ putInReadChannels(entryLogId, fc);
+ return fc;
}
/**
@@ -584,7 +758,7 @@ public class EntryLogger {
protected void scanEntryLog(long entryLogId, EntryLogScanner scanner) throws IOException {
ByteBuffer sizeBuff = ByteBuffer.allocate(4);
ByteBuffer lidBuff = ByteBuffer.allocate(8);
- BufferedChannel bc;
+ BufferedReadChannel bc;
// Get the BufferedChannel for the current entry log file
try {
bc = getChannelForLogId(entryLogId);
@@ -601,7 +775,7 @@ public class EntryLogger {
if (pos >= bc.size()) {
break;
}
- if (bc.read(sizeBuff, pos) != sizeBuff.capacity()) {
+ if (readFromLogChannel(entryLogId, bc, sizeBuff, pos) != sizeBuff.capacity()) {
throw new IOException("Short read for entry size from entrylog " + entryLogId);
}
long offset = pos;
@@ -614,7 +788,7 @@ public class EntryLogger {
}
sizeBuff.clear();
// try to read ledger id first
- if (bc.read(lidBuff, pos) != lidBuff.capacity()) {
+ if (readFromLogChannel(entryLogId, bc, lidBuff, pos) != lidBuff.capacity()) {
throw new IOException("Short read for ledger id from entrylog " + entryLogId);
}
lidBuff.flip();
@@ -628,7 +802,7 @@ public class EntryLogger {
// read the entry
byte data[] = new byte[entrySize];
ByteBuffer buff = ByteBuffer.wrap(data);
- int rc = bc.read(buff, pos);
+ int rc = readFromLogChannel(entryLogId, bc, buff, pos);
if (rc != data.length) {
throw new IOException("Short read for ledger entry from entryLog " + entryLogId
+ "@" + pos + "(" + rc + "!=" + data.length + ")");
@@ -649,22 +823,44 @@ public class EntryLogger {
LOG.info("Stopping EntryLogger");
try {
flush();
- for (Entry<Long, BufferedChannel> channelEntry : channels
- .entrySet()) {
- channelEntry.getValue().getFileChannel().close();
+ for (FileChannel fc : logid2FileChannel.values()) {
+ fc.close();
}
+ // clear the mapping, so we don't need to go through the channels again in finally block in normal case.
+ logid2FileChannel.clear();
+ // close current writing log file
+ closeFileChannel(logChannel);
+ logChannel = null;
} catch (IOException ie) {
// we have no idea how to avoid io exception during shutting down, so just ignore it
LOG.error("Error flush entry log during shutting down, which may cause entry log corrupted.", ie);
} finally {
- for (Entry<Long, BufferedChannel> channelEntry : channels
- .entrySet()) {
- FileChannel fileChannel = channelEntry.getValue()
- .getFileChannel();
- if (fileChannel.isOpen()) {
- IOUtils.close(LOG, fileChannel);
- }
+ for (FileChannel fc : logid2FileChannel.values()) {
+ IOUtils.close(LOG, fc);
}
+ forceCloseFileChannel(logChannel);
+ }
+ // shutdown the pre-allocation thread
+ entryLoggerAllocator.stop();
+ }
+
+ private static void closeFileChannel(BufferedChannelBase channel) throws IOException {
+ if (null == channel) {
+ return;
+ }
+ FileChannel fileChannel = channel.getFileChannel();
+ if (null != fileChannel) {
+ fileChannel.close();
+ }
+ }
+
+ private static void forceCloseFileChannel(BufferedChannelBase channel) {
+ if (null == channel) {
+ return;
+ }
+ FileChannel fileChannel = channel.getFileChannel();
+ if (null != fileChannel) {
+ IOUtils.close(LOG, fileChannel);
}
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java?rev=1560348&r1=1560347&r2=1560348&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java Wed Jan 22 13:35:03 2014
@@ -32,6 +32,7 @@ import com.google.common.annotations.Bet
public class ServerConfiguration extends AbstractConfiguration {
// Entry Log Parameters
protected final static String ENTRY_LOG_SIZE_LIMIT = "logSizeLimit";
+ protected final static String ENTRY_LOG_FILE_PREALLOCATION_ENABLED = "entryLogFilePreallocationEnabled";
protected final static String MINOR_COMPACTION_INTERVAL = "minorCompactionInterval";
protected final static String MINOR_COMPACTION_THRESHOLD = "minorCompactionThreshold";
protected final static String MAJOR_COMPACTION_INTERVAL = "majorCompactionInterval";
@@ -132,6 +133,27 @@ public class ServerConfiguration extends
}
/**
+ * Is entry log file preallocation enabled.
+ *
+ * @return whether entry log file preallocation is enabled or not.
+ */
+ public boolean isEntryLogFilePreAllocationEnabled() {
+ return this.getBoolean(ENTRY_LOG_FILE_PREALLOCATION_ENABLED, true);
+ }
+
+ /**
+ * Enable/disable entry log file preallocation.
+ *
+ * @param enabled
+ * enable/disable entry log file preallocation.
+ * @return server configuration object.
+ */
+ public ServerConfiguration setEntryLogFilePreAllocationEnabled(boolean enabled) {
+ this.setProperty(ENTRY_LOG_FILE_PREALLOCATION_ENABLED, enabled);
+ return this;
+ }
+
+ /**
* Get Garbage collection wait time
*
* @return gc wait time
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java?rev=1560348&r1=1560347&r2=1560348&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java Wed Jan 22 13:35:03 2014
@@ -95,11 +95,13 @@ public class CompactionTest extends Book
public void setUp() throws Exception {
// Set up the configuration properties needed.
baseConf.setEntryLogSizeLimit(numEntries * ENTRY_SIZE);
+ // Disable skip list for compaction
baseConf.setGcWaitTime(gcWaitTime);
baseConf.setMinorCompactionThreshold(minorCompactionThreshold);
baseConf.setMajorCompactionThreshold(majorCompactionThreshold);
baseConf.setMinorCompactionInterval(minorCompactionInterval);
baseConf.setMajorCompactionInterval(majorCompactionInterval);
+ baseConf.setEntryLogFilePreAllocationEnabled(false);
super.setUp();
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java?rev=1560348&r1=1560347&r2=1560348&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java Wed Jan 22 13:35:03 2014
@@ -57,6 +57,7 @@ public class LedgerDeleteTest extends Mu
// Set up the configuration properties needed.
baseConf.setEntryLogSizeLimit(2 * 1024 * 1024L);
baseConf.setGcWaitTime(1000);
+ baseConf.setEntryLogFilePreAllocationEnabled(false);
super.setUp();
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java?rev=1560348&r1=1560347&r2=1560348&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java Wed Jan 22 13:35:03 2014
@@ -39,6 +39,7 @@ public class ReadOnlyBookieTest extends
public ReadOnlyBookieTest() {
super(2);
+ baseConf.setEntryLogFilePreAllocationEnabled(false);
}
/**