You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sa...@apache.org on 2019/06/05 13:41:40 UTC
[hadoop] branch trunk updated: HDFS-14356. Implement HDFS cache on
SCM with native PMDK libs. Contributed by Feilong He.
This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new d1aad44 HDFS-14356. Implement HDFS cache on SCM with native PMDK libs. Contributed by Feilong He.
d1aad44 is described below
commit d1aad444907e1fc5314e8e64529e57c51ed7561c
Author: Sammi Chen <sa...@apache.org>
AuthorDate: Wed Jun 5 21:33:00 2019 +0800
HDFS-14356. Implement HDFS cache on SCM with native PMDK libs. Contributed by Feilong He.
---
BUILDING.txt | 28 +++
dev-support/bin/dist-copynativelibs | 8 +
hadoop-common-project/hadoop-common/pom.xml | 2 +
.../hadoop-common/src/CMakeLists.txt | 21 ++
.../hadoop-common/src/config.h.cmake | 1 +
.../org/apache/hadoop/io/nativeio/NativeIO.java | 135 ++++++++++-
.../src/org/apache/hadoop/io/nativeio/NativeIO.c | 252 +++++++++++++++++++++
.../src/org/apache/hadoop/io/nativeio/pmdk_load.c | 106 +++++++++
.../src/org/apache/hadoop/io/nativeio/pmdk_load.h | 95 ++++++++
.../apache/hadoop/io/nativeio/TestNativeIO.java | 153 +++++++++++++
.../datanode/fsdataset/impl/FsDatasetCache.java | 22 ++
.../datanode/fsdataset/impl/FsDatasetImpl.java | 8 +
.../datanode/fsdataset/impl/FsDatasetUtil.java | 22 ++
.../datanode/fsdataset/impl/MappableBlock.java | 6 +
.../fsdataset/impl/MappableBlockLoader.java | 11 +-
.../fsdataset/impl/MappableBlockLoaderFactory.java | 4 +
.../fsdataset/impl/MemoryMappableBlockLoader.java | 8 +-
.../datanode/fsdataset/impl/MemoryMappedBlock.java | 5 +
...der.java => NativePmemMappableBlockLoader.java} | 166 +++++++-------
...MappedBlock.java => NativePmemMappedBlock.java} | 49 ++--
.../fsdataset/impl/PmemMappableBlockLoader.java | 10 +-
.../datanode/fsdataset/impl/PmemMappedBlock.java | 5 +
22 files changed, 1009 insertions(+), 108 deletions(-)
diff --git a/BUILDING.txt b/BUILDING.txt
index cc9ac17..8c57a1d 100644
--- a/BUILDING.txt
+++ b/BUILDING.txt
@@ -78,6 +78,8 @@ Optional packages:
$ sudo apt-get install fuse libfuse-dev
* ZStandard compression
$ sudo apt-get install zstd
+* PMDK library for storage class memory(SCM) as HDFS cache backend
+ Please refer to http://pmem.io/ and https://github.com/pmem/pmdk
----------------------------------------------------------------------------------
Maven main modules:
@@ -262,6 +264,32 @@ Maven build goals:
invoke, run 'mvn dependency-check:aggregate'. Note that this plugin
requires maven 3.1.1 or greater.
+ PMDK library build options:
+
+ The Persistent Memory Development Kit (PMDK), formerly known as NVML, is a growing
+ collection of libraries which have been developed for various use cases, tuned,
+ validated to production quality, and thoroughly documented. These libraries are built
+ on the Direct Access (DAX) feature available in both Linux and Windows, which allows
+ applications directly load/store access to persistent memory by memory-mapping files
+ on a persistent memory aware file system.
+
+ It is currently an optional component, meaning that Hadoop can be built without
+ this dependency. Please Note the library is used via dynamic module. For getting
+ more details please refer to the official sites:
+ http://pmem.io/ and https://github.com/pmem/pmdk.
+
+ * -Drequire.pmdk is used to build the project with PMDK libraries forcibly. With this
+ option provided, the build will fail if libpmem library is not found. If this option
+ is not given, the build will generate a version of Hadoop with libhadoop.so.
+ And storage class memory(SCM) backed HDFS cache is still supported without PMDK involved.
+ Because PMDK can bring better caching write/read performance, it is recommended to build
+ the project with this option if user plans to use SCM backed HDFS cache.
+ * -Dpmdk.lib is used to specify a nonstandard location for PMDK libraries if they are not
+ under /usr/lib or /usr/lib64.
+ * -Dbundle.pmdk is used to copy the specified libpmem libraries into the distribution tar
+ package. This option requires that -Dpmdk.lib is specified. With -Dbundle.pmdk provided,
+ the build will fail if -Dpmdk.lib is not specified.
+
----------------------------------------------------------------------------------
Building components separately
diff --git a/dev-support/bin/dist-copynativelibs b/dev-support/bin/dist-copynativelibs
index 67d2edf..4a783f0 100755
--- a/dev-support/bin/dist-copynativelibs
+++ b/dev-support/bin/dist-copynativelibs
@@ -96,6 +96,12 @@ for i in "$@"; do
--isalbundle=*)
ISALBUNDLE=${i#*=}
;;
+ --pmdklib=*)
+ PMDKLIB=${i#*=}
+ ;;
+ --pmdkbundle=*)
+ PMDKBUNDLE=${i#*=}
+ ;;
--opensslbinbundle=*)
OPENSSLBINBUNDLE=${i#*=}
;;
@@ -153,6 +159,8 @@ if [[ -d "${LIB_DIR}" ]]; then
bundle_native_lib "${OPENSSLLIBBUNDLE}" "openssl.lib" "crypto" "${OPENSSLLIB}"
bundle_native_lib "${ISALBUNDLE}" "isal.lib" "isa" "${ISALLIB}"
+
+ bundle_native_lib "${PMDKBUNDLE}" "pmdk.lib" "pmdk" "${PMDKLIB}"
fi
# Windows
diff --git a/hadoop-common-project/hadoop-common/pom.xml b/hadoop-common-project/hadoop-common/pom.xml
index 64e4d04..5b60053 100644
--- a/hadoop-common-project/hadoop-common/pom.xml
+++ b/hadoop-common-project/hadoop-common/pom.xml
@@ -682,6 +682,8 @@
<REQUIRE_ISAL>${require.isal} </REQUIRE_ISAL>
<CUSTOM_ISAL_PREFIX>${isal.prefix} </CUSTOM_ISAL_PREFIX>
<CUSTOM_ISAL_LIB>${isal.lib} </CUSTOM_ISAL_LIB>
+ <REQUIRE_PMDK>${require.pmdk}</REQUIRE_PMDK>
+ <CUSTOM_PMDK_LIB>${pmdk.lib}</CUSTOM_PMDK_LIB>
<REQUIRE_OPENSSL>${require.openssl} </REQUIRE_OPENSSL>
<CUSTOM_OPENSSL_PREFIX>${openssl.prefix} </CUSTOM_OPENSSL_PREFIX>
<CUSTOM_OPENSSL_LIB>${openssl.lib} </CUSTOM_OPENSSL_LIB>
diff --git a/hadoop-common-project/hadoop-common/src/CMakeLists.txt b/hadoop-common-project/hadoop-common/src/CMakeLists.txt
index b9287c0..771c685 100644
--- a/hadoop-common-project/hadoop-common/src/CMakeLists.txt
+++ b/hadoop-common-project/hadoop-common/src/CMakeLists.txt
@@ -121,6 +121,7 @@ else ()
ENDIF(REQUIRE_ZSTD)
endif ()
+#Require ISA-L
set(STORED_CMAKE_FIND_LIBRARY_SUFFIXES ${CMAKE_FIND_LIBRARY_SUFFIXES})
hadoop_set_find_shared_library_version("2")
find_library(ISAL_LIBRARY
@@ -159,6 +160,25 @@ else (ISAL_LIBRARY)
ENDIF(REQUIRE_ISAL)
endif (ISAL_LIBRARY)
+# Build with PMDK library if -Drequire.pmdk option is specified.
+if(REQUIRE_PMDK)
+ set(STORED_CMAKE_FIND_LIBRARY_SUFFIXES ${CMAKE_FIND_LIBRARY_SUFFIXES})
+ hadoop_set_find_shared_library_version("1")
+ find_library(PMDK_LIBRARY
+ NAMES pmem
+ PATHS ${CUSTOM_PMDK_LIB} /usr/lib /usr/lib64)
+ set(CMAKE_FIND_LIBRARY_SUFFIXES ${STORED_CMAKE_FIND_LIBRARY_SUFFIXES})
+
+ if(PMDK_LIBRARY)
+ GET_FILENAME_COMPONENT(HADOOP_PMDK_LIBRARY ${PMDK_LIBRARY} NAME)
+ set(PMDK_SOURCE_FILES ${SRC}/io/nativeio/pmdk_load.c)
+ else(PMDK_LIBRARY)
+ MESSAGE(FATAL_ERROR "The required PMDK library is NOT found. PMDK_LIBRARY=${PMDK_LIBRARY}")
+ endif(PMDK_LIBRARY)
+else(REQUIRE_PMDK)
+ MESSAGE(STATUS "Build without PMDK support.")
+endif(REQUIRE_PMDK)
+
# Build hardware CRC32 acceleration, if supported on the platform.
if(CMAKE_SYSTEM_PROCESSOR MATCHES "^i.86$" OR CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" OR CMAKE_SYSTEM_PROCESSOR STREQUAL "amd64")
set(BULK_CRC_ARCH_SOURCE_FIlE "${SRC}/util/bulk_crc32_x86.c")
@@ -256,6 +276,7 @@ hadoop_add_dual_library(hadoop
${SRC}/io/compress/zlib/ZlibDecompressor.c
${BZIP2_SOURCE_FILES}
${SRC}/io/nativeio/NativeIO.c
+ ${PMDK_SOURCE_FILES}
${SRC}/io/nativeio/errno_enum.c
${SRC}/io/nativeio/file_descriptor.c
${SRC}/io/nativeio/SharedFileDescriptorFactory.c
diff --git a/hadoop-common-project/hadoop-common/src/config.h.cmake b/hadoop-common-project/hadoop-common/src/config.h.cmake
index 40aa467..7e23a5d 100644
--- a/hadoop-common-project/hadoop-common/src/config.h.cmake
+++ b/hadoop-common-project/hadoop-common/src/config.h.cmake
@@ -24,6 +24,7 @@
#cmakedefine HADOOP_ZSTD_LIBRARY "@HADOOP_ZSTD_LIBRARY@"
#cmakedefine HADOOP_OPENSSL_LIBRARY "@HADOOP_OPENSSL_LIBRARY@"
#cmakedefine HADOOP_ISAL_LIBRARY "@HADOOP_ISAL_LIBRARY@"
+#cmakedefine HADOOP_PMDK_LIBRARY "@HADOOP_PMDK_LIBRARY@"
#cmakedefine HAVE_SYNC_FILE_RANGE
#cmakedefine HAVE_POSIX_FADVISE
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
index 4e0cd8f..1d0eab7 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
@@ -100,6 +100,48 @@ public class NativeIO {
write. */
public static int SYNC_FILE_RANGE_WAIT_AFTER = 4;
+ /**
+ * Keeps the support state of PMDK.
+ */
+ public enum SupportState {
+ UNSUPPORTED(-1),
+ PMDK_LIB_NOT_FOUND(1),
+ SUPPORTED(0);
+
+ private byte stateCode;
+ SupportState(int stateCode) {
+ this.stateCode = (byte) stateCode;
+ }
+
+ public int getStateCode() {
+ return stateCode;
+ }
+
+ public String getMessage() {
+ String msg;
+ switch (stateCode) {
+ case -1:
+ msg = "The native code is built without PMDK support.";
+ break;
+ case 1:
+ msg = "The native code is built with PMDK support, but PMDK libs " +
+ "are NOT found in execution environment or failed to be loaded.";
+ break;
+ case 0:
+ msg = "The native code is built with PMDK support, and PMDK libs " +
+ "are loaded successfully.";
+ break;
+ default:
+ msg = "The state code: " + stateCode + " is unrecognized!";
+ }
+ return msg;
+ }
+ }
+
+ // Denotes the state of supporting PMDK. The value is set by JNI.
+ private static SupportState pmdkSupportState =
+ SupportState.PMDK_LIB_NOT_FOUND;
+
private static final Logger LOG = LoggerFactory.getLogger(NativeIO.class);
// Set to true via JNI if possible
@@ -124,6 +166,93 @@ public class NativeIO {
POSIX.cacheManipulator = cacheManipulator;
}
+ // This method is invoked by JNI.
+ public static void setPmdkSupportState(int stateCode) {
+ for (SupportState state : SupportState.values()) {
+ if (state.getStateCode() == stateCode) {
+ pmdkSupportState = state;
+ return;
+ }
+ }
+ LOG.error("The state code: " + stateCode + " is unrecognized!");
+ }
+
+ public static boolean isPmdkAvailable() {
+ LOG.info(pmdkSupportState.getMessage());
+ return pmdkSupportState == SupportState.SUPPORTED;
+ }
+
+ /**
+ * Denote memory region for a file mapped.
+ */
+ public static class PmemMappedRegion {
+ private long address;
+ private long length;
+ private boolean isPmem;
+
+ public PmemMappedRegion(long address, long length, boolean isPmem) {
+ this.address = address;
+ this.length = length;
+ this.isPmem = isPmem;
+ }
+
+ public boolean isPmem() {
+ return this.isPmem;
+ }
+
+ public long getAddress() {
+ return this.address;
+ }
+
+ public long getLength() {
+ return this.length;
+ }
+ }
+
+ /**
+ * JNI wrapper of persist memory operations.
+ */
+ public static class Pmem {
+ // check whether the address is a Pmem address or DIMM address
+ public static boolean isPmem(long address, long length) {
+ return NativeIO.POSIX.isPmemCheck(address, length);
+ }
+
+ // create a pmem file and memory map it
+ public static PmemMappedRegion mapBlock(String path, long length) {
+ return NativeIO.POSIX.pmemCreateMapFile(path, length);
+ }
+
+ // unmap a pmem file
+ public static boolean unmapBlock(long address, long length) {
+ return NativeIO.POSIX.pmemUnMap(address, length);
+ }
+
+ // copy data from disk file(src) to pmem file(dest), without flush
+ public static void memCopy(byte[] src, long dest, boolean isPmem,
+ long length) {
+ NativeIO.POSIX.pmemCopy(src, dest, isPmem, length);
+ }
+
+ // flush the memory content to persistent storage
+ public static void memSync(PmemMappedRegion region) {
+ if (region.isPmem()) {
+ NativeIO.POSIX.pmemDrain();
+ } else {
+ NativeIO.POSIX.pmemSync(region.getAddress(), region.getLength());
+ }
+ }
+ }
+
+ private static native boolean isPmemCheck(long address, long length);
+ private static native PmemMappedRegion pmemCreateMapFile(String path,
+ long length);
+ private static native boolean pmemUnMap(long address, long length);
+ private static native void pmemCopy(byte[] src, long dest, boolean isPmem,
+ long length);
+ private static native void pmemDrain();
+ private static native void pmemSync(long address, long length);
+
/**
* Used to manipulate the operating system cache.
*/
@@ -143,8 +272,8 @@ public class NativeIO {
}
public void posixFadviseIfPossible(String identifier,
- FileDescriptor fd, long offset, long len, int flags)
- throws NativeIOException {
+ FileDescriptor fd, long offset, long len, int flags)
+ throws NativeIOException {
NativeIO.POSIX.posixFadviseIfPossible(identifier, fd, offset,
len, flags);
}
@@ -748,7 +877,7 @@ public class NativeIO {
* user account name, of the format DOMAIN\UserName. This method
* will remove the domain part of the full logon name.
*
- * @param Fthe full principal name containing the domain
+ * @param name the full principal name containing the domain
* @return name with domain removed
*/
private static String stripDomain(String name) {
diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c
index 2274d57..3a0641b 100644
--- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c
+++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c
@@ -36,6 +36,10 @@
#include <sys/resource.h>
#include <sys/stat.h>
#include <sys/syscall.h>
+#ifdef HADOOP_PMDK_LIBRARY
+#include <libpmem.h>
+#include "pmdk_load.h"
+#endif
#if !(defined(__FreeBSD__) || defined(__MACH__))
#include <sys/sendfile.h>
#endif
@@ -60,6 +64,7 @@
#define NATIVE_IO_POSIX_CLASS "org/apache/hadoop/io/nativeio/NativeIO$POSIX"
#define NATIVE_IO_STAT_CLASS "org/apache/hadoop/io/nativeio/NativeIO$POSIX$Stat"
+#define NATIVE_IO_POSIX_PMEMREGION_CLASS "org/apache/hadoop/io/nativeio/NativeIO$POSIX$PmemMappedRegion"
#define SET_INT_OR_RETURN(E, C, F) \
{ \
@@ -81,6 +86,12 @@ static jmethodID nioe_ctor;
// Please see HADOOP-7156 for details.
jobject pw_lock_object;
+#ifdef HADOOP_PMDK_LIBRARY
+// the NativeIO$POSIX$PmemMappedRegion inner class and its constructor
+static jclass pmem_region_clazz = NULL;
+static jmethodID pmem_region_ctor = NULL;
+#endif
+
/*
* Throw a java.IO.IOException, generating the message from errno.
* NB. this is also used form windows_secure_container_executor.c
@@ -269,6 +280,63 @@ static void nioe_deinit(JNIEnv *env) {
nioe_ctor = NULL;
}
+#ifdef HADOOP_PMDK_LIBRARY
+static int loadPmdkLib(JNIEnv *env) {
+ char errMsg[1024];
+ jclass clazz = (*env)->FindClass(env, NATIVE_IO_POSIX_CLASS);
+ if (clazz == NULL) {
+ return 0; // exception has been raised
+ }
+ load_pmdk_lib(errMsg, sizeof(errMsg));
+ jmethodID mid = (*env)->GetStaticMethodID(env, clazz, "setPmdkSupportState", "(I)V");
+ if (mid == 0) {
+ return 0;
+ }
+ if (strlen(errMsg) > 0) {
+ (*env)->CallStaticVoidMethod(env, clazz, mid, 1);
+ return 0;
+ }
+ (*env)->CallStaticVoidMethod(env, clazz, mid, 0);
+ return 1;
+}
+
+static void pmem_region_init(JNIEnv *env, jclass nativeio_class) {
+
+ jclass clazz = NULL;
+ // Init Stat
+ clazz = (*env)->FindClass(env, NATIVE_IO_POSIX_PMEMREGION_CLASS);
+ if (!clazz) {
+ THROW(env, "java/io/IOException", "Failed to get PmemMappedRegion class");
+ return; // exception has been raised
+ }
+
+ // Init PmemMappedRegion class
+ pmem_region_clazz = (*env)->NewGlobalRef(env, clazz);
+ if (!pmem_region_clazz) {
+ THROW(env, "java/io/IOException", "Failed to new global reference of PmemMappedRegion class");
+ return; // exception has been raised
+ }
+
+ pmem_region_ctor = (*env)->GetMethodID(env, pmem_region_clazz, "<init>", "(JJZ)V");
+ if (!pmem_region_ctor) {
+ THROW(env, "java/io/IOException", "Failed to get PmemMappedRegion constructor");
+ return; // exception has been raised
+ }
+}
+
+static void pmem_region_deinit(JNIEnv *env) {
+ if (pmem_region_ctor != NULL) {
+ (*env)->DeleteGlobalRef(env, pmem_region_ctor);
+ pmem_region_ctor = NULL;
+ }
+
+ if (pmem_region_clazz != NULL) {
+ (*env)->DeleteGlobalRef(env, pmem_region_clazz);
+ pmem_region_clazz = NULL;
+ }
+ }
+#endif
+
/*
* private static native void initNative();
*
@@ -292,6 +360,11 @@ Java_org_apache_hadoop_io_nativeio_NativeIO_initNative(
#ifdef UNIX
errno_enum_init(env);
PASS_EXCEPTIONS_GOTO(env, error);
+#ifdef HADOOP_PMDK_LIBRARY
+ if (loadPmdkLib(env)) {
+ pmem_region_init(env, clazz);
+ }
+#endif
#endif
return;
error:
@@ -299,6 +372,9 @@ error:
// class wasn't initted yet
#ifdef UNIX
stat_deinit(env);
+#ifdef HADOOP_PMDK_LIBRARY
+ pmem_region_deinit(env);
+#endif
#endif
nioe_deinit(env);
fd_deinit(env);
@@ -1383,3 +1459,179 @@ cleanup:
/**
* vim: sw=2: ts=2: et:
*/
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/*
+ * Class: org_apache_hadoop_io_nativeio_NativeIO_POSIX
+ * Method: isPmemCheck
+ * Signature: (JJ)Z
+ */
+JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_isPmemCheck(
+JNIEnv *env, jclass thisClass, jlong address, jlong length) {
+ #if (defined UNIX) && (defined HADOOP_PMDK_LIBRARY)
+ jint is_pmem = pmdkLoader->pmem_is_pmem(address, length);
+ return (is_pmem) ? JNI_TRUE : JNI_FALSE;
+ #else
+ THROW(env, "java/lang/UnsupportedOperationException",
+ "The function isPmemCheck is not supported.");
+ return JNI_FALSE;
+ #endif
+ }
+
+/*
+ * Class: org_apache_hadoop_io_nativeio_NativeIO_POSIX
+ * Method: pmemCreateMapFile
+ * Signature: (Ljava/lang/String;J)Lorg/apache/hadoop/io/nativeio/NativeIO/POSIX/PmemMappedRegion;
+ */
+JNIEXPORT jobject JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_pmemCreateMapFile(
+JNIEnv *env, jclass thisClass, jstring filePath, jlong fileLength) {
+ #if (defined UNIX) && (defined HADOOP_PMDK_LIBRARY)
+ /* create a pmem file and memory map it */
+ const char * path = NULL;
+ void * pmemaddr = NULL;
+ size_t mapped_len = 0;
+ int is_pmem = 1;
+ char msg[1000];
+
+ path = (*env)->GetStringUTFChars(env, filePath, NULL);
+ if (!path) {
+ THROW(env, "java/lang/IllegalArgumentException", "File Path cannot be null");
+ return NULL;
+ }
+
+ if (fileLength <= 0) {
+ (*env)->ReleaseStringUTFChars(env, filePath, path);
+ THROW(env, "java/lang/IllegalArgumentException", "File length should be positive");
+ return NULL;
+ }
+
+ pmemaddr = pmdkLoader->pmem_map_file(path, fileLength, PMEM_FILE_CREATE|PMEM_FILE_EXCL,
+ 0666, &mapped_len, &is_pmem);
+
+ if (!pmemaddr) {
+ snprintf(msg, sizeof(msg), "Failed to create pmem file. file: %s, length: %x, error msg: %s", path, fileLength, pmem_errormsg());
+ THROW(env, "java/io/IOException", msg);
+ (*env)->ReleaseStringUTFChars(env, filePath, path);
+ return NULL;
+ }
+
+ if (fileLength != mapped_len) {
+ snprintf(msg, sizeof(msg), "Mapped length doesn't match the request length. file :%s, request length:%x, returned length:%x, error msg:%s", path, fileLength, mapped_len, pmem_errormsg());
+ THROW(env, "java/io/IOException", msg);
+ (*env)->ReleaseStringUTFChars(env, filePath, path);
+ return NULL;
+ }
+
+ (*env)->ReleaseStringUTFChars(env, filePath, path);
+
+ if ((!pmem_region_clazz) || (!pmem_region_ctor)) {
+ THROW(env, "java/io/IOException", "PmemMappedRegion class or constructor is NULL");
+ return NULL;
+ }
+
+ jobject ret = (*env)->NewObject(env, pmem_region_clazz, pmem_region_ctor, pmemaddr, mapped_len, (jboolean)is_pmem);
+ return ret;
+
+ #else
+ THROW(env, "java/lang/UnsupportedOperationException",
+ "The function pmemCreateMapFile is not supported.");
+ return NULL;
+ #endif
+ }
+
+/*
+ * Class: org_apache_hadoop_io_nativeio_NativeIO_POSIX
+ * Method: pmemUnMap
+ * Signature: (JJ)V
+ */
+JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_pmemUnMap(
+JNIEnv *env, jclass thisClass, jlong address, jlong length) {
+ #if (defined UNIX) && (defined HADOOP_PMDK_LIBRARY)
+ int succeed = 0;
+ char msg[1000];
+ succeed = pmdkLoader->pmem_unmap(address, length);
+ // succeed = -1 failure; succeed = 0 success
+ if (succeed != 0) {
+ snprintf(msg, sizeof(msg), "Failed to unmap region. address: %x, length: %x, error msg: %s", address, length, pmem_errormsg());
+ THROW(env, "java/io/IOException", msg);
+ return JNI_FALSE;
+ } else {
+ return JNI_TRUE;
+ }
+ #else
+ THROW(env, "java/lang/UnsupportedOperationException",
+ "The function pmemUnMap is not supported.");
+ return JNI_FALSE;
+ #endif
+ }
+
+/*
+ * Class: org_apache_hadoop_io_nativeio_NativeIO_POSIX
+ * Method: pmemCopy
+ * Signature: ([BJJ)V
+ */
+JNIEXPORT void JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_pmemCopy(
+JNIEnv *env, jclass thisClass, jbyteArray buf, jlong address, jboolean is_pmem, jlong length) {
+ #if (defined UNIX) && (defined HADOOP_PMDK_LIBRARY)
+ char msg[1000];
+ jbyte* srcBuf = (*env)->GetByteArrayElements(env, buf, 0);
+ snprintf(msg, sizeof(msg), "Pmem copy content. dest: %x, length: %x, src: %x ", address, length, srcBuf);
+ if (is_pmem) {
+ pmdkLoader->pmem_memcpy_nodrain(address, srcBuf, length);
+ } else {
+ memcpy(address, srcBuf, length);
+ }
+ (*env)->ReleaseByteArrayElements(env, buf, srcBuf, 0);
+ return;
+ #else
+ THROW(env, "java/lang/UnsupportedOperationException",
+ "The function pmemCopy is not supported.");
+ #endif
+ }
+
+/*
+ * Class: org_apache_hadoop_io_nativeio_NativeIO
+ * Method: pmemDrain
+ * Signature: ()V
+ */
+JNIEXPORT void JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_pmemDrain(
+JNIEnv *env, jclass thisClass) {
+ #if (defined UNIX) && (defined HADOOP_PMDK_LIBRARY)
+ pmdkLoader->pmem_drain();
+ #else
+ THROW(env, "java/lang/UnsupportedOperationException",
+ "The function pmemDrain is not supported.");
+ #endif
+ }
+
+ /*
+ * Class: org_apache_hadoop_io_nativeio_NativeIO_POSIX
+ * Method: pmemSync
+ * Signature: (JJ)V
+ */
+JNIEXPORT void JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_pmemSync
+ (JNIEnv * env, jclass thisClass, jlong address, jlong length) {
+
+ #if (defined UNIX) && (defined HADOOP_PMDK_LIBRARY)
+ int succeed = 0;
+ char msg[1000];
+ succeed = pmdkLoader->pmem_msync(address, length);
+ // succeed = -1 failure
+ if (succeed = -1) {
+ snprintf(msg, sizeof(msg), "Failed to msync region. address: %x, length: %x, error msg: %s", address, length, pmem_errormsg());
+ THROW(env, "java/io/IOException", msg);
+ return;
+ }
+ #else
+ THROW(env, "java/lang/UnsupportedOperationException",
+ "The function pmemSync is not supported.");
+ #endif
+ }
+
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/pmdk_load.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/pmdk_load.c
new file mode 100644
index 0000000..f7d6cfb
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/pmdk_load.c
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "org_apache_hadoop.h"
+#include "pmdk_load.h"
+#include "org_apache_hadoop_io_nativeio_NativeIO.h"
+#include "org_apache_hadoop_io_nativeio_NativeIO_POSIX.h"
+
+#ifdef UNIX
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <dlfcn.h>
+
+#include "config.h"
+#endif
+
+PmdkLibLoader * pmdkLoader;
+
+/**
+ * pmdk_load.c
+ * Utility of loading the libpmem library and the required functions.
+ * Building of this codes won't rely on any libpmem source codes, but running
+ * into this will rely on successfully loading of the dynamic library.
+ *
+ */
+
+static const char* load_functions() {
+#ifdef UNIX
+ PMDK_LOAD_DYNAMIC_SYMBOL((pmdkLoader->pmem_map_file), "pmem_map_file");
+ PMDK_LOAD_DYNAMIC_SYMBOL((pmdkLoader->pmem_unmap), "pmem_unmap");
+ PMDK_LOAD_DYNAMIC_SYMBOL((pmdkLoader->pmem_is_pmem), "pmem_is_pmem");
+ PMDK_LOAD_DYNAMIC_SYMBOL((pmdkLoader->pmem_drain), "pmem_drain");
+ PMDK_LOAD_DYNAMIC_SYMBOL((pmdkLoader->pmem_memcpy_nodrain), "pmem_memcpy_nodrain");
+ PMDK_LOAD_DYNAMIC_SYMBOL((pmdkLoader->pmem_msync), "pmem_msync");
+#endif
+ return NULL;
+}
+
+void load_pmdk_lib(char* err, size_t err_len) {
+ const char* errMsg;
+ const char* library = NULL;
+#ifdef UNIX
+ Dl_info dl_info;
+#else
+ LPTSTR filename = NULL;
+#endif
+
+ err[0] = '\0';
+
+ if (pmdkLoader != NULL) {
+ return;
+ }
+ pmdkLoader = calloc(1, sizeof(PmdkLibLoader));
+
+ // Load PMDK library
+ #ifdef UNIX
+ pmdkLoader->libec = dlopen(HADOOP_PMDK_LIBRARY, RTLD_LAZY | RTLD_GLOBAL);
+ if (pmdkLoader->libec == NULL) {
+ snprintf(err, err_len, "Failed to load %s (%s)",
+ HADOOP_PMDK_LIBRARY, dlerror());
+ return;
+ }
+ // Clear any existing error
+ dlerror();
+ #endif
+ errMsg = load_functions(pmdkLoader->libec);
+ if (errMsg != NULL) {
+ snprintf(err, err_len, "Loading functions from PMDK failed: %s", errMsg);
+ }
+
+#ifdef UNIX
+ if(dladdr(pmdkLoader->pmem_map_file, &dl_info)) {
+ library = dl_info.dli_fname;
+ }
+#else
+ if (GetModuleFileName(pmdkLoader->libec, filename, 256) > 0) {
+ library = filename;
+ }
+#endif
+
+ if (library == NULL) {
+ library = HADOOP_PMDK_LIBRARY;
+ }
+
+ pmdkLoader->libname = strdup(library);
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/pmdk_load.h b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/pmdk_load.h
new file mode 100644
index 0000000..c93a076
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/pmdk_load.h
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "org_apache_hadoop.h"
+
+#ifdef UNIX
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <dlfcn.h>
+#endif
+
+#ifndef _PMDK_LOAD_H_
+#define _PMDK_LOAD_H_
+
+
+#ifdef UNIX
+// For libpmem.h
+typedef void * (*__d_pmem_map_file)(const char *path, size_t len, int flags, mode_t mode,
+ size_t *mapped_lenp, int *is_pmemp);
+typedef int (* __d_pmem_unmap)(void *addr, size_t len);
+typedef int (*__d_pmem_is_pmem)(const void *addr, size_t len);
+typedef void (*__d_pmem_drain)(void);
+typedef void * (*__d_pmem_memcpy_nodrain)(void *pmemdest, const void *src, size_t len);
+typedef int (* __d_pmem_msync)(const void *addr, size_t len);
+
+#endif
+
+typedef struct __PmdkLibLoader {
+ // The loaded library handle
+ void* libec;
+ char* libname;
+ __d_pmem_map_file pmem_map_file;
+ __d_pmem_unmap pmem_unmap;
+ __d_pmem_is_pmem pmem_is_pmem;
+ __d_pmem_drain pmem_drain;
+ __d_pmem_memcpy_nodrain pmem_memcpy_nodrain;
+ __d_pmem_msync pmem_msync;
+} PmdkLibLoader;
+
+extern PmdkLibLoader * pmdkLoader;
+
+/**
+ * A helper function to dlsym a 'symbol' from a given library-handle.
+ */
+
+#ifdef UNIX
+
+static __attribute__ ((unused))
+void *myDlsym(void *handle, const char *symbol) {
+ void *func_ptr = dlsym(handle, symbol);
+ return func_ptr;
+}
+
+/* A helper macro to dlsym the requisite dynamic symbol in NON-JNI env. */
+#define PMDK_LOAD_DYNAMIC_SYMBOL(func_ptr, symbol) \
+ if ((func_ptr = myDlsym(pmdkLoader->libec, symbol)) == NULL) { \
+ return "Failed to load symbol" symbol; \
+ }
+
+#endif
+
+/**
+ * Return 0 if not support, 1 otherwise.
+ */
+int build_support_pmdk();
+
+/**
+ * Initialize and load PMDK library, returning error message if any.
+ *
+ * @param err The err message buffer.
+ * @param err_len The length of the message buffer.
+ */
+void load_pmdk_lib(char* err, size_t err_len);
+
+#endif //_PMDK_LOAD_H_
\ No newline at end of file
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java
index 6b3c232..a14928c 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java
@@ -25,6 +25,8 @@ import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.RandomAccessFile;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;
@@ -782,4 +784,155 @@ public class TestNativeIO {
assertTrue("Native POSIX_FADV_NOREUSE const not set",
POSIX_FADV_NOREUSE >= 0);
}
+
+
+ @Test (timeout=10000)
+ public void testPmemCheckParameters() {
+ assumeNotWindows("Native PMDK not supported on Windows");
+ // Skip testing while the build or environment does not support PMDK
+ assumeTrue(NativeIO.POSIX.isPmdkAvailable());
+
+ // Please make sure /mnt/pmem0 is a persistent memory device with total
+ // volume size 'volumeSize'
+ String filePath = "/$:";
+ long length = 0;
+ long volumnSize = 16 * 1024 * 1024 * 1024L;
+
+ // Incorrect file length
+ try {
+ NativeIO.POSIX.Pmem.mapBlock(filePath, length);
+ fail("Illegal length parameter should be detected");
+ } catch (Exception e) {
+ LOG.info(e.getMessage());
+ }
+
+ // Incorrect file length
+ filePath = "/mnt/pmem0/test_native_io";
+ length = -1L;
+ try {
+ NativeIO.POSIX.Pmem.mapBlock(filePath, length);
+ fail("Illegal length parameter should be detected");
+ }catch (Exception e) {
+ LOG.info(e.getMessage());
+ }
+ }
+
+ @Test (timeout=10000)
+ public void testPmemMapMultipleFiles() {
+ assumeNotWindows("Native PMDK not supported on Windows");
+ // Skip testing while the build or environment does not support PMDK
+ assumeTrue(NativeIO.POSIX.isPmdkAvailable());
+
+ // Please make sure /mnt/pmem0 is a persistent memory device with total
+ // volume size 'volumeSize'
+ String filePath = "/mnt/pmem0/test_native_io";
+ long length = 0;
+ long volumnSize = 16 * 1024 * 1024 * 1024L;
+
+ // Multiple files, each with 128MB size, aggregated size exceeds volume
+ // limit 16GB
+ length = 128 * 1024 * 1024L;
+ long fileNumber = volumnSize / length;
+ LOG.info("File number = " + fileNumber);
+ for (int i = 0; i < fileNumber; i++) {
+ String path = filePath + i;
+ LOG.info("File path = " + path);
+ NativeIO.POSIX.Pmem.mapBlock(path, length);
+ }
+ try {
+ NativeIO.POSIX.Pmem.mapBlock(filePath, length);
+ fail("Request map extra file when persistent memory is all occupied");
+ } catch (Exception e) {
+ LOG.info(e.getMessage());
+ }
+ }
+
+ @Test (timeout=10000)
+ public void testPmemMapBigFile() {
+ assumeNotWindows("Native PMDK not supported on Windows");
+ // Skip testing while the build or environment does not support PMDK
+ assumeTrue(NativeIO.POSIX.isPmdkAvailable());
+
+ // Please make sure /mnt/pmem0 is a persistent memory device with total
+ // volume size 'volumeSize'
+ String filePath = "/mnt/pmem0/test_native_io_big";
+ long length = 0;
+ long volumeSize = 16 * 1024 * 1024 * 1024L;
+
+ // One file length exceeds persistent memory volume 16GB.
+ length = volumeSize + 1024L;
+ try {
+ LOG.info("File length = " + length);
+ NativeIO.POSIX.Pmem.mapBlock(filePath, length);
+ fail("File length exceeds persistent memory total volume size");
+ }catch (Exception e) {
+ LOG.info(e.getMessage());
+ deletePmemMappedFile(filePath);
+ }
+ }
+
+ @Test (timeout=10000)
+ public void testPmemCopy() throws IOException {
+ assumeNotWindows("Native PMDK not supported on Windows");
+ // Skip testing while the build or environment does not support PMDK
+ assumeTrue(NativeIO.POSIX.isPmdkAvailable());
+
+ // Create and map a block file. Please make sure /mnt/pmem0 is a persistent
+ // memory device.
+ String filePath = "/mnt/pmem0/copy";
+ long length = 4096;
+ PmemMappedRegion region = NativeIO.POSIX.Pmem.mapBlock(filePath, length);
+ assertTrue(NativeIO.POSIX.Pmem.isPmem(region.getAddress(), length));
+ assertFalse(NativeIO.POSIX.Pmem.isPmem(region.getAddress(), length + 100));
+ assertFalse(NativeIO.POSIX.Pmem.isPmem(region.getAddress() + 100, length));
+ assertFalse(NativeIO.POSIX.Pmem.isPmem(region.getAddress() - 100, length));
+
+ // Copy content to mapped file
+ byte[] data = generateSequentialBytes(0, (int) length);
+ NativeIO.POSIX.Pmem.memCopy(data, region.getAddress(), region.isPmem(),
+ length);
+
+ // Read content before pmemSync
+ byte[] readBuf1 = new byte[(int)length];
+ IOUtils.readFully(new FileInputStream(filePath), readBuf1, 0, (int)length);
+ assertArrayEquals(data, readBuf1);
+
+ byte[] readBuf2 = new byte[(int)length];
+ // Sync content to persistent memory twice
+ NativeIO.POSIX.Pmem.memSync(region);
+ NativeIO.POSIX.Pmem.memSync(region);
+ // Read content after pmemSync twice
+ IOUtils.readFully(new FileInputStream(filePath), readBuf2, 0, (int)length);
+ assertArrayEquals(data, readBuf2);
+
+ //Read content after unmap twice
+ NativeIO.POSIX.Pmem.unmapBlock(region.getAddress(), length);
+ NativeIO.POSIX.Pmem.unmapBlock(region.getAddress(), length);
+ byte[] readBuf3 = new byte[(int)length];
+ IOUtils.readFully(new FileInputStream(filePath), readBuf3, 0, (int)length);
+ assertArrayEquals(data, readBuf3);
+ }
+
+ private static byte[] generateSequentialBytes(int start, int length) {
+ byte[] result = new byte[length];
+
+ for (int i = 0; i < length; i++) {
+ result[i] = (byte) ((start + i) % 127);
+ }
+ return result;
+ }
+
+ private static void deletePmemMappedFile(String filePath) {
+ try {
+ if (filePath != null) {
+ boolean result = Files.deleteIfExists(Paths.get(filePath));
+ if (!result) {
+ throw new IOException();
+ }
+ }
+ } catch (Throwable e) {
+ LOG.error("Failed to delete the mapped file " + filePath +
+ " from persistent memory", e);
+ }
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
index 4fab214..37e548e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
@@ -215,6 +215,28 @@ public class FsDatasetCache {
}
/**
+ * Get cache address on persistent memory for read operation.
+ * The cache address comes from PMDK lib function when mapping
+ * block to persistent memory.
+ *
+ * @param bpid blockPoolId
+ * @param blockId blockId
+ * @return address
+ */
+ long getCacheAddress(String bpid, long blockId) {
+ if (cacheLoader.isTransientCache() ||
+ !isCached(bpid, blockId)) {
+ return -1;
+ }
+ if (!(cacheLoader.isNativeLoader())) {
+ return -1;
+ }
+ ExtendedBlockId key = new ExtendedBlockId(blockId, bpid);
+ MappableBlock mappableBlock = mappableBlockMap.get(key).mappableBlock;
+ return mappableBlock.getAddress();
+ }
+
+ /**
* @return List of cached blocks suitable for translation into a
* {@link BlockListAsLongs} for a cache report.
*/
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 80738d3..76110d6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -803,6 +803,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
String cachePath = cacheManager.getReplicaCachePath(
b.getBlockPoolId(), b.getBlockId());
if (cachePath != null) {
+ long addr = cacheManager.getCacheAddress(
+ b.getBlockPoolId(), b.getBlockId());
+ if (addr != -1) {
+ LOG.debug("Get InputStream by cache address.");
+ return FsDatasetUtil.getDirectInputStream(
+ addr, info.getBlockDataLength());
+ }
+ LOG.debug("Get InputStream by cache file path.");
return FsDatasetUtil.getInputStreamAndSeek(
new File(cachePath), seekOffset);
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
index 5308b60..fbd02c7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
@@ -25,7 +25,10 @@ import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
import java.net.URI;
+import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.file.Files;
import java.nio.file.Paths;
@@ -42,6 +45,7 @@ import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DataChecksum;
+import org.apache.htrace.shaded.fasterxml.jackson.databind.util.ByteBufferBackedInputStream;
/** Utility methods. */
@InterfaceAudience.Private
@@ -131,6 +135,24 @@ public class FsDatasetUtil {
}
}
+ public static InputStream getDirectInputStream(long addr, long length)
+ throws IOException {
+ try {
+ Class<?> directByteBufferClass =
+ Class.forName("java.nio.DirectByteBuffer");
+ Constructor<?> constructor =
+ directByteBufferClass.getDeclaredConstructor(long.class, int.class);
+ constructor.setAccessible(true);
+ ByteBuffer byteBuffer =
+ (ByteBuffer) constructor.newInstance(addr, (int)length);
+ return new ByteBufferBackedInputStream(byteBuffer);
+ } catch (ClassNotFoundException | NoSuchMethodException |
+ IllegalAccessException | InvocationTargetException |
+ InstantiationException e) {
+ throw new IOException(e);
+ }
+ }
+
/**
* Find the meta-file for the specified block file and then return the
* generation stamp from the name of the meta-file. Generally meta file will
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java
index 0fff327..a00c442 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java
@@ -35,4 +35,10 @@ public interface MappableBlock extends Closeable {
* @return the number of bytes that have been cached.
*/
long getLength();
+
+ /**
+ * Get cache address if applicable.
+ * Return -1 if not applicable.
+ */
+ long getAddress();
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
index 3ec8416..5b9ba3a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
@@ -64,8 +64,7 @@ public abstract class MappableBlockLoader {
* @return The Mappable block.
*/
abstract MappableBlock load(long length, FileInputStream blockIn,
- FileInputStream metaIn, String blockFileName,
- ExtendedBlockId key)
+ FileInputStream metaIn, String blockFileName, ExtendedBlockId key)
throws IOException;
/**
@@ -107,6 +106,11 @@ public abstract class MappableBlockLoader {
abstract boolean isTransientCache();
/**
+ * Check whether this is a native pmem cache loader.
+ */
+ abstract boolean isNativeLoader();
+
+ /**
* Clean up cache, can be used during DataNode shutdown.
*/
void shutdown() {
@@ -117,8 +121,7 @@ public abstract class MappableBlockLoader {
* Verifies the block's checksum. This is an I/O intensive operation.
*/
protected void verifyChecksum(long length, FileInputStream metaIn,
- FileChannel blockChannel, String blockFileName)
- throws IOException {
+ FileChannel blockChannel, String blockFileName) throws IOException {
// Verify the checksum from the block's meta file
// Get the DataChecksum from the meta file header
BlockMetadataHeader header =
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoaderFactory.java
index 43b1b53..6569373 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoaderFactory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoaderFactory.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.datanode.DNConf;
+import org.apache.hadoop.io.nativeio.NativeIO;
/**
* Creates MappableBlockLoader.
@@ -42,6 +43,9 @@ public final class MappableBlockLoaderFactory {
if (conf.getPmemVolumes() == null || conf.getPmemVolumes().length == 0) {
return new MemoryMappableBlockLoader();
}
+ if (NativeIO.isAvailable() && NativeIO.POSIX.isPmdkAvailable()) {
+ return new NativePmemMappableBlockLoader();
+ }
return new PmemMappableBlockLoader();
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
index 52d8d93..dd4188c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
@@ -66,8 +66,7 @@ public class MemoryMappableBlockLoader extends MappableBlockLoader {
*/
@Override
MappableBlock load(long length, FileInputStream blockIn,
- FileInputStream metaIn, String blockFileName,
- ExtendedBlockId key)
+ FileInputStream metaIn, String blockFileName, ExtendedBlockId key)
throws IOException {
MemoryMappedBlock mappableBlock = null;
MappedByteBuffer mmap = null;
@@ -116,4 +115,9 @@ public class MemoryMappableBlockLoader extends MappableBlockLoader {
public boolean isTransientCache() {
return true;
}
+
+ @Override
+ public boolean isNativeLoader() {
+ return false;
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappedBlock.java
index c09ad1a..47dfeae 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappedBlock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappedBlock.java
@@ -45,6 +45,11 @@ public class MemoryMappedBlock implements MappableBlock {
}
@Override
+ public long getAddress() {
+ return -1L;
+ }
+
+ @Override
public void close() {
if (mmap != null) {
NativeIO.POSIX.munmap(mmap);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/NativePmemMappableBlockLoader.java
similarity index 53%
copy from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
copy to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/NativePmemMappableBlockLoader.java
index 3ec8416..09e9454 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/NativePmemMappableBlockLoader.java
@@ -24,7 +24,11 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.io.nativeio.NativeIO.POSIX;
import org.apache.hadoop.util.DataChecksum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
@@ -34,21 +38,26 @@ import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
/**
- * Maps block to DataNode cache region.
+ * Map block to persistent memory with native PMDK libs.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
-public abstract class MappableBlockLoader {
+public class NativePmemMappableBlockLoader extends PmemMappableBlockLoader {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(NativePmemMappableBlockLoader.class);
- /**
- * Initialize a specific MappableBlockLoader.
- */
- abstract void initialize(FsDatasetCache cacheManager) throws IOException;
+ @Override
+ void initialize(FsDatasetCache cacheManager) throws IOException {
+ super.initialize(cacheManager);
+ }
/**
* Load the block.
*
- * Map the block, and then verify its checksum.
+ * Map the block and verify its checksum.
+ *
+ * The block will be mapped to PmemDir/BlockPoolId-BlockId, in which PmemDir
+ * is a persistent memory volume chosen by PmemVolumeManager.
*
* @param length The current length of the block.
* @param blockIn The block input stream. Should be positioned at the
@@ -58,67 +67,62 @@ public abstract class MappableBlockLoader {
* @param blockFileName The block file name, for logging purposes.
* @param key The extended block ID.
*
- * @throws IOException If mapping block to cache region fails or checksum
- * fails.
+ * @throws IOException If mapping block to persistent memory fails or
+ * checksum fails.
*
* @return The Mappable block.
*/
- abstract MappableBlock load(long length, FileInputStream blockIn,
- FileInputStream metaIn, String blockFileName,
- ExtendedBlockId key)
- throws IOException;
-
- /**
- * Try to reserve some given bytes.
- *
- * @param key The ExtendedBlockId for a block.
- *
- * @param bytesCount The number of bytes to add.
- *
- * @return The new number of usedBytes if we succeeded;
- * -1 if we failed.
- */
- abstract long reserve(ExtendedBlockId key, long bytesCount);
-
- /**
- * Release some bytes that we're using.
- *
- * @param key The ExtendedBlockId for a block.
- *
- * @param bytesCount The number of bytes to release.
- *
- * @return The new number of usedBytes.
- */
- abstract long release(ExtendedBlockId key, long bytesCount);
-
- /**
- * Get the approximate amount of cache space used.
- */
- abstract long getCacheUsed();
-
- /**
- * Get the maximum amount of cache bytes.
- */
- abstract long getCacheCapacity();
+ @Override
+ public MappableBlock load(long length, FileInputStream blockIn,
+ FileInputStream metaIn, String blockFileName,
+ ExtendedBlockId key)
+ throws IOException {
+ NativePmemMappedBlock mappableBlock = null;
+ POSIX.PmemMappedRegion region = null;
+ String filePath = null;
- /**
- * Check whether the cache is non-volatile.
- */
- abstract boolean isTransientCache();
+ FileChannel blockChannel = null;
+ try {
+ blockChannel = blockIn.getChannel();
+ if (blockChannel == null) {
+ throw new IOException("Block InputStream has no FileChannel.");
+ }
- /**
- * Clean up cache, can be used during DataNode shutdown.
- */
- void shutdown() {
- // Do nothing.
+ assert NativeIO.isAvailable();
+ filePath = PmemVolumeManager.getInstance().getCachePath(key);
+ region = POSIX.Pmem.mapBlock(filePath, length);
+ if (region == null) {
+ throw new IOException("Failed to map the block " + blockFileName +
+ " to persistent storage.");
+ }
+ verifyChecksumAndMapBlock(region, length, metaIn, blockChannel,
+ blockFileName);
+ mappableBlock = new NativePmemMappedBlock(region.getAddress(),
+ region.getLength(), key);
+ LOG.info("Successfully cached one replica:{} into persistent memory"
+ + ", [cached path={}, address={}, length={}]", key, filePath,
+ region.getAddress(), length);
+ } finally {
+ IOUtils.closeQuietly(blockChannel);
+ if (mappableBlock == null) {
+ if (region != null) {
+ // unmap content from persistent memory
+ POSIX.Pmem.unmapBlock(region.getAddress(),
+ region.getLength());
+ FsDatasetUtil.deleteMappedFile(filePath);
+ }
+ }
+ }
+ return mappableBlock;
}
/**
- * Verifies the block's checksum. This is an I/O intensive operation.
+ * Verifies the block's checksum meanwhile map block to persistent memory.
+ * This is an I/O intensive operation.
*/
- protected void verifyChecksum(long length, FileInputStream metaIn,
- FileChannel blockChannel, String blockFileName)
- throws IOException {
+ private void verifyChecksumAndMapBlock(POSIX.PmemMappedRegion region,
+ long length, FileInputStream metaIn, FileChannel blockChannel,
+ String blockFileName) throws IOException {
// Verify the checksum from the block's meta file
// Get the DataChecksum from the meta file header
BlockMetadataHeader header =
@@ -129,8 +133,8 @@ public abstract class MappableBlockLoader {
try {
metaChannel = metaIn.getChannel();
if (metaChannel == null) {
- throw new IOException(
- "Block InputStream meta file has no FileChannel.");
+ throw new IOException("Cannot get FileChannel" +
+ " from Block InputStream meta file.");
}
DataChecksum checksum = header.getChecksum();
final int bytesPerChecksum = checksum.getBytesPerChecksum();
@@ -140,13 +144,19 @@ public abstract class MappableBlockLoader {
ByteBuffer checksumBuf = ByteBuffer.allocate(numChunks * checksumSize);
// Verify the checksum
int bytesVerified = 0;
+ long mappedAddress = -1L;
+ if (region != null) {
+ mappedAddress = region.getAddress();
+ }
while (bytesVerified < length) {
Preconditions.checkState(bytesVerified % bytesPerChecksum == 0,
- "Unexpected partial chunk before EOF");
+ "Unexpected partial chunk before EOF.");
assert bytesVerified % bytesPerChecksum == 0;
int bytesRead = fillBuffer(blockChannel, blockBuf);
if (bytesRead == -1) {
- throw new IOException("checksum verification failed: premature EOF");
+ throw new IOException(
+ "Checksum verification failed for the block " + blockFileName +
+ ": premature EOF");
}
blockBuf.flip();
// Number of read chunks, including partial chunk at end
@@ -158,32 +168,24 @@ public abstract class MappableBlockLoader {
bytesVerified);
// Success
bytesVerified += bytesRead;
+ // Copy data to persistent file
+ POSIX.Pmem.memCopy(blockBuf.array(), mappedAddress,
+ region.isPmem(), bytesRead);
+ mappedAddress += bytesRead;
+ // Clear buffer
blockBuf.clear();
checksumBuf.clear();
}
+ if (region != null) {
+ POSIX.Pmem.memSync(region);
+ }
} finally {
IOUtils.closeQuietly(metaChannel);
}
}
- /**
- * Reads bytes into a buffer until EOF or the buffer's limit is reached.
- */
- protected int fillBuffer(FileChannel channel, ByteBuffer buf)
- throws IOException {
- int bytesRead = channel.read(buf);
- if (bytesRead < 0) {
- //EOF
- return bytesRead;
- }
- while (buf.remaining() > 0) {
- int n = channel.read(buf);
- if (n < 0) {
- //EOF
- return bytesRead;
- }
- bytesRead += n;
- }
- return bytesRead;
+ @Override
+ public boolean isNativeLoader() {
+ return true;
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/NativePmemMappedBlock.java
similarity index 52%
copy from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java
copy to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/NativePmemMappedBlock.java
index 25c3d40..92012b2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/NativePmemMappedBlock.java
@@ -21,25 +21,29 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.ExtendedBlockId;
+import org.apache.hadoop.io.nativeio.NativeIO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
- * Represents an HDFS block that is mapped to persistent memory by DataNode
- * with mapped byte buffer. PMDK is NOT involved in this implementation.
+ * Represents an HDFS block that is mapped to persistent memory by the DataNode.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
-public class PmemMappedBlock implements MappableBlock {
+public class NativePmemMappedBlock implements MappableBlock {
private static final Logger LOG =
- LoggerFactory.getLogger(PmemMappedBlock.class);
+ LoggerFactory.getLogger(NativePmemMappedBlock.class);
+
+ private long pmemMappedAddress = -1L;
private long length;
private ExtendedBlockId key;
- PmemMappedBlock(long length, ExtendedBlockId key) {
+ NativePmemMappedBlock(long pmemMappedAddress, long length,
+ ExtendedBlockId key) {
assert length > 0;
+ this.pmemMappedAddress = pmemMappedAddress;
this.length = length;
this.key = key;
}
@@ -50,15 +54,32 @@ public class PmemMappedBlock implements MappableBlock {
}
@Override
+ public long getAddress() {
+ return pmemMappedAddress;
+ }
+
+ @Override
public void close() {
- String cacheFilePath =
- PmemVolumeManager.getInstance().getCachePath(key);
- try {
- FsDatasetUtil.deleteMappedFile(cacheFilePath);
- LOG.info("Successfully uncached one replica:{} from persistent memory"
- + ", [cached path={}, length={}]", key, cacheFilePath, length);
- } catch (IOException e) {
- LOG.warn("Failed to delete the mapped File: {}!", cacheFilePath, e);
+ if (pmemMappedAddress != -1L) {
+ String cacheFilePath =
+ PmemVolumeManager.getInstance().getCachePath(key);
+ try {
+ // Current libpmem will report error when pmem_unmap is called with
+ // length not aligned with page size, although the length is returned
+ // by pmem_map_file.
+ boolean success =
+ NativeIO.POSIX.Pmem.unmapBlock(pmemMappedAddress, length);
+ if (!success) {
+ throw new IOException("Failed to unmap the mapped file from " +
+ "pmem address: " + pmemMappedAddress);
+ }
+ pmemMappedAddress = -1L;
+ FsDatasetUtil.deleteMappedFile(cacheFilePath);
+ LOG.info("Successfully uncached one replica:{} from persistent memory"
+ + ", [cached path={}, length={}]", key, cacheFilePath, length);
+ } catch (IOException e) {
+ LOG.warn("IOException occurred for block {}!", key, e);
+ }
}
}
-}
\ No newline at end of file
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java
index 239fff8..70a42c4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java
@@ -43,7 +43,7 @@ public class PmemMappableBlockLoader extends MappableBlockLoader {
@Override
void initialize(FsDatasetCache cacheManager) throws IOException {
- LOG.info("Initializing cache loader: PmemMappableBlockLoader.");
+ LOG.info("Initializing cache loader: " + this.getClass().getName());
DNConf dnConf = cacheManager.getDnConf();
PmemVolumeManager.init(dnConf.getPmemVolumes());
pmemVolumeManager = PmemVolumeManager.getInstance();
@@ -71,8 +71,7 @@ public class PmemMappableBlockLoader extends MappableBlockLoader {
*/
@Override
MappableBlock load(long length, FileInputStream blockIn,
- FileInputStream metaIn, String blockFileName,
- ExtendedBlockId key)
+ FileInputStream metaIn, String blockFileName, ExtendedBlockId key)
throws IOException {
PmemMappedBlock mappableBlock = null;
String cachePath = null;
@@ -133,6 +132,11 @@ public class PmemMappableBlockLoader extends MappableBlockLoader {
}
@Override
+ public boolean isNativeLoader() {
+ return false;
+ }
+
+ @Override
void shutdown() {
LOG.info("Clean up cache on persistent memory during shutdown.");
PmemVolumeManager.getInstance().cleanup();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java
index 25c3d40..a49626a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java
@@ -50,6 +50,11 @@ public class PmemMappedBlock implements MappableBlock {
}
@Override
+ public long getAddress() {
+ return -1L;
+ }
+
+ @Override
public void close() {
String cacheFilePath =
PmemVolumeManager.getInstance().getCachePath(key);
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org