You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by si...@apache.org on 2013/07/20 19:59:39 UTC
svn commit: r1505174 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/
bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/
bookkeeper-server/src/main/java/org/apache/bookkeeper...
Author: sijie
Date: Sat Jul 20 17:59:39 2013
New Revision: 1505174
URL: http://svn.apache.org/r1505174
Log:
BOOKKEEPER-563: Avoid Journal polluting page cache (Robin Dhamankar via sijie)
Added:
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/NativeIO.java
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1505174&r1=1505173&r2=1505174&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Sat Jul 20 17:59:39 2013
@@ -70,6 +70,8 @@ Trunk (unreleased changes)
BOOKKEEPER-633: ConcurrentModificationException in RackawareEnsemblePlacementPolicy when a bookie is removed from available list (vinay via sijie)
+ BOOKKEEPER-563: Avoid Journal polluting page cache (Robin Dhamankar via sijie)
+
hedwig-server:
BOOKKEEPER-601: readahead cache size isn't updated correctly (sijie via fpj)
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml?rev=1505174&r1=1505173&r2=1505174&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml Sat Jul 20 17:59:39 2013
@@ -98,6 +98,11 @@
<artifactId>commons-io</artifactId>
<version>2.1</version>
</dependency>
+ <dependency>
+ <groupId>net.java.dev.jna</groupId>
+ <artifactId>jna</artifactId>
+ <version>3.2.7</version>
+ </dependency>
<!--
Annoying dependency we need to include because
zookeeper uses log4j and so we transatively do, but
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java?rev=1505174&r1=1505173&r2=1505174&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java Sat Jul 20 17:59:39 2013
@@ -111,10 +111,23 @@ public class BufferedChannel
writeBufferStartPosition = bc.position();
}
if (sync) {
- bc.force(false);
+ forceWrite(false);
}
}
+ public long forceWrite(boolean forceMetadata) throws IOException {
+ // This is the point up to which we had flushed to the file system page cache
+ // before issuing this force write hence is guaranteed to be made durable by
+ // the force write, any flush that happens after this may or may
+ // not be flushed
+ long positionForceWrite;
+ synchronized (this) {
+ positionForceWrite = writeBufferStartPosition;
+ }
+ bc.force(forceMetadata);
+ return positionForceWrite;
+ }
+
/*public Channel getInternalChannel() {
return bc;
}*/
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java?rev=1505174&r1=1505173&r2=1505174&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java Sat Jul 20 17:59:39 2013
@@ -275,6 +275,8 @@ class Journal extends Thread implements
final File journalDirectory;
final ServerConfiguration conf;
+ // should we hint the filesystem to remove pages from cache after force write
+ private final boolean removePagesFromCache;
private LastLogMark lastLogMark = new LastLogMark(0, 0);
@@ -292,6 +294,7 @@ class Journal extends Thread implements
this.maxJournalSize = conf.getMaxJournalSize() * MB;
this.maxBackupJournals = conf.getMaxBackupJournals();
+ this.removePagesFromCache = conf.getJournalRemovePagesFromCache();
// read last log mark
lastLogMark.readLog();
LOG.debug("Last Log Mark : {}", lastLogMark.getCurMark());
@@ -492,7 +495,7 @@ class Journal extends Thread implements
// new journal file to write
if (null == logFile) {
logId = logId + 1;
- logFile = new JournalChannel(journalDirectory, logId);
+ logFile = new JournalChannel(journalDirectory, logId, removePagesFromCache);
bc = logFile.getBufferedChannel();
lastFlushPosition = 0;
@@ -505,7 +508,11 @@ class Journal extends Thread implements
qe = queue.poll();
if (qe == null || bc.position() > lastFlushPosition + 512*1024) {
//logFile.force(false);
- bc.flush(true);
+ bc.flush(false);
+ // This separation of flush and force is useful when adaptive group
+ // force write is used where the flush thread does not block while
+ // the force is issued by a separate thread
+ logFile.forceWrite(false);
lastFlushPosition = bc.position();
lastLogMark.setCurLogMark(logId, lastFlushPosition);
for (QueueEntry e : toFlush) {
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java?rev=1505174&r1=1505173&r2=1505174&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java Sat Jul 20 17:59:39 2013
@@ -29,6 +29,7 @@ import java.io.RandomAccessFile;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.ByteBuffer;
+import org.apache.bookkeeper.util.NativeIO;
import static com.google.common.base.Charsets.UTF_8;
@@ -42,6 +43,7 @@ import org.slf4j.LoggerFactory;
class JournalChannel implements Closeable {
static Logger LOG = LoggerFactory.getLogger(JournalChannel.class);
+ final RandomAccessFile randomAccessFile;
final FileChannel fc;
final BufferedChannel bc;
final int formatVersion;
@@ -57,12 +59,24 @@ class JournalChannel implements Closeabl
public final static long preAllocSize = 4*1024*1024;
public final static ByteBuffer zeros = ByteBuffer.allocate(512);
+ private boolean fRemoveFromPageCache;
+ // The position of the file channel's last force write.
+ private long lastForceWritePosition = 0;
JournalChannel(File journalDirectory, long logId) throws IOException {
- this(journalDirectory, logId, START_OF_FILE);
+ this(journalDirectory, logId, START_OF_FILE, false);
}
JournalChannel(File journalDirectory, long logId, long position) throws IOException {
+ this(journalDirectory, logId, position, false);
+ }
+
+ JournalChannel(File journalDirectory, long logId, boolean fRemoveFromPageCache) throws IOException {
+ this(journalDirectory, logId, START_OF_FILE, fRemoveFromPageCache);
+ }
+
+ JournalChannel(File journalDirectory, long logId, long position, boolean fRemoveFromPageCache) throws IOException {
+ this.fRemoveFromPageCache = fRemoveFromPageCache;
File fn = new File(journalDirectory, Long.toHexString(logId) + ".txn");
LOG.info("Opening journal {}", fn);
@@ -73,7 +87,8 @@ class JournalChannel implements Closeabl
throw new IOException("File " + fn
+ " suddenly appeared, is another bookie process running?");
}
- fc = new RandomAccessFile(fn, "rw").getChannel();
+ randomAccessFile = new RandomAccessFile(fn, "rw");
+ fc = randomAccessFile.getChannel();
formatVersion = CURRENT_JOURNAL_FORMAT_VERSION;
ByteBuffer bb = ByteBuffer.allocate(HEADER_SIZE);
@@ -81,14 +96,15 @@ class JournalChannel implements Closeabl
bb.putInt(formatVersion);
bb.flip();
fc.write(bb);
- fc.force(true);
bc = new BufferedChannel(fc, 65536);
+ forceWrite(true);
nextPrealloc = preAllocSize;
fc.write(zeros, nextPrealloc);
} else { // open an existing file
- fc = new RandomAccessFile(fn, "r").getChannel();
+ randomAccessFile = new RandomAccessFile(fn, "r");
+ fc = randomAccessFile.getChannel();
bc = null; // readonly
ByteBuffer bb = ByteBuffer.allocate(HEADER_SIZE);
@@ -133,6 +149,9 @@ class JournalChannel implements Closeabl
} catch (IOException e) {
LOG.error("Bookie journal file can seek to position :", e);
}
+
+ // Anything we read has been force written
+ lastForceWritePosition = fc.position();
}
}
@@ -163,4 +182,17 @@ class JournalChannel implements Closeabl
public void close() throws IOException {
fc.close();
}
+
+ public void forceWrite(boolean forceMetadata) throws IOException {
+ long newForceWritePosition = bc.forceWrite(forceMetadata);
+ if (newForceWritePosition > lastForceWritePosition) {
+ if (fRemoveFromPageCache) {
+ NativeIO.bestEffortRemoveFromPageCache(randomAccessFile.getFD(),
+ lastForceWritePosition, (int)(newForceWritePosition - lastForceWritePosition));
+ }
+ synchronized (this) {
+ lastForceWritePosition = newForceWritePosition;
+ }
+ }
+ }
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java?rev=1505174&r1=1505173&r2=1505174&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java Sat Jul 20 17:59:39 2013
@@ -20,6 +20,8 @@ package org.apache.bookkeeper.conf;
import java.io.File;
import java.util.List;
+import com.google.common.annotations.Beta;
+
import org.apache.commons.lang.StringUtils;
/**
@@ -46,6 +48,7 @@ public class ServerConfiguration extends
// Journal Parameters
protected final static String MAX_JOURNAL_SIZE = "journalMaxSizeMB";
protected final static String MAX_BACKUP_JOURNALS = "journalMaxBackups";
+ protected final static String JOURNAL_REMOVE_FROM_PAGE_CACHE = "journalRemoveFromPageCache";
// Bookie Parameters
protected final static String BOOKIE_PORT = "bookiePort";
protected final static String LISTENING_INTERFACE = "listeningInterface";
@@ -753,4 +756,26 @@ public class ServerConfiguration extends
public boolean isAutoRecoveryDaemonEnabled() {
return getBoolean(AUTO_RECOVERY_DAEMON_ENABLED, false);
}
+
+ /**
+ * Should we remove pages from page cache after force write
+ *
+ * @return remove pages from cache
+ */
+ @Beta
+ public boolean getJournalRemovePagesFromCache() {
+ return getBoolean(JOURNAL_REMOVE_FROM_PAGE_CACHE, false);
+ }
+
+ /**
+ * Sets that whether should we remove pages from page cache after force write.
+ *
+ * @param enabled
+ * - true if we need to remove pages from page cache. otherwise, false
+ * @return ServerConfiguration
+ */
+ public ServerConfiguration setJournalRemovePagesFromCache(boolean enabled) {
+ setProperty(JOURNAL_REMOVE_FROM_PAGE_CACHE, enabled);
+ return this;
+ }
}
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/NativeIO.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/NativeIO.java?rev=1505174&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/NativeIO.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/NativeIO.java Sat Jul 20 17:59:39 2013
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.util;
+
+import java.lang.reflect.Field;
+import java.io.FileDescriptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.sun.jna.LastErrorException;
+import com.sun.jna.Native;
+
+public final class NativeIO {
+ private static Logger LOG = LoggerFactory.getLogger(NativeIO.class);
+
+ private static final int POSIX_FADV_DONTNEED = 4; /* fadvise.h */
+
+ private static boolean initializationAttempted = false;
+ private static boolean initialized = false;
+
+ private static void onDemandInitialization() {
+ try {
+ initializationAttempted = true;
+ Native.register("c");
+ initialized = true;
+ } catch (NoClassDefFoundError e) {
+ LOG.info("JNA not found. Native methods will be disabled.");
+ } catch (UnsatisfiedLinkError e) {
+ LOG.info("Unable to link C library. Native methods will be disabled.");
+ } catch (NoSuchMethodError e) {
+ LOG.warn("Obsolete version of JNA present; unable to register C library");
+ }
+ }
+
+ // fadvice
+ public static native int posix_fadvise(int fd, long offset, int len, int flag) throws LastErrorException;
+
+ private NativeIO() {}
+
+ private static Field getFieldByReflection(Class cls, String fieldName) {
+ Field field = null;
+
+ try {
+ field = cls.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ } catch (Exception e) {
+ // We don't really expect this so throw an assertion to
+ // catch this during development
+ assert false;
+ LOG.warn("Unable to read {} field from {}", fieldName, cls.getName());
+ }
+
+ return field;
+ }
+ /**
+ * Get system file descriptor (int) from FileDescriptor object.
+ * @param descriptor - FileDescriptor object to get fd from
+ * @return file descriptor, -1 or error
+ */
+ private static int getSysFileDescriptor(FileDescriptor descriptor) {
+ // field would not be null due to 'assert false' in getFieldByReflection
+ Field field = getFieldByReflection(descriptor.getClass(), "fd");
+ try {
+ return field.getInt(descriptor);
+ } catch (Exception e) {
+ LOG.warn("Unable to read fd field from java.io.FileDescriptor");
+ }
+
+ return -1;
+ }
+
+ /**
+ * Remove pages from the file system page cache when they wont
+ * be accessed again
+ *
+ * @param fileDescriptor The file descriptor of the source file.
+ * @param offset The offset within the file.
+ * @param len The length to be flushed.
+ *
+ * @throws nothing => Best effort
+ */
+
+ public static void bestEffortRemoveFromPageCache(FileDescriptor fileDescriptor, long offset, int len) {
+ if (!initializationAttempted) {
+ onDemandInitialization();
+ }
+
+ if (!initialized) {
+ return;
+ }
+
+ int sysFileDesc = getSysFileDescriptor(fileDescriptor);
+
+ if (sysFileDesc < 0) {
+ return;
+ }
+
+ try {
+ if (System.getProperty("os.name").toLowerCase().contains("linux")) {
+ posix_fadvise(sysFileDesc, offset, len, POSIX_FADV_DONTNEED);
+ } else {
+ LOG.debug("posix_fadvise skipped on file descriptor {}, offset {}", fileDescriptor, offset);
+ }
+ } catch (UnsatisfiedLinkError e) {
+ // if JNA is unavailable just skipping Direct I/O
+ // instance of this class will act like normal RandomAccessFile
+ LOG.warn("Unsatisfied Link error: posix_fadvise failed on file descriptor {}, offset {}",
+ fileDescriptor, offset);
+ } catch (Exception e) {
+ // This is best effort anyway so lets just log that there was an
+ // exception and forget
+ LOG.warn("Unknown exception: posix_fadvise failed on file descriptor {}, offset {}",
+ fileDescriptor, offset);
+ }
+ }
+
+}