You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/01/31 19:01:00 UTC

[GitHub] sijie closed pull request #80: BOOKKEEPER-816: use native fallocate to improve journal allocation

sijie closed pull request #80: BOOKKEEPER-816: use native fallocate to improve journal allocation
URL: https://github.com/apache/bookkeeper/pull/80
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/bin/bookkeeper b/bookkeeper-server/bin/bookkeeper
index 54be3fefe..87429f9b1 100755
--- a/bookkeeper-server/bin/bookkeeper
+++ b/bookkeeper-server/bin/bookkeeper
@@ -180,10 +180,11 @@ if [ -z "$BOOKIE_LOG_CONF" ]; then
 fi
 
 BOOKIE_CLASSPATH="$BOOKIE_JAR:$BOOKIE_CLASSPATH:$BOOKIE_EXTRA_CLASSPATH"
-BOOKIE_CLASSPATH="`dirname $BOOKIE_LOG_CONF`:$BOOKIE_CLASSPATH"
-OPTS="$OPTS -Dlog4j.configuration=`basename $BOOKIE_LOG_CONF`"
-
-OPTS="-cp $BOOKIE_CLASSPATH $OPTS"
+if [ "$BOOKIE_LOG_CONF" != "" ]; then
+    BOOKIE_CLASSPATH="`dirname $BOOKIE_LOG_CONF`:$BOOKIE_CLASSPATH"
+    OPTS="$OPTS -Dlog4j.configuration=`basename $BOOKIE_LOG_CONF`"
+fi
+OPTS="-cp $BOOKIE_CLASSPATH -Djava.library.path=$BK_HOME/target/native/target/usr/local/lib $OPTS"
 
 OPTS="$OPTS $BOOKIE_EXTRA_OPTS"
 
diff --git a/bookkeeper-server/pom.xml b/bookkeeper-server/pom.xml
index cc13e5930..475876be6 100644
--- a/bookkeeper-server/pom.xml
+++ b/bookkeeper-server/pom.xml
@@ -331,6 +331,79 @@
     </plugins>
   </build>
   <profiles>
+    <profile>
+      <id>native</id>
+      <activation>
+        <activeByDefault>false</activeByDefault>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-enforcer-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>enforce-os</id>
+                <goals>
+                  <goal>enforce</goal>
+                </goals>
+                <configuration>
+                  <rules>
+                    <requireOS>
+                      <family>mac</family>
+                      <family>unix</family>
+                      <message>native build only supported on Mac or Unix</message>
+                    </requireOS>
+                  </rules>  
+                  <fail>true</fail>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>native-maven-plugin</artifactId>
+            <executions>
+              <execution>
+                <phase>compile</phase>
+                <goals>
+                  <goal>javah</goal>
+                </goals>
+                <configuration>
+                  <javahPath>${env.JAVA_HOME}/bin/javah</javahPath>
+                  <javahClassNames>
+                    <javahClassName>org.apache.bookkeeper.util.NativeIO</javahClassName>
+                  </javahClassNames>
+                  <javahOutputDirectory>${project.build.directory}/native/javah</javahOutputDirectory>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-antrun-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>make</id>
+                <phase>compile</phase>
+                <goals><goal>run</goal></goals>
+                <configuration>
+                  <target>
+                    <exec executable="cmake" dir="${project.build.directory}/native" failonerror="true">
+                      <arg line="${basedir}/src/ -DGENERATED_JAVAH=${project.build.directory}/native/javah -DJVM_ARCH_DATA_MODEL=${sun.arch.data.model}"/>
+                    </exec>
+                    <exec executable="make" dir="${project.build.directory}/native" failonerror="true">
+                      <arg line="VERBOSE=1"/>
+                    </exec>
+                    <exec executable="make" dir="${project.build.directory}/native" failonerror="true"></exec>
+                  </target>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
     <profile>
       <id>protobuf</id>
       <build>
