You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by si...@apache.org on 2013/07/20 19:59:39 UTC

svn commit: r1505174 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/ bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ bookkeeper-server/src/main/java/org/apache/bookkeeper...

Author: sijie
Date: Sat Jul 20 17:59:39 2013
New Revision: 1505174

URL: http://svn.apache.org/r1505174
Log:
BOOKKEEPER-563: Avoid Journal polluting page cache (Robin Dhamankar via sijie)

Added:
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/NativeIO.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1505174&r1=1505173&r2=1505174&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Sat Jul 20 17:59:39 2013
@@ -70,6 +70,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-633: ConcurrentModificationException in RackawareEnsemblePlacementPolicy when a bookie is removed from available list (vinay via sijie)
 
+        BOOKKEEPER-563: Avoid Journal polluting page cache (Robin Dhamankar via sijie)
+
       hedwig-server:
 
         BOOKKEEPER-601: readahead cache size isn't updated correctly (sijie via fpj)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml?rev=1505174&r1=1505173&r2=1505174&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml Sat Jul 20 17:59:39 2013
@@ -98,6 +98,11 @@
       <artifactId>commons-io</artifactId>
       <version>2.1</version>
     </dependency>
+    <dependency>
+      <groupId>net.java.dev.jna</groupId>
+      <artifactId>jna</artifactId>
+      <version>3.2.7</version>
+    </dependency>
     <!--
         Annoying dependency we need to include because
         zookeeper uses log4j and so we transatively do, but

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java?rev=1505174&r1=1505173&r2=1505174&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java Sat Jul 20 17:59:39 2013
@@ -111,10 +111,23 @@ public class BufferedChannel
             writeBufferStartPosition = bc.position();
         }
         if (sync) {
-            bc.force(false);
+            forceWrite(false);
         }
     }
 
+    public long forceWrite(boolean forceMetadata) throws IOException {
+        // This is the point up to which we had flushed to the file system page cache
+        // before issuing this force write hence is guaranteed to be made durable by
+        // the force write, any flush that happens after this may or may
+        // not be flushed
+        long positionForceWrite;
+        synchronized (this) {
+            positionForceWrite = writeBufferStartPosition;
+        }
+        bc.force(forceMetadata);
+        return positionForceWrite;
+    }
+
     /*public Channel getInternalChannel() {
         return bc;
     }*/

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java?rev=1505174&r1=1505173&r2=1505174&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java Sat Jul 20 17:59:39 2013
@@ -275,6 +275,8 @@ class Journal extends Thread implements 
 
     final File journalDirectory;
     final ServerConfiguration conf;
+    // should we hint the filesystem to remove pages from cache after force write
+    private final boolean removePagesFromCache;
 
     private LastLogMark lastLogMark = new LastLogMark(0, 0);
 
@@ -292,6 +294,7 @@ class Journal extends Thread implements 
         this.maxJournalSize = conf.getMaxJournalSize() * MB;
         this.maxBackupJournals = conf.getMaxBackupJournals();
 
+        this.removePagesFromCache = conf.getJournalRemovePagesFromCache();
         // read last log mark
         lastLogMark.readLog();
         LOG.debug("Last Log Mark : {}", lastLogMark.getCurMark());
