You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sa...@apache.org on 2019/06/05 13:41:40 UTC

[hadoop] branch trunk updated: HDFS-14356. Implement HDFS cache on SCM with native PMDK libs. Contributed by Feilong He.

This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d1aad44  HDFS-14356. Implement HDFS cache on SCM with native PMDK libs. Contributed by Feilong He.
d1aad44 is described below

commit d1aad444907e1fc5314e8e64529e57c51ed7561c
Author: Sammi Chen <sa...@apache.org>
AuthorDate: Wed Jun 5 21:33:00 2019 +0800

    HDFS-14356. Implement HDFS cache on SCM with native PMDK libs. Contributed by Feilong He.
---
 BUILDING.txt                                       |  28 +++
 dev-support/bin/dist-copynativelibs                |   8 +
 hadoop-common-project/hadoop-common/pom.xml        |   2 +
 .../hadoop-common/src/CMakeLists.txt               |  21 ++
 .../hadoop-common/src/config.h.cmake               |   1 +
 .../org/apache/hadoop/io/nativeio/NativeIO.java    | 135 ++++++++++-
 .../src/org/apache/hadoop/io/nativeio/NativeIO.c   | 252 +++++++++++++++++++++
 .../src/org/apache/hadoop/io/nativeio/pmdk_load.c  | 106 +++++++++
 .../src/org/apache/hadoop/io/nativeio/pmdk_load.h  |  95 ++++++++
 .../apache/hadoop/io/nativeio/TestNativeIO.java    | 153 +++++++++++++
 .../datanode/fsdataset/impl/FsDatasetCache.java    |  22 ++
 .../datanode/fsdataset/impl/FsDatasetImpl.java     |   8 +
 .../datanode/fsdataset/impl/FsDatasetUtil.java     |  22 ++
 .../datanode/fsdataset/impl/MappableBlock.java     |   6 +
 .../fsdataset/impl/MappableBlockLoader.java        |  11 +-
 .../fsdataset/impl/MappableBlockLoaderFactory.java |   4 +
 .../fsdataset/impl/MemoryMappableBlockLoader.java  |   8 +-
 .../datanode/fsdataset/impl/MemoryMappedBlock.java |   5 +
 ...der.java => NativePmemMappableBlockLoader.java} | 166 +++++++-------
 ...MappedBlock.java => NativePmemMappedBlock.java} |  49 ++--
 .../fsdataset/impl/PmemMappableBlockLoader.java    |  10 +-
 .../datanode/fsdataset/impl/PmemMappedBlock.java   |   5 +
 22 files changed, 1009 insertions(+), 108 deletions(-)

diff --git a/BUILDING.txt b/BUILDING.txt
index cc9ac17..8c57a1d 100644
--- a/BUILDING.txt
+++ b/BUILDING.txt
@@ -78,6 +78,8 @@ Optional packages:
   $ sudo apt-get install fuse libfuse-dev
 * ZStandard compression
     $ sudo apt-get install zstd
+* PMDK library for storage class memory(SCM) as HDFS cache backend
+  Please refer to http://pmem.io/ and https://github.com/pmem/pmdk
 
 ----------------------------------------------------------------------------------
 Maven main modules:
@@ -262,6 +264,32 @@ Maven build goals:
    invoke, run 'mvn dependency-check:aggregate'. Note that this plugin
    requires maven 3.1.1 or greater.
 
+ PMDK library build options:
+
+   The Persistent Memory Development Kit (PMDK), formerly known as NVML, is a growing
+   collection of libraries which have been developed for various use cases, tuned,
+   validated to production quality, and thoroughly documented. These libraries are built
+   on the Direct Access (DAX) feature available in both Linux and Windows, which allows
+   applications directly load/store access to persistent memory by memory-mapping files
+   on a persistent memory aware file system.
+
+   It is currently an optional component, meaning that Hadoop can be built without
+   this dependency. Please Note the library is used via dynamic module. For getting
+   more details please refer to the official sites:
+   http://pmem.io/ and https://github.com/pmem/pmdk.
+
+  * -Drequire.pmdk is used to build the project with PMDK libraries forcibly. With this
+    option provided, the build will fail if libpmem library is not found. If this option
+    is not given, the build will generate a version of Hadoop with libhadoop.so.
+    And storage class memory(SCM) backed HDFS cache is still supported without PMDK involved.
+    Because PMDK can bring better caching write/read performance, it is recommended to build
+    the project with this option if user plans to use SCM backed HDFS cache.
+  * -Dpmdk.lib is used to specify a nonstandard location for PMDK libraries if they are not
+    under /usr/lib or /usr/lib64.
+  * -Dbundle.pmdk is used to copy the specified libpmem libraries into the distribution tar
+    package. This option requires that -Dpmdk.lib is specified. With -Dbundle.pmdk provided,
+    the build will fail if -Dpmdk.lib is not specified.
+
 ----------------------------------------------------------------------------------
 Building components separately
 
