You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2017/04/20 17:35:23 UTC
[1/7] bookkeeper git commit: bookie: fallocate & sync_file_range
Repository: bookkeeper
Updated Branches:
refs/heads/sijie/bookkeeper_fallocate [created] 95028b9d8
bookie: fallocate & sync_file_range
- introduce fallocate & sync_file_range in NativeIO to provide better preallocation & file sync logic.
- if journalAdaptiveGroupWrites is disabled, use sync_file_range to sync range in better granularity
- add more stats on journal flush & creation.
RB_ID=260795
Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/d9802962
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/d9802962
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/d9802962
Branch: refs/heads/sijie/bookkeeper_fallocate
Commit: d9802962b20acf7a86209262871af7a2eb4cbed7
Parents: 95ea481
Author: Sijie Guo <si...@twitter.com>
Authored: Mon Jan 27 23:03:07 2014 -0800
Committer: Sijie Guo <si...@apache.org>
Committed: Thu Nov 17 17:17:21 2016 -0800
----------------------------------------------------------------------
bookkeeper-server/bin/bookkeeper | 9 +-
bookkeeper-server/pom.xml | 73 ++++++++
bookkeeper-server/src/CMakeLists.txt | 142 ++++++++++++++++
bookkeeper-server/src/JNIFlags.cmake | 118 +++++++++++++
bookkeeper-server/src/config.h.cmake | 26 +++
.../org/apache/bookkeeper/bookie/Journal.java | 165 ++++++++++++-------
.../bookkeeper/bookie/JournalChannel.java | 137 ++++++++++++---
.../java/org/apache/bookkeeper/util/Errno.java | 115 +++++++++++++
.../org/apache/bookkeeper/util/NativeIO.java | 147 +++++++++++++++--
.../src/org/apache/bookkeeper/util/NativeIO.c | 122 ++++++++++++++
10 files changed, 955 insertions(+), 99 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/d9802962/bookkeeper-server/bin/bookkeeper
----------------------------------------------------------------------
diff --git a/bookkeeper-server/bin/bookkeeper b/bookkeeper-server/bin/bookkeeper
index 54be3fe..87429f9 100755
--- a/bookkeeper-server/bin/bookkeeper
+++ b/bookkeeper-server/bin/bookkeeper
@@ -180,10 +180,11 @@ if [ -z "$BOOKIE_LOG_CONF" ]; then
fi
BOOKIE_CLASSPATH="$BOOKIE_JAR:$BOOKIE_CLASSPATH:$BOOKIE_EXTRA_CLASSPATH"
-BOOKIE_CLASSPATH="`dirname $BOOKIE_LOG_CONF`:$BOOKIE_CLASSPATH"
-OPTS="$OPTS -Dlog4j.configuration=`basename $BOOKIE_LOG_CONF`"
-
-OPTS="-cp $BOOKIE_CLASSPATH $OPTS"
+if [ "$BOOKIE_LOG_CONF" != "" ]; then
+ BOOKIE_CLASSPATH="`dirname $BOOKIE_LOG_CONF`:$BOOKIE_CLASSPATH"
+ OPTS="$OPTS -Dlog4j.configuration=`basename $BOOKIE_LOG_CONF`"
+fi
+OPTS="-cp $BOOKIE_CLASSPATH -Djava.library.path=$BK_HOME/target/native/target/usr/local/lib $OPTS"
OPTS="$OPTS $BOOKIE_EXTRA_OPTS"
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/d9802962/bookkeeper-server/pom.xml
----------------------------------------------------------------------
diff --git a/bookkeeper-server/pom.xml b/bookkeeper-server/pom.xml
index bd143f1..730a659 100644
--- a/bookkeeper-server/pom.xml
+++ b/bookkeeper-server/pom.xml
@@ -327,6 +327,79 @@
</build>
<profiles>
<profile>
+ <id>native</id>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-enforcer-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>enforce-os</id>
+ <goals>
+ <goal>enforce</goal>
+ </goals>
+ <configuration>
+ <rules>
+ <requireOS>
+ <family>mac</family>
+ <family>unix</family>
+ <message>native build only supported on Mac or Unix</message>
+ </requireOS>
+ </rules>
+ <fail>true</fail>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>native-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>compile</phase>
+ <goals>
+ <goal>javah</goal>
+ </goals>
+ <configuration>
+ <javahPath>${env.JAVA_HOME}/bin/javah</javahPath>
+ <javahClassNames>
+ <javahClassName>org.apache.bookkeeper.util.NativeIO</javahClassName>
+ </javahClassNames>
+ <javahOutputDirectory>${project.build.directory}/native/javah</javahOutputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>make</id>
+ <phase>compile</phase>
+ <goals><goal>run</goal></goals>
+ <configuration>
+ <target>
+ <exec executable="cmake" dir="${project.build.directory}/native" failonerror="true">
+ <arg line="${basedir}/src/ -DGENERATED_JAVAH=${project.build.directory}/native/javah -DJVM_ARCH_DATA_MODEL=${sun.arch.data.model}"/>
+ </exec>
+ <exec executable="make" dir="${project.build.directory}/native" failonerror="true">
+ <arg line="VERBOSE=1"/>
+ </exec>
+ <exec executable="make" dir="${project.build.directory}/native" failonerror="true"></exec>
+ </target>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
<id>protobuf</id>
<build>
<plugins>
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/d9802962/bookkeeper-server/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/CMakeLists.txt b/bookkeeper-server/src/CMakeLists.txt
new file mode 100644
index 0000000..3d446b8
--- /dev/null
+++ b/bookkeeper-server/src/CMakeLists.txt
@@ -0,0 +1,142 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+cmake_minimum_required(VERSION 2.6 FATAL_ERROR)
+
+# Default to release builds
+set(CMAKE_BUILD_TYPE, Release)
+
+include(JNIFlags.cmake NO_POLICY_SCOPE)
+
+# Compile a library with both shared and static variants
+function(add_dual_library LIBNAME)
+ add_library(${LIBNAME} SHARED ${ARGN})
+ add_library(${LIBNAME}_static STATIC ${ARGN})
+ set_target_properties(${LIBNAME}_static PROPERTIES OUTPUT_NAME ${LIBNAME})
+endfunction(add_dual_library)
+
+# Link both a static and a dynamic target against some libraries
+function(target_link_dual_libraries LIBNAME)
+ target_link_libraries(${LIBNAME} ${ARGN})
+ target_link_libraries(${LIBNAME}_static ${ARGN})
+endfunction(target_link_dual_libraries)
+
+function(output_directory TGT DIR)
+ SET_TARGET_PROPERTIES(${TGT} PROPERTIES
+ RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/${DIR}")
+ SET_TARGET_PROPERTIES(${TGT} PROPERTIES
+ ARCHIVE_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/${DIR}")
+ SET_TARGET_PROPERTIES(${TGT} PROPERTIES
+ LIBRARY_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/${DIR}")
+endfunction(output_directory TGT DIR)
+
+function(dual_output_directory TGT DIR)
+ output_directory(${TGT} "${DIR}")
+ output_directory(${TGT}_static "${DIR}")
+endfunction(dual_output_directory TGT DIR)
+
+#
+# This macro alters the behavior of find_package and find_library.
+# It does this by setting the CMAKE_FIND_LIBRARY_SUFFIXES global variable.
+# You should save that variable before calling this function and restore it
+# after you have accomplished your goal.
+#
+# The behavior is altered in two ways:
+# 1. We always find shared libraries, never static;
+# 2. We find shared libraries with the given version number.
+#
+# On Windows this function is a no-op. Windows does not encode
+# version number information information into library path names.
+#
+macro(set_find_shared_library_version LVERS)
+ IF(${CMAKE_SYSTEM_NAME} MATCHES "Darwin")
+ # Mac OS uses .dylib
+ SET(CMAKE_FIND_LIBRARY_SUFFIXES ".${LVERS}.dylib")
+ ELSEIF(${CMAKE_SYSTEM_NAME} MATCHES "FreeBSD")
+ # FreeBSD has always .so installed.
+ SET(CMAKE_FIND_LIBRARY_SUFFIXES ".so")
+ ELSEIF(${CMAKE_SYSTEM_NAME} MATCHES "Windows")
+ # Windows doesn't support finding shared libraries by version.
+ ELSE()
+ # Most UNIX variants use .so
+ SET(CMAKE_FIND_LIBRARY_SUFFIXES ".so.${LVERS}")
+ ENDIF()
+endmacro(set_find_shared_library_version LVERS)
+
+if (NOT GENERATED_JAVAH)
+ # Must identify where the generated headers have been placed
+ MESSAGE(FATAL_ERROR "You must set the cmake variable GENERATED_JAVAH")
+endif (NOT GENERATED_JAVAH)
+find_package(JNI REQUIRED)
+
+SET(STORED_CMAKE_FIND_LIBRARY_SUFFIXES CMAKE_FIND_LIBRARY_SUFFIXES)
+set_find_shared_library_version("1")
+find_package(ZLIB REQUIRED)
+SET(CMAKE_FIND_LIBRARY_SUFFIXES STORED_CMAKE_FIND_LIBRARY_SUFFIXES)
+
+set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -g -Wall -O2")
+set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -D_REENTRANT -D_GNU_SOURCE")
+set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -D_LARGEFILE_SOURCE -D_FILE_OFFSET_BITS=64")
+set(D main/native/src/org/apache/bookkeeper)
+set(T main/native/src/test/org/apache/bookkeeper)
+
+INCLUDE(CheckFunctionExists)
+INCLUDE(CheckCSourceCompiles)
+INCLUDE(CheckLibraryExists)
+CHECK_FUNCTION_EXISTS(sync_file_range HAVE_SYNC_FILE_RANGE)
+CHECK_FUNCTION_EXISTS(fallocate HAVE_FALLOCATE)
+CHECK_FUNCTION_EXISTS(posix_fadvise HAVE_POSIX_FADVISE)
+CHECK_FUNCTION_EXISTS(posix_fallocate HAVE_POSIX_ALLOCATE)
+CHECK_LIBRARY_EXISTS(dl dlopen "" NEED_LINK_DL)
+
+include_directories(
+ ${GENERATED_JAVAH}
+ main/native/src
+ ${CMAKE_CURRENT_SOURCE_DIR}
+ ${CMAKE_CURRENT_SOURCE_DIR}/src
+ ${CMAKE_BINARY_DIR}
+ ${JNI_INCLUDE_DIRS}
+)
+CONFIGURE_FILE(${CMAKE_SOURCE_DIR}/config.h.cmake ${CMAKE_BINARY_DIR}/config.h)
+
+SET(CMAKE_BUILD_WITH_INSTALL_RPATH TRUE)
+add_dual_library(bookkeeper
+ ${D}/util/NativeIO.c
+)
+if (NEED_LINK_DL)
+ set(LIB_DL dl)
+endif (NEED_LINK_DL)
+
+IF (${CMAKE_SYSTEM_NAME} MATCHES "Linux")
+ #
+ # By embedding '$ORIGIN' into the RPATH of libbookkeeper.so,
+ # dlopen will look in the directory containing libbookkeeper.so.
+ # However, $ORIGIN is not supported by all operating systems.
+ #
+ SET_TARGET_PROPERTIES(bookkeeper
+ PROPERTIES INSTALL_RPATH "\$ORIGIN/")
+ENDIF()
+
+target_link_dual_libraries(bookkeeper
+ ${LIB_DL}
+ ${JAVA_JVM_LIBRARY}
+)
+SET(LIBBOOKKEEPER_VERSION "1.0.0")
+SET_TARGET_PROPERTIES(bookkeeper PROPERTIES
+ SOVERSION ${LIBBOOKKEEPER_VERSION})
+dual_output_directory(bookkeeper target/usr/local/lib)
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/d9802962/bookkeeper-server/src/JNIFlags.cmake
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/JNIFlags.cmake b/bookkeeper-server/src/JNIFlags.cmake
new file mode 100644
index 0000000..8333285
--- /dev/null
+++ b/bookkeeper-server/src/JNIFlags.cmake
@@ -0,0 +1,118 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+cmake_minimum_required(VERSION 2.6 FATAL_ERROR)
+
+# If JVM_ARCH_DATA_MODEL is 32, compile all binaries as 32-bit.
+# This variable is set by maven.
+if (JVM_ARCH_DATA_MODEL EQUAL 32)
+ # Force 32-bit code generation on amd64/x86_64, ppc64, sparc64
+ if (CMAKE_COMPILER_IS_GNUCC AND CMAKE_SYSTEM_PROCESSOR MATCHES ".*64")
+ set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -m32")
+ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -m32")
+ set(CMAKE_LD_FLAGS "${CMAKE_LD_FLAGS} -m32")
+ endif ()
+ if (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" OR CMAKE_SYSTEM_PROCESSOR STREQUAL "amd64")
+ # Set CMAKE_SYSTEM_PROCESSOR to ensure that find_package(JNI) will use
+ # the 32-bit version of libjvm.so.
+ set(CMAKE_SYSTEM_PROCESSOR "i686")
+ endif ()
+endif (JVM_ARCH_DATA_MODEL EQUAL 32)
+
+# Determine float ABI of JVM on ARM Linux
+if (CMAKE_SYSTEM_PROCESSOR MATCHES "^arm" AND CMAKE_SYSTEM_NAME STREQUAL "Linux")
+ find_program(READELF readelf)
+ if (READELF MATCHES "NOTFOUND")
+ message(WARNING "readelf not found; JVM float ABI detection disabled")
+ else (READELF MATCHES "NOTFOUND")
+ execute_process(
+ COMMAND ${READELF} -A ${JAVA_JVM_LIBRARY}
+ OUTPUT_VARIABLE JVM_ELF_ARCH
+ ERROR_QUIET)
+ if (NOT JVM_ELF_ARCH MATCHES "Tag_ABI_VFP_args: VFP registers")
+ message("Soft-float JVM detected")
+
+ # Test compilation with -mfloat-abi=softfp using an arbitrary libc function
+ # (typically fails with "fatal error: bits/predefs.h: No such file or directory"
+ # if soft-float dev libraries are not installed)
+ include(CMakePushCheckState)
+ cmake_push_check_state()
+ set(CMAKE_REQUIRED_FLAGS "${CMAKE_REQUIRED_FLAGS} -mfloat-abi=softfp")
+ include(CheckSymbolExists)
+ check_symbol_exists(exit stdlib.h SOFTFP_AVAILABLE)
+ if (NOT SOFTFP_AVAILABLE)
+ message(FATAL_ERROR "Soft-float dev libraries required (e.g. 'apt-get install libc6-dev-armel' on Debian/Ubuntu)")
+ endif (NOT SOFTFP_AVAILABLE)
+ cmake_pop_check_state()
+
+ set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mfloat-abi=softfp")
+ endif ()
+ endif (READELF MATCHES "NOTFOUND")
+endif (CMAKE_SYSTEM_PROCESSOR MATCHES "^arm" AND CMAKE_SYSTEM_NAME STREQUAL "Linux")
+
+IF("${CMAKE_SYSTEM}" MATCHES "Linux")
+ #
+ # Locate JNI_INCLUDE_DIRS and JNI_LIBRARIES.
+ # Since we were invoked from Maven, we know that the JAVA_HOME environment
+ # variable is valid. So we ignore system paths here and just use JAVA_HOME.
+ #
+ FILE(TO_CMAKE_PATH "$ENV{JAVA_HOME}" _JAVA_HOME)
+ IF(CMAKE_SYSTEM_PROCESSOR MATCHES "^i.86$")
+ SET(_java_libarch "i386")
+ ELSEIF (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" OR CMAKE_SYSTEM_PROCESSOR STREQUAL "amd64")
+ SET(_java_libarch "amd64")
+ ELSEIF (CMAKE_SYSTEM_PROCESSOR MATCHES "^arm")
+ SET(_java_libarch "arm")
+ ELSE()
+ SET(_java_libarch ${CMAKE_SYSTEM_PROCESSOR})
+ ENDIF()
+ SET(_JDK_DIRS "${_JAVA_HOME}/jre/lib/${_java_libarch}/*"
+ "${_JAVA_HOME}/jre/lib/${_java_libarch}"
+ "${_JAVA_HOME}/jre/lib/*"
+ "${_JAVA_HOME}/jre/lib"
+ "${_JAVA_HOME}/lib/*"
+ "${_JAVA_HOME}/lib"
+ "${_JAVA_HOME}/include/*"
+ "${_JAVA_HOME}/include"
+ "${_JAVA_HOME}"
+ )
+ FIND_PATH(JAVA_INCLUDE_PATH
+ NAMES jni.h
+ PATHS ${_JDK_DIRS}
+ NO_DEFAULT_PATH)
+ #In IBM java, it's jniport.h instead of jni_md.h
+ FIND_PATH(JAVA_INCLUDE_PATH2
+ NAMES jni_md.h jniport.h
+ PATHS ${_JDK_DIRS}
+ NO_DEFAULT_PATH)
+ SET(JNI_INCLUDE_DIRS ${JAVA_INCLUDE_PATH} ${JAVA_INCLUDE_PATH2})
+ FIND_LIBRARY(JAVA_JVM_LIBRARY
+ NAMES jvm JavaVM
+ PATHS ${_JDK_DIRS}
+ NO_DEFAULT_PATH)
+ SET(JNI_LIBRARIES ${JAVA_JVM_LIBRARY})
+ MESSAGE("JAVA_HOME=${JAVA_HOME}, JAVA_JVM_LIBRARY=${JAVA_JVM_LIBRARY}")
+ MESSAGE("JAVA_INCLUDE_PATH=${JAVA_INCLUDE_PATH}, JAVA_INCLUDE_PATH2=${JAVA_INCLUDE_PATH2}")
+ IF(JAVA_JVM_LIBRARY AND JAVA_INCLUDE_PATH AND JAVA_INCLUDE_PATH2)
+ MESSAGE("Located all JNI components successfully.")
+ ELSE()
+ MESSAGE(FATAL_ERROR "Failed to find a viable JVM installation under JAVA_HOME.")
+ ENDIF()
+ELSE()
+ find_package(JNI REQUIRED)
+ENDIF()
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/d9802962/bookkeeper-server/src/config.h.cmake
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/config.h.cmake b/bookkeeper-server/src/config.h.cmake
new file mode 100644
index 0000000..d460b7f
--- /dev/null
+++ b/bookkeeper-server/src/config.h.cmake
@@ -0,0 +1,26 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef CONFIG_H
+#define CONFIG_H
+
+#cmakedefine HAVE_SYNC_FILE_RANGE
+#cmakedefine HAVE_FALLOCATE
+#cmakedefine HAVE_POSIX_FADVISE
+#cmakedefine HAVE_POSIX_FALLOCATE
+
+#endif
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/d9802962/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 7be0984..dd62d28 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -39,10 +39,15 @@ import com.google.common.base.Stopwatch;
import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+<<<<<<< HEAD
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
+=======
+import org.apache.bookkeeper.stats.BookkeeperServerStatsLogger;
+import org.apache.bookkeeper.stats.ServerStatsProvider;
+>>>>>>> 2d5718f... bookie: fallocate & sync_file_range
import org.apache.bookkeeper.util.DaemonThreadFactory;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.bookkeeper.util.MathUtils;
@@ -271,13 +276,13 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
/**
* Journal Entry to Record
*/
- private class QueueEntry implements Runnable {
- ByteBuffer entry;
- long ledgerId;
- long entryId;
- WriteCallback cb;
- Object ctx;
- long enqueueTime;
+ private class QueueEntry extends SafeRunnable {
+ final ByteBuffer entry;
+ final long ledgerId;
+ final long entryId;
+ final WriteCallback cb;
+ final Object ctx;
+ final long enqueueTime;
QueueEntry(ByteBuffer entry, long ledgerId, long entryId,
WriteCallback cb, Object ctx, long enqueueTime) {
@@ -304,19 +309,22 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
private final LinkedList<QueueEntry> forceWriteWaiters;
private boolean shouldClose;
private final boolean isMarker;
- private final long lastFlushedPosition;
+ private final long startFlushPosition;
+ private final long endFlushPosition;
private final long logId;
private ForceWriteRequest(JournalChannel logFile,
long logId,
- long lastFlushedPosition,
+ long startFlushPosition,
+ long endFlushPosition,
LinkedList<QueueEntry> forceWriteWaiters,
boolean shouldClose,
boolean isMarker) {
this.forceWriteWaiters = forceWriteWaiters;
this.logFile = logFile;
this.logId = logId;
- this.lastFlushedPosition = lastFlushedPosition;
+ this.startFlushPosition = startFlushPosition;
+ this.endFlushPosition = endFlushPosition;
this.shouldClose = shouldClose;
this.isMarker = isMarker;
forceWriteQueueSize.inc();
@@ -324,22 +332,24 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
public int process(boolean shouldForceWrite) throws IOException {
forceWriteQueueSize.dec();
+
if (isMarker) {
return 0;
}
try {
if (shouldForceWrite) {
- long startTime = MathUtils.nowInNano();
- this.logFile.forceWrite(false);
- journalSyncStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+ if (enableGroupForceWrites) {
+ this.logFile.forceWrite(false);
+ } else {
+ this.logFile.syncRangeOrForceWrite(this.startFlushPosition,
+ this.endFlushPosition - this.startFlushPosition);
+ }
}
- lastLogMark.setCurLogMark(this.logId, this.lastFlushedPosition);
+ lastLogMark.setCurLogMark(this.logId, this.endFlushPosition);
// Notify the waiters that the force write succeeded
- for (QueueEntry e : this.forceWriteWaiters) {
- cbThreadPool.submit(e);
- }
+ callback();
return this.forceWriteWaiters.size();
}
@@ -348,6 +358,16 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
}
}
+ void callback() {
+ for (QueueEntry e : this.forceWriteWaiters) {
+ if (null != e.ctx) {
+ cbThreadPool.submitOrdered(e.ctx, e);
+ } else {
+ cbThreadPool.submit(e);
+ }
+ }
+ }
+
public void closeFileIfNecessary() {
// Close if shouldClose is set
if (shouldClose) {
@@ -374,14 +394,12 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
// This holds the queue entries that should be notified after a
// successful force write
Thread threadToNotifyOnEx;
- // should we group force writes
- private final boolean enableGroupForceWrites;
// make flush interval as a parameter
- public ForceWriteThread(Thread threadToNotifyOnEx, boolean enableGroupForceWrites) {
+ public ForceWriteThread(Thread threadToNotifyOnEx) {
super("ForceWriteThread");
this.threadToNotifyOnEx = threadToNotifyOnEx;
- this.enableGroupForceWrites = enableGroupForceWrites;
}
+
@Override
public void run() {
LOG.info("ForceWrite Thread started");
@@ -394,25 +412,23 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
// Force write the file and then notify the write completions
//
- if (!req.isMarker) {
- if (shouldForceWrite) {
- // if we are going to force write, any request that is already in the
- // queue will benefit from this force write - post a marker prior to issuing
- // the flush so until this marker is encountered we can skip the force write
- if (enableGroupForceWrites) {
- forceWriteRequests.put(new ForceWriteRequest(req.logFile, 0, 0, null, false, true));
- }
+ if (!req.isMarker && shouldForceWrite) {
+ // if we are going to force write, any request that is already in the
+ // queue will benefit from this force write - post a marker prior to issuing
+ // the flush so until this marker is encountered we can skip the force write
+ if (enableGroupForceWrites) {
+ forceWriteRequests.put(new ForceWriteRequest(req.logFile, 0, 0, 0, null, false, true));
+ }
- // If we are about to issue a write, record the number of requests in
- // the last force write and then reset the counter so we can accumulate
- // requests in the write we are about to issue
- if (numReqInLastForceWrite > 0) {
- forceWriteGroupingCountStats.registerSuccessfulValue(numReqInLastForceWrite);
- numReqInLastForceWrite = 0;
- }
+ // If we are about to issue a write, record the number of requests in
+ // the last force write and then reset the counter so we can accumulate
+ // requests in the write we are about to issue
+ if (numReqInLastForceWrite > 0) {
+ forceWriteGroupingCountStats.registerSuccessfulValue(numReqInLastForceWrite);
+ numReqInLastForceWrite = 0;
}
- numReqInLastForceWrite += req.process(shouldForceWrite);
}
+ numReqInLastForceWrite += req.process(shouldForceWrite);
if (enableGroupForceWrites &&
// if its a marker we should switch back to flushing
@@ -493,6 +509,8 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
final File journalDirectory;
final ServerConfiguration conf;
final ForceWriteThread forceWriteThread;
+ // should we group force writes
+ private final boolean enableGroupForceWrites;
// Time after which we will stop grouping and issue the flush
private final long maxGroupWaitInNanos;
// Threshold after which we flush any buffered journal entries
@@ -503,13 +521,17 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
private final boolean flushWhenQueueEmpty;
// should we hint the filesystem to remove pages from cache after force write
private final boolean removePagesFromCache;
+ // journal align size
+ private final int journalAlignmentSize;
+ // journal format version to write
+ private final int journalFormatVersionToWrite;
private final LastLogMark lastLogMark = new LastLogMark(0, 0);
/**
* The thread pool used to handle callback.
*/
- private final ExecutorService cbThreadPool;
+ private final OrderedSafeExecutor cbThreadPool;
// journal entry queue to commit
final LinkedBlockingQueue<QueueEntry> queue = new LinkedBlockingQueue<QueueEntry>();
@@ -548,12 +570,19 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
this.journalPreAllocSize = conf.getJournalPreAllocSizeMB() * MB;
this.journalWriteBufferSize = conf.getJournalWriteBufferSizeKB() * KB;
this.maxBackupJournals = conf.getMaxBackupJournals();
- this.forceWriteThread = new ForceWriteThread(this, conf.getJournalAdaptiveGroupWrites());
+ this.enableGroupForceWrites = conf.getJournalAdaptiveGroupWrites();
+ this.forceWriteThread = new ForceWriteThread(this);
this.maxGroupWaitInNanos = TimeUnit.MILLISECONDS.toNanos(conf.getJournalMaxGroupWaitMSec());
this.bufferedWritesThreshold = conf.getJournalBufferedWritesThreshold();
this.bufferedEntriesThreshold = conf.getJournalBufferedEntriesThreshold();
- this.cbThreadPool = Executors.newFixedThreadPool(conf.getNumJournalCallbackThreads(),
- new DaemonThreadFactory());
+ this.journalAlignmentSize = conf.getJournalAlignmentSize();
+ this.journalFormatVersionToWrite = conf.getJournalFormatVersionToWrite();
+ this.cbThreadPool = OrderedSafeExecutor.newBuilder()
+ .name("BookieJournal")
+ .numThreads(conf.getNumJournalCallbackThreads())
+ .statsLogger(Stats.get().getStatsLogger("journal"))
+ .threadFactory(new DaemonThreadFactory())
+ .build();
// Unless there is a cap on the max wait (which requires group force writes)
// we cannot skip flushing for queue empty
@@ -646,9 +675,11 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
throws IOException {
JournalChannel recLog;
if (journalPos <= 0) {
- recLog = new JournalChannel(journalDirectory, journalId, journalPreAllocSize, journalWriteBufferSize);
+ recLog = new JournalChannel(journalDirectory, journalId,
+ journalPreAllocSize, journalWriteBufferSize, statsLogger);
} else {
- recLog = new JournalChannel(journalDirectory, journalId, journalPreAllocSize, journalWriteBufferSize, journalPos);
+ recLog = new JournalChannel(journalDirectory, journalId,
+ journalPreAllocSize, journalWriteBufferSize, journalPos, statsLogger);
}
int journalVersion = recLog.getFormatVersion();
try {
@@ -701,6 +732,8 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
if (!isPaddingRecord) {
scanner.process(journalVersion, offset, recBuff);
}
+ // update last log mark during replaying
+ lastLogMark.setCurLogMark(journalId, offset);
}
} finally {
recLog.close();
@@ -785,10 +818,11 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
public void run() {
LinkedList<QueueEntry> toFlush = new LinkedList<QueueEntry>();
ByteBuffer lenBuff = ByteBuffer.allocate(4);
- ByteBuffer paddingBuff = ByteBuffer.allocate(2 * conf.getJournalAlignmentSize());
+ ByteBuffer paddingBuff = ByteBuffer.allocate(2 * journalAlignmentSize);
ZeroBuffer.put(paddingBuff);
JournalChannel logFile = null;
forceWriteThread.start();
+ Stopwatch journalAllocationWatcher = new Stopwatch();
Stopwatch journalCreationWatcher = new Stopwatch();
Stopwatch journalFlushWatcher = new Stopwatch();
long batchSize = 0;
@@ -799,7 +833,7 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
// http://docs.oracle.com/javase/1.5.0/docs/api/java/lang/System.html#nanoTime%28%29
long logId = journalIds.isEmpty() ? System.currentTimeMillis() : journalIds.get(journalIds.size() - 1);
BufferedChannel bc = null;
- long lastFlushPosition = 0;
+ long lastFlushPosition = 0L;
boolean groupWhenTimeout = false;
long dequeueStartTime = 0L;
@@ -815,15 +849,16 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
logId,
journalPreAllocSize,
journalWriteBufferSize,
- conf.getJournalAlignmentSize(),
+ journalAlignmentSize,
removePagesFromCache,
- conf.getJournalFormatVersionToWrite());
+ journalFormatVersionToWrite,
+ statsLogger);
journalCreationStats.registerSuccessfulEvent(
journalCreationWatcher.stop().elapsedTime(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
bc = logFile.getBufferedChannel();
- lastFlushPosition = bc.position();
+ lastFlushPosition = 0;
}
if (qe == null) {
@@ -880,14 +915,21 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
// toFlush is non null and not empty so should be safe to access getFirst
if (shouldFlush) {
- if (conf.getJournalFormatVersionToWrite() >= JournalChannel.V5) {
- writePaddingBytes(logFile, paddingBuff, conf.getJournalAlignmentSize());
- }
+ long prevFlushPosition = lastFlushPosition;
+
journalFlushWatcher.reset().start();
+ if (journalFormatVersionToWrite >= JournalChannel.V5) {
+ writePaddingBytes(logFile, paddingBuff, journalAlignmentSize);
+ }
bc.flush(false);
lastFlushPosition = bc.position();
- journalFlushStats.registerSuccessfulEvent(
- journalFlushWatcher.stop().elapsedTime(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
+
+ // start sync the range
+ if (!enableGroupForceWrites) {
+ logFile.startSyncRange(prevFlushPosition, lastFlushPosition);
+ }
+ journalFlushLatencyStats.registerSuccessfulEvent(
+ journalFlushWatcher.stop().elapsed(TimeUnit.MICROSECONDS));
// Trace the lifetime of entries through persistence
if (LOG.isDebugEnabled()) {
@@ -899,7 +941,8 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
forceWriteBatchEntriesStats.registerSuccessfulValue(toFlush.size());
forceWriteBatchBytesStats.registerSuccessfulValue(batchSize);
- forceWriteRequests.put(new ForceWriteRequest(logFile, logId, lastFlushPosition, toFlush, (lastFlushPosition > maxJournalSize), false));
+ forceWriteRequests.put(new ForceWriteRequest(logFile, logId, prevFlushPosition,
+ lastFlushPosition, toFlush, (lastFlushPosition > maxJournalSize), false));
toFlush = new LinkedList<QueueEntry>();
batchSize = 0L;
// check whether journal file is over file limit
@@ -936,8 +979,13 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
// we should be doing the following, but then we run out of
// direct byte buffers
// logFile.write(new ByteBuffer[] { lenBuff, qe.entry });
- bc.write(lenBuff);
- bc.write(qe.entry);
+ int flushes = 0;
+ flushes += bc.write(lenBuff);
+ flushes += bc.write(qe.entry);
+
+ journalMemAddFlushTimesStats.registerSuccessfulEvent(flushes);
+ journalMemAddLatencyStats.registerSuccessfulEvent(
+ MathUtils.elapsedMicroSec(qe.enqueueTime));
toFlush.add(qe);
qe = null;
@@ -970,11 +1018,10 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
LOG.info("Shutting down Journal");
forceWriteThread.shutdown();
cbThreadPool.shutdown();
- if (!cbThreadPool.awaitTermination(5, TimeUnit.SECONDS)) {
+ ;
+ if (!cbThreadPool.forceShutdown(5, TimeUnit.SECONDS)) {
LOG.warn("Couldn't shutdown journal callback thread gracefully. Forcing");
}
- cbThreadPool.shutdownNow();
-
running = false;
this.interrupt();
this.join();
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/d9802962/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
index ad46e5c..e3077e1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
@@ -28,13 +28,23 @@ import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.ByteBuffer;
import java.util.Arrays;
-
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Stopwatch;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.MathUtils;
import org.apache.bookkeeper.util.NativeIO;
import org.apache.bookkeeper.util.ZeroBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.google.common.base.Charsets.UTF_8;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.*;
+import static org.apache.bookkeeper.util.NativeIO.*;
/**
* Simple wrapper around FileChannel to add versioning
@@ -81,28 +91,36 @@ class JournalChannel implements Closeable {
// The position of the file channel's last drop position
private long lastDropPosition = 0L;
+ // Stats
+ private final OpStatsLogger journalPreallocationStats;
+ private final Counter journalForceWriteCounter;
+ private final OpStatsLogger journalForceWriteStats;
+
// Mostly used by tests
JournalChannel(File journalDirectory, long logId) throws IOException {
- this(journalDirectory, logId, 4 * 1024 * 1024, 65536, START_OF_FILE);
+ this(journalDirectory, logId, 4 * 1024 * 1024, 65536, START_OF_FILE, NullStatsLogger.INSTANCE);
}
// Open journal for scanning starting from the first record in journal.
- JournalChannel(File journalDirectory, long logId, long preAllocSize, int writeBufferSize) throws IOException {
- this(journalDirectory, logId, preAllocSize, writeBufferSize, START_OF_FILE);
+ JournalChannel(File journalDirectory, long logId, long preAllocSize, int writeBufferSize, StatsLogger statsLogger)
+ throws IOException {
+ this(journalDirectory, logId, preAllocSize, writeBufferSize, START_OF_FILE, statsLogger);
}
// Open journal for scanning starting from given position.
JournalChannel(File journalDirectory, long logId,
- long preAllocSize, int writeBufferSize, long position) throws IOException {
- this(journalDirectory, logId, preAllocSize, writeBufferSize, SECTOR_SIZE, position, false, V5);
+ long preAllocSize, int writeBufferSize, long position, StatsLogger statsLogger)
+ throws IOException {
+ this(journalDirectory, logId, preAllocSize, writeBufferSize, SECTOR_SIZE, position, false, V5, statsLogger);
}
// Open journal to write
JournalChannel(File journalDirectory, long logId,
long preAllocSize, int writeBufferSize, int journalAlignSize,
- boolean fRemoveFromPageCache, int formatVersionToWrite) throws IOException {
+ boolean fRemoveFromPageCache, int formatVersionToWrite,
+ StatsLogger statsLogger) throws IOException {
this(journalDirectory, logId, preAllocSize, writeBufferSize, journalAlignSize,
- START_OF_FILE, fRemoveFromPageCache, formatVersionToWrite);
+ START_OF_FILE, fRemoveFromPageCache, formatVersionToWrite, statsLogger);
}
/**
@@ -124,12 +142,20 @@ class JournalChannel implements Closeable {
* whether to remove cached pages from page cache.
* @param formatVersionToWrite
* format version to write
+ * @param statsLogger
+ * stats logger to record stats
* @throws IOException
*/
- private JournalChannel(File journalDirectory, long logId,
- long preAllocSize, int writeBufferSize, int journalAlignSize,
- long position, boolean fRemoveFromPageCache,
- int formatVersionToWrite) throws IOException {
+ private JournalChannel(File journalDirectory,
+ long logId,
+ long preAllocSize,
+ int writeBufferSize,
+ int journalAlignSize,
+ long position,
+ boolean fRemoveFromPageCache,
+ int formatVersionToWrite,
+ StatsLogger statsLogger)
+ throws IOException {
this.journalAlignSize = journalAlignSize;
this.zeros = ByteBuffer.allocate(journalAlignSize);
this.preAllocSize = preAllocSize - preAllocSize % journalAlignSize;
@@ -149,9 +175,13 @@ class JournalChannel implements Closeable {
+ " suddenly appeared, is another bookie process running?");
}
randomAccessFile = new RandomAccessFile(fn, "rw");
+ fd = NativeIO.getSysFileDescriptor(randomAccessFile.getFD());
fc = randomAccessFile.getChannel();
formatVersion = formatVersionToWrite;
+ // preallocate the space the header
+ preallocate();
+
int headerSize = (V4 == formatVersion) ? VERSION_HEADER_SIZE : HEADER_SIZE;
ByteBuffer bb = ByteBuffer.allocate(headerSize);
ZeroBuffer.put(bb);
@@ -162,11 +192,12 @@ class JournalChannel implements Closeable {
fc.write(bb);
bc = new BufferedChannel(fc, writeBufferSize);
- forceWrite(true);
- nextPrealloc = this.preAllocSize;
- fc.write(zeros, nextPrealloc - journalAlignSize);
+
+ // sync the file
+ // syncRangeOrForceWrite(0, HEADER_SIZE);
} else { // open an existing file
randomAccessFile = new RandomAccessFile(fn, "r");
+ fd = NativeIO.getSysFileDescriptor(randomAccessFile.getFD());
fc = randomAccessFile.getChannel();
bc = null; // readonly
@@ -215,7 +246,13 @@ class JournalChannel implements Closeable {
LOG.error("Bookie journal file can seek to position :", e);
}
}
- this.fd = NativeIO.getSysFileDescriptor(randomAccessFile.getFD());
+
+ // Stats
+ this.journalForceWriteCounter = statsLogger.getCounter(JOURNAL_NUM_FORCE_WRITES);
+ this.journalForceWriteStats = statsLogger.getOpStatsLogger(JOURNAL_FORCE_WRITE_LATENCY);
+ this.journalPreallocationStats = statsLogger.getOpStatsLogger(JOURNAL_PREALLOCATION);
+
+ LOG.info("Opened journal {} : fd {}", fn, fd);
}
int getFormatVersion() {
@@ -229,14 +266,33 @@ class JournalChannel implements Closeable {
return bc;
}
- void preAllocIfNeeded(long size) throws IOException {
- if (bc.position() + size > nextPrealloc) {
- nextPrealloc += preAllocSize;
+ private void preallocate() throws IOException {
+ long prevPrealloc = nextPrealloc;
+ nextPrealloc = prevPrealloc + preAllocSize;
+ if (!NativeIO.fallocateIfPossible(fd, prevPrealloc, preAllocSize)) {
zeros.clear();
fc.write(zeros, nextPrealloc - journalAlignSize);
}
}
+ void preAllocIfNeeded(long size) throws IOException {
+ preAllocIfNeeded(size, null);
+ }
+
+ void preAllocIfNeeded(long size, Stopwatch stopwatch) throws IOException {
+ if (bc.position() + size > nextPrealloc) {
+ if (null != stopwatch) {
+ stopwatch.reset().start();
+ }
+ preallocate();
+ if (null != stopwatch) {
+ journalPreallocationStats.registerSuccessfulEvent(
+ stopwatch.stop().elapsedTime(TimeUnit.MICROSECONDS),
+ TimeUnit.MICROSECONDS);
+ }
+ }
+ }
+
int read(ByteBuffer dst)
throws IOException {
return fc.read(dst);
@@ -246,11 +302,33 @@ class JournalChannel implements Closeable {
fc.close();
}
+ public void startSyncRange(long offset, long bytes) throws IOException {
+ NativeIO.syncFileRangeIfPossible(fd, offset, bytes, SYNC_FILE_RANGE_WRITE);
+ }
+
+ public boolean syncRangeIfPossible(long offset, long bytes) throws IOException {
+ if (NativeIO.syncFileRangeIfPossible(fd, offset, bytes,
+ SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE | SYNC_FILE_RANGE_WAIT_AFTER)) {
+ removeFromPageCacheIfPossible(offset + bytes);
+ return false;
+ } else {
+ return false;
+ }
+ }
+
public void forceWrite(boolean forceMetadata) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Journal ForceWrite");
}
- long newForceWritePosition = bc.forceWrite(forceMetadata);
+ long startTimeNanos = MathUtils.nowInNano();
+ forceWriteImpl(forceMetadata);
+ // collect stats
+ journalForceWriteCounter.inc();
+ journalForceWriteStats.registerSuccessfulEvent(
+ MathUtils.elapsedMicroSec(startTimeNanos), TimeUnit.MICROSECONDS);
+ }
+
+ private void removeFromPageCacheIfPossible(long offset) {
//
// For POSIX_FADV_DONTNEED, we want to drop from the beginning
// of the file to a position prior to the current position.
@@ -265,11 +343,28 @@ class JournalChannel implements Closeable {
// lastDropPosition newDropPos lastForceWritePosition
//
if (fRemoveFromPageCache) {
- long newDropPos = newForceWritePosition - CACHE_DROP_LAG_BYTES;
+ long newDropPos = offset - CACHE_DROP_LAG_BYTES;
if (lastDropPosition < newDropPos) {
NativeIO.bestEffortRemoveFromPageCache(fd, lastDropPosition, newDropPos - lastDropPosition);
}
this.lastDropPosition = newDropPos;
}
}
+
+ private void forceWriteImpl(boolean forceMetadata) throws IOException {
+ long newForceWritePosition = bc.forceWrite(forceMetadata);
+ removeFromPageCacheIfPossible(newForceWritePosition);
+ }
+
+ public void syncRangeOrForceWrite(long offset, long bytes) throws IOException {
+ long startTimeNanos = MathUtils.nowInNano();
+ if (!syncRangeIfPossible(offset, bytes)) {
+ forceWriteImpl(false);
+ }
+ // collect stats
+ journalForceWriteCounter.inc();
+ journalForceWriteStats.registerSuccessfulEvent(
+ MathUtils.elapsedMicroSec(startTimeNanos),
+ TimeUnit.MICROSECONDS);
+ }
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/d9802962/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/Errno.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/Errno.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/Errno.java
new file mode 100644
index 0000000..e5d8ae8
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/Errno.java
@@ -0,0 +1,115 @@
+package org.apache.bookkeeper.util;
+
+import com.sun.jna.Library;
+import com.sun.jna.Native;
+
+public class Errno {
+
+ private static InterfaceDelegate delegate = (InterfaceDelegate) Native.loadLibrary("c",
+ InterfaceDelegate.class);
+
+ /**
+ * The routine perror() produces a message on the standard error output,
+ * describing the last error encountered during a call to a system or
+ * library function. First (if s is not NULL and *s is not a null byte
+ * ('\0')) the argument string s is printed, followed by a colon and a
+ * blank. Then the message and a new-line.
+ *
+ * To be of most use, the argument string should include the name of the
+ * function that incurred the error. The error number is taken from the
+ * external variable errno, which is set when errors occur but not cleared
+ * when non-erroneous calls are made.
+ *
+ * The global error list sys_errlist[] indexed by errno can be used to
+ * obtain the error message without the newline. The largest message number
+ * provided in the table is sys_nerr -1. Be careful when directly accessing
+ * this list because new error values may not have been added to
+ * sys_errlist[].
+ *
+ * When a system call fails, it usually returns -1 and sets the variable
+ * errno to a value describing what went wrong. (These values can be found
+ * in <errno.h>.) Many library functions do likewise. The function perror()
+ * serves to translate this error code into human-readable form. Note that
+ * errno is undefined after a successful library call: this call may well
+ * change this variable, even though it succeeds, for example because it
+ * internally used some other library function that failed. Thus, if a
+ * failing call is not immediately followed by a call to perror(), the value
+ * of errno should be saved.
+ */
+ public static int perror(String s) {
+ return delegate.perror(s);
+ }
+
+ /**
+ * The strerror() function returns a string describing the error code passed
+ * in the argument errnum, possibly using the LC_MESSAGES part of the
+ * current locale to select the appropriate language. This string must not
+ * be modified by the application, but may be modified by a subsequent call
+ * to perror() or strerror(). No library function will modify this string.
+ *
+ * The strerror_r() function is similar to strerror(), but is thread safe.
+ * This function is available in two versions: an XSI-compliant version
+ * specified in POSIX.1-2001, and a GNU-specific version (available since
+ * glibc 2.0). If _XOPEN_SOURCE is defined with the value 600, then the
+ * XSI-compliant version is provided, otherwise the GNU-specific version is
+ * provided.
+ *
+ * The XSI-compliant strerror_r() is preferred for portable applications. It
+ * returns the error string in the user-supplied buffer buf of length
+ * buflen.
+ *
+ * The GNU-specific strerror_r() returns a pointer to a string containing
+ * the error message. This may be either a pointer to a string that the
+ * function stores in buf, or a pointer to some (immutable) static string
+ * (in which case buf is unused). If the function stores a string in buf,
+ * then at most buflen bytes are stored (the string may be truncated if
+ * buflen is too small) and the string always includes a terminating null
+ * byte.
+ *
+ */
+ public static String strerror(int errnum) {
+ return delegate.strerror(errnum);
+ }
+
+ public static String strerror() {
+ return strerror(errno());
+ }
+
+ /**
+ * The <errno.h> header file defines the integer variable errno, which is
+ * set by system calls and some library functions in the event of an error
+ * to indicate what went wrong. Its value is significant only when the call
+ * returned an error (usually -1), and a function that does succeed is
+ * allowed to change errno.
+ *
+ * Sometimes, when -1 is also a valid successful return value one has to
+ * zero errno before the call in order to detect possible errors.
+ *
+ * errno is defined by the ISO C standard to be a modifiable lvalue of type
+ * int, and must not be explicitly declared; errno may be a macro. errno is
+ * thread-local; setting it in one thread does not affect its value in any
+ * other thread.
+ *
+ * Valid error numbers are all non-zero; errno is never set to zero by any
+ * library function. All the error names specified by POSIX.1 must have
+ * distinct values, with the exception of EAGAIN and EWOULDBLOCK, which may
+ * be the same.
+ *
+ * Below is a list of the symbolic error names that are defined on Linux.
+ * Some of these are marked POSIX.1, indicating that the name is defined by
+ * POSIX.1-2001, or C99, indicating that the name is defined by C99.
+ *
+ */
+ public static int errno() {
+ return Native.getLastError();
+ }
+
+ interface InterfaceDelegate extends Library {
+
+ int perror(String s);
+
+ String strerror(int errnum);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/d9802962/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/NativeIO.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/NativeIO.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/NativeIO.java
index 2448842..9eb3a68 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/NativeIO.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/NativeIO.java
@@ -20,34 +20,56 @@ package org.apache.bookkeeper.util;
import java.lang.reflect.Field;
import java.io.FileDescriptor;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.sun.jna.LastErrorException;
-import com.sun.jna.Native;
public final class NativeIO {
private static final Logger LOG = LoggerFactory.getLogger(NativeIO.class);
private static final int POSIX_FADV_DONTNEED = 4; /* fadvise.h */
+ /**
+ * Wait upon writeout of all pages in the range before performing the write.
+ */
+ public static final int SYNC_FILE_RANGE_WAIT_BEFORE = 1;
+ /**
+ * Initiate writeout of all those dirty pages in the range which are not presently
+ * under writeback.
+ */
+ public static final int SYNC_FILE_RANGE_WRITE = 2;
+ /**
+ * Wait upon writeout of all pages in the range after performing the write.
+ */
+ public static final int SYNC_FILE_RANGE_WAIT_AFTER = 4;
+
+ private static final int FALLOC_FL_KEEP_SIZE = 1;
+
private static boolean initialized = false;
private static boolean fadvisePossible = true;
+ private static boolean syncFileRangePossible = true;
+ private static boolean sysFallocatePossible = true;
+ private static boolean posixFallocatePossible = true;
static {
try {
- Native.register("c");
+ LOG.info("Loading bookkeeper native library.");
+ System.loadLibrary("bookkeeper");
initialized = true;
- } catch (NoClassDefFoundError e) {
- LOG.info("JNA not found. Native methods will be disabled.");
- } catch (UnsatisfiedLinkError e) {
- LOG.info("Unable to link C library. Native methods will be disabled.");
- } catch (NoSuchMethodError e) {
- LOG.warn("Obsolete version of JNA present; unable to register C library");
+ LOG.info("Loaded bookkeeper native library. Enabled Native IO.");
+ } catch (Throwable t) {
+ LOG.info("Unable to load bookkeeper native library. Native methods will be disabled : ", t);
}
}
// fadvice
- public static native int posix_fadvise(int fd, long offset, long len, int flag) throws LastErrorException;
+ public static native int posix_fadvise(int fd, long offset, long len, int flag);
+ // posix_fallocate
+ public static native int posix_fallocate(int fd, long offset, long len);
+ // fallocate
+ public static native int fallocate(int fd, int mode, long offset, long len);
+ // sync_file_range(2)
+ public static native int sync_file_range(int fd, long offset, long len, int flags);
private NativeIO() {}
@@ -66,6 +88,7 @@ public final class NativeIO {
return field;
}
+
/**
* Get system file descriptor (int) from FileDescriptor object.
* @param descriptor - FileDescriptor object to get fd from
@@ -82,6 +105,92 @@ public final class NativeIO {
return -1;
}
+ public static boolean fallocateIfPossible(int fd, long offset, long nbytes) {
+ if (!initialized || fd < 0) {
+ return false;
+ }
+ boolean allocated = false;
+ if (sysFallocatePossible) {
+ allocated = sysFallocateIfPossible(fd, offset, nbytes);
+ }
+ if (!allocated && posixFallocatePossible) {
+ allocated = posixFallocateIfPossible(fd, offset, nbytes);
+ }
+ return allocated;
+ }
+
+ private static boolean sysFallocateIfPossible(int fd, long offset, long nbytes) {
+ try {
+ int rc = fallocate(fd, FALLOC_FL_KEEP_SIZE, offset, nbytes);
+ if (rc != 0) {
+ LOG.error("Failed on sys fallocate file descriptor {}, offset {}, bytes {}, rc {} : {}",
+ new Object[] { fd, offset, nbytes, rc, Errno.strerror() });
+ return false;
+ }
+ } catch (UnsupportedOperationException uoe) {
+ LOG.warn("sys fallocate isn't supported : ", uoe);
+ sysFallocatePossible = false;
+ } catch (UnsatisfiedLinkError nle) {
+ LOG.warn("Unsatisfied Link error: sys fallocate failed on file descriptor {}, offset {}, bytes {} : ",
+ new Object[] { fd, offset, nbytes, nle });
+ sysFallocatePossible = false;
+ } catch (Exception e) {
+ LOG.error("Unknown exception: sys fallocate failed on file descriptor {}, offset {}, bytes {} : ",
+ new Object[] { fd, offset, nbytes, e });
+ return false;
+ }
+ return sysFallocatePossible;
+ }
+
+ private static boolean posixFallocateIfPossible(int fd, long offset, long nbytes) {
+ try {
+ int rc = posix_fallocate(fd, offset, nbytes);
+ if (rc != 0) {
+ LOG.error("Failed on posix_fallocate file descriptor {}, offset {}, bytes {}, rc {} : {}",
+ new Object[] { fd, offset, nbytes, rc, Errno.strerror() });
+ return false;
+ }
+ } catch (UnsupportedOperationException uoe) {
+ LOG.warn("posix_fallocate isn't supported : ", uoe);
+ posixFallocatePossible = false;
+ } catch (UnsatisfiedLinkError nle) {
+ LOG.warn("Unsatisfied Link error: posix_fallocate failed on file descriptor {}, offset {}, bytes {} : ",
+ new Object[] { fd, offset, nbytes, nle });
+ posixFallocatePossible = false;
+ } catch (Exception e) {
+ LOG.error("Unknown exception: posix_fallocate failed on file descriptor {}, offset {}, bytes {} : ",
+ new Object[] { fd, offset, nbytes, e });
+ return false;
+ }
+ return posixFallocatePossible;
+ }
+
+ public static boolean syncFileRangeIfPossible(int fd, long offset, long nbytes, int flags) {
+ if (!initialized || !syncFileRangePossible || fd < 0) {
+ return false;
+ }
+ try {
+ int rc = sync_file_range(fd, offset, nbytes, flags);
+ if (rc != 0) {
+ LOG.error("Failed on syncing file descriptor {}, offset {}, bytes {}, rc {} : {}",
+ new Object[] { fd, offset, nbytes, rc, Errno.strerror() });
+ return false;
+ }
+ } catch (UnsupportedOperationException uoe) {
+ LOG.warn("sync_file_range isn't supported : ", uoe);
+ syncFileRangePossible = false;
+ } catch (UnsatisfiedLinkError nle) {
+ LOG.warn("Unsatisfied Link error: sync_file_range failed on file descriptor {}, offset {}, bytes {} : ",
+ new Object[] { fd, offset, nbytes, nle });
+ syncFileRangePossible = false;
+ } catch (Exception e) {
+ LOG.error("Unknown exception: sync_file_range failed on file descriptor {}, offset {}, bytes {} : ",
+ new Object[] { fd, offset, nbytes, e });
+ return false;
+ }
+ return syncFileRangePossible;
+ }
+
/**
* Remove pages from the file system page cache when they wont
* be accessed again
@@ -89,16 +198,22 @@ public final class NativeIO {
* @param fd The file descriptor of the source file.
* @param offset The offset within the file.
* @param len The length to be flushed.
- *
- * @throws nothing => Best effort
*/
-
public static void bestEffortRemoveFromPageCache(int fd, long offset, long len) {
+ posixFadviseIfPossible(fd, offset, len, POSIX_FADV_DONTNEED);
+ }
+
+ public static boolean posixFadviseIfPossible(int fd, long offset, long len, int flags) {
if (!initialized || !fadvisePossible || fd < 0) {
- return;
+ return false;
}
try {
- posix_fadvise(fd, offset, len, POSIX_FADV_DONTNEED);
+ int rc = posix_fadvise(fd, offset, len, flags);
+ if (rc != 0) {
+ LOG.error("Failed on posix_fadvise file descriptor {}, offset {}, bytes {}, flags {}, rc {} : {}",
+ new Object[] { fd, offset, len, flags, rc, Errno.strerror() });
+ return false;
+ }
} catch (UnsupportedOperationException uoe) {
LOG.warn("posix_fadvise is not supported : ", uoe);
fadvisePossible = false;
@@ -113,7 +228,9 @@ public final class NativeIO {
// exception and forget
LOG.warn("Unknown exception: posix_fadvise failed on file descriptor {}, offset {} : ",
new Object[] { fd, offset, e });
+ return false;
}
+ return fadvisePossible;
}
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/d9802962/bookkeeper-server/src/main/native/src/org/apache/bookkeeper/util/NativeIO.c
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/native/src/org/apache/bookkeeper/util/NativeIO.c b/bookkeeper-server/src/main/native/src/org/apache/bookkeeper/util/NativeIO.c
new file mode 100644
index 0000000..b93bde4
--- /dev/null
+++ b/bookkeeper-server/src/main/native/src/org/apache/bookkeeper/util/NativeIO.c
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <jni.h>
+
+#include <errno.h>
+#include <fcntl.h>
+#include <sys/syscall.h>
+#include <sys/types.h>
+#include <asm-x86_64/unistd.h>
+#include "config.h"
+
+#if defined(HAVE_SYNC_FILE_RANGE)
+# define my_sync_file_range sync_file_range
+#elif defined(__NR_sync_file_range)
+// RHEL 5 kernels have sync_file_range support, but the glibc
+// included does not have the library function. We can
+// still call it directly, and if it's not supported by the
+// kernel, we'd get ENOSYS. See RedHat Bugzilla #518581
+static int manual_sync_file_range (int fd, __off64_t from, __off64_t to, unsigned int flags)
+{
+#ifdef __x86_64__
+ return syscall( __NR_sync_file_range, fd, from, to, flags);
+#else
+ return syscall (__NR_sync_file_range, fd,
+ __LONG_LONG_PAIR ((long) (from >> 32), (long) from),
+ __LONG_LONG_PAIR ((long) (to >> 32), (long) to),
+ flags);
+#endif
+}
+#define my_sync_file_range manual_sync_file_range
+#endif
+
+/**
+ * public static native void sync_file_range(
+ * int fd, long offset, long len, int flags);
+ *
+ * The "00024" in the function name is an artifact of how JNI encodes
+ * special characters. U+0024 is '$'.
+ */
+JNIEXPORT jint JNICALL
+Java_org_apache_bookkeeper_util_NativeIO_sync_1file_1range(
+ JNIEnv *env, jclass clazz,
+ jint fd, jlong offset, jlong len, jint flags)
+{
+#ifndef my_sync_file_range
+ errno = ENOSYS;
+ return -1;
+#else
+ return my_sync_file_range(fd, (off_t)offset, (off_t)len, flags);
+#endif
+}
+
+#if defined(HAVE_FALLOCATE)
+# define my_fallocate fallocate
+#elif defined(__NR_fallocate)
+static int manual_fallocate (int fd, int mode, __off64_t from, __off64_t to)
+{
+#ifdef __x86_64__
+ return syscall( __NR_fallocate, fd, mode, from, to);
+#else
+ return syscall (__NR_fallocate, fd, mode,
+ __LONG_LONG_PAIR ((long) (from >> 32), (long) from),
+ __LONG_LONG_PAIR ((long) (to >> 32), (long) to));
+#endif
+}
+#define my_fallocate manual_fallocate
+#endif
+
+JNIEXPORT jint JNICALL
+Java_org_apache_bookkeeper_util_NativeIO_fallocate(
+ JNIEnv *env, jclass clazz,
+ jint fd, jint mode, jlong offset, jlong len)
+{
+#ifndef my_fallocate
+ errno = ENOSYS;
+ return -1;
+#else
+ return my_fallocate(fd, mode, (off_t)offset, (off_t)len);
+#endif
+}
+
+JNIEXPORT jint JNICALL
+Java_org_apache_bookkeeper_util_NativeIO_posix_1fadvise(
+ JNIEnv *env, jclass clazz,
+ jint fd, jlong offset, jlong len, jint flags)
+{
+#ifndef HAVE_POSIX_FADVISE
+ errno = ENOSYS;
+ return -1;
+#else
+ return posix_fadvise(fd, (off_t)offset, (off_t)len, flags);
+#endif
+}
+
+JNIEXPORT jint JNICALL
+Java_org_apache_bookkeeper_util_NativeIO_posix_1fallocate(
+ JNIEnv *env, jclass clazz,
+ jint fd, jlong offset, jlong len)
+{
+#ifndef HAVE_POSIX_FALLOCATE
+ errno = ENOSYS;
+ return -1;
+#else
+ return posix_fallocate(fd, (off_t)offset, (off_t)len);
+#endif
+}
[2/7] bookkeeper git commit: fix merge conflicts
Posted by si...@apache.org.
fix merge conflicts
Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/014512c3
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/014512c3
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/014512c3
Branch: refs/heads/sijie/bookkeeper_fallocate
Commit: 014512c3563bd06bd90789db0c5d0369fe421a62
Parents: d980296
Author: Sijie Guo <si...@apache.org>
Authored: Thu Nov 17 17:32:15 2016 -0800
Committer: Sijie Guo <si...@apache.org>
Committed: Thu Nov 17 17:32:15 2016 -0800
----------------------------------------------------------------------
.../bookie/BookKeeperServerStats.java | 1 +
.../bookkeeper/bookie/BufferedChannel.java | 8 +++--
.../org/apache/bookkeeper/bookie/Journal.java | 34 +++++++++-----------
3 files changed, 23 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/014512c3/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
index 239f923..79c0d61 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
@@ -53,6 +53,7 @@ public interface BookKeeperServerStats {
public final static String JOURNAL_FORCE_WRITE_BATCH_ENTRIES = "JOURNAL_FORCE_WRITE_BATCH_ENTRIES";
public final static String JOURNAL_FORCE_WRITE_BATCH_BYTES = "JOURNAL_FORCE_WRITE_BATCH_BYTES";
public final static String JOURNAL_FLUSH_LATENCY = "JOURNAL_FLUSH_LATENCY";
+ public final static String JOURNAL_FLUSH_IN_MEM_ADD = "JOURNAL_FLUSH_IN_MEM_ADD";
public final static String JOURNAL_QUEUE_LATENCY = "JOURNAL_QUEUE_LATENCY";
public final static String JOURNAL_PROCESS_TIME_LATENCY = "JOURNAL_PROCESS_TIME_LATENCY";
public final static String JOURNAL_CREATION_LATENCY = "JOURNAL_CREATION_LATENCY";
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/014512c3/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
index cb7d914..0492943 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
@@ -24,9 +24,10 @@ package org.apache.bookkeeper.bookie;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
-import org.apache.bookkeeper.util.ZeroBuffer;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.bookkeeper.util.ZeroBuffer;
+
/**
* Provides a buffering layer in front of a FileChannel.
*/
@@ -64,8 +65,9 @@ public class BufferedChannel extends BufferedReadChannel {
* @param src The source ByteBuffer which contains the data to be written.
* @throws IOException if a write operation fails.
*/
- synchronized public void write(ByteBuffer src) throws IOException {
+ synchronized public int write(ByteBuffer src) throws IOException {
int copied = 0;
+ int flushes = 0;
while(src.remaining() > 0) {
int truncated = 0;
if (writeBuffer.remaining() < src.remaining()) {
@@ -78,9 +80,11 @@ public class BufferedChannel extends BufferedReadChannel {
// if we have run out of buffer space, we should flush to the file
if (writeBuffer.remaining() == 0) {
flushInternal();
+ ++flushes;
}
}
position += copied;
+ return flushes;
}
/**
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/014512c3/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index dd62d28..e8adfff 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -30,8 +30,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -39,18 +37,15 @@ import com.google.common.base.Stopwatch;
import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
-<<<<<<< HEAD
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
-=======
-import org.apache.bookkeeper.stats.BookkeeperServerStatsLogger;
-import org.apache.bookkeeper.stats.ServerStatsProvider;
->>>>>>> 2d5718f... bookie: fallocate & sync_file_range
import org.apache.bookkeeper.util.DaemonThreadFactory;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.bookkeeper.util.MathUtils;
+import org.apache.bookkeeper.util.OrderedSafeExecutor;
+import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.bookkeeper.util.ZeroBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -295,7 +290,7 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
}
@Override
- public void run() {
+ public void safeRun() {
if (LOG.isDebugEnabled()) {
LOG.debug("Acknowledge Ledger: {}, Entry: {}", ledgerId, entryId);
}
@@ -541,10 +536,12 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
private final LedgerDirsManager ledgerDirsManager;
// Expose Stats
+ private final StatsLogger statsLogger;
private final OpStatsLogger journalAddEntryStats;
- private final OpStatsLogger journalSyncStats;
+ private final OpStatsLogger journalMemAddEntryStats;
private final OpStatsLogger journalCreationStats;
private final OpStatsLogger journalFlushStats;
+ private final OpStatsLogger journalMemAddFlushStats;
private final OpStatsLogger journalProcessTimeStats;
private final OpStatsLogger journalQueueStats;
private final OpStatsLogger forceWriteGroupingCountStats;
@@ -580,7 +577,7 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
this.cbThreadPool = OrderedSafeExecutor.newBuilder()
.name("BookieJournal")
.numThreads(conf.getNumJournalCallbackThreads())
- .statsLogger(Stats.get().getStatsLogger("journal"))
+ .statsLogger(statsLogger)
.threadFactory(new DaemonThreadFactory())
.build();
@@ -594,10 +591,12 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
LOG.debug("Last Log Mark : {}", lastLogMark.getCurMark());
// Expose Stats
+ this.statsLogger = statsLogger;
journalAddEntryStats = statsLogger.getOpStatsLogger(JOURNAL_ADD_ENTRY);
- journalSyncStats = statsLogger.getOpStatsLogger(JOURNAL_SYNC);
+ journalMemAddEntryStats = statsLogger.getOpStatsLogger(JOURNAL_MEM_ADD_ENTRY);
journalCreationStats = statsLogger.getOpStatsLogger(JOURNAL_CREATION_LATENCY);
journalFlushStats = statsLogger.getOpStatsLogger(JOURNAL_FLUSH_LATENCY);
+ journalMemAddFlushStats = statsLogger.getOpStatsLogger(JOURNAL_FLUSH_IN_MEM_ADD);
journalQueueStats = statsLogger.getOpStatsLogger(JOURNAL_QUEUE_LATENCY);
journalProcessTimeStats = statsLogger.getOpStatsLogger(JOURNAL_PROCESS_TIME_LATENCY);
forceWriteGroupingCountStats = statsLogger.getOpStatsLogger(JOURNAL_FORCE_WRITE_GROUPING_COUNT);
@@ -928,8 +927,8 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
if (!enableGroupForceWrites) {
logFile.startSyncRange(prevFlushPosition, lastFlushPosition);
}
- journalFlushLatencyStats.registerSuccessfulEvent(
- journalFlushWatcher.stop().elapsed(TimeUnit.MICROSECONDS));
+ journalFlushStats.registerSuccessfulEvent(
+ journalFlushWatcher.stop().elapsedTime(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
// Trace the lifetime of entries through persistence
if (LOG.isDebugEnabled()) {
@@ -983,9 +982,9 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
flushes += bc.write(lenBuff);
flushes += bc.write(qe.entry);
- journalMemAddFlushTimesStats.registerSuccessfulEvent(flushes);
- journalMemAddLatencyStats.registerSuccessfulEvent(
- MathUtils.elapsedMicroSec(qe.enqueueTime));
+ journalMemAddFlushStats.registerSuccessfulValue(flushes);
+ journalMemAddEntryStats.registerSuccessfulEvent(
+ MathUtils.elapsedMicroSec(qe.enqueueTime), TimeUnit.MICROSECONDS);
toFlush.add(qe);
qe = null;
@@ -1018,8 +1017,7 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
LOG.info("Shutting down Journal");
forceWriteThread.shutdown();
cbThreadPool.shutdown();
- ;
- if (!cbThreadPool.forceShutdown(5, TimeUnit.SECONDS)) {
+ if (!cbThreadPool.awaitTermination(5, TimeUnit.SECONDS)) {
LOG.warn("Couldn't shutdown journal callback thread gracefully. Forcing");
}
running = false;
[3/7] bookkeeper git commit: Fix
Posted by si...@apache.org.
Fix
Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/6077ef8b
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/6077ef8b
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/6077ef8b
Branch: refs/heads/sijie/bookkeeper_fallocate
Commit: 6077ef8b97648aa5521d352609abb236b151b8ad
Parents: 014512c
Author: Sijie Guo <si...@apache.org>
Authored: Thu Nov 17 17:42:36 2016 -0800
Committer: Sijie Guo <si...@apache.org>
Committed: Thu Nov 17 17:42:36 2016 -0800
----------------------------------------------------------------------
.../src/main/java/org/apache/bookkeeper/bookie/Journal.java | 8 ++++++--
1 file changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/6077ef8b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index e8adfff..942a93f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -334,12 +334,14 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
try {
if (shouldForceWrite) {
+ long startTime = MathUtils.nowInNano();
if (enableGroupForceWrites) {
this.logFile.forceWrite(false);
} else {
this.logFile.syncRangeOrForceWrite(this.startFlushPosition,
this.endFlushPosition - this.startFlushPosition);
}
+ journalSyncStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
}
lastLogMark.setCurLogMark(this.logId, this.endFlushPosition);
@@ -539,6 +541,7 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
private final StatsLogger statsLogger;
private final OpStatsLogger journalAddEntryStats;
private final OpStatsLogger journalMemAddEntryStats;
+ private final OpStatsLogger journalSyncStats;
private final OpStatsLogger journalCreationStats;
private final OpStatsLogger journalFlushStats;
private final OpStatsLogger journalMemAddFlushStats;
@@ -594,6 +597,7 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
this.statsLogger = statsLogger;
journalAddEntryStats = statsLogger.getOpStatsLogger(JOURNAL_ADD_ENTRY);
journalMemAddEntryStats = statsLogger.getOpStatsLogger(JOURNAL_MEM_ADD_ENTRY);
+ journalSyncStats = statsLogger.getOpStatsLogger(JOURNAL_SYNC);
journalCreationStats = statsLogger.getOpStatsLogger(JOURNAL_CREATION_LATENCY);
journalFlushStats = statsLogger.getOpStatsLogger(JOURNAL_FLUSH_LATENCY);
journalMemAddFlushStats = statsLogger.getOpStatsLogger(JOURNAL_FLUSH_IN_MEM_ADD);
@@ -857,7 +861,7 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
bc = logFile.getBufferedChannel();
- lastFlushPosition = 0;
+ lastFlushPosition = bc.position();
}
if (qe == null) {
@@ -928,7 +932,7 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
logFile.startSyncRange(prevFlushPosition, lastFlushPosition);
}
journalFlushStats.registerSuccessfulEvent(
- journalFlushWatcher.stop().elapsedTime(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
+ journalFlushWatcher.stop().elapsedTime(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
// Trace the lifetime of entries through persistence
if (LOG.isDebugEnabled()) {
[5/7] bookkeeper git commit: Remove sync_range
Posted by si...@apache.org.
Remove sync_range
Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/74113b5c
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/74113b5c
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/74113b5c
Branch: refs/heads/sijie/bookkeeper_fallocate
Commit: 74113b5cf390132d1e6ddb13ba3c207e076df758
Parents: 4292282
Author: Sijie Guo <si...@apache.org>
Authored: Tue Apr 18 11:19:24 2017 -0700
Committer: Sijie Guo <si...@apache.org>
Committed: Tue Apr 18 11:19:24 2017 -0700
----------------------------------------------------------------------
.../org/apache/bookkeeper/bookie/Journal.java | 11 +----
.../bookkeeper/bookie/JournalChannel.java | 30 --------------
.../org/apache/bookkeeper/util/NativeIO.java | 43 --------------------
.../src/org/apache/bookkeeper/util/NativeIO.c | 41 -------------------
4 files changed, 1 insertion(+), 124 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74113b5c/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index e3531f6..2862144 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -335,12 +335,7 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
try {
if (shouldForceWrite) {
long startTime = MathUtils.nowInNano();
- if (enableGroupForceWrites) {
- this.logFile.forceWrite(false);
- } else {
- this.logFile.syncRangeOrForceWrite(this.startFlushPosition,
- this.endFlushPosition - this.startFlushPosition);
- }
+ this.logFile.forceWrite(false);
journalSyncStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
}
lastLogMark.setCurLogMark(this.logId, this.endFlushPosition);
@@ -932,10 +927,6 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
bc.flush(false);
lastFlushPosition = bc.position();
- // start sync the range
- if (!enableGroupForceWrites) {
- logFile.startSyncRange(prevFlushPosition, lastFlushPosition);
- }
journalFlushStats.registerSuccessfulEvent(
journalFlushWatcher.stop().elapsedTime(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74113b5c/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
index e3077e1..07e3d3d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
@@ -30,7 +30,6 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -44,7 +43,6 @@ import org.slf4j.LoggerFactory;
import static com.google.common.base.Charsets.UTF_8;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.*;
-import static org.apache.bookkeeper.util.NativeIO.*;
/**
* Simple wrapper around FileChannel to add versioning
@@ -192,9 +190,6 @@ class JournalChannel implements Closeable {
fc.write(bb);
bc = new BufferedChannel(fc, writeBufferSize);
-
- // sync the file
- // syncRangeOrForceWrite(0, HEADER_SIZE);
} else { // open an existing file
randomAccessFile = new RandomAccessFile(fn, "r");
fd = NativeIO.getSysFileDescriptor(randomAccessFile.getFD());
@@ -302,20 +297,6 @@ class JournalChannel implements Closeable {
fc.close();
}
- public void startSyncRange(long offset, long bytes) throws IOException {
- NativeIO.syncFileRangeIfPossible(fd, offset, bytes, SYNC_FILE_RANGE_WRITE);
- }
-
- public boolean syncRangeIfPossible(long offset, long bytes) throws IOException {
- if (NativeIO.syncFileRangeIfPossible(fd, offset, bytes,
- SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE | SYNC_FILE_RANGE_WAIT_AFTER)) {
- removeFromPageCacheIfPossible(offset + bytes);
- return false;
- } else {
- return false;
- }
- }
-
public void forceWrite(boolean forceMetadata) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Journal ForceWrite");
@@ -356,15 +337,4 @@ class JournalChannel implements Closeable {
removeFromPageCacheIfPossible(newForceWritePosition);
}
- public void syncRangeOrForceWrite(long offset, long bytes) throws IOException {
- long startTimeNanos = MathUtils.nowInNano();
- if (!syncRangeIfPossible(offset, bytes)) {
- forceWriteImpl(false);
- }
- // collect stats
- journalForceWriteCounter.inc();
- journalForceWriteStats.registerSuccessfulEvent(
- MathUtils.elapsedMicroSec(startTimeNanos),
- TimeUnit.MICROSECONDS);
- }
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74113b5c/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/NativeIO.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/NativeIO.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/NativeIO.java
index 9eb3a68..80e0ee2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/NativeIO.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/NativeIO.java
@@ -29,25 +29,10 @@ public final class NativeIO {
private static final int POSIX_FADV_DONTNEED = 4; /* fadvise.h */
- /**
- * Wait upon writeout of all pages in the range before performing the write.
- */
- public static final int SYNC_FILE_RANGE_WAIT_BEFORE = 1;
- /**
- * Initiate writeout of all those dirty pages in the range which are not presently
- * under writeback.
- */
- public static final int SYNC_FILE_RANGE_WRITE = 2;
- /**
- * Wait upon writeout of all pages in the range after performing the write.
- */
- public static final int SYNC_FILE_RANGE_WAIT_AFTER = 4;
-
private static final int FALLOC_FL_KEEP_SIZE = 1;
private static boolean initialized = false;
private static boolean fadvisePossible = true;
- private static boolean syncFileRangePossible = true;
private static boolean sysFallocatePossible = true;
private static boolean posixFallocatePossible = true;
@@ -68,8 +53,6 @@ public final class NativeIO {
public static native int posix_fallocate(int fd, long offset, long len);
// fallocate
public static native int fallocate(int fd, int mode, long offset, long len);
- // sync_file_range(2)
- public static native int sync_file_range(int fd, long offset, long len, int flags);
private NativeIO() {}
@@ -165,32 +148,6 @@ public final class NativeIO {
return posixFallocatePossible;
}
- public static boolean syncFileRangeIfPossible(int fd, long offset, long nbytes, int flags) {
- if (!initialized || !syncFileRangePossible || fd < 0) {
- return false;
- }
- try {
- int rc = sync_file_range(fd, offset, nbytes, flags);
- if (rc != 0) {
- LOG.error("Failed on syncing file descriptor {}, offset {}, bytes {}, rc {} : {}",
- new Object[] { fd, offset, nbytes, rc, Errno.strerror() });
- return false;
- }
- } catch (UnsupportedOperationException uoe) {
- LOG.warn("sync_file_range isn't supported : ", uoe);
- syncFileRangePossible = false;
- } catch (UnsatisfiedLinkError nle) {
- LOG.warn("Unsatisfied Link error: sync_file_range failed on file descriptor {}, offset {}, bytes {} : ",
- new Object[] { fd, offset, nbytes, nle });
- syncFileRangePossible = false;
- } catch (Exception e) {
- LOG.error("Unknown exception: sync_file_range failed on file descriptor {}, offset {}, bytes {} : ",
- new Object[] { fd, offset, nbytes, e });
- return false;
- }
- return syncFileRangePossible;
- }
-
/**
* Remove pages from the file system page cache when they wont
* be accessed again
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/74113b5c/bookkeeper-server/src/main/native/src/org/apache/bookkeeper/util/NativeIO.c
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/native/src/org/apache/bookkeeper/util/NativeIO.c b/bookkeeper-server/src/main/native/src/org/apache/bookkeeper/util/NativeIO.c
index b93bde4..9ba2854 100644
--- a/bookkeeper-server/src/main/native/src/org/apache/bookkeeper/util/NativeIO.c
+++ b/bookkeeper-server/src/main/native/src/org/apache/bookkeeper/util/NativeIO.c
@@ -25,47 +25,6 @@
#include <asm-x86_64/unistd.h>
#include "config.h"
-#if defined(HAVE_SYNC_FILE_RANGE)
-# define my_sync_file_range sync_file_range
-#elif defined(__NR_sync_file_range)
-// RHEL 5 kernels have sync_file_range support, but the glibc
-// included does not have the library function. We can
-// still call it directly, and if it's not supported by the
-// kernel, we'd get ENOSYS. See RedHat Bugzilla #518581
-static int manual_sync_file_range (int fd, __off64_t from, __off64_t to, unsigned int flags)
-{
-#ifdef __x86_64__
- return syscall( __NR_sync_file_range, fd, from, to, flags);
-#else
- return syscall (__NR_sync_file_range, fd,
- __LONG_LONG_PAIR ((long) (from >> 32), (long) from),
- __LONG_LONG_PAIR ((long) (to >> 32), (long) to),
- flags);
-#endif
-}
-#define my_sync_file_range manual_sync_file_range
-#endif
-
-/**
- * public static native void sync_file_range(
- * int fd, long offset, long len, int flags);
- *
- * The "00024" in the function name is an artifact of how JNI encodes
- * special characters. U+0024 is '$'.
- */
-JNIEXPORT jint JNICALL
-Java_org_apache_bookkeeper_util_NativeIO_sync_1file_1range(
- JNIEnv *env, jclass clazz,
- jint fd, jlong offset, jlong len, jint flags)
-{
-#ifndef my_sync_file_range
- errno = ENOSYS;
- return -1;
-#else
- return my_sync_file_range(fd, (off_t)offset, (off_t)len, flags);
-#endif
-}
-
#if defined(HAVE_FALLOCATE)
# define my_fallocate fallocate
#elif defined(__NR_fallocate)
[7/7] bookkeeper git commit: Added missing license header
Posted by si...@apache.org.
Added missing license header
Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/95028b9d
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/95028b9d
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/95028b9d
Branch: refs/heads/sijie/bookkeeper_fallocate
Commit: 95028b9d85f6e0dbc932f44b440798ea4fb49d02
Parents: 49081ae
Author: Sijie Guo <si...@apache.org>
Authored: Thu Apr 20 10:33:04 2017 -0700
Committer: Sijie Guo <si...@apache.org>
Committed: Thu Apr 20 10:33:04 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/bookkeeper/util/Errno.java | 17 +++++++++++++++++
1 file changed, 17 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/95028b9d/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/Errno.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/Errno.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/Errno.java
index e5d8ae8..86d1e28 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/Errno.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/Errno.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.bookkeeper.util;
import com.sun.jna.Library;
[6/7] bookkeeper git commit: Address the compilation issue on mac
Posted by si...@apache.org.
Address the compilation issue on mac
Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/49081aeb
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/49081aeb
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/49081aeb
Branch: refs/heads/sijie/bookkeeper_fallocate
Commit: 49081aebea8437bfbcc18c85aea7c99080427068
Parents: 74113b5
Author: Sijie Guo <si...@apache.org>
Authored: Thu Apr 20 10:31:11 2017 -0700
Committer: Sijie Guo <si...@apache.org>
Committed: Thu Apr 20 10:31:11 2017 -0700
----------------------------------------------------------------------
.../main/native/src/org/apache/bookkeeper/util/NativeIO.c | 10 +++++++++-
1 file changed, 9 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/49081aeb/bookkeeper-server/src/main/native/src/org/apache/bookkeeper/util/NativeIO.c
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/native/src/org/apache/bookkeeper/util/NativeIO.c b/bookkeeper-server/src/main/native/src/org/apache/bookkeeper/util/NativeIO.c
index 9ba2854..789f10f 100644
--- a/bookkeeper-server/src/main/native/src/org/apache/bookkeeper/util/NativeIO.c
+++ b/bookkeeper-server/src/main/native/src/org/apache/bookkeeper/util/NativeIO.c
@@ -22,9 +22,17 @@
#include <fcntl.h>
#include <sys/syscall.h>
#include <sys/types.h>
-#include <asm-x86_64/unistd.h>
+#if !defined(_WIN32) && (defined(__unix__) || defined(__unix) || (defined(__APPLE__) && defined(__MACH__)))
+ /* UNIX-style OS. ------------------------------------------- */
+ #include <unistd.h>
+ #if (defined(__unit__) || defined(__unix)) && defined(__x86_64__)
+ #include <asm-x86_64/unistd.h>
+ #endif
+#endif
#include "config.h"
+
+
#if defined(HAVE_FALLOCATE)
# define my_fallocate fallocate
#elif defined(__NR_fallocate)
[4/7] bookkeeper git commit: Merge branch 'master' into
sijie/bookkeeper_fallocate
Posted by si...@apache.org.
Merge branch 'master' into sijie/bookkeeper_fallocate
Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/42922828
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/42922828
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/42922828
Branch: refs/heads/sijie/bookkeeper_fallocate
Commit: 42922828e48e9541bb2ad7fdc6d98966ead1c7c3
Parents: 6077ef8 de59bd2
Author: Sijie Guo <si...@apache.org>
Authored: Tue Apr 18 11:02:30 2017 -0700
Committer: Sijie Guo <si...@apache.org>
Committed: Tue Apr 18 11:02:30 2017 -0700
----------------------------------------------------------------------
bookkeeper-benchmark/pom.xml | 6 +
bookkeeper-server/conf/bk_server.conf | 14 +
bookkeeper-server/pom.xml | 13 +-
.../apache/bookkeeper/auth/AuthCallbacks.java | 28 +
.../auth/AuthProviderFactoryFactory.java | 52 +-
.../org/apache/bookkeeper/auth/AuthToken.java | 44 +
.../bookkeeper/auth/BookKeeperPrincipal.java | 72 +
.../bookkeeper/auth/BookieAuthProvider.java | 26 +-
.../bookkeeper/auth/ClientAuthProvider.java | 29 +-
.../bookie/BookKeeperServerStats.java | 3 +
.../org/apache/bookkeeper/bookie/Bookie.java | 447 +-
.../apache/bookkeeper/bookie/BookieBean.java | 7 +-
.../bookkeeper/bookie/BookieConnectionPeer.java | 30 +
.../apache/bookkeeper/bookie/BookieShell.java | 573 +-
.../bookkeeper/bookie/CheckpointSourceList.java | 112 +
.../org/apache/bookkeeper/bookie/Cookie.java | 106 +-
.../apache/bookkeeper/bookie/EntryLogger.java | 33 +-
.../apache/bookkeeper/bookie/EntryMemTable.java | 33 +-
.../org/apache/bookkeeper/bookie/FileInfo.java | 34 +
.../bookkeeper/bookie/FileSystemUpgrade.java | 3 +-
.../bookie/GarbageCollectorThread.java | 138 +-
.../bookkeeper/bookie/HandleFactoryImpl.java | 55 +-
.../bookkeeper/bookie/IndexPersistenceMgr.java | 28 +
.../bookie/InterleavedLedgerStorage.java | 21 +
.../org/apache/bookkeeper/bookie/Journal.java | 13 +-
.../apache/bookkeeper/bookie/LedgerCache.java | 3 +
.../bookkeeper/bookie/LedgerCacheImpl.java | 9 +
.../bookkeeper/bookie/LedgerDescriptor.java | 5 +
.../bookkeeper/bookie/LedgerDescriptorImpl.java | 9 +
.../bookkeeper/bookie/LedgerDirsManager.java | 72 +
.../apache/bookkeeper/bookie/LedgerStorage.java | 16 +
.../LocalBookieEnsemblePlacementPolicy.java | 12 +-
.../bookie/ScanAndCompareGarbageCollector.java | 6 +-
.../bookkeeper/bookie/SortedLedgerStorage.java | 2 +-
.../apache/bookkeeper/client/AsyncCallback.java | 14 +
.../apache/bookkeeper/client/BKException.java | 2 +-
.../apache/bookkeeper/client/BookKeeper.java | 174 +-
.../bookkeeper/client/BookKeeperAdmin.java | 22 +-
.../client/BookKeeperClientStats.java | 8 +
.../bookkeeper/client/BookieInfoReader.java | 261 +
.../apache/bookkeeper/client/BookieWatcher.java | 11 +-
.../bookkeeper/client/ClientConnectionPeer.java | 30 +
.../client/DefaultEnsemblePlacementPolicy.java | 112 +-
.../apache/bookkeeper/client/DigestManager.java | 55 +
.../client/EnsemblePlacementPolicy.java | 10 +
.../client/ExplicitLacFlushPolicy.java | 152 +
.../bookkeeper/client/LedgerCreateOp.java | 26 +-
.../apache/bookkeeper/client/LedgerHandle.java | 142 +-
.../bookkeeper/client/PendingReadLacOp.java | 145 +
.../bookkeeper/client/PendingWriteLacOp.java | 114 +
.../RackawareEnsemblePlacementPolicy.java | 15 +-
.../RackawareEnsemblePlacementPolicyImpl.java | 177 +-
.../bookkeeper/client/ReadOnlyLedgerHandle.java | 12 +-
.../RegionAwareEnsemblePlacementPolicy.java | 7 +-
.../bookkeeper/client/SynchCallbackUtils.java | 14 +-
.../client/WeightedRandomSelection.java | 156 +
.../bookkeeper/conf/AbstractConfiguration.java | 75 +
.../bookkeeper/conf/ClientConfiguration.java | 182 +-
.../bookkeeper/conf/ServerConfiguration.java | 106 +-
.../meta/AbstractZkLedgerManager.java | 8 +-
.../meta/FlatLedgerManagerFactory.java | 5 +-
.../meta/HierarchicalLedgerManager.java | 4 +-
.../meta/HierarchicalLedgerManagerFactory.java | 5 +-
.../apache/bookkeeper/meta/LedgerLayout.java | 6 +-
.../bookkeeper/meta/LedgerManagerFactory.java | 6 +-
.../meta/LongHierarchicalLedgerManager.java | 334 +
.../LongHierarchicalLedgerManagerFactory.java | 29 +
.../bookkeeper/meta/MSLedgerManagerFactory.java | 6 +-
.../bookkeeper/meta/ZkLedgerIdGenerator.java | 9 +-
.../meta/ZkLedgerUnderreplicationManager.java | 28 +-
.../apache/bookkeeper/proto/AuthHandler.java | 147 +-
.../apache/bookkeeper/proto/BookieClient.java | 111 +-
.../bookkeeper/proto/BookieNettyServer.java | 83 +-
.../apache/bookkeeper/proto/BookieProtocol.java | 3 +
.../proto/BookieRequestProcessor.java | 65 +
.../apache/bookkeeper/proto/BookieServer.java | 83 +-
.../proto/BookkeeperInternalCallbacks.java | 13 +
.../bookkeeper/proto/BookkeeperProtocol.java | 8003 ++++++++++++++----
.../apache/bookkeeper/proto/ChannelManager.java | 14 +-
.../apache/bookkeeper/proto/ConnectionPeer.java | 63 +
.../proto/GetBookieInfoProcessorV3.java | 90 +
.../bookkeeper/proto/LocalBookiesRegistry.java | 4 +-
.../proto/NioServerSocketChannelManager.java | 14 +-
.../proto/PerChannelBookieClient.java | 841 +-
.../bookkeeper/proto/ReadLacProcessorV3.java | 108 +
.../bookkeeper/proto/VMLocalChannelManager.java | 14 +-
.../bookkeeper/proto/WriteLacProcessorV3.java | 113 +
.../apache/bookkeeper/replication/Auditor.java | 5 +-
.../bookkeeper/replication/AuditorElector.java | 10 +-
.../replication/ReplicationWorker.java | 110 +-
.../bookkeeper/util/HexDumpEntryFormatter.java | 53 +
.../apache/bookkeeper/util/LocalBookKeeper.java | 1 +
.../bookkeeper/util/OrderedSafeExecutor.java | 117 +-
.../org/apache/bookkeeper/util/StringUtils.java | 49 +
.../org/apache/bookkeeper/util/ZkUtils.java | 11 +
.../util/collections/ConcurrentLongHashMap.java | 494 ++
.../util/collections/ConcurrentLongHashSet.java | 421 +
.../collections/ConcurrentLongLongHashMap.java | 723 ++
.../ConcurrentLongLongPairHashMap.java | 550 ++
.../util/collections/ConcurrentOpenHashMap.java | 493 ++
.../util/collections/ConcurrentOpenHashSet.java | 416 +
.../bookkeeper/zookeeper/ZooKeeperClient.java | 235 +-
.../src/main/proto/BookkeeperProtocol.proto | 63 +-
.../src/main/resources/findbugsExclude.xml | 9 +
.../org/apache/bookkeeper/auth/TestAuth.java | 381 +-
.../bookkeeper/bookie/BookieAccessor.java | 6 +-
.../bookkeeper/bookie/CompactionTest.java | 13 +-
.../apache/bookkeeper/bookie/CookieTest.java | 145 +-
.../bookkeeper/bookie/CreateNewLogTest.java | 32 +-
.../bookie/EnableZkSecurityBasicTest.java | 126 +
.../bookkeeper/bookie/LedgerCacheTest.java | 89 +
.../bookkeeper/bookie/LedgerStorageTest.java | 53 +
.../bookie/TestGcOverreplicatedLedger.java | 3 +-
.../bookie/TestLedgerDirsManager.java | 30 +
.../bookkeeper/bookie/TestSyncThread.java | 13 +
.../bookkeeper/bookie/UpdateCookieCmdTest.java | 8 +-
.../bookkeeper/client/BookKeeperCloseTest.java | 103 +-
...perDiskSpaceWeightedLedgerPlacementTest.java | 452 +
.../bookkeeper/client/BookKeeperTest.java | 124 +
.../bookkeeper/client/BookieRecoveryTest.java | 1 +
.../client/BookieWriteLedgerTest.java | 103 +-
.../client/TestGetBookieInfoTimeout.java | 141 +
.../TestRackawareEnsemblePlacementPolicy.java | 248 +
...awareEnsemblePlacementPolicyUsingScript.java | 291 +
.../client/TestWatchEnsembleChange.java | 4 +-
.../client/TestWeightedRandomSelection.java | 280 +
.../NoSystemPropertiesConfigurationTest.java | 14 +-
.../conf/SystemPropertiesConfigurationTest.java | 14 +-
.../apache/bookkeeper/meta/GcLedgersTest.java | 126 +-
.../bookkeeper/meta/LedgerLayoutTest.java | 5 +-
.../bookkeeper/meta/LedgerManagerTestCase.java | 20 +-
.../bookkeeper/meta/TestLedgerManager.java | 7 +-
.../meta/TestZkLedgerIdGenerator.java | 3 +-
.../proto/TestBackwardCompatCMS42.java | 45 +-
.../bookkeeper/proto/TestDataFormats.java | 149 -
.../proto/TestPerChannelBookieClient.java | 2 +-
.../replication/AuditorLedgerCheckerTest.java | 3 +
.../replication/AuthAutoRecoveryTest.java | 111 +
.../bookkeeper/test/BookieClientTest.java | 66 +-
.../test/LocalBookiesRegistryTest.java | 59 +
.../MultiLedgerManagerMultiDigestTestCase.java | 1 +
.../test/MultiLedgerManagerTestCase.java | 3 +-
.../apache/bookkeeper/util/TestDiskChecker.java | 33 +-
.../collections/ConcurrentLongHashMapTest.java | 435 +
.../collections/ConcurrentLongHashSetTest.java | 275 +
.../ConcurrentLongLongHashMapTest.java | 473 ++
.../ConcurrentLongLongPairHashMapTest.java | 343 +
.../collections/ConcurrentOpenHashMapTest.java | 488 ++
.../collections/ConcurrentOpenHashSetTest.java | 318 +
.../bookkeeper/zookeeper/TestRetryPolicy.java | 54 +
.../zookeeper/TestZooKeeperClient.java | 29 +-
.../src/test/proto/TestDataFormats.proto | 34 -
.../src/test/resources/networkmappingscript.sh | 35 +
.../codahale-metrics-provider/pom.xml | 16 +-
.../stats/CodahaleMetricsProvider.java | 4 +-
dev/bk-merge-pr.py | 33 +-
pom.xml | 2 +-
157 files changed, 21276 insertions(+), 2615 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/42922828/bookkeeper-server/pom.xml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/42922828/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/42922828/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
----------------------------------------------------------------------
diff --cc bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 942a93f,1483e36..e3531f6
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@@ -819,9 -787,10 +823,10 @@@ class Journal extends BookieCriticalThr
*/
@Override
public void run() {
+ LOG.info("Starting journal on {}", journalDirectory);
LinkedList<QueueEntry> toFlush = new LinkedList<QueueEntry>();
ByteBuffer lenBuff = ByteBuffer.allocate(4);
- ByteBuffer paddingBuff = ByteBuffer.allocate(2 * conf.getJournalAlignmentSize());
+ ByteBuffer paddingBuff = ByteBuffer.allocate(2 * journalAlignmentSize);
ZeroBuffer.put(paddingBuff);
JournalChannel logFile = null;
forceWriteThread.start();