@@ -492,7 +495,7 @@ class Journal extends Thread implements 
                 // new journal file to write
                 if (null == logFile) {
                     logId = logId + 1;
-                    logFile = new JournalChannel(journalDirectory, logId);
+                    logFile = new JournalChannel(journalDirectory, logId, removePagesFromCache);
                     bc = logFile.getBufferedChannel();
 
                     lastFlushPosition = 0;
@@ -505,7 +508,11 @@ class Journal extends Thread implements 
                         qe = queue.poll();
                         if (qe == null || bc.position() > lastFlushPosition + 512*1024) {
                             //logFile.force(false);
-                            bc.flush(true);
+                            bc.flush(false);
+                            // This separation of flush and force is useful when adaptive group
+                            // force write is used where the flush thread does not block while
+                            // the force is issued by a separate thread
+                            logFile.forceWrite(false);
                             lastFlushPosition = bc.position();
                             lastLogMark.setCurLogMark(logId, lastFlushPosition);
                             for (QueueEntry e : toFlush) {

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java?rev=1505174&r1=1505173&r2=1505174&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java Sat Jul 20 17:59:39 2013
@@ -29,6 +29,7 @@ import java.io.RandomAccessFile;
 import java.io.IOException;
 import java.nio.channels.FileChannel;
 import java.nio.ByteBuffer;
+import org.apache.bookkeeper.util.NativeIO;
 
 import static com.google.common.base.Charsets.UTF_8;
 
@@ -42,6 +43,7 @@ import org.slf4j.LoggerFactory;
 class JournalChannel implements Closeable {
     static Logger LOG = LoggerFactory.getLogger(JournalChannel.class);
 
+    final RandomAccessFile randomAccessFile;
     final FileChannel fc;
     final BufferedChannel bc;
     final int formatVersion;
@@ -57,12 +59,24 @@ class JournalChannel implements Closeabl
 
     public final static long preAllocSize = 4*1024*1024;
     public final static ByteBuffer zeros = ByteBuffer.allocate(512);
+    private boolean fRemoveFromPageCache;
+    // The position of the file channel's last force write.
+    private long lastForceWritePosition = 0;
 
     JournalChannel(File journalDirectory, long logId) throws IOException {
-        this(journalDirectory, logId, START_OF_FILE);
+        this(journalDirectory, logId, START_OF_FILE, false);
     }
 
     JournalChannel(File journalDirectory, long logId, long position) throws IOException {
+        this(journalDirectory, logId, position, false);
+    }
+
+    JournalChannel(File journalDirectory, long logId, boolean fRemoveFromPageCache) throws IOException {
+        this(journalDirectory, logId, START_OF_FILE, fRemoveFromPageCache);
+    }
+
+    JournalChannel(File journalDirectory, long logId, long position, boolean fRemoveFromPageCache) throws IOException {
+        this.fRemoveFromPageCache = fRemoveFromPageCache;
         File fn = new File(journalDirectory, Long.toHexString(logId) + ".txn");
 
         LOG.info("Opening journal {}", fn);
@@ -73,7 +87,8 @@ class JournalChannel implements Closeabl
                 throw new IOException("File " + fn
                         + " suddenly appeared, is another bookie process running?");
             }
-            fc = new RandomAccessFile(fn, "rw").getChannel();
+            randomAccessFile = new RandomAccessFile(fn, "rw");
+            fc = randomAccessFile.getChannel();
             formatVersion = CURRENT_JOURNAL_FORMAT_VERSION;
 
             ByteBuffer bb = ByteBuffer.allocate(HEADER_SIZE);
@@ -81,14 +96,15 @@ class JournalChannel implements Closeabl
             bb.putInt(formatVersion);
             bb.flip();
             fc.write(bb);
-            fc.force(true);
 
             bc = new BufferedChannel(fc, 65536);
 
+            forceWrite(true);
             nextPrealloc = preAllocSize;
             fc.write(zeros, nextPrealloc);
         } else {  // open an existing file
-            fc = new RandomAccessFile(fn, "r").getChannel();
+            randomAccessFile = new RandomAccessFile(fn, "r");
+            fc = randomAccessFile.getChannel();
             bc = null; // readonly
 
             ByteBuffer bb = ByteBuffer.allocate(HEADER_SIZE);
@@ -133,6 +149,9 @@ class JournalChannel implements Closeabl
             } catch (IOException e) {
                 LOG.error("Bookie journal file can seek to position :", e);
             }
+
+            // Anything we read has been force written
+            lastForceWritePosition = fc.position();
         }
     }
 
@@ -163,4 +182,17 @@ class JournalChannel implements Closeabl
     public void close() throws IOException {
         fc.close();
     }
+
+    public void forceWrite(boolean forceMetadata) throws IOException {
+        long newForceWritePosition = bc.forceWrite(forceMetadata);
+        if (newForceWritePosition > lastForceWritePosition) {
+            if (fRemoveFromPageCache) {
+                NativeIO.bestEffortRemoveFromPageCache(randomAccessFile.getFD(),
+                    lastForceWritePosition, (int)(newForceWritePosition - lastForceWritePosition));
+            }
+            synchronized (this) {
+                lastForceWritePosition = newForceWritePosition;
+            }
+        }
+    }
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java?rev=1505174&r1=1505173&r2=1505174&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java Sat Jul 20 17:59:39 2013
@@ -20,6 +20,8 @@ package org.apache.bookkeeper.conf;
 import java.io.File;
 import java.util.List;
 
+import com.google.common.annotations.Beta;
+
 import org.apache.commons.lang.StringUtils;
 
 /**
@@ -46,6 +48,7 @@ public class ServerConfiguration extends
     // Journal Parameters
     protected final static String MAX_JOURNAL_SIZE = "journalMaxSizeMB";
     protected final static String MAX_BACKUP_JOURNALS = "journalMaxBackups";
+    protected final static String JOURNAL_REMOVE_FROM_PAGE_CACHE = "journalRemoveFromPageCache";
     // Bookie Parameters
     protected final static String BOOKIE_PORT = "bookiePort";
     protected final static String LISTENING_INTERFACE = "listeningInterface";
@@ -753,4 +756,26 @@ public class ServerConfiguration extends
     public boolean isAutoRecoveryDaemonEnabled() {
         return getBoolean(AUTO_RECOVERY_DAEMON_ENABLED, false);
     }
+
+    /**
+     * Should we remove pages from page cache after force write
+     *
+     * @return remove pages from cache
+     */
+    @Beta
+    public boolean getJournalRemovePagesFromCache() {
+        return getBoolean(JOURNAL_REMOVE_FROM_PAGE_CACHE, false);
+    }
+
+    /**
+     * Sets that whether should we remove pages from page cache after force write.
+     *
+     * @param enabled
+     *            - true if we need to remove pages from page cache. otherwise, false
+     * @return ServerConfiguration
+     */
+    public ServerConfiguration setJournalRemovePagesFromCache(boolean enabled) {
+        setProperty(JOURNAL_REMOVE_FROM_PAGE_CACHE, enabled);
+        return this;
+    }
 }

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/NativeIO.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/NativeIO.java?rev=1505174&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/NativeIO.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/NativeIO.java Sat Jul 20 17:59:39 2013
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.util;
+
+import java.lang.reflect.Field;
+import java.io.FileDescriptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.sun.jna.LastErrorException;
+import com.sun.jna.Native;
+
+public final class NativeIO {
+    private static Logger LOG = LoggerFactory.getLogger(NativeIO.class);
+
+    private static final int POSIX_FADV_DONTNEED = 4; /* fadvise.h */
+
+    private static boolean initializationAttempted = false;
+    private static boolean initialized = false;
+
+    private static void onDemandInitialization() {
+        try {
+            initializationAttempted = true;
+            Native.register("c");
+            initialized = true;
+        } catch (NoClassDefFoundError e) {
+            LOG.info("JNA not found. Native methods will be disabled.");
+        } catch (UnsatisfiedLinkError e) {
+            LOG.info("Unable to link C library. Native methods will be disabled.");
+        } catch (NoSuchMethodError e) {
+            LOG.warn("Obsolete version of JNA present; unable to register C library");
+        }
+    }
+
+    // fadvice
+    public static native int posix_fadvise(int fd, long offset, int len, int flag) throws LastErrorException;
+
+    private NativeIO() {}
+
+    private static Field getFieldByReflection(Class cls, String fieldName) {
+        Field field = null;
+
+        try {
+            field = cls.getDeclaredField(fieldName);
+            field.setAccessible(true);
+        } catch (Exception e) {
+            // We don't really expect this so throw an assertion to
+            // catch this during development
+            assert false;
+            LOG.warn("Unable to read {} field from {}", fieldName, cls.getName());
+        }
+
+        return field;
+    }
+    /**
+     * Get system file descriptor (int) from FileDescriptor object.
+     * @param descriptor - FileDescriptor object to get fd from
+     * @return file descriptor, -1 or error
+     */
+    private static int getSysFileDescriptor(FileDescriptor descriptor) {
+        // field would not be null due to 'assert false' in getFieldByReflection
+        Field field = getFieldByReflection(descriptor.getClass(), "fd");
+        try {
+            return field.getInt(descriptor);
+        } catch (Exception e) {
+            LOG.warn("Unable to read fd field from java.io.FileDescriptor");
+        }
+
+        return -1;
+    }
+
+    /**
+     * Remove pages from the file system page cache when they wont
+     * be accessed again
+     *
+     * @param fileDescriptor     The file descriptor of the source file.
+     * @param offset The offset within the file.
+     * @param len    The length to be flushed.
+     *
+     * @throws nothing => Best effort
+     */
+
+    public static void bestEffortRemoveFromPageCache(FileDescriptor fileDescriptor, long offset, int len) {
+        if (!initializationAttempted) {
+            onDemandInitialization();
+        }
+
+        if (!initialized) {
+            return;
+        }
+
+        int sysFileDesc = getSysFileDescriptor(fileDescriptor);
+
+        if (sysFileDesc < 0) {
+            return;
+        }
+
+        try {
+            if (System.getProperty("os.name").toLowerCase().contains("linux")) {
+                posix_fadvise(sysFileDesc, offset, len, POSIX_FADV_DONTNEED);
+            } else {
+                LOG.debug("posix_fadvise skipped on file descriptor {}, offset {}", fileDescriptor, offset);
+            }
+        } catch (UnsatisfiedLinkError e) {
+            // if JNA is unavailable just skipping Direct I/O
+            // instance of this class will act like normal RandomAccessFile
+            LOG.warn("Unsatisfied Link error: posix_fadvise failed on file descriptor {}, offset {}",
+                fileDescriptor, offset);
+        } catch (Exception e) {
+            // This is best effort anyway so lets just log that there was an
+            // exception and forget
+            LOG.warn("Unknown exception: posix_fadvise failed on file descriptor {}, offset {}",
+                fileDescriptor, offset);
+        }
+    }
+
+}