diff --git a/bookkeeper-server/src/CMakeLists.txt b/bookkeeper-server/src/CMakeLists.txt
new file mode 100644
index 000000000..3d446b84d
--- /dev/null
+++ b/bookkeeper-server/src/CMakeLists.txt
@@ -0,0 +1,142 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+cmake_minimum_required(VERSION 2.6 FATAL_ERROR)
+
+# Default to release builds
+set(CMAKE_BUILD_TYPE, Release)
+
+include(JNIFlags.cmake NO_POLICY_SCOPE)
+
+# Compile a library with both shared and static variants
+function(add_dual_library LIBNAME)
+    add_library(${LIBNAME} SHARED ${ARGN})
+    add_library(${LIBNAME}_static STATIC ${ARGN})
+    set_target_properties(${LIBNAME}_static PROPERTIES OUTPUT_NAME ${LIBNAME})
+endfunction(add_dual_library)
+
+# Link both a static and a dynamic target against some libraries
+function(target_link_dual_libraries LIBNAME)
+    target_link_libraries(${LIBNAME} ${ARGN})
+    target_link_libraries(${LIBNAME}_static ${ARGN})
+endfunction(target_link_dual_libraries)
+
+function(output_directory TGT DIR)
+    SET_TARGET_PROPERTIES(${TGT} PROPERTIES
+        RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/${DIR}")
+    SET_TARGET_PROPERTIES(${TGT} PROPERTIES
+        ARCHIVE_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/${DIR}")
+    SET_TARGET_PROPERTIES(${TGT} PROPERTIES
+        LIBRARY_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/${DIR}")
+endfunction(output_directory TGT DIR)
+
+function(dual_output_directory TGT DIR)
+    output_directory(${TGT} "${DIR}")
+    output_directory(${TGT}_static "${DIR}")
+endfunction(dual_output_directory TGT DIR)
+
+#
+# This macro alters the behavior of find_package and find_library.
+# It does this by setting the CMAKE_FIND_LIBRARY_SUFFIXES global variable. 
+# You should save that variable before calling this function and restore it
+# after you have accomplished your goal.
+#
+# The behavior is altered in two ways:
+# 1. We always find shared libraries, never static;
+# 2. We find shared libraries with the given version number.
+#
+# On Windows this function is a no-op.  Windows does not encode
+# version number information information into library path names.
+#
+macro(set_find_shared_library_version LVERS)
+    IF(${CMAKE_SYSTEM_NAME} MATCHES "Darwin")
+        # Mac OS uses .dylib
+        SET(CMAKE_FIND_LIBRARY_SUFFIXES ".${LVERS}.dylib")
+    ELSEIF(${CMAKE_SYSTEM_NAME} MATCHES "FreeBSD")
+        # FreeBSD has always .so installed.
+        SET(CMAKE_FIND_LIBRARY_SUFFIXES ".so")
+    ELSEIF(${CMAKE_SYSTEM_NAME} MATCHES "Windows")
+        # Windows doesn't support finding shared libraries by version.
+    ELSE()
+        # Most UNIX variants use .so
+        SET(CMAKE_FIND_LIBRARY_SUFFIXES ".so.${LVERS}")
+    ENDIF()
+endmacro(set_find_shared_library_version LVERS)
+
+if (NOT GENERATED_JAVAH)
+    # Must identify where the generated headers have been placed
+    MESSAGE(FATAL_ERROR "You must set the cmake variable GENERATED_JAVAH")
+endif (NOT GENERATED_JAVAH)
+find_package(JNI REQUIRED)
+
+SET(STORED_CMAKE_FIND_LIBRARY_SUFFIXES CMAKE_FIND_LIBRARY_SUFFIXES)
+set_find_shared_library_version("1")
+find_package(ZLIB REQUIRED)
+SET(CMAKE_FIND_LIBRARY_SUFFIXES STORED_CMAKE_FIND_LIBRARY_SUFFIXES)
+
+set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -g -Wall -O2")
+set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -D_REENTRANT -D_GNU_SOURCE")
+set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -D_LARGEFILE_SOURCE -D_FILE_OFFSET_BITS=64")
+set(D main/native/src/org/apache/bookkeeper)
+set(T main/native/src/test/org/apache/bookkeeper)
+
+INCLUDE(CheckFunctionExists)
+INCLUDE(CheckCSourceCompiles)
+INCLUDE(CheckLibraryExists)
+CHECK_FUNCTION_EXISTS(sync_file_range HAVE_SYNC_FILE_RANGE)
+CHECK_FUNCTION_EXISTS(fallocate HAVE_FALLOCATE)
+CHECK_FUNCTION_EXISTS(posix_fadvise HAVE_POSIX_FADVISE)
+CHECK_FUNCTION_EXISTS(posix_fallocate HAVE_POSIX_ALLOCATE)
+CHECK_LIBRARY_EXISTS(dl dlopen "" NEED_LINK_DL)
+
+include_directories(
+    ${GENERATED_JAVAH}
+    main/native/src
+    ${CMAKE_CURRENT_SOURCE_DIR}
+    ${CMAKE_CURRENT_SOURCE_DIR}/src
+    ${CMAKE_BINARY_DIR}
+    ${JNI_INCLUDE_DIRS}
+)
+CONFIGURE_FILE(${CMAKE_SOURCE_DIR}/config.h.cmake ${CMAKE_BINARY_DIR}/config.h)
+
+SET(CMAKE_BUILD_WITH_INSTALL_RPATH TRUE)
+add_dual_library(bookkeeper
+    ${D}/util/NativeIO.c
+)
+if (NEED_LINK_DL)
+   set(LIB_DL dl)
+endif (NEED_LINK_DL)
+
+IF (${CMAKE_SYSTEM_NAME} MATCHES "Linux")
+    #
+    # By embedding '$ORIGIN' into the RPATH of libbookkeeper.so,
+    # dlopen will look in the directory containing libbookkeeper.so.
+    # However, $ORIGIN is not supported by all operating systems.
+    #
+    SET_TARGET_PROPERTIES(bookkeeper
+        PROPERTIES INSTALL_RPATH "\$ORIGIN/")
+ENDIF()
+
+target_link_dual_libraries(bookkeeper
+    ${LIB_DL}
+    ${JAVA_JVM_LIBRARY}
+)
+SET(LIBBOOKKEEPER_VERSION "1.0.0")
+SET_TARGET_PROPERTIES(bookkeeper PROPERTIES
+    SOVERSION ${LIBBOOKKEEPER_VERSION})
+dual_output_directory(bookkeeper target/usr/local/lib)
diff --git a/bookkeeper-server/src/JNIFlags.cmake b/bookkeeper-server/src/JNIFlags.cmake
new file mode 100644
index 000000000..8333285f2
--- /dev/null
+++ b/bookkeeper-server/src/JNIFlags.cmake
@@ -0,0 +1,118 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+cmake_minimum_required(VERSION 2.6 FATAL_ERROR)
+
+# If JVM_ARCH_DATA_MODEL is 32, compile all binaries as 32-bit.
+# This variable is set by maven.
+if (JVM_ARCH_DATA_MODEL EQUAL 32)
+    # Force 32-bit code generation on amd64/x86_64, ppc64, sparc64
+    if (CMAKE_COMPILER_IS_GNUCC AND CMAKE_SYSTEM_PROCESSOR MATCHES ".*64")
+        set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -m32")
+        set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -m32")
+        set(CMAKE_LD_FLAGS "${CMAKE_LD_FLAGS} -m32")
+    endif ()
+    if (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" OR CMAKE_SYSTEM_PROCESSOR STREQUAL "amd64")
+        # Set CMAKE_SYSTEM_PROCESSOR to ensure that find_package(JNI) will use
+        # the 32-bit version of libjvm.so.
+        set(CMAKE_SYSTEM_PROCESSOR "i686")
+    endif ()
+endif (JVM_ARCH_DATA_MODEL EQUAL 32)
+
+# Determine float ABI of JVM on ARM Linux
+if (CMAKE_SYSTEM_PROCESSOR MATCHES "^arm" AND CMAKE_SYSTEM_NAME STREQUAL "Linux")
+    find_program(READELF readelf)
+    if (READELF MATCHES "NOTFOUND")
+        message(WARNING "readelf not found; JVM float ABI detection disabled")
+    else (READELF MATCHES "NOTFOUND")
+        execute_process(
+            COMMAND ${READELF} -A ${JAVA_JVM_LIBRARY}
+            OUTPUT_VARIABLE JVM_ELF_ARCH
+            ERROR_QUIET)
+        if (NOT JVM_ELF_ARCH MATCHES "Tag_ABI_VFP_args: VFP registers")
+            message("Soft-float JVM detected")
+
+            # Test compilation with -mfloat-abi=softfp using an arbitrary libc function
+            # (typically fails with "fatal error: bits/predefs.h: No such file or directory"
+            # if soft-float dev libraries are not installed)
+            include(CMakePushCheckState)
+            cmake_push_check_state()
+            set(CMAKE_REQUIRED_FLAGS "${CMAKE_REQUIRED_FLAGS} -mfloat-abi=softfp")
+            include(CheckSymbolExists)
+            check_symbol_exists(exit stdlib.h SOFTFP_AVAILABLE)
+            if (NOT SOFTFP_AVAILABLE)
+                message(FATAL_ERROR "Soft-float dev libraries required (e.g. 'apt-get install libc6-dev-armel' on Debian/Ubuntu)")
+            endif (NOT SOFTFP_AVAILABLE)
+            cmake_pop_check_state()
+
+            set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mfloat-abi=softfp")
+        endif ()
+    endif (READELF MATCHES "NOTFOUND")
+endif (CMAKE_SYSTEM_PROCESSOR MATCHES "^arm" AND CMAKE_SYSTEM_NAME STREQUAL "Linux")
+
+IF("${CMAKE_SYSTEM}" MATCHES "Linux")
+    #
+    # Locate JNI_INCLUDE_DIRS and JNI_LIBRARIES.
+    # Since we were invoked from Maven, we know that the JAVA_HOME environment
+    # variable is valid.  So we ignore system paths here and just use JAVA_HOME.
+    #
+    FILE(TO_CMAKE_PATH "$ENV{JAVA_HOME}" _JAVA_HOME)
+    IF(CMAKE_SYSTEM_PROCESSOR MATCHES "^i.86$")
+        SET(_java_libarch "i386")
+    ELSEIF (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" OR CMAKE_SYSTEM_PROCESSOR STREQUAL "amd64")
+        SET(_java_libarch "amd64")
+    ELSEIF (CMAKE_SYSTEM_PROCESSOR MATCHES "^arm")
+        SET(_java_libarch "arm")
+    ELSE()
+        SET(_java_libarch ${CMAKE_SYSTEM_PROCESSOR})
+    ENDIF()
+    SET(_JDK_DIRS "${_JAVA_HOME}/jre/lib/${_java_libarch}/*"
+                  "${_JAVA_HOME}/jre/lib/${_java_libarch}"
+                  "${_JAVA_HOME}/jre/lib/*"
+                  "${_JAVA_HOME}/jre/lib"
+                  "${_JAVA_HOME}/lib/*"
+                  "${_JAVA_HOME}/lib"
+                  "${_JAVA_HOME}/include/*"
+                  "${_JAVA_HOME}/include"
+                  "${_JAVA_HOME}"
+    )
+    FIND_PATH(JAVA_INCLUDE_PATH
+        NAMES jni.h 
+        PATHS ${_JDK_DIRS}
+        NO_DEFAULT_PATH)
+    #In IBM java, it's jniport.h instead of jni_md.h
+    FIND_PATH(JAVA_INCLUDE_PATH2 
+        NAMES jni_md.h jniport.h
+        PATHS ${_JDK_DIRS}
+        NO_DEFAULT_PATH)
+    SET(JNI_INCLUDE_DIRS ${JAVA_INCLUDE_PATH} ${JAVA_INCLUDE_PATH2})
+    FIND_LIBRARY(JAVA_JVM_LIBRARY
+        NAMES jvm JavaVM
+        PATHS ${_JDK_DIRS}
+        NO_DEFAULT_PATH)
+    SET(JNI_LIBRARIES ${JAVA_JVM_LIBRARY})
+    MESSAGE("JAVA_HOME=${JAVA_HOME}, JAVA_JVM_LIBRARY=${JAVA_JVM_LIBRARY}")
+    MESSAGE("JAVA_INCLUDE_PATH=${JAVA_INCLUDE_PATH}, JAVA_INCLUDE_PATH2=${JAVA_INCLUDE_PATH2}")
+    IF(JAVA_JVM_LIBRARY AND JAVA_INCLUDE_PATH AND JAVA_INCLUDE_PATH2)
+        MESSAGE("Located all JNI components successfully.")
+    ELSE()
+        MESSAGE(FATAL_ERROR "Failed to find a viable JVM installation under JAVA_HOME.")
+    ENDIF()
+ELSE()
+    find_package(JNI REQUIRED)
+ENDIF()
diff --git a/bookkeeper-server/src/config.h.cmake b/bookkeeper-server/src/config.h.cmake
new file mode 100644
index 000000000..d460b7fc9
--- /dev/null
+++ b/bookkeeper-server/src/config.h.cmake
@@ -0,0 +1,26 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef CONFIG_H
+#define CONFIG_H
+
+#cmakedefine HAVE_SYNC_FILE_RANGE
+#cmakedefine HAVE_FALLOCATE
+#cmakedefine HAVE_POSIX_FADVISE
+#cmakedefine HAVE_POSIX_FALLOCATE
+
+#endif
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
index 99a2db137..fa3b46269 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
@@ -56,6 +56,7 @@
     public final static String JOURNAL_FORCE_WRITE_BATCH_ENTRIES = "JOURNAL_FORCE_WRITE_BATCH_ENTRIES";
     public final static String JOURNAL_FORCE_WRITE_BATCH_BYTES = "JOURNAL_FORCE_WRITE_BATCH_BYTES";
     public final static String JOURNAL_FLUSH_LATENCY = "JOURNAL_FLUSH_LATENCY";
+    public final static String JOURNAL_FLUSH_IN_MEM_ADD = "JOURNAL_FLUSH_IN_MEM_ADD";
     public final static String JOURNAL_QUEUE_LATENCY = "JOURNAL_QUEUE_LATENCY";
     public final static String JOURNAL_PROCESS_TIME_LATENCY = "JOURNAL_PROCESS_TIME_LATENCY";
     public final static String JOURNAL_CREATION_LATENCY = "JOURNAL_CREATION_LATENCY";
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
index cb7d914d7..0492943a0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
@@ -24,9 +24,10 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
-import org.apache.bookkeeper.util.ZeroBuffer;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.bookkeeper.util.ZeroBuffer;
+
 /**
  * Provides a buffering layer in front of a FileChannel.
  */
@@ -64,8 +65,9 @@ public BufferedChannel(FileChannel fc, int writeCapacity, int readCapacity) thro
      * @param src The source ByteBuffer which contains the data to be written.
      * @throws IOException if a write operation fails.
      */
-    synchronized public void write(ByteBuffer src) throws IOException {
+    synchronized public int write(ByteBuffer src) throws IOException {
         int copied = 0;
+        int flushes = 0;
         while(src.remaining() > 0) {
             int truncated = 0;
             if (writeBuffer.remaining() < src.remaining()) {
@@ -78,9 +80,11 @@ synchronized public void write(ByteBuffer src) throws IOException {
             // if we have run out of buffer space, we should flush to the file
             if (writeBuffer.remaining() == 0) {
                 flushInternal();
+                ++flushes;
             }
         }
         position += copied;
+        return flushes;
     }
 
     /**
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 1483e368a..2862144a4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -30,8 +30,6 @@
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
@@ -46,6 +44,8 @@
 import org.apache.bookkeeper.util.DaemonThreadFactory;
 import org.apache.bookkeeper.util.IOUtils;
 import org.apache.bookkeeper.util.MathUtils;
+import org.apache.bookkeeper.util.OrderedSafeExecutor;
+import org.apache.bookkeeper.util.SafeRunnable;
 import org.apache.bookkeeper.util.ZeroBuffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -271,13 +271,13 @@ public boolean accept(long journalId) {
     /**
      * Journal Entry to Record
      */
-    private class QueueEntry implements Runnable {
-        ByteBuffer entry;
-        long ledgerId;
-        long entryId;
-        WriteCallback cb;
-        Object ctx;
-        long enqueueTime;
+    private class QueueEntry extends SafeRunnable {
+        final ByteBuffer entry;
+        final long ledgerId;
+        final long entryId;
+        final WriteCallback cb;
+        final Object ctx;
+        final long enqueueTime;
 
         QueueEntry(ByteBuffer entry, long ledgerId, long entryId,
                    WriteCallback cb, Object ctx, long enqueueTime) {
@@ -290,7 +290,7 @@ public boolean accept(long journalId) {
         }
 
         @Override
-        public void run() {
+        public void safeRun() {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Acknowledge Ledger: {}, Entry: {}", ledgerId, entryId);
             }
@@ -304,19 +304,22 @@ public void run() {
         private final LinkedList<QueueEntry> forceWriteWaiters;
         private boolean shouldClose;
         private final boolean isMarker;
-        private final long lastFlushedPosition;
+        private final long startFlushPosition;
+        private final long endFlushPosition;
         private final long logId;
 
         private ForceWriteRequest(JournalChannel logFile,
                           long logId,
-                          long lastFlushedPosition,
+                          long startFlushPosition,
+                          long endFlushPosition,
                           LinkedList<QueueEntry> forceWriteWaiters,
                           boolean shouldClose,
                           boolean isMarker) {
             this.forceWriteWaiters = forceWriteWaiters;
             this.logFile = logFile;
             this.logId = logId;
-            this.lastFlushedPosition = lastFlushedPosition;
+            this.startFlushPosition = startFlushPosition;
+            this.endFlushPosition = endFlushPosition;
             this.shouldClose = shouldClose;
             this.isMarker = isMarker;
             forceWriteQueueSize.inc();
@@ -324,6 +327,7 @@ private ForceWriteRequest(JournalChannel logFile,
 
         public int process(boolean shouldForceWrite) throws IOException {
             forceWriteQueueSize.dec();
+
             if (isMarker) {
                 return 0;
             }
@@ -334,12 +338,10 @@ public int process(boolean shouldForceWrite) throws IOException {
                     this.logFile.forceWrite(false);
                     journalSyncStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
                 }
-                lastLogMark.setCurLogMark(this.logId, this.lastFlushedPosition);
+                lastLogMark.setCurLogMark(this.logId, this.endFlushPosition);
 
                 // Notify the waiters that the force write succeeded
-                for (QueueEntry e : this.forceWriteWaiters) {
-                    cbThreadPool.submit(e);
-                }
+                callback();
 
                 return this.forceWriteWaiters.size();
             }
@@ -348,6 +350,16 @@ public int process(boolean shouldForceWrite) throws IOException {
             }
         }
 
+        void callback() {
+            for (QueueEntry e : this.forceWriteWaiters) {
+                if (null != e.ctx) {
+                    cbThreadPool.submitOrdered(e.ctx, e);
+                } else {
+                    cbThreadPool.submit(e);
+                }
+            }
+        }
+
         public void closeFileIfNecessary() {
             // Close if shouldClose is set
             if (shouldClose) {
@@ -374,14 +386,12 @@ public void closeFileIfNecessary() {
         // This holds the queue entries that should be notified after a
         // successful force write
         Thread threadToNotifyOnEx;
-        // should we group force writes
-        private final boolean enableGroupForceWrites;
         // make flush interval as a parameter
-        public ForceWriteThread(Thread threadToNotifyOnEx, boolean enableGroupForceWrites) {
+        public ForceWriteThread(Thread threadToNotifyOnEx) {
             super("ForceWriteThread");
             this.threadToNotifyOnEx = threadToNotifyOnEx;
-            this.enableGroupForceWrites = enableGroupForceWrites;
         }
+
         @Override
         public void run() {
             LOG.info("ForceWrite Thread started");
@@ -394,25 +404,23 @@ public void run() {
 
                     // Force write the file and then notify the write completions
                     //
-                    if (!req.isMarker) {
-                        if (shouldForceWrite) {
-                            // if we are going to force write, any request that is already in the
-                            // queue will benefit from this force write - post a marker prior to issuing
-                            // the flush so until this marker is encountered we can skip the force write
-                            if (enableGroupForceWrites) {
-                                forceWriteRequests.put(new ForceWriteRequest(req.logFile, 0, 0, null, false, true));
-                            }
+                    if (!req.isMarker && shouldForceWrite) {
+                        // if we are going to force write, any request that is already in the
+                        // queue will benefit from this force write - post a marker prior to issuing
+                        // the flush so until this marker is encountered we can skip the force write
+                        if (enableGroupForceWrites) {
+                            forceWriteRequests.put(new ForceWriteRequest(req.logFile, 0, 0, 0, null, false, true));
+                        }
 
-                            // If we are about to issue a write, record the number of requests in
-                            // the last force write and then reset the counter so we can accumulate
-                            // requests in the write we are about to issue
-                            if (numReqInLastForceWrite > 0) {
-                                forceWriteGroupingCountStats.registerSuccessfulValue(numReqInLastForceWrite);
-                                numReqInLastForceWrite = 0;
-                            }
+                        // If we are about to issue a write, record the number of requests in
+                        // the last force write and then reset the counter so we can accumulate
+                        // requests in the write we are about to issue
+                        if (numReqInLastForceWrite > 0) {
+                            forceWriteGroupingCountStats.registerSuccessfulValue(numReqInLastForceWrite);
+                            numReqInLastForceWrite = 0;
                         }
-                        numReqInLastForceWrite += req.process(shouldForceWrite);
                     }
+                    numReqInLastForceWrite += req.process(shouldForceWrite);
 
                     if (enableGroupForceWrites &&
                         // if its a marker we should switch back to flushing
@@ -493,6 +501,8 @@ static void writePaddingBytes(JournalChannel jc, ByteBuffer paddingBuffer, int j
     final File journalDirectory;
     final ServerConfiguration conf;
     final ForceWriteThread forceWriteThread;
+    // should we group force writes
+    private final boolean enableGroupForceWrites;
     // Time after which we will stop grouping and issue the flush
     private final long maxGroupWaitInNanos;
     // Threshold after which we flush any buffered journal entries
@@ -503,13 +513,17 @@ static void writePaddingBytes(JournalChannel jc, ByteBuffer paddingBuffer, int j
     private final boolean flushWhenQueueEmpty;
     // should we hint the filesystem to remove pages from cache after force write
     private final boolean removePagesFromCache;
+    // journal align size
+    private final int journalAlignmentSize;
+    // journal format version to write
+    private final int journalFormatVersionToWrite;
 
     private final LastLogMark lastLogMark = new LastLogMark(0, 0);
 
     /**
      * The thread pool used to handle callback.
      */
-    private final ExecutorService cbThreadPool;
+    private final OrderedSafeExecutor cbThreadPool;
 
     // journal entry queue to commit
     final LinkedBlockingQueue<QueueEntry> queue = new LinkedBlockingQueue<QueueEntry>();
@@ -519,10 +533,13 @@ static void writePaddingBytes(JournalChannel jc, ByteBuffer paddingBuffer, int j
     private final LedgerDirsManager ledgerDirsManager;
 
     // Expose Stats
+    private final StatsLogger statsLogger;
     private final OpStatsLogger journalAddEntryStats;
+    private final OpStatsLogger journalMemAddEntryStats;
     private final OpStatsLogger journalSyncStats;
     private final OpStatsLogger journalCreationStats;
     private final OpStatsLogger journalFlushStats;
+    private final OpStatsLogger journalMemAddFlushStats;
     private final OpStatsLogger journalProcessTimeStats;
     private final OpStatsLogger journalQueueStats;
     private final OpStatsLogger forceWriteGroupingCountStats;
@@ -548,12 +565,19 @@ public Journal(File journalDirectory, ServerConfiguration conf, LedgerDirsManage
         this.journalPreAllocSize = conf.getJournalPreAllocSizeMB() * MB;
         this.journalWriteBufferSize = conf.getJournalWriteBufferSizeKB() * KB;
         this.maxBackupJournals = conf.getMaxBackupJournals();
-        this.forceWriteThread = new ForceWriteThread(this, conf.getJournalAdaptiveGroupWrites());
+        this.enableGroupForceWrites = conf.getJournalAdaptiveGroupWrites();
+        this.forceWriteThread = new ForceWriteThread(this);
         this.maxGroupWaitInNanos = TimeUnit.MILLISECONDS.toNanos(conf.getJournalMaxGroupWaitMSec());
         this.bufferedWritesThreshold = conf.getJournalBufferedWritesThreshold();
         this.bufferedEntriesThreshold = conf.getJournalBufferedEntriesThreshold();
-        this.cbThreadPool = Executors.newFixedThreadPool(conf.getNumJournalCallbackThreads(),
-                                                         new DaemonThreadFactory());
+        this.journalAlignmentSize = conf.getJournalAlignmentSize();
+        this.journalFormatVersionToWrite = conf.getJournalFormatVersionToWrite();
+        this.cbThreadPool = OrderedSafeExecutor.newBuilder()
+                .name("BookieJournal")
+                .numThreads(conf.getNumJournalCallbackThreads())
+                .statsLogger(statsLogger)
+                .threadFactory(new DaemonThreadFactory())
+                .build();
 
         // Unless there is a cap on the max wait (which requires group force writes)
         // we cannot skip flushing for queue empty
@@ -565,10 +589,13 @@ public Journal(File journalDirectory, ServerConfiguration conf, LedgerDirsManage
         LOG.debug("Last Log Mark : {}", lastLogMark.getCurMark());
 
         // Expose Stats
+        this.statsLogger = statsLogger;
         journalAddEntryStats = statsLogger.getOpStatsLogger(JOURNAL_ADD_ENTRY);
+        journalMemAddEntryStats = statsLogger.getOpStatsLogger(JOURNAL_MEM_ADD_ENTRY);
         journalSyncStats = statsLogger.getOpStatsLogger(JOURNAL_SYNC);
         journalCreationStats = statsLogger.getOpStatsLogger(JOURNAL_CREATION_LATENCY);
         journalFlushStats = statsLogger.getOpStatsLogger(JOURNAL_FLUSH_LATENCY);
+        journalMemAddFlushStats = statsLogger.getOpStatsLogger(JOURNAL_FLUSH_IN_MEM_ADD);
         journalQueueStats = statsLogger.getOpStatsLogger(JOURNAL_QUEUE_LATENCY);
         journalProcessTimeStats = statsLogger.getOpStatsLogger(JOURNAL_PROCESS_TIME_LATENCY);
         forceWriteGroupingCountStats = statsLogger.getOpStatsLogger(JOURNAL_FORCE_WRITE_GROUPING_COUNT);
@@ -650,9 +677,11 @@ public void scanJournal(long journalId, long journalPos, JournalScanner scanner)
         throws IOException {
         JournalChannel recLog;
         if (journalPos <= 0) {
-            recLog = new JournalChannel(journalDirectory, journalId, journalPreAllocSize, journalWriteBufferSize);
+            recLog = new JournalChannel(journalDirectory, journalId,
+                    journalPreAllocSize, journalWriteBufferSize, statsLogger);
         } else {
-            recLog = new JournalChannel(journalDirectory, journalId, journalPreAllocSize, journalWriteBufferSize, journalPos);
+            recLog = new JournalChannel(journalDirectory, journalId,
+                    journalPreAllocSize, journalWriteBufferSize, journalPos, statsLogger);
         }
         int journalVersion = recLog.getFormatVersion();
         try {
@@ -705,6 +734,8 @@ public void scanJournal(long journalId, long journalPos, JournalScanner scanner)
                 if (!isPaddingRecord) {
                     scanner.process(journalVersion, offset, recBuff);
                 }
+                // update last log mark during replaying
+                lastLogMark.setCurLogMark(journalId, offset);
             }
         } finally {
             recLog.close();
@@ -790,10 +821,11 @@ public void run() {
         LOG.info("Starting journal on {}", journalDirectory);
         LinkedList<QueueEntry> toFlush = new LinkedList<QueueEntry>();
         ByteBuffer lenBuff = ByteBuffer.allocate(4);
-        ByteBuffer paddingBuff = ByteBuffer.allocate(2 * conf.getJournalAlignmentSize());
+        ByteBuffer paddingBuff = ByteBuffer.allocate(2 * journalAlignmentSize);
         ZeroBuffer.put(paddingBuff);
         JournalChannel logFile = null;
         forceWriteThread.start();
+        Stopwatch journalAllocationWatcher = new Stopwatch();
         Stopwatch journalCreationWatcher = new Stopwatch();
         Stopwatch journalFlushWatcher = new Stopwatch();
         long batchSize = 0;
@@ -804,7 +836,7 @@ public void run() {
             // http://docs.oracle.com/javase/1.5.0/docs/api/java/lang/System.html#nanoTime%28%29
             long logId = journalIds.isEmpty() ? System.currentTimeMillis() : journalIds.get(journalIds.size() - 1);
             BufferedChannel bc = null;
-            long lastFlushPosition = 0;
+            long lastFlushPosition = 0L;
             boolean groupWhenTimeout = false;
 
             long dequeueStartTime = 0L;
@@ -820,9 +852,10 @@ public void run() {
                                         logId,
                                         journalPreAllocSize,
                                         journalWriteBufferSize,
-                                        conf.getJournalAlignmentSize(),
+                                        journalAlignmentSize,
                                         removePagesFromCache,
-                                        conf.getJournalFormatVersionToWrite());
+                                        journalFormatVersionToWrite,
+                                        statsLogger);
                     journalCreationStats.registerSuccessfulEvent(
                             journalCreationWatcher.stop().elapsedTime(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
 
@@ -885,12 +918,15 @@ public void run() {
 
                         // toFlush is non null and not empty so should be safe to access getFirst
                         if (shouldFlush) {
-                            if (conf.getJournalFormatVersionToWrite() >= JournalChannel.V5) {
-                                writePaddingBytes(logFile, paddingBuff, conf.getJournalAlignmentSize());
-                            }
+                            long prevFlushPosition = lastFlushPosition;
+
                             journalFlushWatcher.reset().start();
+                            if (journalFormatVersionToWrite >= JournalChannel.V5) {
+                                writePaddingBytes(logFile, paddingBuff, journalAlignmentSize);
+                            }
                             bc.flush(false);
                             lastFlushPosition = bc.position();
+
                             journalFlushStats.registerSuccessfulEvent(
                                     journalFlushWatcher.stop().elapsedTime(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
 
@@ -904,7 +940,8 @@ public void run() {
                             forceWriteBatchEntriesStats.registerSuccessfulValue(toFlush.size());
                             forceWriteBatchBytesStats.registerSuccessfulValue(batchSize);
 
-                            forceWriteRequests.put(new ForceWriteRequest(logFile, logId, lastFlushPosition, toFlush, (lastFlushPosition > maxJournalSize), false));
+                            forceWriteRequests.put(new ForceWriteRequest(logFile, logId, prevFlushPosition,
+                                    lastFlushPosition, toFlush, (lastFlushPosition > maxJournalSize), false));
                             toFlush = new LinkedList<QueueEntry>();
                             batchSize = 0L;
                             // check whether journal file is over file limit
@@ -941,8 +978,13 @@ public void run() {
                 // we should be doing the following, but then we run out of
                 // direct byte buffers
                 // logFile.write(new ByteBuffer[] { lenBuff, qe.entry });
-                bc.write(lenBuff);
-                bc.write(qe.entry);
+                int flushes = 0;
+                flushes += bc.write(lenBuff);
+                flushes += bc.write(qe.entry);
+
+                journalMemAddFlushStats.registerSuccessfulValue(flushes);
+                journalMemAddEntryStats.registerSuccessfulEvent(
+                        MathUtils.elapsedMicroSec(qe.enqueueTime), TimeUnit.MICROSECONDS);
 
                 toFlush.add(qe);
                 qe = null;
@@ -978,8 +1020,6 @@ public synchronized void shutdown() {
             if (!cbThreadPool.awaitTermination(5, TimeUnit.SECONDS)) {
                 LOG.warn("Couldn't shutdown journal callback thread gracefully. Forcing");
             }
-            cbThreadPool.shutdownNow();
-
             running = false;
             this.interrupt();
             this.join();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
index ad46e5cf3..07e3d3dee 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
@@ -28,13 +28,21 @@
 import java.nio.channels.FileChannel;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
-
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Stopwatch;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.MathUtils;
 import org.apache.bookkeeper.util.NativeIO;
 import org.apache.bookkeeper.util.ZeroBuffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static com.google.common.base.Charsets.UTF_8;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.*;
 
 /**
  * Simple wrapper around FileChannel to add versioning
@@ -81,28 +89,36 @@
     // The position of the file channel's last drop position
     private long lastDropPosition = 0L;
 
+    // Stats
+    private final OpStatsLogger journalPreallocationStats;
+    private final Counter journalForceWriteCounter;
+    private final OpStatsLogger journalForceWriteStats;
+
     // Mostly used by tests
     JournalChannel(File journalDirectory, long logId) throws IOException {
-        this(journalDirectory, logId, 4 * 1024 * 1024, 65536, START_OF_FILE);
+        this(journalDirectory, logId, 4 * 1024 * 1024, 65536, START_OF_FILE, NullStatsLogger.INSTANCE);
     }
 
     // Open journal for scanning starting from the first record in journal.
-    JournalChannel(File journalDirectory, long logId, long preAllocSize, int writeBufferSize) throws IOException {
-        this(journalDirectory, logId, preAllocSize, writeBufferSize, START_OF_FILE);
+    JournalChannel(File journalDirectory, long logId, long preAllocSize, int writeBufferSize, StatsLogger statsLogger)
+            throws IOException {
+        this(journalDirectory, logId, preAllocSize, writeBufferSize, START_OF_FILE, statsLogger);
     }
 
     // Open journal for scanning starting from given position.
     JournalChannel(File journalDirectory, long logId,
-                   long preAllocSize, int writeBufferSize, long position) throws IOException {
-         this(journalDirectory, logId, preAllocSize, writeBufferSize, SECTOR_SIZE, position, false, V5);
+                   long preAllocSize, int writeBufferSize, long position, StatsLogger statsLogger)
+            throws IOException {
+         this(journalDirectory, logId, preAllocSize, writeBufferSize, SECTOR_SIZE, position, false, V5, statsLogger);
     }
 
     // Open journal to write
     JournalChannel(File journalDirectory, long logId,
                    long preAllocSize, int writeBufferSize, int journalAlignSize,
-                   boolean fRemoveFromPageCache, int formatVersionToWrite) throws IOException {
+                   boolean fRemoveFromPageCache, int formatVersionToWrite,
+                   StatsLogger statsLogger) throws IOException {
         this(journalDirectory, logId, preAllocSize, writeBufferSize, journalAlignSize,
-             START_OF_FILE, fRemoveFromPageCache, formatVersionToWrite);
+             START_OF_FILE, fRemoveFromPageCache, formatVersionToWrite, statsLogger);
     }
 
     /**
@@ -124,12 +140,20 @@
      *          whether to remove cached pages from page cache.
      * @param formatVersionToWrite
      *          format version to write
+     * @param statsLogger
+     *          stats logger to record stats
      * @throws IOException
      */
-    private JournalChannel(File journalDirectory, long logId,
-                           long preAllocSize, int writeBufferSize, int journalAlignSize,
-                           long position, boolean fRemoveFromPageCache,
-                           int formatVersionToWrite) throws IOException {
+    private JournalChannel(File journalDirectory,
+                           long logId,
+                           long preAllocSize,
+                           int writeBufferSize,
+                           int journalAlignSize,
+                           long position,
+                           boolean fRemoveFromPageCache,
+                           int formatVersionToWrite,
+                           StatsLogger statsLogger)
+            throws IOException {
         this.journalAlignSize = journalAlignSize;
         this.zeros = ByteBuffer.allocate(journalAlignSize);
         this.preAllocSize = preAllocSize - preAllocSize % journalAlignSize;
@@ -149,9 +173,13 @@ private JournalChannel(File journalDirectory, long logId,
                         + " suddenly appeared, is another bookie process running?");
             }
             randomAccessFile = new RandomAccessFile(fn, "rw");
+            fd = NativeIO.getSysFileDescriptor(randomAccessFile.getFD());
             fc = randomAccessFile.getChannel();
             formatVersion = formatVersionToWrite;
 
+            // preallocate the space the header
+            preallocate();
+
             int headerSize = (V4 == formatVersion) ? VERSION_HEADER_SIZE : HEADER_SIZE;
             ByteBuffer bb = ByteBuffer.allocate(headerSize);
             ZeroBuffer.put(bb);
@@ -162,11 +190,9 @@ private JournalChannel(File journalDirectory, long logId,
             fc.write(bb);
 
             bc = new BufferedChannel(fc, writeBufferSize);
-            forceWrite(true);
-            nextPrealloc = this.preAllocSize;
-            fc.write(zeros, nextPrealloc - journalAlignSize);
         } else {  // open an existing file
             randomAccessFile = new RandomAccessFile(fn, "r");
+            fd = NativeIO.getSysFileDescriptor(randomAccessFile.getFD());
             fc = randomAccessFile.getChannel();
             bc = null; // readonly
 
@@ -215,7 +241,13 @@ private JournalChannel(File journalDirectory, long logId,
                 LOG.error("Bookie journal file can seek to position :", e);
             }
         }
-        this.fd = NativeIO.getSysFileDescriptor(randomAccessFile.getFD());
+
+        // Stats
+        this.journalForceWriteCounter = statsLogger.getCounter(JOURNAL_NUM_FORCE_WRITES);
+        this.journalForceWriteStats = statsLogger.getOpStatsLogger(JOURNAL_FORCE_WRITE_LATENCY);
+        this.journalPreallocationStats = statsLogger.getOpStatsLogger(JOURNAL_PREALLOCATION);
+
+        LOG.info("Opened journal {} : fd {}", fn, fd);
     }
 
     int getFormatVersion() {
@@ -229,14 +261,33 @@ BufferedChannel getBufferedChannel() throws IOException {
         return bc;
     }
 
-    void preAllocIfNeeded(long size) throws IOException {
-        if (bc.position() + size > nextPrealloc) {
-            nextPrealloc += preAllocSize;
+    private void preallocate() throws IOException {
+        long prevPrealloc = nextPrealloc;
+        nextPrealloc = prevPrealloc + preAllocSize;
+        if (!NativeIO.fallocateIfPossible(fd, prevPrealloc, preAllocSize)) {
             zeros.clear();
             fc.write(zeros, nextPrealloc - journalAlignSize);
         }
     }
 
+    void preAllocIfNeeded(long size) throws IOException {
+        preAllocIfNeeded(size, null);
+    }
+
+    void preAllocIfNeeded(long size, Stopwatch stopwatch) throws IOException {
+        if (bc.position() + size > nextPrealloc) {
+            if (null != stopwatch) {
+                stopwatch.reset().start();
+            }
+            preallocate();
+            if (null != stopwatch) {
+                journalPreallocationStats.registerSuccessfulEvent(
+                        stopwatch.stop().elapsedTime(TimeUnit.MICROSECONDS),
+                        TimeUnit.MICROSECONDS);
+            }
+        }
+    }
+
     int read(ByteBuffer dst)
             throws IOException {
         return fc.read(dst);
@@ -250,7 +301,15 @@ public void forceWrite(boolean forceMetadata) throws IOException {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Journal ForceWrite");
         }
-        long newForceWritePosition = bc.forceWrite(forceMetadata);
+        long startTimeNanos = MathUtils.nowInNano();
+        forceWriteImpl(forceMetadata);
+        // collect stats
+        journalForceWriteCounter.inc();
+        journalForceWriteStats.registerSuccessfulEvent(
+                MathUtils.elapsedMicroSec(startTimeNanos), TimeUnit.MICROSECONDS);
+    }
+
+    private void removeFromPageCacheIfPossible(long offset) {
         //
         // For POSIX_FADV_DONTNEED, we want to drop from the beginning
         // of the file to a position prior to the current position.
@@ -265,11 +324,17 @@ public void forceWrite(boolean forceMetadata) throws IOException {
         // lastDropPosition     newDropPos             lastForceWritePosition
         //
         if (fRemoveFromPageCache) {
-            long newDropPos = newForceWritePosition - CACHE_DROP_LAG_BYTES;
+            long newDropPos = offset - CACHE_DROP_LAG_BYTES;
             if (lastDropPosition < newDropPos) {
                 NativeIO.bestEffortRemoveFromPageCache(fd, lastDropPosition, newDropPos - lastDropPosition);
             }
             this.lastDropPosition = newDropPos;
         }
     }
+
+    private void forceWriteImpl(boolean forceMetadata) throws IOException {
+        long newForceWritePosition = bc.forceWrite(forceMetadata);
+        removeFromPageCacheIfPossible(newForceWritePosition);
+    }
+
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/Errno.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/Errno.java
new file mode 100644
index 000000000..86d1e289e
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/Errno.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.util;
+
+import com.sun.jna.Library;
+import com.sun.jna.Native;
+
+public class Errno {
+
+    private static InterfaceDelegate delegate = (InterfaceDelegate) Native.loadLibrary("c",
+                                                                                       InterfaceDelegate.class);
+
+    /**
+     * The routine perror() produces a message on the standard error output,
+     * describing the last error encountered during a call to a system or
+     * library function. First (if s is not NULL and *s is not a null byte
+     * ('\0')) the argument string s is printed, followed by a colon and a
+     * blank. Then the message and a new-line.
+     *
+     * To be of most use, the argument string should include the name of the
+     * function that incurred the error. The error number is taken from the
+     * external variable errno, which is set when errors occur but not cleared
+     * when non-erroneous calls are made.
+     *
+     * The global error list sys_errlist[] indexed by errno can be used to
+     * obtain the error message without the newline. The largest message number
+     * provided in the table is sys_nerr -1. Be careful when directly accessing
+     * this list because new error values may not have been added to
+     * sys_errlist[].
+     *
+     * When a system call fails, it usually returns -1 and sets the variable
+     * errno to a value describing what went wrong. (These values can be found
+     * in <errno.h>.) Many library functions do likewise. The function perror()
+     * serves to translate this error code into human-readable form. Note that
+     * errno is undefined after a successful library call: this call may well
+     * change this variable, even though it succeeds, for example because it
+     * internally used some other library function that failed. Thus, if a
+     * failing call is not immediately followed by a call to perror(), the value
+     * of errno should be saved.
+     */
+    public static int perror(String s) {
+        return delegate.perror(s);
+    }
+
+    /**
+     * The strerror() function returns a string describing the error code passed
+     * in the argument errnum, possibly using the LC_MESSAGES part of the
+     * current locale to select the appropriate language. This string must not
+     * be modified by the application, but may be modified by a subsequent call
+     * to perror() or strerror(). No library function will modify this string.
+     *
+     * The strerror_r() function is similar to strerror(), but is thread safe.
+     * This function is available in two versions: an XSI-compliant version
+     * specified in POSIX.1-2001, and a GNU-specific version (available since
+     * glibc 2.0). If _XOPEN_SOURCE is defined with the value 600, then the
+     * XSI-compliant version is provided, otherwise the GNU-specific version is
+     * provided.
+     *
+     * The XSI-compliant strerror_r() is preferred for portable applications. It
+     * returns the error string in the user-supplied buffer buf of length
+     * buflen.
+     *
+     * The GNU-specific strerror_r() returns a pointer to a string containing
+     * the error message. This may be either a pointer to a string that the
+     * function stores in buf, or a pointer to some (immutable) static string
+     * (in which case buf is unused). If the function stores a string in buf,
+     * then at most buflen bytes are stored (the string may be truncated if
+     * buflen is too small) and the string always includes a terminating null
+     * byte.
+     *
+     */
+    public static String strerror(int errnum) {
+        return delegate.strerror(errnum);
+    }
+
+    public static String strerror() {
+        return strerror(errno());
+    }
+
+    /**
+     * The <errno.h> header file defines the integer variable errno, which is
+     * set by system calls and some library functions in the event of an error
+     * to indicate what went wrong. Its value is significant only when the call
+     * returned an error (usually -1), and a function that does succeed is
+     * allowed to change errno.
+     *
+     * Sometimes, when -1 is also a valid successful return value one has to
+     * zero errno before the call in order to detect possible errors.
+     *
+     * errno is defined by the ISO C standard to be a modifiable lvalue of type
+     * int, and must not be explicitly declared; errno may be a macro. errno is
+     * thread-local; setting it in one thread does not affect its value in any
+     * other thread.
+     *
+     * Valid error numbers are all non-zero; errno is never set to zero by any
+     * library function. All the error names specified by POSIX.1 must have
+     * distinct values, with the exception of EAGAIN and EWOULDBLOCK, which may
+     * be the same.
+     *
+     * Below is a list of the symbolic error names that are defined on Linux.
+     * Some of these are marked POSIX.1, indicating that the name is defined by
+     * POSIX.1-2001, or C99, indicating that the name is defined by C99.
+     *
+     */
+    public static int errno() {
+        return Native.getLastError();
+    }
+
+    interface InterfaceDelegate extends Library {
+
+        int perror(String s);
+
+        String strerror(int errnum);
+
+    }
+
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/NativeIO.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/NativeIO.java
index 24488425b..80e0ee28b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/NativeIO.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/NativeIO.java
@@ -20,34 +20,39 @@
 
 import java.lang.reflect.Field;
 import java.io.FileDescriptor;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import com.sun.jna.LastErrorException;
-import com.sun.jna.Native;
 
 public final class NativeIO {
     private static final Logger LOG = LoggerFactory.getLogger(NativeIO.class);
 
     private static final int POSIX_FADV_DONTNEED = 4; /* fadvise.h */
 
+    private static final int FALLOC_FL_KEEP_SIZE = 1;
+
     private static boolean initialized = false;
     private static boolean fadvisePossible = true;
+    private static boolean sysFallocatePossible = true;
+    private static boolean posixFallocatePossible = true;
 
     static {
         try {
-            Native.register("c");
+            LOG.info("Loading bookkeeper native library.");
+            System.loadLibrary("bookkeeper");
             initialized = true;
-        } catch (NoClassDefFoundError e) {
-            LOG.info("JNA not found. Native methods will be disabled.");
-        } catch (UnsatisfiedLinkError e) {
-            LOG.info("Unable to link C library. Native methods will be disabled.");
-        } catch (NoSuchMethodError e) {
-            LOG.warn("Obsolete version of JNA present; unable to register C library");
+            LOG.info("Loaded bookkeeper native library. Enabled Native IO.");
+        } catch (Throwable t) {
+            LOG.info("Unable to load bookkeeper native library. Native methods will be disabled : ", t);
         }
     }
 
     // fadvice
-    public static native int posix_fadvise(int fd, long offset, long len, int flag) throws LastErrorException;
+    public static native int posix_fadvise(int fd, long offset, long len, int flag);
+    // posix_fallocate
+    public static native int posix_fallocate(int fd, long offset, long len);
+    // fallocate
+    public static native int fallocate(int fd, int mode, long offset, long len);
 
     private NativeIO() {}
 
@@ -66,6 +71,7 @@ private static Field getFieldByReflection(Class cls, String fieldName) {
 
         return field;
     }
+
     /**
      * Get system file descriptor (int) from FileDescriptor object.
      * @param descriptor - FileDescriptor object to get fd from
@@ -82,6 +88,66 @@ public static int getSysFileDescriptor(FileDescriptor descriptor) {
         return -1;
     }
 
+    public static boolean fallocateIfPossible(int fd, long offset, long nbytes) {
+        if (!initialized || fd < 0) {
+            return false;
+        }
+        boolean allocated = false;
+        if (sysFallocatePossible) {
+            allocated = sysFallocateIfPossible(fd, offset, nbytes);
+        }
+        if (!allocated && posixFallocatePossible) {
+            allocated = posixFallocateIfPossible(fd, offset, nbytes);
+        }
+        return allocated;
+    }
+
+    private static boolean sysFallocateIfPossible(int fd, long offset, long nbytes) {
+        try {
+            int rc = fallocate(fd, FALLOC_FL_KEEP_SIZE, offset, nbytes);
+            if (rc != 0) {
+                LOG.error("Failed on sys fallocate file descriptor {}, offset {}, bytes {}, rc {} : {}",
+                        new Object[] { fd, offset, nbytes, rc, Errno.strerror() });
+                return false;
+            }
+        } catch (UnsupportedOperationException uoe) {
+            LOG.warn("sys fallocate isn't supported : ", uoe);
+            sysFallocatePossible = false;
+        }  catch (UnsatisfiedLinkError nle) {
+            LOG.warn("Unsatisfied Link error: sys fallocate failed on file descriptor {}, offset {}, bytes {} : ",
+                    new Object[] { fd, offset, nbytes, nle });
+            sysFallocatePossible = false;
+        } catch (Exception e) {
+            LOG.error("Unknown exception: sys fallocate failed on file descriptor {}, offset {}, bytes {} : ",
+                    new Object[] { fd, offset, nbytes, e });
+            return false;
+        }
+        return sysFallocatePossible;
+    }
+
+    private static boolean posixFallocateIfPossible(int fd, long offset, long nbytes) {
+        try {
+            int rc = posix_fallocate(fd, offset, nbytes);
+            if (rc != 0) {
+                LOG.error("Failed on posix_fallocate file descriptor {}, offset {}, bytes {}, rc {} : {}",
+                        new Object[] { fd, offset, nbytes, rc, Errno.strerror() });
+                return false;
+            }
+        } catch (UnsupportedOperationException uoe) {
+            LOG.warn("posix_fallocate isn't supported : ", uoe);
+            posixFallocatePossible = false;
+        }  catch (UnsatisfiedLinkError nle) {
+            LOG.warn("Unsatisfied Link error: posix_fallocate failed on file descriptor {}, offset {}, bytes {} : ",
+                    new Object[] { fd, offset, nbytes, nle });
+            posixFallocatePossible = false;
+        } catch (Exception e) {
+            LOG.error("Unknown exception: posix_fallocate failed on file descriptor {}, offset {}, bytes {} : ",
+                    new Object[] { fd, offset, nbytes, e });
+            return false;
+        }
+        return posixFallocatePossible;
+    }
+
     /**
      * Remove pages from the file system page cache when they wont
      * be accessed again
@@ -89,16 +155,22 @@ public static int getSysFileDescriptor(FileDescriptor descriptor) {
      * @param fd     The file descriptor of the source file.
      * @param offset The offset within the file.
      * @param len    The length to be flushed.
-     *
-     * @throws nothing => Best effort
      */
-
     public static void bestEffortRemoveFromPageCache(int fd, long offset, long len) {
+        posixFadviseIfPossible(fd, offset, len, POSIX_FADV_DONTNEED);
+    }
+
+    public static boolean posixFadviseIfPossible(int fd, long offset, long len, int flags) {
         if (!initialized || !fadvisePossible || fd < 0) {
-            return;
+            return false;
         }
         try {
-            posix_fadvise(fd, offset, len, POSIX_FADV_DONTNEED);
+            int rc = posix_fadvise(fd, offset, len, flags);
+            if (rc != 0) {
+                LOG.error("Failed on posix_fadvise file descriptor {}, offset {}, bytes {}, flags {}, rc {} : {}",
+                        new Object[] { fd, offset, len, flags, rc, Errno.strerror() });
+                return false;
+            }
         } catch (UnsupportedOperationException uoe) {
             LOG.warn("posix_fadvise is not supported : ", uoe);
             fadvisePossible = false;
@@ -113,7 +185,9 @@ public static void bestEffortRemoveFromPageCache(int fd, long offset, long len)
             // exception and forget
             LOG.warn("Unknown exception: posix_fadvise failed on file descriptor {}, offset {} : ",
                     new Object[] { fd, offset, e });
+            return false;
         }
+        return fadvisePossible;
     }
 
 }
diff --git a/bookkeeper-server/src/main/native/src/org/apache/bookkeeper/util/NativeIO.c b/bookkeeper-server/src/main/native/src/org/apache/bookkeeper/util/NativeIO.c
new file mode 100644
index 000000000..789f10fe8
--- /dev/null
+++ b/bookkeeper-server/src/main/native/src/org/apache/bookkeeper/util/NativeIO.c
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <jni.h>
+
+#include <errno.h>
+#include <fcntl.h>
+#include <sys/syscall.h>
+#include <sys/types.h>
+#if !defined(_WIN32) && (defined(__unix__) || defined(__unix) || (defined(__APPLE__) && defined(__MACH__)))
+  /* UNIX-style OS. ------------------------------------------- */
+  #include <unistd.h>
+  #if (defined(__unit__) || defined(__unix)) && defined(__x86_64__)
+    #include <asm-x86_64/unistd.h>
+  #endif
+#endif
+#include "config.h"
+
+
+
+#if defined(HAVE_FALLOCATE)
+#  define my_fallocate fallocate
+#elif defined(__NR_fallocate)
+static int manual_fallocate (int fd, int mode, __off64_t from, __off64_t to)
+{
+#ifdef __x86_64__
+  return syscall( __NR_fallocate, fd, mode, from, to);
+#else
+  return syscall (__NR_fallocate, fd, mode,
+    __LONG_LONG_PAIR ((long) (from >> 32), (long) from),
+    __LONG_LONG_PAIR ((long) (to >> 32), (long) to));
+#endif
+}
+#define my_fallocate manual_fallocate
+#endif
+
+JNIEXPORT jint JNICALL
+Java_org_apache_bookkeeper_util_NativeIO_fallocate(
+  JNIEnv *env, jclass clazz,
+  jint fd, jint mode, jlong offset, jlong len)
+{
+#ifndef my_fallocate
+  errno = ENOSYS;
+  return -1;
+#else
+  return my_fallocate(fd, mode, (off_t)offset, (off_t)len);
+#endif
+}
+
+JNIEXPORT jint JNICALL
+Java_org_apache_bookkeeper_util_NativeIO_posix_1fadvise(
+  JNIEnv *env, jclass clazz,
+  jint fd, jlong offset, jlong len, jint flags)
+{
+#ifndef HAVE_POSIX_FADVISE
+  errno = ENOSYS;
+  return -1;
+#else
+  return posix_fadvise(fd, (off_t)offset, (off_t)len, flags);
+#endif
+}
+
+JNIEXPORT jint JNICALL
+Java_org_apache_bookkeeper_util_NativeIO_posix_1fallocate(
+  JNIEnv *env, jclass clazz,
+  jint fd, jlong offset, jlong len)
+{
+#ifndef HAVE_POSIX_FALLOCATE
+  errno = ENOSYS;
+  return -1;
+#else
+  return posix_fallocate(fd, (off_t)offset, (off_t)len);
+#endif
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services