diff --git a/dev-support/bin/dist-copynativelibs b/dev-support/bin/dist-copynativelibs
index 67d2edf..4a783f0 100755
--- a/dev-support/bin/dist-copynativelibs
+++ b/dev-support/bin/dist-copynativelibs
@@ -96,6 +96,12 @@ for i in "$@"; do
     --isalbundle=*)
       ISALBUNDLE=${i#*=}
     ;;
+    --pmdklib=*)
+      PMDKLIB=${i#*=}
+    ;;
+    --pmdkbundle=*)
+      PMDKBUNDLE=${i#*=}
+    ;;
     --opensslbinbundle=*)
       OPENSSLBINBUNDLE=${i#*=}
     ;;
@@ -153,6 +159,8 @@ if [[ -d "${LIB_DIR}" ]]; then
   bundle_native_lib "${OPENSSLLIBBUNDLE}" "openssl.lib" "crypto" "${OPENSSLLIB}"
 
   bundle_native_lib "${ISALBUNDLE}" "isal.lib" "isa" "${ISALLIB}"
+
+  bundle_native_lib "${PMDKBUNDLE}" "pmdk.lib" "pmdk" "${PMDKLIB}"
 fi
 
 # Windows
diff --git a/hadoop-common-project/hadoop-common/pom.xml b/hadoop-common-project/hadoop-common/pom.xml
index 64e4d04..5b60053 100644
--- a/hadoop-common-project/hadoop-common/pom.xml
+++ b/hadoop-common-project/hadoop-common/pom.xml
@@ -682,6 +682,8 @@
                     <REQUIRE_ISAL>${require.isal} </REQUIRE_ISAL>
                     <CUSTOM_ISAL_PREFIX>${isal.prefix} </CUSTOM_ISAL_PREFIX>
                     <CUSTOM_ISAL_LIB>${isal.lib} </CUSTOM_ISAL_LIB>
+                    <REQUIRE_PMDK>${require.pmdk}</REQUIRE_PMDK>
+                    <CUSTOM_PMDK_LIB>${pmdk.lib}</CUSTOM_PMDK_LIB>
                     <REQUIRE_OPENSSL>${require.openssl} </REQUIRE_OPENSSL>
                     <CUSTOM_OPENSSL_PREFIX>${openssl.prefix} </CUSTOM_OPENSSL_PREFIX>
                     <CUSTOM_OPENSSL_LIB>${openssl.lib} </CUSTOM_OPENSSL_LIB>
diff --git a/hadoop-common-project/hadoop-common/src/CMakeLists.txt b/hadoop-common-project/hadoop-common/src/CMakeLists.txt
index b9287c0..771c685 100644
--- a/hadoop-common-project/hadoop-common/src/CMakeLists.txt
+++ b/hadoop-common-project/hadoop-common/src/CMakeLists.txt
@@ -121,6 +121,7 @@ else ()
     ENDIF(REQUIRE_ZSTD)
 endif ()
 
+#Require ISA-L
 set(STORED_CMAKE_FIND_LIBRARY_SUFFIXES ${CMAKE_FIND_LIBRARY_SUFFIXES})
 hadoop_set_find_shared_library_version("2")
 find_library(ISAL_LIBRARY
@@ -159,6 +160,25 @@ else (ISAL_LIBRARY)
     ENDIF(REQUIRE_ISAL)
 endif (ISAL_LIBRARY)
 
+# Build with PMDK library if -Drequire.pmdk option is specified.
+if(REQUIRE_PMDK)
+    set(STORED_CMAKE_FIND_LIBRARY_SUFFIXES ${CMAKE_FIND_LIBRARY_SUFFIXES})
+    hadoop_set_find_shared_library_version("1")
+    find_library(PMDK_LIBRARY
+        NAMES pmem
+        PATHS ${CUSTOM_PMDK_LIB} /usr/lib /usr/lib64)
+    set(CMAKE_FIND_LIBRARY_SUFFIXES ${STORED_CMAKE_FIND_LIBRARY_SUFFIXES})
+
+    if(PMDK_LIBRARY)
+        GET_FILENAME_COMPONENT(HADOOP_PMDK_LIBRARY ${PMDK_LIBRARY} NAME)
+        set(PMDK_SOURCE_FILES ${SRC}/io/nativeio/pmdk_load.c)
+    else(PMDK_LIBRARY)
+        MESSAGE(FATAL_ERROR "The required PMDK library is NOT found. PMDK_LIBRARY=${PMDK_LIBRARY}")
+    endif(PMDK_LIBRARY)
+else(REQUIRE_PMDK)
+    MESSAGE(STATUS "Build without PMDK support.")
+endif(REQUIRE_PMDK)
+
 # Build hardware CRC32 acceleration, if supported on the platform.
 if(CMAKE_SYSTEM_PROCESSOR MATCHES "^i.86$" OR CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" OR CMAKE_SYSTEM_PROCESSOR STREQUAL "amd64")
   set(BULK_CRC_ARCH_SOURCE_FIlE "${SRC}/util/bulk_crc32_x86.c")
@@ -256,6 +276,7 @@ hadoop_add_dual_library(hadoop
     ${SRC}/io/compress/zlib/ZlibDecompressor.c
     ${BZIP2_SOURCE_FILES}
     ${SRC}/io/nativeio/NativeIO.c
+    ${PMDK_SOURCE_FILES}
     ${SRC}/io/nativeio/errno_enum.c
     ${SRC}/io/nativeio/file_descriptor.c
     ${SRC}/io/nativeio/SharedFileDescriptorFactory.c
diff --git a/hadoop-common-project/hadoop-common/src/config.h.cmake b/hadoop-common-project/hadoop-common/src/config.h.cmake
index 40aa467..7e23a5d 100644
--- a/hadoop-common-project/hadoop-common/src/config.h.cmake
+++ b/hadoop-common-project/hadoop-common/src/config.h.cmake
@@ -24,6 +24,7 @@
 #cmakedefine HADOOP_ZSTD_LIBRARY "@HADOOP_ZSTD_LIBRARY@"
 #cmakedefine HADOOP_OPENSSL_LIBRARY "@HADOOP_OPENSSL_LIBRARY@"
 #cmakedefine HADOOP_ISAL_LIBRARY "@HADOOP_ISAL_LIBRARY@"
+#cmakedefine HADOOP_PMDK_LIBRARY "@HADOOP_PMDK_LIBRARY@"
 #cmakedefine HAVE_SYNC_FILE_RANGE
 #cmakedefine HAVE_POSIX_FADVISE
 
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
index 4e0cd8f..1d0eab7 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
@@ -100,6 +100,48 @@ public class NativeIO {
        write.  */
     public static int SYNC_FILE_RANGE_WAIT_AFTER = 4;
 
+    /**
+     * Keeps the support state of PMDK.
+     */
+    public enum SupportState {
+      UNSUPPORTED(-1),
+      PMDK_LIB_NOT_FOUND(1),
+      SUPPORTED(0);
+
+      private byte stateCode;
+      SupportState(int stateCode) {
+        this.stateCode = (byte) stateCode;
+      }
+
+      public int getStateCode() {
+        return stateCode;
+      }
+
+      public String getMessage() {
+        String msg;
+        switch (stateCode) {
+        case -1:
+          msg = "The native code is built without PMDK support.";
+          break;
+        case 1:
+          msg = "The native code is built with PMDK support, but PMDK libs " +
+              "are NOT found in execution environment or failed to be loaded.";
+          break;
+        case 0:
+          msg = "The native code is built with PMDK support, and PMDK libs " +
+              "are loaded successfully.";
+          break;
+        default:
+          msg = "The state code: " + stateCode + " is unrecognized!";
+        }
+        return msg;
+      }
+    }
+
+    // Denotes the state of supporting PMDK. The value is set by JNI.
+    private static SupportState pmdkSupportState =
+        SupportState.PMDK_LIB_NOT_FOUND;
+
     private static final Logger LOG = LoggerFactory.getLogger(NativeIO.class);
 
     // Set to true via JNI if possible
@@ -124,6 +166,93 @@ public class NativeIO {
       POSIX.cacheManipulator = cacheManipulator;
     }
 
+    // This method is invoked by JNI.
+    public static void setPmdkSupportState(int stateCode) {
+      for (SupportState state : SupportState.values()) {
+        if (state.getStateCode() == stateCode) {
+          pmdkSupportState = state;
+          return;
+        }
+      }
+      LOG.error("The state code: " + stateCode + " is unrecognized!");
+    }
+
+    public static boolean isPmdkAvailable() {
+      LOG.info(pmdkSupportState.getMessage());
+      return pmdkSupportState == SupportState.SUPPORTED;
+    }
+
+    /**
+     * Denote memory region for a file mapped.
+     */
+    public static class PmemMappedRegion {
+      private long address;
+      private long length;
+      private boolean isPmem;
+
+      public PmemMappedRegion(long address, long length, boolean isPmem) {
+        this.address = address;
+        this.length = length;
+        this.isPmem = isPmem;
+      }
+
+      public boolean isPmem() {
+        return this.isPmem;
+      }
+
+      public long getAddress() {
+        return this.address;
+      }
+
+      public long getLength() {
+        return this.length;
+      }
+    }
+
+    /**
+     * JNI wrapper of persist memory operations.
+     */
+    public static class Pmem {
+      // check whether the address is a Pmem address or DIMM address
+      public static boolean isPmem(long address, long length) {
+        return NativeIO.POSIX.isPmemCheck(address, length);
+      }
+
+      // create a pmem file and memory map it
+      public static PmemMappedRegion mapBlock(String path, long length) {
+        return NativeIO.POSIX.pmemCreateMapFile(path, length);
+      }
+
+      // unmap a pmem file
+      public static boolean unmapBlock(long address, long length) {
+        return NativeIO.POSIX.pmemUnMap(address, length);
+      }
+
+      // copy data from disk file(src) to pmem file(dest), without flush
+      public static void memCopy(byte[] src, long dest, boolean isPmem,
+          long length) {
+        NativeIO.POSIX.pmemCopy(src, dest, isPmem, length);
+      }
+
+      // flush the memory content to persistent storage
+      public static void memSync(PmemMappedRegion region) {
+        if (region.isPmem()) {
+          NativeIO.POSIX.pmemDrain();
+        } else {
+          NativeIO.POSIX.pmemSync(region.getAddress(), region.getLength());
+        }
+      }
+    }
+
+    private static native boolean isPmemCheck(long address, long length);
+    private static native PmemMappedRegion pmemCreateMapFile(String path,
+        long length);
+    private static native boolean pmemUnMap(long address, long length);
+    private static native void pmemCopy(byte[] src, long dest, boolean isPmem,
+        long length);
+    private static native void pmemDrain();
+    private static native void pmemSync(long address, long length);
+
     /**
      * Used to manipulate the operating system cache.
      */
@@ -143,8 +272,8 @@ public class NativeIO {
       }
 
       public void posixFadviseIfPossible(String identifier,
-        FileDescriptor fd, long offset, long len, int flags)
-            throws NativeIOException {
+          FileDescriptor fd, long offset, long len, int flags)
+          throws NativeIOException {
         NativeIO.POSIX.posixFadviseIfPossible(identifier, fd, offset,
             len, flags);
       }
@@ -748,7 +877,7 @@ public class NativeIO {
    * user account name, of the format DOMAIN\UserName. This method
    * will remove the domain part of the full logon name.
    *
-   * @param Fthe full principal name containing the domain
+   * @param name the full principal name containing the domain
    * @return name with domain removed
    */
   private static String stripDomain(String name) {
diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c
index 2274d57..3a0641b 100644
--- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c
+++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c
@@ -36,6 +36,10 @@
 #include <sys/resource.h>
 #include <sys/stat.h>
 #include <sys/syscall.h>
+#ifdef HADOOP_PMDK_LIBRARY
+#include <libpmem.h>
+#include "pmdk_load.h"
+#endif
 #if !(defined(__FreeBSD__) || defined(__MACH__))
 #include <sys/sendfile.h>
 #endif
@@ -60,6 +64,7 @@
 
 #define NATIVE_IO_POSIX_CLASS "org/apache/hadoop/io/nativeio/NativeIO$POSIX"
 #define NATIVE_IO_STAT_CLASS "org/apache/hadoop/io/nativeio/NativeIO$POSIX$Stat"
+#define NATIVE_IO_POSIX_PMEMREGION_CLASS "org/apache/hadoop/io/nativeio/NativeIO$POSIX$PmemMappedRegion"
 
 #define SET_INT_OR_RETURN(E, C, F) \
   { \
@@ -81,6 +86,12 @@ static jmethodID nioe_ctor;
 // Please see HADOOP-7156 for details.
 jobject pw_lock_object;
 
+#ifdef HADOOP_PMDK_LIBRARY
+// the NativeIO$POSIX$PmemMappedRegion inner class and its constructor
+static jclass pmem_region_clazz = NULL;
+static jmethodID pmem_region_ctor = NULL;
+#endif
+
 /*
  * Throw a java.IO.IOException, generating the message from errno.
  * NB. this is also used form windows_secure_container_executor.c
@@ -269,6 +280,63 @@ static void nioe_deinit(JNIEnv *env) {
   nioe_ctor = NULL;
 }
 
+#ifdef HADOOP_PMDK_LIBRARY
+static int loadPmdkLib(JNIEnv *env) {
+  char errMsg[1024];
+  jclass clazz = (*env)->FindClass(env, NATIVE_IO_POSIX_CLASS);
+  if (clazz == NULL) {
+    return 0; // exception has been raised
+  }
+  load_pmdk_lib(errMsg, sizeof(errMsg));
+  jmethodID mid = (*env)->GetStaticMethodID(env, clazz, "setPmdkSupportState", "(I)V");
+  if (mid == 0) {
+    return 0;
+  }
+  if (strlen(errMsg) > 0) {
+    (*env)->CallStaticVoidMethod(env, clazz, mid, 1);
+    return 0;
+  }
+  (*env)->CallStaticVoidMethod(env, clazz, mid, 0);
+  return 1;
+}
+
+static void pmem_region_init(JNIEnv *env, jclass nativeio_class) {
+
+  jclass clazz = NULL;
+  // Init Stat
+  clazz = (*env)->FindClass(env, NATIVE_IO_POSIX_PMEMREGION_CLASS);
+  if (!clazz) {
+    THROW(env, "java/io/IOException", "Failed to get PmemMappedRegion class");
+    return; // exception has been raised
+  }
+
+  // Init PmemMappedRegion class
+  pmem_region_clazz = (*env)->NewGlobalRef(env, clazz);
+  if (!pmem_region_clazz) {
+    THROW(env, "java/io/IOException", "Failed to new global reference of PmemMappedRegion class");
+    return; // exception has been raised
+  }
+
+  pmem_region_ctor = (*env)->GetMethodID(env, pmem_region_clazz, "<init>", "(JJZ)V");
+  if (!pmem_region_ctor) {
+    THROW(env, "java/io/IOException", "Failed to get PmemMappedRegion constructor");
+    return; // exception has been raised
+  }
+}
+
+static void pmem_region_deinit(JNIEnv *env) {
+  if (pmem_region_ctor != NULL) {
+    (*env)->DeleteGlobalRef(env, pmem_region_ctor);
+    pmem_region_ctor = NULL;
+  }
+
+  if (pmem_region_clazz != NULL) {
+    (*env)->DeleteGlobalRef(env, pmem_region_clazz);
+    pmem_region_clazz = NULL;
+  }
+ }
+#endif
+
 /*
  * private static native void initNative();
  *
@@ -292,6 +360,11 @@ Java_org_apache_hadoop_io_nativeio_NativeIO_initNative(
 #ifdef UNIX
   errno_enum_init(env);
   PASS_EXCEPTIONS_GOTO(env, error);
+#ifdef HADOOP_PMDK_LIBRARY
+  if (loadPmdkLib(env)) {
+    pmem_region_init(env, clazz);
+  }
+#endif
 #endif
   return;
 error:
@@ -299,6 +372,9 @@ error:
   // class wasn't initted yet
 #ifdef UNIX
   stat_deinit(env);
+#ifdef HADOOP_PMDK_LIBRARY
+  pmem_region_deinit(env);
+#endif
 #endif
   nioe_deinit(env);
   fd_deinit(env);
@@ -1383,3 +1459,179 @@ cleanup:
 /**
  * vim: sw=2: ts=2: et:
  */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/*
+ * Class:     org_apache_hadoop_io_nativeio_NativeIO_POSIX
+ * Method:    isPmemCheck
+ * Signature: (JJ)Z
+ */
+JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_isPmemCheck(
+JNIEnv *env, jclass thisClass, jlong address, jlong length) {
+  #if (defined UNIX) && (defined HADOOP_PMDK_LIBRARY)
+    jint is_pmem = pmdkLoader->pmem_is_pmem(address, length);
+    return (is_pmem) ? JNI_TRUE : JNI_FALSE;
+  #else
+    THROW(env, "java/lang/UnsupportedOperationException",
+        "The function isPmemCheck is not supported.");
+    return JNI_FALSE;
+  #endif
+  }
+
+/*
+ * Class:     org_apache_hadoop_io_nativeio_NativeIO_POSIX
+ * Method:    pmemCreateMapFile
+ * Signature: (Ljava/lang/String;J)Lorg/apache/hadoop/io/nativeio/NativeIO/POSIX/PmemMappedRegion;
+ */
+JNIEXPORT jobject JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_pmemCreateMapFile(
+JNIEnv *env, jclass thisClass, jstring filePath, jlong fileLength) {
+  #if (defined UNIX) && (defined HADOOP_PMDK_LIBRARY)
+    /* create a pmem file and memory map it */
+    const char * path = NULL;
+    void * pmemaddr = NULL;
+    size_t mapped_len = 0;
+    int is_pmem = 1;
+    char msg[1000];
+
+    path = (*env)->GetStringUTFChars(env, filePath, NULL);
+    if (!path) {
+      THROW(env, "java/lang/IllegalArgumentException", "File Path cannot be null");
+      return NULL;
+    }
+
+    if (fileLength <= 0) {
+      (*env)->ReleaseStringUTFChars(env, filePath, path);
+      THROW(env, "java/lang/IllegalArgumentException", "File length should be positive");
+      return NULL;
+    }
+
+    pmemaddr = pmdkLoader->pmem_map_file(path, fileLength, PMEM_FILE_CREATE|PMEM_FILE_EXCL,
+        0666, &mapped_len, &is_pmem);
+
+    if (!pmemaddr) {
+      snprintf(msg, sizeof(msg), "Failed to create pmem file. file: %s, length: %x, error msg: %s", path, fileLength, pmem_errormsg());
+      THROW(env, "java/io/IOException", msg);
+      (*env)->ReleaseStringUTFChars(env, filePath, path);
+      return NULL;
+    }
+
+    if (fileLength != mapped_len) {
+      snprintf(msg, sizeof(msg), "Mapped length doesn't match the request length. file :%s, request length:%x, returned length:%x, error msg:%s", path, fileLength, mapped_len, pmem_errormsg());
+      THROW(env, "java/io/IOException", msg);
+      (*env)->ReleaseStringUTFChars(env, filePath, path);
+      return NULL;
+    }
+
+    (*env)->ReleaseStringUTFChars(env, filePath, path);
+
+    if ((!pmem_region_clazz) || (!pmem_region_ctor)) {
+      THROW(env, "java/io/IOException", "PmemMappedRegion class or constructor is NULL");
+      return NULL;
+    }
+
+    jobject ret = (*env)->NewObject(env, pmem_region_clazz, pmem_region_ctor, pmemaddr, mapped_len, (jboolean)is_pmem);
+    return ret;
+
+  #else
+    THROW(env, "java/lang/UnsupportedOperationException",
+        "The function pmemCreateMapFile is not supported.");
+    return NULL;
+  #endif
+  }
+
+/*
+ * Class:     org_apache_hadoop_io_nativeio_NativeIO_POSIX
+ * Method:    pmemUnMap
+ * Signature: (JJ)V
+ */
+JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_pmemUnMap(
+JNIEnv *env, jclass thisClass, jlong address, jlong length) {
+  #if (defined UNIX) && (defined HADOOP_PMDK_LIBRARY)
+    int succeed = 0;
+    char msg[1000];
+    succeed = pmdkLoader->pmem_unmap(address, length);
+    // succeed = -1 failure; succeed = 0 success
+    if (succeed != 0) {
+      snprintf(msg, sizeof(msg), "Failed to unmap region. address: %x, length: %x, error msg: %s", address, length, pmem_errormsg());
+      THROW(env, "java/io/IOException", msg);
+      return JNI_FALSE;
+    } else {
+      return JNI_TRUE;
+    }
+  #else
+    THROW(env, "java/lang/UnsupportedOperationException",
+        "The function pmemUnMap is not supported.");
+    return JNI_FALSE;
+  #endif
+  }
+
+/*
+ * Class:     org_apache_hadoop_io_nativeio_NativeIO_POSIX
+ * Method:    pmemCopy
+ * Signature: ([BJJ)V
+ */
+JNIEXPORT void JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_pmemCopy(
+JNIEnv *env, jclass thisClass, jbyteArray buf, jlong address, jboolean is_pmem, jlong length) {
+  #if (defined UNIX) && (defined HADOOP_PMDK_LIBRARY)
+    char msg[1000];
+    jbyte* srcBuf = (*env)->GetByteArrayElements(env, buf, 0);
+    snprintf(msg, sizeof(msg), "Pmem copy content. dest: %x, length: %x, src: %x ", address, length, srcBuf);
+    if (is_pmem) {
+      pmdkLoader->pmem_memcpy_nodrain(address, srcBuf, length);
+    } else {
+      memcpy(address, srcBuf, length);
+    }
+    (*env)->ReleaseByteArrayElements(env, buf, srcBuf, 0);
+    return;
+  #else
+    THROW(env, "java/lang/UnsupportedOperationException",
+        "The function pmemCopy is not supported.");
+  #endif
+  }
+
+/*
+ * Class:     org_apache_hadoop_io_nativeio_NativeIO
+ * Method:    pmemDrain
+ * Signature: ()V
+ */
+JNIEXPORT void JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_pmemDrain(
+JNIEnv *env, jclass thisClass) {
+  #if (defined UNIX) && (defined HADOOP_PMDK_LIBRARY)
+    pmdkLoader->pmem_drain();
+  #else
+    THROW(env, "java/lang/UnsupportedOperationException",
+        "The function pmemDrain is not supported.");
+  #endif
+  }
+
+  /*
+ * Class:     org_apache_hadoop_io_nativeio_NativeIO_POSIX
+ * Method:    pmemSync
+ * Signature: (JJ)V
+ */
+JNIEXPORT void JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_pmemSync
+  (JNIEnv * env, jclass thisClass, jlong address, jlong length) {
+
+  #if (defined UNIX) && (defined HADOOP_PMDK_LIBRARY)
+    int succeed = 0;
+    char msg[1000];
+    succeed = pmdkLoader->pmem_msync(address, length);
+    // succeed = -1 failure
+    if (succeed = -1) {
+      snprintf(msg, sizeof(msg), "Failed to msync region. address: %x, length: %x, error msg: %s", address, length, pmem_errormsg());
+      THROW(env, "java/io/IOException", msg);
+      return;
+    }
+  #else
+    THROW(env, "java/lang/UnsupportedOperationException",
+        "The function pmemSync is not supported.");
+  #endif
+  }
+
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/pmdk_load.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/pmdk_load.c
new file mode 100644
index 0000000..f7d6cfb
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/pmdk_load.c
@@ -0,0 +1,106 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "org_apache_hadoop.h"
+#include "pmdk_load.h"
+#include "org_apache_hadoop_io_nativeio_NativeIO.h"
+#include "org_apache_hadoop_io_nativeio_NativeIO_POSIX.h"
+
+#ifdef UNIX
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <dlfcn.h>
+
+#include "config.h"
+#endif
+
+PmdkLibLoader * pmdkLoader;
+
+/**
+ *  pmdk_load.c
+ *  Utility of loading the libpmem library and the required functions.
+ *  Building of this codes won't rely on any libpmem source codes, but running
+ *  into this will rely on successfully loading of the dynamic library.
+ *
+ */
+
+static const char* load_functions() {
+#ifdef UNIX
+  PMDK_LOAD_DYNAMIC_SYMBOL((pmdkLoader->pmem_map_file), "pmem_map_file");
+  PMDK_LOAD_DYNAMIC_SYMBOL((pmdkLoader->pmem_unmap), "pmem_unmap");
+  PMDK_LOAD_DYNAMIC_SYMBOL((pmdkLoader->pmem_is_pmem), "pmem_is_pmem");
+  PMDK_LOAD_DYNAMIC_SYMBOL((pmdkLoader->pmem_drain), "pmem_drain");
+  PMDK_LOAD_DYNAMIC_SYMBOL((pmdkLoader->pmem_memcpy_nodrain), "pmem_memcpy_nodrain");
+  PMDK_LOAD_DYNAMIC_SYMBOL((pmdkLoader->pmem_msync), "pmem_msync");
+#endif
+  return NULL;
+}
+
+void load_pmdk_lib(char* err, size_t err_len) {
+  const char* errMsg;
+  const char* library = NULL;
+#ifdef UNIX
+  Dl_info dl_info;
+#else
+  LPTSTR filename = NULL;
+#endif
+
+  err[0] = '\0';
+
+  if (pmdkLoader != NULL) {
+    return;
+  }
+  pmdkLoader = calloc(1, sizeof(PmdkLibLoader));
+
+  // Load PMDK library
+  #ifdef UNIX
+  pmdkLoader->libec = dlopen(HADOOP_PMDK_LIBRARY, RTLD_LAZY | RTLD_GLOBAL);
+  if (pmdkLoader->libec == NULL) {
+    snprintf(err, err_len, "Failed to load %s (%s)",
+        HADOOP_PMDK_LIBRARY, dlerror());
+    return;
+  }
+  // Clear any existing error
+  dlerror();
+  #endif
+  errMsg = load_functions(pmdkLoader->libec);
+  if (errMsg != NULL) {
+    snprintf(err, err_len, "Loading functions from PMDK failed: %s", errMsg);
+  }
+
+#ifdef UNIX
+  if(dladdr(pmdkLoader->pmem_map_file, &dl_info)) {
+    library = dl_info.dli_fname;
+  }
+#else
+  if (GetModuleFileName(pmdkLoader->libec, filename, 256) > 0) {
+    library = filename;
+  }
+#endif
+
+  if (library == NULL) {
+    library = HADOOP_PMDK_LIBRARY;
+  }
+
+  pmdkLoader->libname = strdup(library);
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/pmdk_load.h b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/pmdk_load.h
new file mode 100644
index 0000000..c93a076
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/pmdk_load.h
@@ -0,0 +1,95 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "org_apache_hadoop.h"
+
+#ifdef UNIX
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <dlfcn.h>
+#endif
+
+#ifndef _PMDK_LOAD_H_
+#define _PMDK_LOAD_H_
+
+
+#ifdef UNIX
+// For libpmem.h
+typedef void * (*__d_pmem_map_file)(const char *path, size_t len, int flags, mode_t mode,
+    size_t *mapped_lenp, int *is_pmemp);
+typedef int (* __d_pmem_unmap)(void *addr, size_t len);
+typedef int (*__d_pmem_is_pmem)(const void *addr, size_t len);
+typedef void (*__d_pmem_drain)(void);
+typedef void * (*__d_pmem_memcpy_nodrain)(void *pmemdest, const void *src, size_t len);
+typedef int (* __d_pmem_msync)(const void *addr, size_t len);
+
+#endif
+
+typedef struct __PmdkLibLoader {
+  // The loaded library handle
+  void* libec;
+  char* libname;
+  __d_pmem_map_file pmem_map_file;
+  __d_pmem_unmap pmem_unmap;
+  __d_pmem_is_pmem pmem_is_pmem;
+  __d_pmem_drain pmem_drain;
+  __d_pmem_memcpy_nodrain pmem_memcpy_nodrain;
+  __d_pmem_msync pmem_msync;
+} PmdkLibLoader;
+
+extern PmdkLibLoader * pmdkLoader;
+
+/**
+ * A helper function to dlsym a 'symbol' from a given library-handle.
+ */
+
+#ifdef UNIX
+
+static __attribute__ ((unused))
+void *myDlsym(void *handle, const char *symbol) {
+  void *func_ptr = dlsym(handle, symbol);
+  return func_ptr;
+}
+
+/* A helper macro to dlsym the requisite dynamic symbol in NON-JNI env. */
+#define PMDK_LOAD_DYNAMIC_SYMBOL(func_ptr, symbol) \
+  if ((func_ptr = myDlsym(pmdkLoader->libec, symbol)) == NULL) { \
+    return "Failed to load symbol" symbol; \
+  }
+
+#endif
+
+/**
+ * Return 0 if not support, 1 otherwise.
+ */
+int build_support_pmdk();
+
+/**
+ * Initialize and load PMDK library, returning error message if any.
+ *
+ * @param err     The err message buffer.
+ * @param err_len The length of the message buffer.
+ */
+void load_pmdk_lib(char* err, size_t err_len);
+
+#endif //_PMDK_LOAD_H_
\ No newline at end of file
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java
index 6b3c232..a14928c 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java
@@ -25,6 +25,8 @@ import java.io.FileReader;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.channels.FileChannel.MapMode;
@@ -782,4 +784,155 @@ public class TestNativeIO {
     assertTrue("Native POSIX_FADV_NOREUSE const not set",
       POSIX_FADV_NOREUSE >= 0);
   }
+
+
+  @Test (timeout=10000)
+  public void testPmemCheckParameters() {
+    assumeNotWindows("Native PMDK not supported on Windows");
+    // Skip testing while the build or environment does not support PMDK
+    assumeTrue(NativeIO.POSIX.isPmdkAvailable());
+
+    // Please make sure /mnt/pmem0 is a persistent memory device with total
+    // volume size 'volumeSize'
+    String filePath = "/$:";
+    long length = 0;
+    long volumnSize = 16 * 1024 * 1024 * 1024L;
+
+    // Incorrect file length
+    try {
+      NativeIO.POSIX.Pmem.mapBlock(filePath, length);
+      fail("Illegal length parameter should be detected");
+    } catch (Exception e) {
+      LOG.info(e.getMessage());
+    }
+
+    // Incorrect file length
+    filePath = "/mnt/pmem0/test_native_io";
+    length = -1L;
+    try {
+      NativeIO.POSIX.Pmem.mapBlock(filePath, length);
+      fail("Illegal length parameter should be detected");
+    }catch (Exception e) {
+      LOG.info(e.getMessage());
+    }
+  }
+
+  @Test (timeout=10000)
+  public void testPmemMapMultipleFiles() {
+    assumeNotWindows("Native PMDK not supported on Windows");
+    // Skip testing while the build or environment does not support PMDK
+    assumeTrue(NativeIO.POSIX.isPmdkAvailable());
+
+    // Please make sure /mnt/pmem0 is a persistent memory device with total
+    // volume size 'volumeSize'
+    String filePath = "/mnt/pmem0/test_native_io";
+    long length = 0;
+    long volumnSize = 16 * 1024 * 1024 * 1024L;
+
+    // Multiple files, each with 128MB size, aggregated size exceeds volume
+    // limit 16GB
+    length = 128 * 1024 * 1024L;
+    long fileNumber = volumnSize / length;
+    LOG.info("File number = " + fileNumber);
+    for (int i = 0; i < fileNumber; i++) {
+      String path = filePath + i;
+      LOG.info("File path = " + path);
+      NativeIO.POSIX.Pmem.mapBlock(path, length);
+    }
+    try {
+      NativeIO.POSIX.Pmem.mapBlock(filePath, length);
+      fail("Request map extra file when persistent memory is all occupied");
+    } catch (Exception e) {
+      LOG.info(e.getMessage());
+    }
+  }
+
+  @Test (timeout=10000)
+  public void testPmemMapBigFile() {
+    assumeNotWindows("Native PMDK not supported on Windows");
+    // Skip testing while the build or environment does not support PMDK
+    assumeTrue(NativeIO.POSIX.isPmdkAvailable());
+
+    // Please make sure /mnt/pmem0 is a persistent memory device with total
+    // volume size 'volumeSize'
+    String filePath = "/mnt/pmem0/test_native_io_big";
+    long length = 0;
+    long volumeSize = 16 * 1024 * 1024 * 1024L;
+
+    // One file length exceeds persistent memory volume 16GB.
+    length = volumeSize + 1024L;
+    try {
+      LOG.info("File length = " + length);
+      NativeIO.POSIX.Pmem.mapBlock(filePath, length);
+      fail("File length exceeds persistent memory total volume size");
+    }catch (Exception e) {
+      LOG.info(e.getMessage());
+      deletePmemMappedFile(filePath);
+    }
+  }
+
+  @Test (timeout=10000)
+  public void testPmemCopy() throws IOException {
+    assumeNotWindows("Native PMDK not supported on Windows");
+    // Skip testing while the build or environment does not support PMDK
+    assumeTrue(NativeIO.POSIX.isPmdkAvailable());
+
+    // Create and map a block file. Please make sure /mnt/pmem0 is a persistent
+    // memory device.
+    String filePath = "/mnt/pmem0/copy";
+    long length = 4096;
+    PmemMappedRegion region = NativeIO.POSIX.Pmem.mapBlock(filePath, length);
+    assertTrue(NativeIO.POSIX.Pmem.isPmem(region.getAddress(), length));
+    assertFalse(NativeIO.POSIX.Pmem.isPmem(region.getAddress(), length + 100));
+    assertFalse(NativeIO.POSIX.Pmem.isPmem(region.getAddress() + 100, length));
+    assertFalse(NativeIO.POSIX.Pmem.isPmem(region.getAddress() - 100, length));
+
+    // Copy content to mapped file
+    byte[] data = generateSequentialBytes(0, (int) length);
+    NativeIO.POSIX.Pmem.memCopy(data, region.getAddress(), region.isPmem(),
+        length);
+
+    // Read content before pmemSync
+    byte[] readBuf1 = new byte[(int)length];
+    IOUtils.readFully(new FileInputStream(filePath), readBuf1, 0, (int)length);
+    assertArrayEquals(data, readBuf1);
+
+    byte[] readBuf2 = new byte[(int)length];
+    // Sync content to persistent memory twice
+    NativeIO.POSIX.Pmem.memSync(region);
+    NativeIO.POSIX.Pmem.memSync(region);
+    // Read content after pmemSync twice
+    IOUtils.readFully(new FileInputStream(filePath), readBuf2, 0, (int)length);
+    assertArrayEquals(data, readBuf2);
+
+    //Read content after unmap twice
+    NativeIO.POSIX.Pmem.unmapBlock(region.getAddress(), length);
+    NativeIO.POSIX.Pmem.unmapBlock(region.getAddress(), length);
+    byte[] readBuf3 = new byte[(int)length];
+    IOUtils.readFully(new FileInputStream(filePath), readBuf3, 0, (int)length);
+    assertArrayEquals(data, readBuf3);
+  }
+
+  private static byte[] generateSequentialBytes(int start, int length) {
+    byte[] result = new byte[length];
+
+    for (int i = 0; i < length; i++) {
+      result[i] = (byte) ((start + i) % 127);
+    }
+    return result;
+  }
+
+  private static void deletePmemMappedFile(String filePath) {
+    try {
+      if (filePath != null) {
+        boolean result = Files.deleteIfExists(Paths.get(filePath));
+        if (!result) {
+          throw new IOException();
+        }
+      }
+    } catch (Throwable e) {
+      LOG.error("Failed to delete the mapped file " + filePath +
+          " from persistent memory", e);
+    }
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
index 4fab214..37e548e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
@@ -215,6 +215,28 @@ public class FsDatasetCache {
   }
 
   /**
+   * Get cache address on persistent memory for read operation.
+   * The cache address comes from PMDK lib function when mapping
+   * block to persistent memory.
+   *
+   * @param bpid    blockPoolId
+   * @param blockId blockId
+   * @return address
+   */
+  long getCacheAddress(String bpid, long blockId) {
+    if (cacheLoader.isTransientCache() ||
+        !isCached(bpid, blockId)) {
+      return -1;
+    }
+    if (!(cacheLoader.isNativeLoader())) {
+      return -1;
+    }
+    ExtendedBlockId key = new ExtendedBlockId(blockId, bpid);
+    MappableBlock mappableBlock = mappableBlockMap.get(key).mappableBlock;
+    return mappableBlock.getAddress();
+  }
+
+  /**
    * @return List of cached blocks suitable for translation into a
    * {@link BlockListAsLongs} for a cache report.
    */
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 80738d3..76110d6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -803,6 +803,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     String cachePath = cacheManager.getReplicaCachePath(
         b.getBlockPoolId(), b.getBlockId());
     if (cachePath != null) {
+      long addr = cacheManager.getCacheAddress(
+          b.getBlockPoolId(), b.getBlockId());
+      if (addr != -1) {
+        LOG.debug("Get InputStream by cache address.");
+        return FsDatasetUtil.getDirectInputStream(
+            addr, info.getBlockDataLength());
+      }
+      LOG.debug("Get InputStream by cache file path.");
       return FsDatasetUtil.getInputStreamAndSeek(
           new File(cachePath), seekOffset);
     }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
index 5308b60..fbd02c7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
@@ -25,7 +25,10 @@ import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.RandomAccessFile;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
 import java.net.URI;
+import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
 import java.nio.file.Files;
 import java.nio.file.Paths;
@@ -42,6 +45,7 @@ import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.DataChecksum;
+import org.apache.htrace.shaded.fasterxml.jackson.databind.util.ByteBufferBackedInputStream;
 
 /** Utility methods. */
 @InterfaceAudience.Private
@@ -131,6 +135,24 @@ public class FsDatasetUtil {
     }
   }
 
+  public static InputStream getDirectInputStream(long addr, long length)
+      throws IOException {
+    try {
+      Class<?> directByteBufferClass =
+          Class.forName("java.nio.DirectByteBuffer");
+      Constructor<?> constructor =
+          directByteBufferClass.getDeclaredConstructor(long.class, int.class);
+      constructor.setAccessible(true);
+      ByteBuffer byteBuffer =
+          (ByteBuffer) constructor.newInstance(addr, (int)length);
+      return new ByteBufferBackedInputStream(byteBuffer);
+    } catch (ClassNotFoundException | NoSuchMethodException |
+        IllegalAccessException | InvocationTargetException |
+        InstantiationException e) {
+      throw new IOException(e);
+    }
+  }
+
   /**
    * Find the meta-file for the specified block file and then return the
    * generation stamp from the name of the meta-file. Generally meta file will
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java
index 0fff327..a00c442 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java
@@ -35,4 +35,10 @@ public interface MappableBlock extends Closeable {
    * @return the number of bytes that have been cached.
    */
   long getLength();
+
+  /**
+   * Get cache address if applicable.
+   * Return -1 if not applicable.
+   */
+  long getAddress();
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
index 3ec8416..5b9ba3a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
@@ -64,8 +64,7 @@ public abstract class MappableBlockLoader {
    * @return               The Mappable block.
    */
   abstract MappableBlock load(long length, FileInputStream blockIn,
-                              FileInputStream metaIn, String blockFileName,
-                              ExtendedBlockId key)
+      FileInputStream metaIn, String blockFileName, ExtendedBlockId key)
       throws IOException;
 
   /**
@@ -107,6 +106,11 @@ public abstract class MappableBlockLoader {
   abstract boolean isTransientCache();
 
   /**
+   * Check whether this is a native pmem cache loader.
+   */
+  abstract boolean isNativeLoader();
+
+  /**
    * Clean up cache, can be used during DataNode shutdown.
    */
   void shutdown() {
@@ -117,8 +121,7 @@ public abstract class MappableBlockLoader {
    * Verifies the block's checksum. This is an I/O intensive operation.
    */
   protected void verifyChecksum(long length, FileInputStream metaIn,
-                                FileChannel blockChannel, String blockFileName)
-      throws IOException {
+      FileChannel blockChannel, String blockFileName) throws IOException {
     // Verify the checksum from the block's meta file
     // Get the DataChecksum from the meta file header
     BlockMetadataHeader header =
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoaderFactory.java
index 43b1b53..6569373 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoaderFactory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoaderFactory.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.server.datanode.DNConf;
+import org.apache.hadoop.io.nativeio.NativeIO;
 
 /**
  * Creates MappableBlockLoader.
@@ -42,6 +43,9 @@ public final class MappableBlockLoaderFactory {
     if (conf.getPmemVolumes() == null || conf.getPmemVolumes().length == 0) {
       return new MemoryMappableBlockLoader();
     }
+    if (NativeIO.isAvailable() && NativeIO.POSIX.isPmdkAvailable()) {
+      return new NativePmemMappableBlockLoader();
+    }
     return new PmemMappableBlockLoader();
   }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
index 52d8d93..dd4188c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
@@ -66,8 +66,7 @@ public class MemoryMappableBlockLoader extends MappableBlockLoader {
    */
   @Override
   MappableBlock load(long length, FileInputStream blockIn,
-                            FileInputStream metaIn, String blockFileName,
-                            ExtendedBlockId key)
+      FileInputStream metaIn, String blockFileName, ExtendedBlockId key)
       throws IOException {
     MemoryMappedBlock mappableBlock = null;
     MappedByteBuffer mmap = null;
@@ -116,4 +115,9 @@ public class MemoryMappableBlockLoader extends MappableBlockLoader {
   public boolean isTransientCache() {
     return true;
   }
+
+  @Override
+  public boolean isNativeLoader() {
+    return false;
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappedBlock.java
index c09ad1a..47dfeae 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappedBlock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappedBlock.java
@@ -45,6 +45,11 @@ public class MemoryMappedBlock implements MappableBlock {
   }
 
   @Override
+  public long getAddress() {
+    return -1L;
+  }
+
+  @Override
   public void close() {
     if (mmap != null) {
       NativeIO.POSIX.munmap(mmap);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/NativePmemMappableBlockLoader.java
similarity index 53%
copy from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
copy to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/NativePmemMappableBlockLoader.java
index 3ec8416..09e9454 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/NativePmemMappableBlockLoader.java
@@ -24,7 +24,11 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.ExtendedBlockId;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.io.nativeio.NativeIO.POSIX;
 import org.apache.hadoop.util.DataChecksum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.BufferedInputStream;
 import java.io.DataInputStream;
@@ -34,21 +38,26 @@ import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 
 /**
- * Maps block to DataNode cache region.
+ * Map block to persistent memory with native PMDK libs.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-public abstract class MappableBlockLoader {
+public class NativePmemMappableBlockLoader extends PmemMappableBlockLoader {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(NativePmemMappableBlockLoader.class);
 
-  /**
-   * Initialize a specific MappableBlockLoader.
-   */
-  abstract void initialize(FsDatasetCache cacheManager) throws IOException;
+  @Override
+  void initialize(FsDatasetCache cacheManager) throws IOException {
+    super.initialize(cacheManager);
+  }
 
   /**
    * Load the block.
    *
-   * Map the block, and then verify its checksum.
+   * Map the block and verify its checksum.
+   *
+   * The block will be mapped to PmemDir/BlockPoolId-BlockId, in which PmemDir
+   * is a persistent memory volume chosen by PmemVolumeManager.
    *
    * @param length         The current length of the block.
    * @param blockIn        The block input stream. Should be positioned at the
@@ -58,67 +67,62 @@ public abstract class MappableBlockLoader {
    * @param blockFileName  The block file name, for logging purposes.
    * @param key            The extended block ID.
    *
-   * @throws IOException   If mapping block to cache region fails or checksum
-   *                       fails.
+   * @throws IOException   If mapping block to persistent memory fails or
+   *                       checksum fails.
    *
    * @return               The Mappable block.
    */
-  abstract MappableBlock load(long length, FileInputStream blockIn,
-                              FileInputStream metaIn, String blockFileName,
-                              ExtendedBlockId key)
-      throws IOException;
-
-  /**
-   * Try to reserve some given bytes.
-   *
-   * @param key           The ExtendedBlockId for a block.
-   *
-   * @param bytesCount    The number of bytes to add.
-   *
-   * @return              The new number of usedBytes if we succeeded;
-   *                      -1 if we failed.
-   */
-  abstract long reserve(ExtendedBlockId key, long bytesCount);
-
-  /**
-   * Release some bytes that we're using.
-   *
-   * @param key           The ExtendedBlockId for a block.
-   *
-   * @param bytesCount    The number of bytes to release.
-   *
-   * @return              The new number of usedBytes.
-   */
-  abstract long release(ExtendedBlockId key, long bytesCount);
-
-  /**
-   * Get the approximate amount of cache space used.
-   */
-  abstract long getCacheUsed();
-
-  /**
-   * Get the maximum amount of cache bytes.
-   */
-  abstract long getCacheCapacity();
+  @Override
+  public MappableBlock load(long length, FileInputStream blockIn,
+      FileInputStream metaIn, String blockFileName,
+      ExtendedBlockId key)
+      throws IOException {
+    NativePmemMappedBlock mappableBlock = null;
+    POSIX.PmemMappedRegion region = null;
+    String filePath = null;
 
-  /**
-   * Check whether the cache is non-volatile.
-   */
-  abstract boolean isTransientCache();
+    FileChannel blockChannel = null;
+    try {
+      blockChannel = blockIn.getChannel();
+      if (blockChannel == null) {
+        throw new IOException("Block InputStream has no FileChannel.");
+      }
 
-  /**
-   * Clean up cache, can be used during DataNode shutdown.
-   */
-  void shutdown() {
-    // Do nothing.
+      assert NativeIO.isAvailable();
+      filePath = PmemVolumeManager.getInstance().getCachePath(key);
+      region = POSIX.Pmem.mapBlock(filePath, length);
+      if (region == null) {
+        throw new IOException("Failed to map the block " + blockFileName +
+            " to persistent storage.");
+      }
+      verifyChecksumAndMapBlock(region, length, metaIn, blockChannel,
+          blockFileName);
+      mappableBlock = new NativePmemMappedBlock(region.getAddress(),
+          region.getLength(), key);
+      LOG.info("Successfully cached one replica:{} into persistent memory"
+          + ", [cached path={}, address={}, length={}]", key, filePath,
+          region.getAddress(), length);
+    } finally {
+      IOUtils.closeQuietly(blockChannel);
+      if (mappableBlock == null) {
+        if (region != null) {
+          // unmap content from persistent memory
+          POSIX.Pmem.unmapBlock(region.getAddress(),
+              region.getLength());
+          FsDatasetUtil.deleteMappedFile(filePath);
+        }
+      }
+    }
+    return mappableBlock;
   }
 
   /**
-   * Verifies the block's checksum. This is an I/O intensive operation.
+   * Verifies the block's checksum meanwhile map block to persistent memory.
+   * This is an I/O intensive operation.
    */
-  protected void verifyChecksum(long length, FileInputStream metaIn,
-                                FileChannel blockChannel, String blockFileName)
-      throws IOException {
+  private void verifyChecksumAndMapBlock(POSIX.PmemMappedRegion region,
+      long length, FileInputStream metaIn, FileChannel blockChannel,
+      String blockFileName) throws IOException {
     // Verify the checksum from the block's meta file
     // Get the DataChecksum from the meta file header
     BlockMetadataHeader header =
@@ -129,8 +133,8 @@ public abstract class MappableBlockLoader {
     try {
       metaChannel = metaIn.getChannel();
       if (metaChannel == null) {
-        throw new IOException(
-            "Block InputStream meta file has no FileChannel.");
+        throw new IOException("Cannot get FileChannel" +
+            " from Block InputStream meta file.");
       }
       DataChecksum checksum = header.getChecksum();
       final int bytesPerChecksum = checksum.getBytesPerChecksum();
@@ -140,13 +144,19 @@ public abstract class MappableBlockLoader {
       ByteBuffer checksumBuf = ByteBuffer.allocate(numChunks * checksumSize);
       // Verify the checksum
       int bytesVerified = 0;
+      long mappedAddress = -1L;
+      if (region != null) {
+        mappedAddress = region.getAddress();
+      }
       while (bytesVerified < length) {
         Preconditions.checkState(bytesVerified % bytesPerChecksum == 0,
-            "Unexpected partial chunk before EOF");
+            "Unexpected partial chunk before EOF.");
         assert bytesVerified % bytesPerChecksum == 0;
         int bytesRead = fillBuffer(blockChannel, blockBuf);
         if (bytesRead == -1) {
-          throw new IOException("checksum verification failed: premature EOF");
+          throw new IOException(
+              "Checksum verification failed for the block " + blockFileName +
+                  ": premature EOF");
         }
         blockBuf.flip();
         // Number of read chunks, including partial chunk at end
@@ -158,32 +168,24 @@ public abstract class MappableBlockLoader {
             bytesVerified);
         // Success
         bytesVerified += bytesRead;
+        // Copy data to persistent file
+        POSIX.Pmem.memCopy(blockBuf.array(), mappedAddress,
+            region.isPmem(), bytesRead);
+        mappedAddress += bytesRead;
+        // Clear buffer
         blockBuf.clear();
         checksumBuf.clear();
       }
+      if (region != null) {
+        POSIX.Pmem.memSync(region);
+      }
     } finally {
       IOUtils.closeQuietly(metaChannel);
     }
   }
 
-  /**
-   * Reads bytes into a buffer until EOF or the buffer's limit is reached.
-   */
-  protected int fillBuffer(FileChannel channel, ByteBuffer buf)
-      throws IOException {
-    int bytesRead = channel.read(buf);
-    if (bytesRead < 0) {
-      //EOF
-      return bytesRead;
-    }
-    while (buf.remaining() > 0) {
-      int n = channel.read(buf);
-      if (n < 0) {
-        //EOF
-        return bytesRead;
-      }
-      bytesRead += n;
-    }
-    return bytesRead;
+  @Override
+  public boolean isNativeLoader() {
+    return true;
   }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/NativePmemMappedBlock.java
similarity index 52%
copy from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java
copy to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/NativePmemMappedBlock.java
index 25c3d40..92012b2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/NativePmemMappedBlock.java
@@ -21,25 +21,29 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.ExtendedBlockId;
+import org.apache.hadoop.io.nativeio.NativeIO;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
 /**
- * Represents an HDFS block that is mapped to persistent memory by DataNode
- * with mapped byte buffer. PMDK is NOT involved in this implementation.
+ * Represents an HDFS block that is mapped to persistent memory by the DataNode.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-public class PmemMappedBlock implements MappableBlock {
+public class NativePmemMappedBlock implements MappableBlock {
   private static final Logger LOG =
-      LoggerFactory.getLogger(PmemMappedBlock.class);
+      LoggerFactory.getLogger(NativePmemMappedBlock.class);
+
+  private long pmemMappedAddress = -1L;
   private long length;
   private ExtendedBlockId key;
 
-  PmemMappedBlock(long length, ExtendedBlockId key) {
+  NativePmemMappedBlock(long pmemMappedAddress, long length,
+      ExtendedBlockId key) {
     assert length > 0;
+    this.pmemMappedAddress = pmemMappedAddress;
     this.length = length;
     this.key = key;
   }
@@ -50,15 +54,32 @@ public class PmemMappedBlock implements MappableBlock {
   }
 
   @Override
+  public long getAddress() {
+    return pmemMappedAddress;
+  }
+
+  @Override
   public void close() {
-    String cacheFilePath =
-        PmemVolumeManager.getInstance().getCachePath(key);
-    try {
-      FsDatasetUtil.deleteMappedFile(cacheFilePath);
-      LOG.info("Successfully uncached one replica:{} from persistent memory"
-          + ", [cached path={}, length={}]", key, cacheFilePath, length);
-    } catch (IOException e) {
-      LOG.warn("Failed to delete the mapped File: {}!", cacheFilePath, e);
+    if (pmemMappedAddress != -1L) {
+      String cacheFilePath =
+          PmemVolumeManager.getInstance().getCachePath(key);
+      try {
+        // Current libpmem will report error when pmem_unmap is called with
+        // length not aligned with page size, although the length is returned
+        // by pmem_map_file.
+        boolean success =
+            NativeIO.POSIX.Pmem.unmapBlock(pmemMappedAddress, length);
+        if (!success) {
+          throw new IOException("Failed to unmap the mapped file from " +
+              "pmem address: " + pmemMappedAddress);
+        }
+        pmemMappedAddress = -1L;
+        FsDatasetUtil.deleteMappedFile(cacheFilePath);
+        LOG.info("Successfully uncached one replica:{} from persistent memory"
+            + ", [cached path={}, length={}]", key, cacheFilePath, length);
+      } catch (IOException e) {
+        LOG.warn("IOException occurred for block {}!", key, e);
+      }
     }
   }
-}
\ No newline at end of file
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java
index 239fff8..70a42c4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java
@@ -43,7 +43,7 @@ public class PmemMappableBlockLoader extends MappableBlockLoader {
 
   @Override
   void initialize(FsDatasetCache cacheManager) throws IOException {
-    LOG.info("Initializing cache loader: PmemMappableBlockLoader.");
+    LOG.info("Initializing cache loader: " + this.getClass().getName());
     DNConf dnConf = cacheManager.getDnConf();
     PmemVolumeManager.init(dnConf.getPmemVolumes());
     pmemVolumeManager = PmemVolumeManager.getInstance();
@@ -71,8 +71,7 @@ public class PmemMappableBlockLoader extends MappableBlockLoader {
    */
   @Override
   MappableBlock load(long length, FileInputStream blockIn,
-                     FileInputStream metaIn, String blockFileName,
-                     ExtendedBlockId key)
+      FileInputStream metaIn, String blockFileName, ExtendedBlockId key)
       throws IOException {
     PmemMappedBlock mappableBlock = null;
     String cachePath = null;
@@ -133,6 +132,11 @@ public class PmemMappableBlockLoader extends MappableBlockLoader {
   }
 
   @Override
+  public boolean isNativeLoader() {
+    return false;
+  }
+
+  @Override
   void shutdown() {
     LOG.info("Clean up cache on persistent memory during shutdown.");
     PmemVolumeManager.getInstance().cleanup();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java
index 25c3d40..a49626a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java
@@ -50,6 +50,11 @@ public class PmemMappedBlock implements MappableBlock {
   }
 
   @Override
+  public long getAddress() {
+    return -1L;
+  }
+
+  @Override
   public void close() {
     String cacheFilePath =
         PmemVolumeManager.getInstance().getCachePath(key);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org